zulip/zerver/openapi/openapi.py

226 lines
9.2 KiB
Python

# Set of helper functions to manipulate the OpenAPI files that define our REST
# API's specification.
import os
from typing import Any, Dict, List, Optional, Set
OPENAPI_SPEC_PATH = os.path.abspath(os.path.join(
os.path.dirname(__file__),
'../openapi/zulip.yaml'))
# A list of exceptions we allow when running validate_against_openapi_schema.
# The validator will ignore these keys when they appear in the "content"
# passed.
EXCLUDE_PROPERTIES = {
'/register': {
'post': {
'200': ['max_message_id', 'realm_emoji'],
},
},
'/zulip-outgoing-webhook': {
'post': {
'200': ['result', 'msg', 'message'],
},
},
}
class OpenAPISpec():
def __init__(self, path: str) -> None:
self.path = path
self.last_update: Optional[float] = None
self.data: Optional[Dict[str, Any]] = None
def reload(self) -> None:
# Because importing yamole (and in turn, yaml) takes
# significant time, and we only use python-yaml for our API
# docs, importing it lazily here is a significant optimization
# to `manage.py` startup.
#
# There is a bit of a race here...we may have two processes
# accessing this module level object and both trying to
# populate self.data at the same time. Hopefully this will
# only cause some extra processing at startup and not data
# corruption.
from yamole import YamoleParser
with open(self.path) as f:
yaml_parser = YamoleParser(f)
self.data = yaml_parser.data
self.last_update = os.path.getmtime(self.path)
def spec(self) -> Dict[str, Any]:
"""Reload the OpenAPI file if it has been modified after the last time
it was read, and then return the parsed data.
"""
last_modified = os.path.getmtime(self.path)
# Using != rather than < to cover the corner case of users placing an
# earlier version than the current one
if self.last_update != last_modified:
self.reload()
assert(self.data)
return self.data
class SchemaError(Exception):
pass
openapi_spec = OpenAPISpec(OPENAPI_SPEC_PATH)
def get_schema(endpoint: str, method: str, response: str) -> Dict[str, Any]:
if len(response) == 3:
schema = (openapi_spec.spec()['paths'][endpoint][method.lower()]['responses']
[response]['content']['application/json']['schema'])
return schema
else:
resp_code = int(response[4])
response = response[0:3]
schema = (openapi_spec.spec()['paths'][endpoint][method.lower()]['responses']
[response]['content']['application/json']['schema']["oneOf"][resp_code])
return schema
def get_openapi_fixture(endpoint: str, method: str,
response: str='200') -> Dict[str, Any]:
"""Fetch a fixture from the full spec object.
"""
return (get_schema(endpoint, method, response)['example'])
def get_openapi_description(endpoint: str, method: str) -> str:
"""Fetch a description from the full spec object.
"""
description = openapi_spec.spec()['paths'][endpoint][method.lower()]['description']
return description
def get_openapi_paths() -> Set[str]:
return set(openapi_spec.spec()['paths'].keys())
def get_openapi_parameters(endpoint: str, method: str,
include_url_parameters: bool=True) -> List[Dict[str, Any]]:
openapi_endpoint = openapi_spec.spec()['paths'][endpoint][method.lower()]
# We do a `.get()` for this last bit to distinguish documented
# endpoints with no parameters (empty list) from undocumented
# endpoints (KeyError exception).
parameters = openapi_endpoint.get('parameters', [])
# Also, we skip parameters defined in the URL.
if not include_url_parameters:
parameters = [parameter for parameter in parameters if
parameter['in'] != 'path']
return parameters
def get_openapi_return_values(endpoint: str, method: str,
include_url_parameters: bool=True) -> List[Dict[str, Any]]:
openapi_endpoint = openapi_spec.spec()['paths'][endpoint][method.lower()]
response = openapi_endpoint['responses']['200']['content']['application/json']['schema']
# In cases where we have used oneOf, the schemas only differ in examples
# So we can choose any.
if 'oneOf' in response:
response = response['oneOf'][0]
response = response['properties']
return response
exclusion_list: List[str] = []
def validate_against_openapi_schema(content: Dict[str, Any], endpoint: str,
method: str, response: str) -> None:
"""Compare a "content" dict with the defined schema for a specific method
in an endpoint.
"""
# Check if the response matches its code
if response.startswith('2') and (content.get('result', 'success').lower() != 'success'):
raise SchemaError("Response is not 200 but is validating against 200 schema")
global exclusion_list
schema = get_schema(endpoint, method, response)
# In a single response schema we do not have two keys with the same name.
# Hence exclusion list is declared globally
exclusion_list = (EXCLUDE_PROPERTIES.get(endpoint, {}).get(method, {}).get(response, []))
# Code is not declared but appears in various 400 responses. If common, it can be added
# to 400 response schema
if response.startswith('4'):
exclusion_list.append('code')
validate_object(content, schema)
def validate_array(content: List[Any], schema: Dict[str, Any]) -> None:
valid_types: List[type] = []
if 'oneOf' in schema['items']:
for valid_type in schema['items']['oneOf']:
valid_types.append(to_python_type(valid_type['type']))
else:
valid_types.append(to_python_type(schema['items']['type']))
for item in content:
if type(item) not in valid_types:
raise SchemaError('Wrong data type in array')
# We can directly check for objects and arrays as
# there are no mixed arrays consisting of objects
# and arrays.
if 'object' in valid_types:
if 'oneOf' not in schema['items']:
validate_object(item, schema['items'])
continue
# If the object was not an opaque object then
# the continue statement above should have
# been executed.
if type(item) is dict:
raise SchemaError('Opaque object in array')
if 'items' in schema['items']:
validate_array(item, schema['items'])
def validate_object(content: Dict[str, Any], schema: Dict[str, Any]) -> None:
for key, value in content.items():
if key in exclusion_list:
continue
# Check that the key is defined in the schema
if key not in schema['properties']:
raise SchemaError('Extraneous key "{}" in the response\'s '
'content'.format(key))
# Check that the types match
expected_type: List[type] = []
if 'oneOf' in schema['properties'][key]:
for types in schema['properties'][key]['oneOf']:
expected_type.append(to_python_type(types['type']))
else:
expected_type.append(to_python_type(schema['properties'][key]['type']))
actual_type = type(value)
# We have only define nullable property if it is nullable
if value is None and 'nullable' in schema['properties'][key]:
continue
if actual_type not in expected_type:
raise SchemaError('Expected type {} for key "{}", but actually '
'got {}'.format(expected_type, key, actual_type))
if expected_type is list:
validate_array(value, schema['properties'][key])
if 'properties' in schema['properties'][key]:
validate_object(value, schema['properties'][key])
continue
if 'additionalProperties' in schema['properties'][key]:
for child_keys in value:
if type(value[child_keys]) is list:
validate_array(value[child_keys],
schema['properties'][key]['additionalProperties'])
continue
validate_object(value[child_keys],
schema['properties'][key]['additionalProperties'])
continue
# If the object is not opaque then continue statements
# will be executed above and this will be skipped
if expected_type is dict:
raise SchemaError(f'Opaque object "{key}"')
# Check that at least all the required keys are present
if 'required' in schema:
for req_key in schema['required']:
if req_key in exclusion_list:
continue
if req_key not in content.keys():
raise SchemaError('Expected to find the "{}" required key'.format(req_key))
def to_python_type(py_type: str) -> type:
"""Transform an OpenAPI-like type to a Python one.
https://swagger.io/docs/specification/data-models/data-types
"""
TYPES = {
'string': str,
'number': float,
'integer': int,
'boolean': bool,
'array': list,
'object': dict,
}
return TYPES[py_type]