from typing import Union, List, Dict, Optional, cast import logging import ujson from django.core.exceptions import ValidationError from django.db import IntegrityError, connection from django.http import HttpRequest, HttpResponse from django.utils.translation import ugettext as _ from zerver.decorator import require_realm_admin, human_users_only from zerver.lib.request import has_request_variables, REQ from zerver.lib.actions import (try_add_realm_custom_profile_field, do_remove_realm_custom_profile_field, try_update_realm_custom_profile_field, do_update_user_custom_profile_data, try_reorder_realm_custom_profile_fields, notify_user_update_custom_profile_data) from zerver.lib.response import json_success, json_error from zerver.lib.types import ProfileFieldData from zerver.lib.validator import (check_dict, check_list, check_int, validate_field_data, check_capped_string) from zerver.models import (custom_profile_fields_for_realm, UserProfile, CustomProfileFieldValue, CustomProfileField, custom_profile_fields_for_realm) from zerver.lib.exceptions import JsonableError def list_realm_custom_profile_fields(request: HttpRequest, user_profile: UserProfile) -> HttpResponse: fields = custom_profile_fields_for_realm(user_profile.realm_id) return json_success({'custom_fields': [f.as_dict() for f in fields]}) hint_validator = check_capped_string(CustomProfileField.HINT_MAX_LENGTH) name_validator = check_capped_string(CustomProfileField.NAME_MAX_LENGTH) def validate_field_name_and_hint(name: str, hint: str) -> None: if not name.strip(): raise JsonableError(_("Name cannot be blank.")) error = hint_validator('hint', hint) if error: raise JsonableError(error) error = name_validator('name', name) if error: raise JsonableError(error) @require_realm_admin @has_request_variables def create_realm_custom_profile_field(request: HttpRequest, user_profile: UserProfile, name: str=REQ(), hint: str=REQ(default=''), field_data: ProfileFieldData=REQ(default={}, converter=ujson.loads), field_type: int=REQ(validator=check_int)) -> HttpResponse: validate_field_name_and_hint(name, hint) field_types = [i[0] for i in CustomProfileField.FIELD_TYPE_CHOICES] if field_type not in field_types: return json_error(_("Invalid field type.")) # Choice type field must have at least have one choice if field_type == CustomProfileField.CHOICE and len(field_data) < 1: return json_error(_("Field must have at least one choice.")) error = validate_field_data(field_data) if error: return json_error(error) try: field = try_add_realm_custom_profile_field( realm=user_profile.realm, name=name, field_data=field_data, field_type=field_type, hint=hint, ) return json_success({'id': field.id}) except IntegrityError: return json_error(_("A field with that name already exists.")) @require_realm_admin def delete_realm_custom_profile_field(request: HttpRequest, user_profile: UserProfile, field_id: int) -> HttpResponse: try: field = CustomProfileField.objects.get(id=field_id) except CustomProfileField.DoesNotExist: return json_error(_('Field id {id} not found.').format(id=field_id)) do_remove_realm_custom_profile_field(realm=user_profile.realm, field=field) return json_success() @require_realm_admin @has_request_variables def update_realm_custom_profile_field(request: HttpRequest, user_profile: UserProfile, field_id: int, name: str=REQ(), hint: str=REQ(default=''), field_data: ProfileFieldData=REQ(default={}, converter=ujson.loads), ) -> HttpResponse: validate_field_name_and_hint(name, hint) error = validate_field_data(field_data) if error: return json_error(error) realm = user_profile.realm try: field = CustomProfileField.objects.get(realm=realm, id=field_id) except CustomProfileField.DoesNotExist: return json_error(_('Field id {id} not found.').format(id=field_id)) try: try_update_realm_custom_profile_field(realm, field, name, hint=hint, field_data=field_data) except IntegrityError: return json_error(_('A field with that name already exists.')) return json_success() @require_realm_admin @has_request_variables def reorder_realm_custom_profile_fields(request: HttpRequest, user_profile: UserProfile, order: List[int]=REQ(validator=check_list( check_int))) -> HttpResponse: try_reorder_realm_custom_profile_fields(user_profile.realm, order) return json_success() @human_users_only @has_request_variables def remove_user_custom_profile_data(request: HttpRequest, user_profile: UserProfile, data: List[int]=REQ(validator=check_list( check_int))) -> HttpResponse: for field_id in data: try: field = CustomProfileField.objects.get(realm=user_profile.realm, id=field_id) except CustomProfileField.DoesNotExist: return json_error(_('Field id {id} not found.').format(id=field_id)) try: field_value = CustomProfileFieldValue.objects.get(field=field, user_profile=user_profile) except CustomProfileFieldValue.DoesNotExist: continue field_value.delete() notify_user_update_custom_profile_data(user_profile, {'id': field_id, 'value': None, 'type': field.field_type}) return json_success() @human_users_only @has_request_variables def update_user_custom_profile_data( request: HttpRequest, user_profile: UserProfile, data: List[Dict[str, Union[int, str, List[int]]]]=REQ(validator=check_list( check_dict([('id', check_int)])))) -> HttpResponse: for item in data: field_id = item['id'] try: field = CustomProfileField.objects.get(id=field_id) except CustomProfileField.DoesNotExist: return json_error(_('Field id {id} not found.').format(id=field_id)) validators = CustomProfileField.FIELD_VALIDATORS field_type = field.field_type var_name = '{}'.format(field.name) value = item['value'] if field_type in validators: validator = validators[field_type] result = validator(var_name, value) elif field_type == CustomProfileField.CHOICE: choice_field_validator = CustomProfileField.CHOICE_FIELD_VALIDATORS[field_type] field_data = field.field_data result = choice_field_validator(var_name, field_data, value) elif field_type == CustomProfileField.USER: user_field_validator = CustomProfileField.USER_FIELD_VALIDATORS[field_type] result = user_field_validator(user_profile.realm.id, cast(List[int], value), False) else: raise AssertionError("Invalid field type") if result is not None: return json_error(result) do_update_user_custom_profile_data(user_profile, data) # We need to call this explicitly otherwise constraints are not check return json_success()