
415 lines
14 KiB

import threading
from collections import defaultdict
from functools import wraps
from types import FunctionType
from typing import (
import orjson
from django.core.exceptions import ValidationError
from django.http import HttpRequest, HttpResponse
from django.utils.translation import gettext as _
from typing_extensions import Literal
from zerver.lib.exceptions import ErrorCode, InvalidJSONError, JsonableError
from zerver.lib.types import Validator, ViewFuncT
class RequestConfusingParmsError(JsonableError):
data_fields = ["var_name1", "var_name2"]
def __init__(self, var_name1: str, var_name2: str) -> None:
self.var_name1: str = var_name1
self.var_name2: str = var_name2
def msg_format() -> str:
return _("Can't decide between '{var_name1}' and '{var_name2}' arguments")
class RequestVariableMissingError(JsonableError):
data_fields = ["var_name"]
def __init__(self, var_name: str) -> None:
self.var_name: str = var_name
def msg_format() -> str:
return _("Missing '{var_name}' argument")
class RequestVariableConversionError(JsonableError):
data_fields = ["var_name", "bad_value"]
def __init__(self, var_name: str, bad_value: Any) -> None:
self.var_name: str = var_name
self.bad_value = bad_value
def msg_format() -> str:
return _("Bad value for '{var_name}': {bad_value}")
# Used in conjunction with @has_request_variables, below
ResultT = TypeVar("ResultT")
class _REQ(Generic[ResultT]):
# NotSpecified is a sentinel value for determining whether a
# default value was specified for a request variable. We can't
# use None because that could be a valid, user-specified default
class _NotSpecified:
NotSpecified = _NotSpecified()
def __init__(
whence: Optional[str] = None,
converter: Optional[Callable[[str], ResultT]] = None,
default: Union[_NotSpecified, ResultT, None] = NotSpecified,
json_validator: Optional[Validator[ResultT]] = None,
str_validator: Optional[Validator[ResultT]] = None,
argument_type: Optional[str] = None,
intentionally_undocumented: bool = False,
documentation_pending: bool = False,
aliases: Sequence[str] = [],
path_only: bool = False,
) -> None:
"""whence: the name of the request variable that should be used
for this parameter. Defaults to a request variable of the
same name as the parameter.
converter: a function that takes a string and returns a new
value. If specified, this will be called on the request
variable value before passing to the function
default: a value to be used for the argument if the parameter
is missing in the request
json_validator: similar to converter, but takes an already
parsed JSON data structure. If specified, we will parse the
JSON request variable value before passing to the function
str_validator: Like json_validator, but doesn't parse JSON
argument_type: pass 'body' to extract the parsed JSON
corresponding to the request body
aliases: alternate names for the POST var
path_only: Used for parameters included in the URL that we still want
to validate via REQ's hooks.
self.post_var_name = whence
self.func_var_name: Optional[str] = None
self.converter = converter
self.json_validator = json_validator
self.str_validator = str_validator
self.default = default
self.argument_type = argument_type
self.aliases = aliases
self.intentionally_undocumented = intentionally_undocumented
self.documentation_pending = documentation_pending
self.path_only = path_only
assert converter is None or (
json_validator is None and str_validator is None
), "converter and json_validator are mutually exclusive"
assert (
json_validator is None or str_validator is None
), "json_validator and str_validator are mutually exclusive"
# This factory function ensures that mypy can correctly analyze REQ.
# Note that REQ claims to return a type matching that of the parameter
# of which it is the default value, allowing type checking of view
# functions using has_request_variables. In reality, REQ returns an
# instance of class _REQ to enable the decorator to scan the parameter
# list for _REQ objects and patch the parameters as the true types.
# See also this documentation to learn how @overload helps here.
# https://zulip.readthedocs.io/en/latest/testing/mypy.html#using-overload-to-accurately-describe-variations
# Overload 1: converter
def REQ(
whence: Optional[str] = ...,
converter: Callable[[str], ResultT],
default: ResultT = ...,
intentionally_undocumented: bool = ...,
documentation_pending: bool = ...,
aliases: Sequence[str] = ...,
path_only: bool = ...,
) -> ResultT:
# Overload 2: json_validator
def REQ(
whence: Optional[str] = ...,
default: ResultT = ...,
json_validator: Validator[ResultT],
intentionally_undocumented: bool = ...,
documentation_pending: bool = ...,
aliases: Sequence[str] = ...,
path_only: bool = ...,
) -> ResultT:
# Overload 3: no converter/json_validator, default: str or unspecified, argument_type=None
def REQ(
whence: Optional[str] = ...,
default: str = ...,
str_validator: Optional[Validator[str]] = ...,
intentionally_undocumented: bool = ...,
documentation_pending: bool = ...,
aliases: Sequence[str] = ...,
path_only: bool = ...,
) -> str:
# Overload 4: no converter/validator, default=None, argument_type=None
def REQ(
whence: Optional[str] = ...,
default: None,
str_validator: Optional[Validator[str]] = ...,
intentionally_undocumented: bool = ...,
documentation_pending: bool = ...,
aliases: Sequence[str] = ...,
path_only: bool = ...,
) -> Optional[str]:
# Overload 5: argument_type="body"
def REQ(
whence: Optional[str] = ...,
default: ResultT = ...,
argument_type: Literal["body"],
intentionally_undocumented: bool = ...,
documentation_pending: bool = ...,
aliases: Sequence[str] = ...,
path_only: bool = ...,
) -> ResultT:
# Implementation
def REQ(
whence: Optional[str] = None,
converter: Optional[Callable[[str], ResultT]] = None,
default: Union[_REQ._NotSpecified, ResultT] = _REQ.NotSpecified,
json_validator: Optional[Validator[ResultT]] = None,
str_validator: Optional[Validator[ResultT]] = None,
argument_type: Optional[str] = None,
intentionally_undocumented: bool = False,
documentation_pending: bool = False,
aliases: Sequence[str] = [],
path_only: bool = False,
) -> ResultT:
return cast(
arguments_map: Dict[str, List[str]] = defaultdict(list)
# Extracts variables from the request object and passes them as
# named function arguments. The request object must be the first
# argument to the function.
# To use, assign a function parameter a default value that is an
# instance of the _REQ class. That parameter will then be automatically
# populated from the HTTP request. The request object must be the
# first argument to the decorated function.
# This should generally be the innermost (syntactically bottommost)
# decorator applied to a view, since other decorators won't preserve
# the default parameter values used by has_request_variables.
# Note that this can't be used in helper functions which are not
# expected to call json_error or json_success, as it uses json_error
# internally when it encounters an error
def has_request_variables(view_func: ViewFuncT) -> ViewFuncT:
num_params = view_func.__code__.co_argcount
default_param_values = cast(FunctionType, view_func).__defaults__
if default_param_values is None:
default_param_values = ()
num_default_params = len(default_param_values)
default_param_names = view_func.__code__.co_varnames[num_params - num_default_params :]
post_params = []
view_func_full_name = ".".join([view_func.__module__, view_func.__name__])
for (name, value) in zip(default_param_names, default_param_values):
if isinstance(value, _REQ):
value.func_var_name = name
if value.post_var_name is None:
value.post_var_name = name
# Record arguments that should be documented so that our
# automated OpenAPI docs tests can compare these against the code.
if (
not value.intentionally_undocumented
and not value.documentation_pending
and not value.path_only
def _wrapped_view_func(request: HttpRequest, *args: object, **kwargs: object) -> HttpResponse:
for param in post_params:
func_var_name = param.func_var_name
if param.path_only:
# For path_only parameters, they should already have
# been passed via the URL, so there's no need for REQ
# to do anything.
# TODO: Either run validators for path_only parameters
# or don't declare them using REQ.
assert func_var_name in kwargs
if func_var_name in kwargs:
assert func_var_name is not None
if param.argument_type == "body":
val = orjson.loads(request.body)
except orjson.JSONDecodeError:
raise InvalidJSONError(_("Malformed JSON"))
kwargs[func_var_name] = val
# This is a view bug, not a user error, and thus should throw a 500.
assert param.argument_type is None, "Invalid argument type"
post_var_names = [param.post_var_name]
post_var_names += param.aliases
default_assigned = False
post_var_name: Optional[str] = None
for req_var in post_var_names:
if req_var in request.POST:
val = request.POST[req_var]
elif req_var in request.GET:
val = request.GET[req_var]
# This is covered by test_REQ_aliases, but coverage.py
# fails to recognize this for some reason.
continue # nocoverage
if post_var_name is not None:
assert req_var is not None
raise RequestConfusingParmsError(post_var_name, req_var)
post_var_name = req_var
if post_var_name is None:
post_var_name = param.post_var_name
assert post_var_name is not None
if param.default is _REQ.NotSpecified:
raise RequestVariableMissingError(post_var_name)
val = param.default
default_assigned = True
if param.converter is not None and not default_assigned:
val = param.converter(val)
except JsonableError:
except Exception:
raise RequestVariableConversionError(post_var_name, val)
# json_validator is like converter, but doesn't handle JSON parsing; we do.
if param.json_validator is not None and not default_assigned:
val = orjson.loads(val)
except orjson.JSONDecodeError:
raise JsonableError(_('Argument "{}" is not valid JSON.').format(post_var_name))
val = param.json_validator(post_var_name, val)
except ValidationError as error:
raise JsonableError(error.message)
# str_validators is like json_validator, but for direct strings (no JSON parsing).
if param.str_validator is not None and not default_assigned:
val = param.str_validator(post_var_name, val)
except ValidationError as error:
raise JsonableError(error.message)
kwargs[func_var_name] = val
return view_func(request, *args, **kwargs)
return cast(ViewFuncT, _wrapped_view_func) # https://github.com/python/mypy/issues/1927
local = threading.local()
def get_current_request() -> Optional[HttpRequest]:
"""Returns the current HttpRequest object; this should only be used by
logging frameworks, which have no other access to the current
request. All other codepaths should pass through the current
request object, rather than rely on this thread-local global.
return getattr(local, "request", None)
def set_request(req: HttpRequest) -> None:
setattr(local, "request", req)
def unset_request() -> None:
if hasattr(local, "request"):
delattr(local, "request")