zulip/zerver/views/user_settings.py

386 lines
16 KiB
Python
Raw Normal View History

from typing import Any, Dict, Optional
from django.conf import settings
from django.contrib.auth import authenticate, update_session_auth_hash
from django.core.exceptions import ValidationError
from django.core.files.uploadedfile import UploadedFile
2016-06-05 02:15:26 +02:00
from django.http import HttpRequest, HttpResponse
from django.shortcuts import render
from django.utils.html import escape
from django.utils.safestring import SafeString
from django.utils.translation import gettext as _
from django.utils.translation import gettext_lazy
from confirmation.models import (
Confirmation,
ConfirmationKeyException,
get_object_from_key,
render_confirmation_key_error,
)
from zerver.actions.user_settings import (
check_change_full_name,
do_change_avatar_fields,
do_change_password,
do_change_user_delivery_email,
do_change_user_setting,
do_regenerate_api_key,
do_start_email_change_process,
)
from zerver.decorator import human_users_only
from zerver.lib.avatar import avatar_url
from zerver.lib.email_validation import (
get_realm_email_validator,
validate_email_is_valid,
validate_email_not_already_in_realm,
)
from zerver.lib.exceptions import JsonableError, RateLimited, UserDeactivatedError
from zerver.lib.i18n import get_available_language_codes
from zerver.lib.rate_limiter import RateLimitedUser
from zerver.lib.request import REQ, has_request_variables
from zerver.lib.response import json_success
from zerver.lib.send_email import FromAddress, send_email
from zerver.lib.sounds import get_available_notification_sounds
from zerver.lib.upload import upload_avatar_image
from zerver.lib.validator import (
check_bool,
check_int,
check_int_in,
check_string_in,
check_timezone,
)
from zerver.models import (
EmailChangeStatus,
UserProfile,
avatar_changes_disabled,
name_changes_disabled,
)
from zerver.views.auth import redirect_to_deactivation_notice
from zproject.backends import check_password_strength, email_belongs_to_ldap
AVATAR_CHANGES_DISABLED_ERROR = gettext_lazy("Avatar changes are disabled in this organization.")
def confirm_email_change(request: HttpRequest, confirmation_key: str) -> HttpResponse:
try:
email_change_object = get_object_from_key(
confirmation_key, [Confirmation.EMAIL_CHANGE], mark_object_used=True
)
except ConfirmationKeyException as exception:
return render_confirmation_key_error(request, exception)
assert isinstance(email_change_object, EmailChangeStatus)
new_email = email_change_object.new_email
old_email = email_change_object.old_email
user_profile = email_change_object.user_profile
if user_profile.realm.deactivated:
return redirect_to_deactivation_notice()
if not user_profile.is_active:
# TODO: Make this into a user-facing error, not JSON
raise UserDeactivatedError()
if user_profile.realm.email_changes_disabled and not user_profile.is_realm_admin:
raise JsonableError(_("Email address changes are disabled in this organization."))
do_change_user_delivery_email(user_profile, new_email)
context = {"realm_name": user_profile.realm.name, "new_email": new_email}
language = user_profile.default_language
send_email(
"zerver/emails/notify_change_in_email",
to_emails=[old_email],
from_name=FromAddress.security_email_from_name(user_profile=user_profile),
from_address=FromAddress.SUPPORT,
language=language,
context=context,
realm=user_profile.realm,
)
ctx = {
"new_email_html_tag": SafeString(
f'<a href="mailto:{escape(new_email)}">{escape(new_email)}</a>'
),
"old_email_html_tag": SafeString(
f'<a href="mailto:{escape(old_email)}">{escape(old_email)}</a>'
),
}
return render(request, "confirmation/confirm_email_change.html", context=ctx)
emojiset_choices = {emojiset["key"] for emojiset in UserProfile.emojiset_choices()}
default_view_options = ["recent_topics", "all_messages"]
def check_settings_values(
notification_sound: Optional[str],
email_notifications_batching_period_seconds: Optional[int],
default_language: Optional[str] = None,
) -> None:
# We can't use REQ for this widget because
# get_available_language_codes requires provisioning to be
# complete.
if default_language is not None and default_language not in get_available_language_codes():
raise JsonableError(_("Invalid default_language"))
if (
notification_sound is not None
and notification_sound not in get_available_notification_sounds()
and notification_sound != "none"
):
raise JsonableError(_("Invalid notification sound '{}'").format(notification_sound))
if email_notifications_batching_period_seconds is not None and (
email_notifications_batching_period_seconds <= 0
or email_notifications_batching_period_seconds > 7 * 24 * 60 * 60
):
# We set a limit of one week for the batching period
raise JsonableError(
_("Invalid email batching period: {} seconds").format(
email_notifications_batching_period_seconds
)
)
@human_users_only
@has_request_variables
def json_change_settings(
request: HttpRequest,
user_profile: UserProfile,
full_name: Optional[str] = REQ(default=None),
email: Optional[str] = REQ(default=None),
old_password: Optional[str] = REQ(default=None),
new_password: Optional[str] = REQ(default=None),
twenty_four_hour_time: Optional[bool] = REQ(json_validator=check_bool, default=None),
dense_mode: Optional[bool] = REQ(json_validator=check_bool, default=None),
starred_message_counts: Optional[bool] = REQ(json_validator=check_bool, default=None),
fluid_layout_width: Optional[bool] = REQ(json_validator=check_bool, default=None),
high_contrast_mode: Optional[bool] = REQ(json_validator=check_bool, default=None),
color_scheme: Optional[int] = REQ(
json_validator=check_int_in(UserProfile.COLOR_SCHEME_CHOICES), default=None
),
translate_emoticons: Optional[bool] = REQ(json_validator=check_bool, default=None),
display_emoji_reaction_users: Optional[bool] = REQ(json_validator=check_bool, default=None),
default_language: Optional[str] = REQ(default=None),
default_view: Optional[str] = REQ(
str_validator=check_string_in(default_view_options), default=None
),
escape_navigates_to_default_view: Optional[bool] = REQ(json_validator=check_bool, default=None),
left_side_userlist: Optional[bool] = REQ(json_validator=check_bool, default=None),
emojiset: Optional[str] = REQ(str_validator=check_string_in(emojiset_choices), default=None),
demote_inactive_streams: Optional[int] = REQ(
json_validator=check_int_in(UserProfile.DEMOTE_STREAMS_CHOICES), default=None
),
timezone: Optional[str] = REQ(str_validator=check_timezone, default=None),
email_notifications_batching_period_seconds: Optional[int] = REQ(
json_validator=check_int, default=None
),
enable_drafts_synchronization: Optional[bool] = REQ(json_validator=check_bool, default=None),
enable_stream_desktop_notifications: Optional[bool] = REQ(
json_validator=check_bool, default=None
),
enable_stream_email_notifications: Optional[bool] = REQ(
json_validator=check_bool, default=None
),
enable_stream_push_notifications: Optional[bool] = REQ(json_validator=check_bool, default=None),
enable_stream_audible_notifications: Optional[bool] = REQ(
json_validator=check_bool, default=None
),
wildcard_mentions_notify: Optional[bool] = REQ(json_validator=check_bool, default=None),
notification_sound: Optional[str] = REQ(default=None),
enable_desktop_notifications: Optional[bool] = REQ(json_validator=check_bool, default=None),
enable_sounds: Optional[bool] = REQ(json_validator=check_bool, default=None),
enable_offline_email_notifications: Optional[bool] = REQ(
json_validator=check_bool, default=None
),
enable_offline_push_notifications: Optional[bool] = REQ(
json_validator=check_bool, default=None
),
enable_online_push_notifications: Optional[bool] = REQ(json_validator=check_bool, default=None),
enable_digest_emails: Optional[bool] = REQ(json_validator=check_bool, default=None),
enable_login_emails: Optional[bool] = REQ(json_validator=check_bool, default=None),
enable_marketing_emails: Optional[bool] = REQ(json_validator=check_bool, default=None),
message_content_in_email_notifications: Optional[bool] = REQ(
json_validator=check_bool, default=None
),
pm_content_in_desktop_notifications: Optional[bool] = REQ(
json_validator=check_bool, default=None
),
desktop_icon_count_display: Optional[int] = REQ(
json_validator=check_int_in(UserProfile.DESKTOP_ICON_COUNT_DISPLAY_CHOICES), default=None
),
realm_name_in_notifications: Optional[bool] = REQ(json_validator=check_bool, default=None),
presence_enabled: Optional[bool] = REQ(json_validator=check_bool, default=None),
enter_sends: Optional[bool] = REQ(json_validator=check_bool, default=None),
send_private_typing_notifications: Optional[bool] = REQ(
json_validator=check_bool, default=None
),
send_stream_typing_notifications: Optional[bool] = REQ(json_validator=check_bool, default=None),
send_read_receipts: Optional[bool] = REQ(json_validator=check_bool, default=None),
) -> HttpResponse:
if (
default_language is not None
or notification_sound is not None
or email_notifications_batching_period_seconds is not None
):
check_settings_values(
notification_sound, email_notifications_batching_period_seconds, default_language
)
if new_password is not None:
python: Convert assignment type annotations to Python 3.6 style. This commit was split by tabbott; this piece covers the vast majority of files in Zulip, but excludes scripts/, tools/, and puppet/ to help ensure we at least show the right error messages for Xenial systems. We can likely further refine the remaining pieces with some testing. Generated by com2ann, with whitespace fixes and various manual fixes for runtime issues: - invoiced_through: Optional[LicenseLedger] = models.ForeignKey( + invoiced_through: Optional["LicenseLedger"] = models.ForeignKey( -_apns_client: Optional[APNsClient] = None +_apns_client: Optional["APNsClient"] = None - notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE) - signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE) + notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE) + signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE) - author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE) + author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE) - bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL) + bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL) - default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE) - default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE) + default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE) + default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE) -descriptors_by_handler_id: Dict[int, ClientDescriptor] = {} +descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {} -worker_classes: Dict[str, Type[QueueProcessingWorker]] = {} -queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {} +worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {} +queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {} -AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None +AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
return_data: Dict[str, Any] = {}
if email_belongs_to_ldap(user_profile.realm, user_profile.delivery_email):
raise JsonableError(_("Your Zulip password is managed in LDAP"))
try:
if not authenticate(
request,
username=user_profile.delivery_email,
password=old_password,
realm=user_profile.realm,
return_data=return_data,
):
raise JsonableError(_("Wrong password!"))
except RateLimited as e:
assert e.secs_to_freedom is not None
secs_to_freedom = int(e.secs_to_freedom)
raise JsonableError(
_("You're making too many attempts! Try again in {} seconds.").format(
secs_to_freedom
),
)
if not check_password_strength(new_password):
raise JsonableError(_("New password is too weak!"))
do_change_password(user_profile, new_password)
# Password changes invalidates sessions, see
# https://docs.djangoproject.com/en/3.2/topics/auth/default/#session-invalidation-on-password-change
# for details. To avoid this logging the user out of their own
# session (which would provide a confusing UX at best), we
# update the session hash here.
update_session_auth_hash(request, user_profile)
# We also save the session to the DB immediately to mitigate
# race conditions. In theory, there is still a race condition
# and to completely avoid it we will have to use some kind of
# mutex lock in `django.contrib.auth.get_user` where session
# is verified. To make that lock work we will have to control
# the AuthenticationMiddleware which is currently controlled
# by Django,
request.session.save()
python: Convert assignment type annotations to Python 3.6 style. This commit was split by tabbott; this piece covers the vast majority of files in Zulip, but excludes scripts/, tools/, and puppet/ to help ensure we at least show the right error messages for Xenial systems. We can likely further refine the remaining pieces with some testing. Generated by com2ann, with whitespace fixes and various manual fixes for runtime issues: - invoiced_through: Optional[LicenseLedger] = models.ForeignKey( + invoiced_through: Optional["LicenseLedger"] = models.ForeignKey( -_apns_client: Optional[APNsClient] = None +_apns_client: Optional["APNsClient"] = None - notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE) - signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE) + notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE) + signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE) - author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE) + author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE) - bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL) + bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL) - default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE) - default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE) + default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE) + default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE) -descriptors_by_handler_id: Dict[int, ClientDescriptor] = {} +descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {} -worker_classes: Dict[str, Type[QueueProcessingWorker]] = {} -queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {} +worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {} +queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {} -AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None +AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
result: Dict[str, Any] = {}
if email is not None:
new_email = email.strip()
if user_profile.delivery_email != new_email:
if user_profile.realm.email_changes_disabled and not user_profile.is_realm_admin:
raise JsonableError(_("Email address changes are disabled in this organization."))
error = validate_email_is_valid(
new_email,
get_realm_email_validator(user_profile.realm),
)
if error:
raise JsonableError(error)
try:
validate_email_not_already_in_realm(
user_profile.realm,
new_email,
verbose=False,
)
except ValidationError as e:
raise JsonableError(e.message)
ratelimited, time_until_free = RateLimitedUser(
user_profile, domain="email_change_by_user"
).rate_limit()
if ratelimited:
raise RateLimited(time_until_free)
do_start_email_change_process(user_profile, new_email)
if full_name is not None and user_profile.full_name != full_name:
if name_changes_disabled(user_profile.realm) and not user_profile.is_realm_admin:
# Failingly silently is fine -- they can't do it through the UI, so
# they'd have to be trying to break the rules.
pass
else:
# Note that check_change_full_name strips the passed name automatically
check_change_full_name(user_profile, full_name, user_profile)
# Loop over user_profile.property_types
request_settings = {k: v for k, v in list(locals().items()) if k in user_profile.property_types}
for k, v in list(request_settings.items()):
if v is not None and getattr(user_profile, k) != v:
do_change_user_setting(user_profile, k, v, acting_user=user_profile)
if timezone is not None and user_profile.timezone != timezone:
do_change_user_setting(user_profile, "timezone", timezone, acting_user=user_profile)
# TODO: Do this more generally.
from zerver.lib.request import RequestNotes
request_notes = RequestNotes.get_notes(request)
for req_var in request.POST:
if req_var not in request_notes.processed_parameters:
request_notes.ignored_parameters.add(req_var)
if len(request_notes.ignored_parameters) > 0:
result["ignored_parameters_unsupported"] = list(request_notes.ignored_parameters)
return json_success(request, data=result)
def set_avatar_backend(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
if len(request.FILES) != 1:
raise JsonableError(_("You must upload exactly one avatar."))
if avatar_changes_disabled(user_profile.realm) and not user_profile.is_realm_admin:
raise JsonableError(str(AVATAR_CHANGES_DISABLED_ERROR))
user_file = list(request.FILES.values())[0]
assert isinstance(user_file, UploadedFile)
assert user_file.size is not None
if (settings.MAX_AVATAR_FILE_SIZE_MIB * 1024 * 1024) < user_file.size:
raise JsonableError(
_("Uploaded file is larger than the allowed limit of {} MiB").format(
settings.MAX_AVATAR_FILE_SIZE_MIB,
)
)
upload_avatar_image(user_file, user_profile, user_profile)
do_change_avatar_fields(user_profile, UserProfile.AVATAR_FROM_USER, acting_user=user_profile)
user_avatar_url = avatar_url(user_profile)
json_result = dict(
avatar_url=user_avatar_url,
)
return json_success(request, data=json_result)
def delete_avatar_backend(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
if avatar_changes_disabled(user_profile.realm) and not user_profile.is_realm_admin:
raise JsonableError(str(AVATAR_CHANGES_DISABLED_ERROR))
do_change_avatar_fields(
user_profile, UserProfile.AVATAR_FROM_GRAVATAR, acting_user=user_profile
)
gravatar_url = avatar_url(user_profile)
json_result = dict(
avatar_url=gravatar_url,
)
return json_success(request, data=json_result)
# We don't use @human_users_only here, because there are use cases for
# a bot regenerating its own API key.
@has_request_variables
def regenerate_api_key(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
new_api_key = do_regenerate_api_key(user_profile, user_profile)
json_result = dict(
api_key=new_api_key,
)
return json_success(request, data=json_result)