decorators: Migrate to typed_endpoint.

Modify `semgrep-py.yml` to treat arguments after `*args` as
keyword-only arguments.
This commit is contained in:
Kenneth Rodrigues 2024-08-15 14:15:39 +05:30 committed by Tim Abbott
parent 47ebef057e
commit 86bf0d3ea3
3 changed files with 19 additions and 14 deletions

View File

@ -266,6 +266,10 @@ rules:
@typed_endpoint @typed_endpoint
def $F(..., *, ...)-> ...: def $F(..., *, ...)-> ...:
... ...
- pattern-not-inside: |
@typed_endpoint
def $F(..., *args, ...)-> ...:
...
message: | message: |
@typed_endpoint should not be used without keyword-only parameters. @typed_endpoint should not be used without keyword-only parameters.
Make parameters to be parsed from the request as keyword-only, Make parameters to be parsed from the request as keyword-only,

View File

@ -45,10 +45,11 @@ from zerver.lib.exceptions import (
) )
from zerver.lib.queue import queue_json_publish from zerver.lib.queue import queue_json_publish
from zerver.lib.rate_limiter import is_local_addr, rate_limit_request_by_ip, rate_limit_user from zerver.lib.rate_limiter import is_local_addr, rate_limit_request_by_ip, rate_limit_user
from zerver.lib.request import REQ, RequestNotes, has_request_variables from zerver.lib.request import RequestNotes
from zerver.lib.response import json_method_not_allowed from zerver.lib.response import json_method_not_allowed
from zerver.lib.subdomains import get_subdomain, user_matches_subdomain from zerver.lib.subdomains import get_subdomain, user_matches_subdomain
from zerver.lib.timestamp import datetime_to_timestamp, timestamp_to_datetime from zerver.lib.timestamp import datetime_to_timestamp, timestamp_to_datetime
from zerver.lib.typed_endpoint import typed_endpoint
from zerver.lib.users import is_2fa_verified from zerver.lib.users import is_2fa_verified
from zerver.lib.utils import has_api_key_format from zerver.lib.utils import has_api_key_format
from zerver.lib.webhooks.common import notify_bot_owner_about_invalid_json from zerver.lib.webhooks.common import notify_bot_owner_about_invalid_json
@ -333,10 +334,10 @@ def webhook_view(
# Variadic generics are necessary: https://github.com/python/typing/issues/193 # Variadic generics are necessary: https://github.com/python/typing/issues/193
def _wrapped_view_func(view_func: Callable[..., HttpResponse]) -> Callable[..., HttpResponse]: def _wrapped_view_func(view_func: Callable[..., HttpResponse]) -> Callable[..., HttpResponse]:
@csrf_exempt @csrf_exempt
@has_request_variables
@wraps(view_func) @wraps(view_func)
@typed_endpoint
def _wrapped_func_arguments( def _wrapped_func_arguments(
request: HttpRequest, /, api_key: str = REQ(), *args: object, **kwargs: object request: HttpRequest, /, *args: object, api_key: str, **kwargs: object
) -> HttpResponse: ) -> HttpResponse:
user_profile = validate_api_key( user_profile = validate_api_key(
request, request,
@ -685,10 +686,10 @@ def authenticated_uploads_api_view(
) -> Callable[[Callable[..., HttpResponse]], Callable[..., HttpResponse]]: ) -> Callable[[Callable[..., HttpResponse]], Callable[..., HttpResponse]]:
def _wrapped_view_func(view_func: Callable[..., HttpResponse]) -> Callable[..., HttpResponse]: def _wrapped_view_func(view_func: Callable[..., HttpResponse]) -> Callable[..., HttpResponse]:
@csrf_exempt @csrf_exempt
@has_request_variables
@wraps(view_func) @wraps(view_func)
@typed_endpoint
def _wrapped_func_arguments( def _wrapped_func_arguments(
request: HttpRequest, /, api_key: str = REQ(), *args: object, **kwargs: object request: HttpRequest, /, *args: object, api_key: str, **kwargs: object
) -> HttpResponse: ) -> HttpResponse:
user_profile = validate_api_key(request, None, api_key, False) user_profile = validate_api_key(request, None, api_key, False)
if not skip_rate_limiting: if not skip_rate_limiting:
@ -910,8 +911,7 @@ def authenticated_json_view(
# from command-line tools into Django. We protect them from the # from command-line tools into Django. We protect them from the
# outside world by checking a shared secret, and also the originating # outside world by checking a shared secret, and also the originating
# IP (for now). # IP (for now).
@has_request_variables def authenticate_internal_api(request: HttpRequest, *, secret: str) -> bool:
def authenticate_internal_api(request: HttpRequest, secret: str = REQ("secret")) -> bool:
return is_local_addr(request.META["REMOTE_ADDR"]) and constant_time_compare( return is_local_addr(request.META["REMOTE_ADDR"]) and constant_time_compare(
secret, settings.SHARED_SECRET secret, settings.SHARED_SECRET
) )
@ -929,14 +929,15 @@ def internal_api_view(
def _wrapped_view_func( def _wrapped_view_func(
view_func: Callable[Concatenate[HttpRequest, ParamT], HttpResponse], view_func: Callable[Concatenate[HttpRequest, ParamT], HttpResponse],
) -> Callable[Concatenate[HttpRequest, ParamT], HttpResponse]: ) -> Callable[..., HttpResponse]:
@csrf_exempt @csrf_exempt
@require_post @require_post
@wraps(view_func) @wraps(view_func)
@typed_endpoint
def _wrapped_func_arguments( def _wrapped_func_arguments(
request: HttpRequest, /, *args: ParamT.args, **kwargs: ParamT.kwargs request: HttpRequest, /, *args: ParamT.args, secret: str, **kwargs: ParamT.kwargs
) -> HttpResponse: ) -> HttpResponse:
if not authenticate_internal_api(request): if not authenticate_internal_api(request, secret=secret):
raise AccessDeniedError raise AccessDeniedError
request_notes = RequestNotes.get_notes(request) request_notes = RequestNotes.get_notes(request)
is_tornado_request = request_notes.tornado_handler_id is not None is_tornado_request = request_notes.tornado_handler_id is not None

View File

@ -1034,7 +1034,7 @@ class TestInternalNotifyView(ZulipTestCase):
) )
with self.settings(SHARED_SECRET=secret): with self.settings(SHARED_SECRET=secret):
self.assertTrue(authenticate_internal_api(request)) self.assertTrue(authenticate_internal_api(request, secret=secret))
self.assertEqual( self.assertEqual(
orjson.loads(self.internal_notify(False, request).content).get("msg"), orjson.loads(self.internal_notify(False, request).content).get("msg"),
self.BORING_RESULT, self.BORING_RESULT,
@ -1050,7 +1050,7 @@ class TestInternalNotifyView(ZulipTestCase):
tornado_handler=dummy_handler, tornado_handler=dummy_handler,
) )
with self.settings(SHARED_SECRET=secret): with self.settings(SHARED_SECRET=secret):
self.assertTrue(authenticate_internal_api(request)) self.assertTrue(authenticate_internal_api(request, secret=secret))
self.assertEqual( self.assertEqual(
orjson.loads(self.internal_notify(True, request).content).get("msg"), orjson.loads(self.internal_notify(True, request).content).get("msg"),
self.BORING_RESULT, self.BORING_RESULT,
@ -1083,7 +1083,7 @@ class TestInternalNotifyView(ZulipTestCase):
) )
with self.settings(SHARED_SECRET="broken"): with self.settings(SHARED_SECRET="broken"):
self.assertFalse(authenticate_internal_api(request)) self.assertFalse(authenticate_internal_api(request, secret=secret))
with self.assertRaises(AccessDeniedError) as access_denied_error: with self.assertRaises(AccessDeniedError) as access_denied_error:
self.internal_notify(True, request) self.internal_notify(True, request)
self.assertEqual(access_denied_error.exception.http_status_code, 403) self.assertEqual(access_denied_error.exception.http_status_code, 403)
@ -1096,7 +1096,7 @@ class TestInternalNotifyView(ZulipTestCase):
) )
with self.settings(SHARED_SECRET=secret): with self.settings(SHARED_SECRET=secret):
self.assertFalse(authenticate_internal_api(request)) self.assertFalse(authenticate_internal_api(request, secret=secret))
with self.assertRaises(AccessDeniedError) as context: with self.assertRaises(AccessDeniedError) as context:
self.internal_notify(True, request) self.internal_notify(True, request)
self.assertEqual(context.exception.http_status_code, 403) self.assertEqual(context.exception.http_status_code, 403)