users: Migrate to typed_endpoint.

This commit is contained in:
Kenneth Rodrigues 2024-05-11 20:34:34 +05:30 committed by Tim Abbott
parent aaf90a1da0
commit 428ca713d0
4 changed files with 127 additions and 107 deletions

View File

@ -1,5 +1,8 @@
from typing import Optional from typing import List, Optional
from django.core.exceptions import ValidationError
from django.core.validators import URLValidator
from django.utils.translation import gettext as _
from pydantic_core import PydanticCustomError from pydantic_core import PydanticCustomError
# The Pydantic.StringConstraints does not have validation for the string to be # The Pydantic.StringConstraints does not have validation for the string to be
@ -16,3 +19,18 @@ def check_string_fixed_length(string: str, length: int) -> Optional[str]:
}, },
) )
return string return string
def check_int_in(val: int, possible_values: List[int]) -> int:
if val not in possible_values:
raise ValueError(_("Not in the list of possible values"))
return val
def check_url(val: str) -> str:
validate = URLValidator()
try:
validate(val)
return val
except ValidationError:
raise ValueError(_("Not a URL"))

View File

@ -1721,7 +1721,7 @@ class BotTest(ZulipTestCase, UploadSerializeMixin):
# invalid URL test case. # invalid URL test case.
bot_info["payload_url"] = orjson.dumps("http://127.0.0.:5002").decode() bot_info["payload_url"] = orjson.dumps("http://127.0.0.:5002").decode()
result = self.client_post("/json/bots", bot_info) result = self.client_post("/json/bots", bot_info)
self.assert_json_error(result, "payload_url is not a URL") self.assert_json_error(result, "Invalid payload_url: Value error, Not a URL")
def test_get_bot_handler(self) -> None: def test_get_bot_handler(self) -> None:
# Test for valid service. # Test for valid service.
@ -1804,7 +1804,7 @@ class BotTest(ZulipTestCase, UploadSerializeMixin):
user_profile=self.example_user("hamlet"), user_profile=self.example_user("hamlet"),
service_name="followup", service_name="followup",
config_data=orjson.dumps({"invalid": ["config", "value"]}).decode(), config_data=orjson.dumps({"invalid": ["config", "value"]}).decode(),
assert_json_error_msg="config_data contains a value that is not a string", assert_json_error_msg='config_data["invalid"] is not a string',
**extras, **extras,
) )

View File

@ -0,0 +1,14 @@
from zerver.lib.test_classes import ZulipTestCase
from zerver.lib.typed_endpoint_validators import check_int_in, check_url
class ValidatorTestCase(ZulipTestCase):
def test_check_int_in(self) -> None:
check_int_in(3, [1, 2, 3])
with self.assertRaisesRegex(ValueError, "Not in the list of possible values"):
check_int_in(3, [1, 2])
def test_check_url(self) -> None:
check_url("https://example.com")
with self.assertRaisesRegex(ValueError, "Not a URL"):
check_url("https://127.0.0..:5000")

View File

