from datetime import timedelta from email.headerregistry import Address import time_machine from django.conf import settings from django.core import mail from django.utils.html import escape from django.utils.timezone import now from confirmation.models import ( Confirmation, confirmation_url, create_confirmation_link, generate_key, ) from zerver.actions.create_user import do_reactivate_user from zerver.actions.realm_settings import do_deactivate_realm, do_set_realm_property from zerver.actions.user_settings import do_change_user_setting, do_start_email_change_process from zerver.actions.users import do_deactivate_user from zerver.lib.test_classes import ZulipTestCase from zerver.models import EmailChangeStatus, UserProfile from zerver.models.realms import get_realm from zerver.models.users import get_user, get_user_by_delivery_email, get_user_profile_by_id class EmailChangeTestCase(ZulipTestCase): def generate_email_change_link(self, new_email: str) -> str: data = {"email": new_email} url = "/json/settings" self.assert_length(mail.outbox, 0) result = self.client_patch(url, data) self.assert_length(mail.outbox, 1) self.assert_json_success(result) email_message = mail.outbox[0] self.assertEqual( email_message.subject, "Verify your new email address for zulip.testserver", ) body = email_message.body self.assertIn("We received a request to change the email", body) mail.outbox.pop() activation_url = [s for s in body.split("\n") if s][2] return activation_url def test_confirm_email_change_with_non_existent_key(self) -> None: self.login("hamlet") key = generate_key() url = confirmation_url(key, None, Confirmation.EMAIL_CHANGE) response = self.client_get(url) self.assertEqual(response.status_code, 404) self.assert_in_response( "Whoops. We couldn't find your confirmation link in the system.", response ) def test_confirm_email_change_with_invalid_key(self) -> None: self.login("hamlet") key = "invalid_key" url = confirmation_url(key, None, Confirmation.EMAIL_CHANGE) response = self.client_get(url) self.assertEqual(response.status_code, 404) self.assert_in_response("Whoops. The confirmation link is malformed.", response) def test_confirm_email_change_when_time_exceeded(self) -> None: user_profile = self.example_user("hamlet") old_email = user_profile.email new_email = "hamlet-new@zulip.com" self.login("hamlet") obj = EmailChangeStatus.objects.create( new_email=new_email, old_email=old_email, user_profile=user_profile, realm=user_profile.realm, ) date_sent = now() - timedelta(days=2) with time_machine.travel(date_sent, tick=False): url = create_confirmation_link(obj, Confirmation.EMAIL_CHANGE) response = self.client_get(url) self.assertEqual(response.status_code, 404) self.assert_in_response("The confirmation link has expired or been deactivated.", response) def test_confirm_email_change(self) -> None: user_profile = self.example_user("hamlet") old_email = user_profile.delivery_email new_email = '"
  • hamlet-new
  • "@zulip.com' new_email_address = Address(addr_spec=new_email) new_realm = get_realm("zulip") self.login("hamlet") obj = EmailChangeStatus.objects.create( new_email=new_email, old_email=old_email, user_profile=user_profile, realm=user_profile.realm, ) url = create_confirmation_link(obj, Confirmation.EMAIL_CHANGE) response = self.client_get(url) self.assertEqual(response.status_code, 200) self.assert_in_success_response( [ "This confirms that the email address for your Zulip", f'{escape(new_email_address.username)}@{escape(new_email_address.domain)}', ], response, ) user_profile = get_user_by_delivery_email(new_email, new_realm) self.assertTrue(bool(user_profile)) obj.refresh_from_db() self.assertEqual(obj.status, 1) def test_change_email_link_cant_be_reused(self) -> None: new_email = "hamlet-new@zulip.com" user_profile = self.example_user("hamlet") self.login_user(user_profile) activation_url = self.generate_email_change_link(new_email) response = self.client_get(activation_url) self.assertEqual(response.status_code, 200) user_profile.refresh_from_db() self.assertEqual(user_profile.delivery_email, new_email) response = self.client_get(activation_url) self.assertEqual(response.status_code, 404) def test_change_email_revokes(self) -> None: user_profile = self.example_user("hamlet") self.login_user(user_profile) old_email = user_profile.delivery_email first_email = "hamlet-newer@zulip.com" first_url = self.generate_email_change_link(first_email) second_email = "hamlet-newest@zulip.com" second_url = self.generate_email_change_link(second_email) response = self.client_get(first_url) self.assertEqual(response.status_code, 404) user_profile.refresh_from_db() self.assertEqual(user_profile.delivery_email, old_email) response = self.client_get(second_url) self.assertEqual(response.status_code, 200) user_profile.refresh_from_db() self.assertEqual(user_profile.delivery_email, second_email) def test_change_email_deactivated_user_realm(self) -> None: new_email = "hamlet-new@zulip.com" user_profile = self.example_user("hamlet") self.login_user(user_profile) activation_url = self.generate_email_change_link(new_email) do_deactivate_user(user_profile, acting_user=None) response = self.client_get(activation_url) self.assertEqual(response.status_code, 401) do_reactivate_user(user_profile, acting_user=None) self.login_user(user_profile) activation_url = self.generate_email_change_link(new_email) do_deactivate_realm( user_profile.realm, acting_user=None, deactivation_reason="owner_request", email_owners=False, ) response = self.client_get(activation_url) self.assertEqual(response.status_code, 302) self.assertTrue(response["Location"].endswith("/accounts/deactivated/")) def test_start_email_change_process(self) -> None: user_profile = self.example_user("hamlet") do_start_email_change_process(user_profile, "hamlet-new@zulip.com") self.assertEqual(EmailChangeStatus.objects.count(), 1) def test_end_to_end_flow(self) -> None: data = {"email": "hamlet-new@zulip.com"} self.login("hamlet") url = "/json/settings" self.assert_length(mail.outbox, 0) result = self.client_patch(url, data) self.assert_json_success(result) self.assert_length(mail.outbox, 1) email_message = mail.outbox[0] self.assertEqual( email_message.subject, "Verify your new email address for zulip.testserver", ) body = email_message.body self.assertIn("We received a request to change the email", body) self.assertEqual(self.email_envelope_from(email_message), settings.NOREPLY_EMAIL_ADDRESS) self.assertRegex( self.email_display_from(email_message), rf"^testserver account security <{self.TOKENIZED_NOREPLY_REGEX}>\Z", ) self.assertEqual(email_message.extra_headers["List-Id"], "Zulip Dev ") activation_url = [s for s in body.split("\n") if s][2] response = self.client_get(activation_url) self.assert_in_success_response(["This confirms that the email address"], response) # Now confirm trying to change your email back doesn't throw an immediate error result = self.client_patch(url, {"email": "hamlet@zulip.com"}) self.assert_json_success(result) def test_unauthorized_email_change(self) -> None: data = {"email": "hamlet-new@zulip.com"} user_profile = self.example_user("hamlet") self.login_user(user_profile) do_set_realm_property( user_profile.realm, "email_changes_disabled", True, acting_user=None, ) url = "/json/settings" result = self.client_patch(url, data) self.assert_length(mail.outbox, 0) self.assertEqual(result.status_code, 400) self.assert_in_response("Email address changes are disabled in this organization.", result) # Realm admins can change their email address even setting is disabled. data = {"email": "iago-new@zulip.com"} self.login("iago") url = "/json/settings" result = self.client_patch(url, data) self.assert_json_success(result) def test_email_change_already_taken(self) -> None: data = {"email": "cordelia@zulip.com"} user_profile = self.example_user("hamlet") self.login_user(user_profile) url = "/json/settings" result = self.client_patch(url, data) self.assert_length(mail.outbox, 0) self.assertEqual(result.status_code, 400) self.assert_in_response("Already has an account", result) def test_email_change_already_taken_later(self) -> None: conflict_email = "conflict@zulip.com" hamlet = self.example_user("hamlet") cordelia = self.example_user("cordelia") self.login_user(hamlet) hamlet_url = self.generate_email_change_link(conflict_email) self.logout() self.login_user(cordelia) cordelia_url = self.generate_email_change_link(conflict_email) response = self.client_get(cordelia_url) self.assertEqual(response.status_code, 200) cordelia.refresh_from_db() self.assertEqual(cordelia.delivery_email, conflict_email) self.logout() self.login_user(hamlet) response = self.client_get(hamlet_url) self.assertEqual(response.status_code, 400) self.assert_in_response("Already has an account", response) def test_change_email_to_disposable_email(self) -> None: hamlet = self.example_user("hamlet") # Make the realm allow permissive email changes, create a # link, and then lock the permissions down -- the change # should not be allowed when the link is followed. realm = hamlet.realm realm.disallow_disposable_email_addresses = False realm.emails_restricted_to_domains = False realm.save() self.login_user(hamlet) confirmation_url = self.generate_email_change_link("hamlet@mailnator.com") realm.disallow_disposable_email_addresses = True realm.save() response = self.client_get(confirmation_url) self.assertEqual(response.status_code, 400) self.assert_in_response("Please use your real email address.", response) def test_unauthorized_email_change_from_email_confirmation_link(self) -> None: new_email = "hamlet-new@zulip.com" user_profile = self.example_user("hamlet") self.login_user(user_profile) activation_url = self.generate_email_change_link(new_email) do_set_realm_property( user_profile.realm, "email_changes_disabled", True, acting_user=None, ) response = self.client_get(activation_url) self.assertEqual(response.status_code, 400) self.assert_in_response( "Email address changes are disabled in this organization.", response ) def test_post_invalid_email(self) -> None: invalid_emails = ["", "hamlet-new"] for email in invalid_emails: data = {"email": email} self.login("hamlet") url = "/json/settings" result = self.client_patch(url, data) self.assert_in_response("Invalid address", result) def test_post_same_email(self) -> None: data = {"email": self.example_email("hamlet")} self.login("hamlet") url = "/json/settings" result = self.client_patch(url, data) response_dict = self.assert_json_success(result) self.assertEqual("success", response_dict["result"]) self.assertEqual("", response_dict["msg"]) def test_change_delivery_email_end_to_end_with_admins_visibility(self) -> None: user_profile = self.example_user("hamlet") do_change_user_setting( user_profile, "email_address_visibility", UserProfile.EMAIL_ADDRESS_VISIBILITY_ADMINS, acting_user=None, ) self.login_user(user_profile) old_email = user_profile.delivery_email new_email = "hamlet-new@zulip.com" obj = EmailChangeStatus.objects.create( new_email=new_email, old_email=old_email, user_profile=user_profile, realm=user_profile.realm, ) url = create_confirmation_link(obj, Confirmation.EMAIL_CHANGE) response = self.client_get(url) self.assertEqual(response.status_code, 200) self.assert_in_success_response( ["This confirms that the email address for your Zulip"], response ) user_profile = get_user_profile_by_id(user_profile.id) self.assertEqual(user_profile.delivery_email, new_email) self.assertEqual(user_profile.email, f"user{user_profile.id}@zulip.testserver") obj.refresh_from_db() self.assertEqual(obj.status, 1) with self.assertRaises(UserProfile.DoesNotExist): get_user(old_email, user_profile.realm) with self.assertRaises(UserProfile.DoesNotExist): get_user_by_delivery_email(old_email, user_profile.realm) self.assertEqual(get_user_by_delivery_email(new_email, user_profile.realm), user_profile) def test_configure_demo_organization_owner_email(self) -> None: desdemona = self.example_user("desdemona") desdemona.realm.demo_organization_scheduled_deletion_date = now() + timedelta(days=30) desdemona.realm.save() assert desdemona.realm.demo_organization_scheduled_deletion_date is not None self.login("desdemona") desdemona.delivery_email = "" desdemona.save() self.assertEqual(desdemona.delivery_email, "") data = {"email": "desdemona-new@zulip.com"} url = "/json/settings" self.assert_length(mail.outbox, 0) result = self.client_patch(url, data) self.assert_json_success(result) self.assert_length(mail.outbox, 1) email_message = mail.outbox[0] self.assertEqual( email_message.subject, "Verify your new email address for your demo Zulip organization", ) body = email_message.body self.assertIn( "We received a request to add the email address", body, ) self.assertEqual(self.email_envelope_from(email_message), settings.NOREPLY_EMAIL_ADDRESS) self.assertRegex( self.email_display_from(email_message), rf"^testserver account security <{self.TOKENIZED_NOREPLY_REGEX}>\Z", ) self.assertEqual(email_message.extra_headers["List-Id"], "Zulip Dev ") confirmation_url = [s for s in body.split("\n") if s][2] response = self.client_get(confirmation_url, follow=True) self.assertEqual(response.status_code, 200) self.assert_in_success_response(["Set a new password"], response) user_profile = get_user_profile_by_id(desdemona.id) self.assertEqual(user_profile.delivery_email, "desdemona-new@zulip.com")