import logging from functools import wraps from typing import Any, Callable from django.conf import settings from django.http import HttpRequest, HttpResponse from django.urls import path from django.urls.resolvers import URLPattern from django.utils.crypto import constant_time_compare from django.utils.translation import gettext as _ from django.views.decorators.csrf import csrf_exempt from typing_extensions import Concatenate, ParamSpec, override from zerver.decorator import get_basic_credentials, process_client from zerver.lib.exceptions import ( ErrorCode, JsonableError, RateLimitedError, RemoteServerDeactivatedError, UnauthorizedError, ) from zerver.lib.rate_limiter import should_rate_limit from zerver.lib.request import RequestNotes from zerver.lib.rest import default_never_cache_responses, get_target_view_function_or_response from zerver.lib.subdomains import get_subdomain from zerver.models import Realm from zilencer.models import ( RateLimitedRemoteZulipServer, RemoteZulipServer, get_remote_server_by_uuid, ) logger = logging.getLogger(__name__) ParamT = ParamSpec("ParamT") class InvalidZulipServerError(JsonableError): code = ErrorCode.INVALID_ZULIP_SERVER data_fields = ["role"] def __init__(self, role: str) -> None: self.role: str = role @staticmethod @override def msg_format() -> str: return "Zulip server auth failure: {role} is not registered -- did you run `manage.py register_server`?" class InvalidZulipServerKeyError(InvalidZulipServerError): @staticmethod @override def msg_format() -> str: return "Zulip server auth failure: key does not match role {role}" def rate_limit_remote_server( request: HttpRequest, remote_server: RemoteZulipServer, domain: str ) -> None: if not should_rate_limit(request): return try: RateLimitedRemoteZulipServer(remote_server, domain=domain).rate_limit_request(request) except RateLimitedError as e: logger.warning("Remote server %s exceeded rate limits on domain %s", remote_server, domain) raise e def validate_remote_server( request: HttpRequest, role: str, api_key: str, ) -> RemoteZulipServer: try: remote_server = get_remote_server_by_uuid(role) except RemoteZulipServer.DoesNotExist: raise InvalidZulipServerError(role) if not constant_time_compare(api_key, remote_server.api_key): raise InvalidZulipServerKeyError(role) if remote_server.deactivated: raise RemoteServerDeactivatedError if ( get_subdomain(request) != Realm.SUBDOMAIN_FOR_ROOT_DOMAIN and not settings.DEVELOPMENT_DISABLE_PUSH_BOUNCER_DOMAIN_CHECK ): # Sometimes we may want to test push bouncer logic in development. raise JsonableError(_("Invalid subdomain for push notifications bouncer")) RequestNotes.get_notes(request).remote_server = remote_server process_client(request) return remote_server def authenticated_remote_server_view( view_func: Callable[Concatenate[HttpRequest, RemoteZulipServer, ParamT], HttpResponse] ) -> Callable[Concatenate[HttpRequest, ParamT], HttpResponse]: @wraps(view_func) def _wrapped_view_func( request: HttpRequest, /, *args: ParamT.args, **kwargs: ParamT.kwargs ) -> HttpResponse: role, api_key = get_basic_credentials(request) if "@" in role: raise JsonableError(_("Must validate with valid Zulip server API key")) try: remote_server = validate_remote_server(request, role, api_key) except JsonableError as e: raise UnauthorizedError(e.msg) rate_limit_remote_server(request, remote_server, domain="api_by_remote_server") return view_func(request, remote_server, *args, **kwargs) return _wrapped_view_func @default_never_cache_responses @csrf_exempt def remote_server_dispatch(request: HttpRequest, /, **kwargs: Any) -> HttpResponse: result = get_target_view_function_or_response(request, kwargs) if isinstance(result, HttpResponse): return result target_function, view_flags = result return authenticated_remote_server_view(target_function)(request, **kwargs) def remote_server_path( route: str, **handlers: Callable[Concatenate[HttpRequest, RemoteZulipServer, ParamT], HttpResponse], ) -> URLPattern: return path(route, remote_server_dispatch, handlers)