@ -1,5 +1,5 @@
from email.headerregistry import Address from email.headerregistry import Address
from typing import Any, Dict, List, Mapping, Optional, Union from typing import Any, Dict, List, Mapping, Optional, TypeAlias, Union
from django.conf import settings from django.conf import settings
from django.contrib.auth.models import AnonymousUser from django.contrib.auth.models import AnonymousUser
@ -7,6 +7,8 @@ from django.core.files.uploadedfile import UploadedFile
from django.http import HttpRequest, HttpResponse from django.http import HttpRequest, HttpResponse
from django.shortcuts import redirect from django.shortcuts import redirect
from django.utils.translation import gettext as _ from django.utils.translation import gettext as _
from pydantic import AfterValidator, BaseModel, Json, StringConstraints
from typing_extensions import Annotated
from zerver.actions.bots import ( from zerver.actions.bots import (
do_change_bot_owner, do_change_bot_owner,
@ -46,11 +48,17 @@ from zerver.lib.exceptions import (
) )
from zerver.lib.integrations import EMBEDDED_BOTS from zerver.lib.integrations import EMBEDDED_BOTS
from zerver.lib.rate_limiter import rate_limit_spectator_attachment_access_by_file from zerver.lib.rate_limiter import rate_limit_spectator_attachment_access_by_file
from zerver.lib.request import REQ, has_request_variables
from zerver.lib.response import json_success from zerver.lib.response import json_success
from zerver.lib.send_email import FromAddress, send_email from zerver.lib.send_email import FromAddress, send_email
from zerver.lib.streams import access_stream_by_id, access_stream_by_name, subscribed_to_stream from zerver.lib.streams import access_stream_by_id, access_stream_by_name, subscribed_to_stream
from zerver.lib.types import ProfileDataElementUpdateDict, ProfileDataElementValue, Validator from zerver.lib.typed_endpoint import (
ApiParamConfig,
PathOnly,
typed_endpoint,
typed_endpoint_without_parameters,
)
from zerver.lib.typed_endpoint_validators import check_int_in, check_url
from zerver.lib.types import ProfileDataElementUpdateDict
from zerver.lib.upload import upload_avatar_image from zerver.lib.upload import upload_avatar_image
from zerver.lib.url_encoding import append_url_query_string from zerver.lib.url_encoding import append_url_query_string
from zerver.lib.users import ( from zerver.lib.users import (
@ -73,19 +81,6 @@ from zerver.lib.users import (
validate_user_custom_profile_data, validate_user_custom_profile_data,
) )
from zerver.lib.utils import generate_api_key from zerver.lib.utils import generate_api_key
from zerver.lib.validator import (
check_bool,
check_capped_string,
check_dict,
check_dict_only,
check_int,
check_int_in,
check_list,
check_none_or,
check_string,
check_union,
check_url,
)
from zerver.models import Service, Stream, UserProfile from zerver.models import Service, Stream, UserProfile
from zerver.models.realms import ( from zerver.models.realms import (
DisposableEmailError, DisposableEmailError,
@ -101,20 +96,31 @@ from zerver.models.users import (
) )
from zproject.backends import check_password_strength from zproject.backends import check_password_strength
RoleParamType: TypeAlias = Annotated[
int,
AfterValidator(
lambda x: check_int_in(
x,
UserProfile.ROLE_TYPES,
)
),
]
def check_last_owner(user_profile: UserProfile) -> bool: def check_last_owner(user_profile: UserProfile) -> bool:
owners = set(user_profile.realm.get_human_owner_users()) owners = set(user_profile.realm.get_human_owner_users())
return user_profile.is_realm_owner and not user_profile.is_bot and len(owners) == 1 return user_profile.is_realm_owner and not user_profile.is_bot and len(owners) == 1
@has_request_variables @typed_endpoint
def deactivate_user_backend( def deactivate_user_backend(
request: HttpRequest, request: HttpRequest,
user_profile: UserProfile, user_profile: UserProfile,
user_id: int, *,
deactivation_notification_comment: Optional[str] = REQ( user_id: PathOnly[int],
str_validator=check_capped_string(max_length=2000), default=None deactivation_notification_comment: Optional[
), Annotated[str, StringConstraints(max_length=2000)]
] = None,
) -> HttpResponse: ) -> HttpResponse:
target = access_user_by_id(user_profile, user_id, for_admin=True) target = access_user_by_id(user_profile, user_id, for_admin=True)
if target.is_realm_owner and not user_profile.is_realm_owner: if target.is_realm_owner and not user_profile.is_realm_owner:
@ -189,39 +195,20 @@ def reactivate_user_backend(
return json_success(request) return json_success(request)
check_profile_data: Validator[List[Dict[str, Optional[Union[int, ProfileDataElementValue]]]]] = ( class ProfileDataElement(BaseModel):
check_list( id: int
check_dict_only( value: Optional[Union[str, List[int]]]
[
("id", check_int),
(
"value",
check_none_or(
check_union([check_string, check_list(check_int)]),
),
),
]
),
)
)
@has_request_variables @typed_endpoint
def update_user_backend( def update_user_backend(
request: HttpRequest, request: HttpRequest,
user_profile: UserProfile, user_profile: UserProfile,
user_id: int, *,
full_name: Optional[str] = REQ(default=None), user_id: PathOnly[int],
role: Optional[int] = REQ( full_name: Optional[str] = None,
default=None, role: Optional[Json[RoleParamType]] = None,
json_validator=check_int_in( profile_data: Optional[Json[List[ProfileDataElement]]] = None,
UserProfile.ROLE_TYPES,
),
),
profile_data: Optional[List[Dict[str, Optional[Union[int, ProfileDataElementValue]]]]] = REQ(
default=None,
json_validator=check_profile_data,
),
) -> HttpResponse: ) -> HttpResponse:
target = access_user_by_id( target = access_user_by_id(
user_profile, user_id, allow_deactivated=True, allow_bots=True, for_admin=True user_profile, user_id, allow_deactivated=True, allow_bots=True, for_admin=True
@ -251,16 +238,16 @@ def update_user_backend(
if profile_data is not None: if profile_data is not None:
clean_profile_data: List[ProfileDataElementUpdateDict] = [] clean_profile_data: List[ProfileDataElementUpdateDict] = []
for entry in profile_data: for entry in profile_data:
assert isinstance(entry["id"], int) assert isinstance(entry.id, int)
assert not isinstance(entry["value"], int) assert not isinstance(entry.value, int)
if entry["value"] is None or not entry["value"]: if entry.value is None or not entry.value:
field_id = entry["id"] field_id = entry.id
check_remove_custom_profile_field_value(target, field_id) check_remove_custom_profile_field_value(target, field_id)
else: else:
clean_profile_data.append( clean_profile_data.append(
{ {
"id": entry["id"], "id": entry.id,
"value": entry["value"], "value": entry.value,
} }
) )
validate_user_custom_profile_data(target.realm.id, clean_profile_data) validate_user_custom_profile_data(target.realm.id, clean_profile_data)
@ -347,27 +334,21 @@ def get_stream_name(stream: Optional[Stream]) -> Optional[str]:
@require_member_or_admin @require_member_or_admin
@has_request_variables @typed_endpoint
def patch_bot_backend( def patch_bot_backend(
request: HttpRequest, request: HttpRequest,
user_profile: UserProfile, user_profile: UserProfile,
bot_id: int, *,
full_name: Optional[str] = REQ(default=None), bot_id: PathOnly[int],
role: Optional[int] = REQ( full_name: Optional[str] = None,
default=None, role: Optional[Json[RoleParamType]] = None,
json_validator=check_int_in( bot_owner_id: Optional[Json[int]] = None,
UserProfile.ROLE_TYPES, config_data: Optional[Json[Dict[str, str]]] = None,
), service_payload_url: Optional[Json[Annotated[str, AfterValidator(check_url)]]] = None,
), service_interface: Json[int] = 1,
bot_owner_id: Optional[int] = REQ(json_validator=check_int, default=None), default_sending_stream: Optional[str] = None,
config_data: Optional[Dict[str, str]] = REQ( default_events_register_stream: Optional[str] = None,
default=None, json_validator=check_dict(value_validator=check_string) default_all_public_streams: Optional[Json[bool]] = None,
),
service_payload_url: Optional[str] = REQ(json_validator=check_url, default=None),
service_interface: int = REQ(json_validator=check_int, default=1),
default_sending_stream: Optional[str] = REQ(default=None),
default_events_register_stream: Optional[str] = REQ(default=None),
default_all_public_streams: Optional[bool] = REQ(default=None, json_validator=check_bool),
) -> HttpResponse: ) -> HttpResponse:
bot = access_bot_by_id(user_profile, bot_id) bot = access_bot_by_id(user_profile, bot_id)
@ -454,9 +435,9 @@ def patch_bot_backend(
@require_member_or_admin @require_member_or_admin
@has_request_variables @typed_endpoint_without_parameters
def regenerate_bot_api_key( def regenerate_bot_api_key(
request: HttpRequest, user_profile: UserProfile, bot_id: int request: HttpRequest, user_profile: UserProfile, bot_id: PathOnly[int]
) -> HttpResponse: ) -> HttpResponse:
bot = access_bot_by_id(user_profile, bot_id) bot = access_bot_by_id(user_profile, bot_id)
@ -468,25 +449,28 @@ def regenerate_bot_api_key(
@require_member_or_admin @require_member_or_admin
@has_request_variables @typed_endpoint
def add_bot_backend( def add_bot_backend(
request: HttpRequest, request: HttpRequest,
user_profile: UserProfile, user_profile: UserProfile,
full_name_raw: str = REQ("full_name"), *,
short_name_raw: str = REQ("short_name"), full_name_raw: Annotated[str, ApiParamConfig("full_name")],
bot_type: int = REQ(json_validator=check_int, default=UserProfile.DEFAULT_BOT), short_name_raw: Annotated[str, ApiParamConfig("short_name")],
payload_url: str = REQ(json_validator=check_url, default=""), bot_type: Json[int] = UserProfile.DEFAULT_BOT,
service_name: Optional[str] = REQ(default=None), payload_url: Json[Annotated[str, AfterValidator(check_url)]] = "",
config_data: Mapping[str, str] = REQ( service_name: Optional[str] = None,
default={}, json_validator=check_dict(value_validator=check_string) config_data: Optional[Json[Mapping[str, str]]] = None,
), interface_type: Json[int] = Service.GENERIC,
interface_type: int = REQ(json_validator=check_int, default=Service.GENERIC), default_sending_stream_name: Annotated[
default_sending_stream_name: Optional[str] = REQ("default_sending_stream", default=None), Optional[str], ApiParamConfig("default_sending_stream")
default_events_register_stream_name: Optional[str] = REQ( ] = None,
"default_events_register_stream", default=None default_events_register_stream_name: Annotated[
), Optional[str], ApiParamConfig("default_events_register_stream")
default_all_public_streams: Optional[bool] = REQ(json_validator=check_bool, default=None), ] = None,
default_all_public_streams: Optional[Json[bool]] = None,
) -> HttpResponse: ) -> HttpResponse:
if config_data is None:
config_data = {}
short_name = check_short_name(short_name_raw) short_name = check_short_name(short_name_raw)
if bot_type != UserProfile.INCOMING_WEBHOOK_BOT: if bot_type != UserProfile.INCOMING_WEBHOOK_BOT:
service_name = service_name or short_name service_name = service_name or short_name
@ -668,13 +652,14 @@ def get_user_data(
return data return data
@has_request_variables @typed_endpoint
def get_members_backend( def get_members_backend(
request: HttpRequest, request: HttpRequest,
user_profile: UserProfile, user_profile: UserProfile,
user_id: Optional[int] = None, user_id: Optional[int] = None,
include_custom_profile_fields: bool = REQ(json_validator=check_bool, default=False), *,
client_gravatar: bool = REQ(json_validator=check_bool, default=True), include_custom_profile_fields: Json[bool] = False,
client_gravatar: Json[bool] = True,
) -> HttpResponse: ) -> HttpResponse:
target_user = None target_user = None
if user_id is not None: if user_id is not None:
@ -688,13 +673,14 @@ def get_members_backend(
@require_realm_admin @require_realm_admin
@has_request_variables @typed_endpoint
def create_user_backend( def create_user_backend(
request: HttpRequest, request: HttpRequest,
user_profile: UserProfile, user_profile: UserProfile,
email: str = REQ(), *,
password: str = REQ(), email: str,
full_name_raw: str = REQ("full_name"), password: str,
full_name_raw: Annotated[str, ApiParamConfig("full_name")],
) -> HttpResponse: ) -> HttpResponse:
if not user_profile.can_create_users: if not user_profile.can_create_users:
raise JsonableError(_("User not authorized to create users")) raise JsonableError(_("User not authorized to create users"))
@ -764,12 +750,13 @@ def get_profile_backend(request: HttpRequest, user_profile: UserProfile) -> Http
return json_success(request, data=result) return json_success(request, data=result)
@has_request_variables @typed_endpoint
def get_subscription_backend( def get_subscription_backend(
request: HttpRequest, request: HttpRequest,
user_profile: UserProfile, user_profile: UserProfile,
user_id: int = REQ(json_validator=check_int, path_only=True), *,
stream_id: int = REQ(json_validator=check_int, path_only=True), user_id: PathOnly[Json[int]],
stream_id: PathOnly[Json[int]],
) -> HttpResponse: ) -> HttpResponse:
target_user = access_user_by_id(user_profile, user_id, for_admin=False) target_user = access_user_by_id(user_profile, user_id, for_admin=False)
(stream, sub) = access_stream_by_id(user_profile, stream_id, allow_realm_admin=True) (stream, sub) = access_stream_by_id(user_profile, stream_id, allow_realm_admin=True)
@ -779,13 +766,14 @@ def get_subscription_backend(
return json_success(request, data=subscription_status) return json_success(request, data=subscription_status)
@has_request_variables @typed_endpoint
def get_user_by_email( def get_user_by_email(
request: HttpRequest, request: HttpRequest,
user_profile: UserProfile, user_profile: UserProfile,
email: str, *,
include_custom_profile_fields: bool = REQ(json_validator=check_bool, default=False), email: PathOnly[str],
client_gravatar: bool = REQ(json_validator=check_bool, default=True), include_custom_profile_fields: Json[bool] = False,
client_gravatar: Json[bool] = True,
) -> HttpResponse: ) -> HttpResponse:
target_user = access_user_by_email( target_user = access_user_by_email(
user_profile, email, allow_deactivated=True, allow_bots=True, for_admin=False user_profile, email, allow_deactivated=True, allow_bots=True, for_admin=False