# -*- coding: utf-8 -*- import datetime from django.conf import settings from django.contrib.contenttypes.models import ContentType from django.contrib.sites.models import Site from django.http import HttpResponse from django.test import TestCase, override_settings from django.utils.timezone import now as timezone_now from mock import patch, MagicMock from zerver.lib.test_helpers import MockLDAP from confirmation.models import Confirmation, create_confirmation_link, MultiuseInvite, \ generate_key, confirmation_url from zerver.forms import HomepageForm, WRONG_SUBDOMAIN_ERROR from zerver.lib.actions import do_change_password, gather_subscriptions from zerver.views.auth import login_or_register_remote_user from zerver.views.invite import get_invitee_emails_set from zerver.views.registration import confirmation_key, \ redirect_and_log_into_subdomain, send_registration_completion_email from zerver.models import ( get_realm, get_prereg_user_by_email, get_user, get_recipient, PreregistrationUser, Realm, RealmDomain, Recipient, Message, ScheduledEmail, UserProfile, UserMessage, Stream, Subscription, flush_per_request_caches ) from zerver.lib.actions import ( set_default_streams, do_change_is_admin, get_stream, do_create_realm, ) from zerver.lib.send_email import send_email, send_future_email, FromAddress from zerver.lib.initial_password import initial_password from zerver.lib.actions import ( do_deactivate_realm, do_deactivate_user, do_set_realm_property, add_new_user_history, ) from zerver.lib.mobile_auth_otp import xor_hex_strings, ascii_to_hex, \ otp_encrypt_api_key, is_valid_otp, hex_to_ascii, otp_decrypt_api_key from zerver.lib.notifications import enqueue_welcome_emails, \ one_click_unsubscribe_link from zerver.lib.subdomains import is_root_domain_available from zerver.lib.test_helpers import find_pattern_in_email, find_key_by_email, queries_captured, \ HostRequestMock, unsign_subdomain_cookie from zerver.lib.test_classes import ( ZulipTestCase, ) from zerver.lib.test_runner import slow from zerver.lib.sessions import get_session_dict_user from zerver.context_processors import common_context from collections import defaultdict import re import smtplib import ujson from typing import Any, Dict, List, Optional, Set, Text from six.moves import urllib, range, zip import os class RedirectAndLogIntoSubdomainTestCase(ZulipTestCase): def test_cookie_data(self): # type: () -> None realm = Realm.objects.all().first() name = 'Hamlet' email = self.example_email("hamlet") response = redirect_and_log_into_subdomain(realm, name, email) data = unsign_subdomain_cookie(response) self.assertDictEqual(data, {'name': name, 'email': email, 'subdomain': realm.subdomain, 'is_signup': False}) response = redirect_and_log_into_subdomain(realm, name, email, is_signup=True) data = unsign_subdomain_cookie(response) self.assertDictEqual(data, {'name': name, 'email': email, 'subdomain': realm.subdomain, 'is_signup': True}) class DeactivationNoticeTestCase(ZulipTestCase): def test_redirection_for_deactivated_realm(self): # type: () -> None realm = get_realm("zulip") realm.deactivated = True realm.save(update_fields=["deactivated"]) for url in ('/register/', '/login/'): result = self.client_get(url) self.assertEqual(result.status_code, 302) self.assertIn('deactivated', result.url) def test_redirection_for_active_realm(self): # type: () -> None for url in ('/register/', '/login/'): result = self.client_get(url) self.assertEqual(result.status_code, 200) def test_deactivation_notice_when_realm_is_active(self): # type: () -> None result = self.client_get('/accounts/deactivated/') self.assertEqual(result.status_code, 302) self.assertIn('login', result.url) def test_deactivation_notice_when_deactivated(self): # type: () -> None realm = get_realm("zulip") realm.deactivated = True realm.save(update_fields=["deactivated"]) result = self.client_get('/accounts/deactivated/') self.assertIn("Zulip Dev, has been deactivated.", result.content.decode()) class AddNewUserHistoryTest(ZulipTestCase): def test_add_new_user_history_race(self): # type: () -> None """Sends a message during user creation""" # Create a user who hasn't had historical messages added stream_dict = { "Denmark": {"description": "A Scandinavian country", "invite_only": False}, "Verona": {"description": "A city in Italy", "invite_only": False} } # type: Dict[Text, Dict[Text, Any]] realm = get_realm('zulip') set_default_streams(realm, stream_dict) with patch("zerver.lib.actions.add_new_user_history"): self.register(self.nonreg_email('test'), "test") user_profile = self.nonreg_user('test') subs = Subscription.objects.select_related("recipient").filter( user_profile=user_profile, recipient__type=Recipient.STREAM) streams = Stream.objects.filter(id__in=[sub.recipient.type_id for sub in subs]) self.send_message(self.example_email('hamlet'), streams[0].name, Recipient.STREAM, "test") add_new_user_history(user_profile, streams) class PasswordResetTest(ZulipTestCase): """ Log in, reset password, log out, log in with new password. """ def test_password_reset(self): # type: () -> None email = self.example_email("hamlet") old_password = initial_password(email) self.login(email) # test password reset template result = self.client_get('/accounts/password/reset/') self.assert_in_response('Reset your password', result) # start the password reset process by supplying an email address result = self.client_post('/accounts/password/reset/', {'email': email}) # check the redirect link telling you to check mail for password reset link self.assertEqual(result.status_code, 302) self.assertTrue(result["Location"].endswith( "/accounts/password/reset/done/")) result = self.client_get(result["Location"]) self.assert_in_response("Check your email to finish the process.", result) # Check that the password reset email is from a noreply address. from django.core.mail import outbox from_email = outbox[0].from_email self.assertIn("Zulip Account Security", from_email) self.assertIn(FromAddress.NOREPLY, from_email) # Visit the password reset link. password_reset_url = self.get_confirmation_url_from_outbox(email, "(\S+)") result = self.client_get(password_reset_url) self.assertEqual(result.status_code, 200) # Reset your password result = self.client_post(password_reset_url, {'new_password1': 'new_password', 'new_password2': 'new_password'}) # password reset succeeded self.assertEqual(result.status_code, 302) self.assertTrue(result["Location"].endswith("/password/done/")) # log back in with new password self.login(email, password='new_password') user_profile = self.example_user('hamlet') self.assertEqual(get_session_dict_user(self.client.session), user_profile.id) # make sure old password no longer works self.login(email, password=old_password, fails=True) def test_invalid_subdomain(self): # type: () -> None email = self.example_email("hamlet") string_id = 'hamlet' name = 'Hamlet' do_create_realm( string_id, name, restricted_to_domain=False, invite_required=False ) with patch('zerver.forms.get_subdomain', return_value=string_id): # start the password reset process by supplying an email address result = self.client_post( '/accounts/password/reset/', {'email': email}) # check the redirect link telling you to check mail for password reset link self.assertEqual(result.status_code, 302) self.assertTrue(result["Location"].endswith( "/accounts/password/reset/done/")) result = self.client_get(result["Location"]) self.assert_in_response("Check your email to finish the process.", result) from django.core.mail import outbox self.assertEqual(len(outbox), 1) message = outbox.pop() self.assertIn(FromAddress.NOREPLY, message.from_email) self.assertIn("hamlet@zulip.com does not\nhave an active account in http://", message.body) def test_correct_subdomain(self): # type: () -> None email = self.example_email("hamlet") string_id = 'zulip' with patch('zerver.forms.get_subdomain', return_value=string_id): # start the password reset process by supplying an email address result = self.client_post( '/accounts/password/reset/', {'email': email}) # check the redirect link telling you to check mail for password reset link self.assertEqual(result.status_code, 302) self.assertTrue(result["Location"].endswith( "/accounts/password/reset/done/")) result = self.client_get(result["Location"]) self.assert_in_response("Check your email to finish the process.", result) from django.core.mail import outbox self.assertEqual(len(outbox), 1) message = outbox.pop() self.assertIn("Zulip Account Security", message.from_email) self.assertIn(FromAddress.NOREPLY, message.from_email) self.assertIn("Psst. Word on the street is that you", message.body) def test_redirect_endpoints(self): # type: () -> None ''' These tests are mostly designed to give us 100% URL coverage in our URL coverage reports. Our mechanism for finding URL coverage doesn't handle redirects, so we just have a few quick tests here. ''' result = self.client_get('/accounts/password/reset/done/') self.assert_in_success_response(["Check your email"], result) result = self.client_get('/accounts/password/done/') self.assert_in_success_response(["We've reset your password!"], result) result = self.client_get('/accounts/send_confirm/alice@example.com') self.assert_in_success_response(["Still no email?"], result) class LoginTest(ZulipTestCase): """ Logging in, registration, and logging out. """ def test_login(self): # type: () -> None self.login(self.example_email("hamlet")) user_profile = self.example_user('hamlet') self.assertEqual(get_session_dict_user(self.client.session), user_profile.id) def test_login_bad_password(self): # type: () -> None self.login(self.example_email("hamlet"), password="wrongpassword", fails=True) self.assertIsNone(get_session_dict_user(self.client.session)) def test_login_nonexist_user(self): # type: () -> None result = self.login_with_return("xxx@zulip.com", "xxx") self.assert_in_response("Please enter a correct email and password", result) def test_register(self): # type: () -> None realm = get_realm("zulip") stream_dict = {"stream_"+str(i): {"description": "stream_%s_description" % i, "invite_only": False} for i in range(40)} # type: Dict[Text, Dict[Text, Any]] for stream_name in stream_dict.keys(): self.make_stream(stream_name, realm=realm) set_default_streams(realm, stream_dict) # Clear all the caches. flush_per_request_caches() ContentType.objects.clear_cache() Site.objects.clear_cache() with queries_captured() as queries: self.register(self.nonreg_email('test'), "test") # Ensure the number of queries we make is not O(streams) self.assert_length(queries, 68) user_profile = self.nonreg_user('test') self.assertEqual(get_session_dict_user(self.client.session), user_profile.id) self.assertFalse(user_profile.enable_stream_desktop_notifications) def test_register_deactivated(self): # type: () -> None """ If you try to register for a deactivated realm, you get a clear error page. """ realm = get_realm("zulip") realm.deactivated = True realm.save(update_fields=["deactivated"]) result = self.client_post('/accounts/home/', {'email': self.nonreg_email('test')}, subdomain="zulip") self.assertEqual(result.status_code, 302) self.assertEqual('/accounts/deactivated/', result.url) with self.assertRaises(UserProfile.DoesNotExist): self.nonreg_user('test') def test_register_deactivated_partway_through(self): # type: () -> None """ If you try to register for a deactivated realm, you get a clear error page. """ email = self.nonreg_email('test') result = self.client_post('/accounts/home/', {'email': email}, subdomain="zulip") self.assertEqual(result.status_code, 302) print(result.url) self.assertNotIn('deactivated', result.url) realm = get_realm("zulip") realm.deactivated = True realm.save(update_fields=["deactivated"]) result = self.submit_reg_form_for_user(email, "abcd1234", subdomain="zulip") self.assertEqual(result.status_code, 302) self.assertEqual('/accounts/deactivated/', result.url) with self.assertRaises(UserProfile.DoesNotExist): self.nonreg_user('test') def test_login_deactivated(self): # type: () -> None """ If you try to log in to a deactivated realm, you get a clear error page. """ realm = get_realm("zulip") realm.deactivated = True realm.save(update_fields=["deactivated"]) result = self.login_with_return(self.example_email("hamlet"), subdomain="zulip") self.assertEqual(result.status_code, 302) self.assertEqual('/accounts/deactivated/', result.url) def test_logout(self): # type: () -> None self.login(self.example_email("hamlet")) # We use the logout API, not self.logout, to make sure we test # the actual logout code path. self.client_post('/accounts/logout/') self.assertIsNone(get_session_dict_user(self.client.session)) def test_non_ascii_login(self): # type: () -> None """ You can log in even if your password contain non-ASCII characters. """ email = self.nonreg_email('test') password = u"hümbüǵ" # Registering succeeds. self.register(email, password) user_profile = self.nonreg_user('test') self.assertEqual(get_session_dict_user(self.client.session), user_profile.id) self.logout() self.assertIsNone(get_session_dict_user(self.client.session)) # Logging in succeeds. self.logout() self.login(email, password) self.assertEqual(get_session_dict_user(self.client.session), user_profile.id) def test_login_page_redirects_logged_in_user(self): # type: () -> None """You will be redirected to the app's main page if you land on the login page when already logged in. """ self.login(self.example_email("cordelia")) response = self.client_get("/login/") self.assertEqual(response["Location"], "http://zulip.testserver") class InviteUserTest(ZulipTestCase): def invite(self, users, streams, body=''): # type: (Text, List[Text], str) -> HttpResponse """ Invites the specified users to Zulip with the specified streams. users should be a string containing the users to invite, comma or newline separated. streams should be a list of strings. """ return self.client_post("/json/invites", {"invitee_emails": users, "stream": streams, "custom_body": body}) def check_sent_emails(self, correct_recipients, custom_body=None, custom_from_name=None): # type: (List[Text], Optional[str], Optional[str]) -> None from django.core.mail import outbox self.assertEqual(len(outbox), len(correct_recipients)) email_recipients = [email.recipients()[0] for email in outbox] self.assertEqual(sorted(email_recipients), sorted(correct_recipients)) if len(outbox) == 0: return if custom_body is None: self.assertNotIn("Message from", outbox[0].body) else: self.assertIn("Message from ", outbox[0].body) self.assertIn(custom_body, outbox[0].body) if custom_from_name is not None: self.assertIn(custom_from_name, outbox[0].from_email) self.assertIn(FromAddress.NOREPLY, outbox[0].from_email) def test_successful_invite_user(self): # type: () -> None """ A call to /json/invites with valid parameters causes an invitation email to be sent. """ self.login(self.example_email("hamlet")) invitee = "alice-test@zulip.com" self.assert_json_success(self.invite(invitee, ["Denmark"])) self.assertTrue(find_key_by_email(invitee)) self.check_sent_emails([invitee], custom_from_name="Hamlet") def test_successful_invite_user_with_custom_body(self): # type: () -> None """ A call to /json/invites with valid parameters causes an invitation email to be sent. """ self.login(self.example_email("hamlet")) invitee = "alice-test@zulip.com" body = "Custom Text." self.assert_json_success(self.invite(invitee, ["Denmark"], body)) self.assertTrue(find_pattern_in_email(invitee, body)) self.check_sent_emails([invitee], custom_body=body, custom_from_name="Hamlet") def test_successful_invite_user_with_name(self): # type: () -> None """ A call to /json/invites with valid parameters causes an invitation email to be sent. """ self.login(self.example_email("hamlet")) email = "alice-test@zulip.com" invitee = "Alice Test <{}>".format(email) self.assert_json_success(self.invite(invitee, ["Denmark"])) self.assertTrue(find_key_by_email(email)) self.check_sent_emails([email], custom_from_name="Hamlet") def test_successful_invite_user_with_name_and_normal_one(self): # type: () -> None """ A call to /json/invites with valid parameters causes an invitation email to be sent. """ self.login(self.example_email("hamlet")) email = "alice-test@zulip.com" email2 = "bob-test@zulip.com" invitee = "Alice Test <{}>, {}".format(email, email2) self.assert_json_success(self.invite(invitee, ["Denmark"])) self.assertTrue(find_key_by_email(email)) self.assertTrue(find_key_by_email(email2)) self.check_sent_emails([email, email2], custom_from_name="Hamlet") def test_require_realm_admin(self): # type: () -> None """ The invite_by_admins_only realm setting works properly. """ realm = get_realm('zulip') realm.invite_by_admins_only = True realm.save() self.login("hamlet@zulip.com") email = "alice-test@zulip.com" email2 = "bob-test@zulip.com" invitee = "Alice Test <{}>, {}".format(email, email2) self.assert_json_error(self.invite(invitee, ["Denmark"]), "Must be a realm administrator") # Now verify an administrator can do it self.login("iago@zulip.com") self.assert_json_success(self.invite(invitee, ["Denmark"])) self.assertTrue(find_key_by_email(email)) self.assertTrue(find_key_by_email(email2)) self.check_sent_emails([email, email2]) def test_successful_invite_user_with_notifications_stream(self): # type: () -> None """ A call to /json/invites with valid parameters unconditionally subscribes the invitee to the notifications stream if it exists and is public. """ realm = get_realm('zulip') notifications_stream = get_stream('Verona', realm) realm.notifications_stream_id = notifications_stream.id realm.save() self.login(self.example_email("hamlet")) invitee = 'alice-test@zulip.com' self.assert_json_success(self.invite(invitee, ['Denmark'])) self.assertTrue(find_key_by_email(invitee)) self.check_sent_emails([invitee]) prereg_user = get_prereg_user_by_email(invitee) stream_ids = [stream.id for stream in prereg_user.streams.all()] self.assertTrue(notifications_stream.id in stream_ids) def test_invite_user_signup_initial_history(self): # type: () -> None """ Test that a new user invited to a stream receives some initial history but only from public streams. """ self.login(self.example_email('hamlet')) user_profile = self.example_user('hamlet') private_stream_name = "Secret" self.make_stream(private_stream_name, invite_only=True) self.subscribe(user_profile, private_stream_name) public_msg_id = self.send_message(self.example_email("hamlet"), "Denmark", Recipient.STREAM, "Public topic", "Public message") secret_msg_id = self.send_message(self.example_email("hamlet"), private_stream_name, Recipient.STREAM, "Secret topic", "Secret message") invitee = self.nonreg_email('alice') self.assert_json_success(self.invite(invitee, [private_stream_name, "Denmark"])) self.assertTrue(find_key_by_email(invitee)) self.submit_reg_form_for_user(invitee, "password") invitee_profile = self.nonreg_user('alice') invitee_msg_ids = [um.message_id for um in UserMessage.objects.filter(user_profile=invitee_profile)] self.assertTrue(public_msg_id in invitee_msg_ids) self.assertFalse(secret_msg_id in invitee_msg_ids) # Test that exactly 2 new Zulip messages were sent, both notifications. last_3_messages = list(reversed(list(Message.objects.all().order_by("-id")[0:3]))) first_msg = last_3_messages[0] self.assertEqual(first_msg.id, secret_msg_id) # The first, from notification-bot to the user who invited the new user. second_msg = last_3_messages[1] self.assertEqual(second_msg.sender.email, "notification-bot@zulip.com") self.assertTrue(second_msg.content.startswith("alice_zulip.com <`alice@zulip.com`> accepted your")) # The second, from welcome-bot to the user who was invited. third_msg = last_3_messages[2] self.assertEqual(third_msg.sender.email, "welcome-bot@zulip.com") self.assertTrue(third_msg.content.startswith("Hello, and welcome to Zulip!")) def test_multi_user_invite(self): # type: () -> None """ Invites multiple users with a variety of delimiters. """ self.login(self.example_email("hamlet")) # Intentionally use a weird string. self.assert_json_success(self.invite( """bob-test@zulip.com, carol-test@zulip.com, dave-test@zulip.com earl-test@zulip.com""", ["Denmark"])) for user in ("bob", "carol", "dave", "earl"): self.assertTrue(find_key_by_email("%s-test@zulip.com" % (user,))) self.check_sent_emails(["bob-test@zulip.com", "carol-test@zulip.com", "dave-test@zulip.com", "earl-test@zulip.com"]) def test_missing_or_invalid_params(self): # type: () -> None """ Tests inviting with various missing or invalid parameters. """ self.login(self.example_email("hamlet")) self.assert_json_error( self.client_post("/json/invites", {"invitee_emails": "foo@zulip.com", "custom_body": ''}), "You must specify at least one stream for invitees to join.") for address in ("noatsign.com", "outsideyourdomain@example.net"): self.assert_json_error( self.invite(address, ["Denmark"]), "Some emails did not validate, so we didn't send any invitations.") self.check_sent_emails([]) self.assert_json_error( self.invite("", ["Denmark"]), "You must specify at least one email address.") self.check_sent_emails([]) def test_invalid_stream(self): # type: () -> None """ Tests inviting to a non-existent stream. """ self.login(self.example_email("hamlet")) self.assert_json_error(self.invite("iago-test@zulip.com", ["NotARealStream"]), "Stream does not exist: NotARealStream. No invites were sent.") self.check_sent_emails([]) def test_invite_existing_user(self): # type: () -> None """ If you invite an address already using Zulip, no invitation is sent. """ self.login(self.example_email("hamlet")) self.assert_json_error( self.client_post("/json/invites", {"invitee_emails": self.example_email("hamlet"), "stream": ["Denmark"], "custom_body": ''}), "We weren't able to invite anyone.") self.assertRaises(PreregistrationUser.DoesNotExist, lambda: PreregistrationUser.objects.get( email=self.example_email("hamlet"))) self.check_sent_emails([]) def test_invite_some_existing_some_new(self): # type: () -> None """ If you invite a mix of already existing and new users, invitations are only sent to the new users. """ self.login(self.example_email("hamlet")) existing = [self.example_email("hamlet"), u"othello@zulip.com"] new = [u"foo-test@zulip.com", u"bar-test@zulip.com"] result = self.client_post("/json/invites", {"invitee_emails": "\n".join(existing + new), "stream": ["Denmark"], "custom_body": ''}) self.assert_json_error(result, "Some of those addresses are already using Zulip, \ so we didn't send them an invitation. We did send invitations to everyone else!") # We only created accounts for the new users. for email in existing: self.assertRaises(PreregistrationUser.DoesNotExist, lambda: PreregistrationUser.objects.get( email=email)) for email in new: self.assertTrue(PreregistrationUser.objects.get(email=email)) # We only sent emails to the new users. self.check_sent_emails(new) prereg_user = get_prereg_user_by_email('foo-test@zulip.com') self.assertEqual(prereg_user.email, 'foo-test@zulip.com') def test_invite_outside_domain_in_closed_realm(self): # type: () -> None """ In a realm with `restricted_to_domain = True`, you can't invite people with a different domain from that of the realm or your e-mail address. """ zulip_realm = get_realm("zulip") zulip_realm.restricted_to_domain = True zulip_realm.save() self.login(self.example_email("hamlet")) external_address = "foo@example.com" self.assert_json_error( self.invite(external_address, ["Denmark"]), "Some emails did not validate, so we didn't send any invitations.") def test_invite_outside_domain_in_open_realm(self): # type: () -> None """ In a realm with `restricted_to_domain = False`, you can invite people with a different domain from that of the realm or your e-mail address. """ zulip_realm = get_realm("zulip") zulip_realm.restricted_to_domain = False zulip_realm.save() self.login(self.example_email("hamlet")) external_address = "foo@example.com" self.assert_json_success(self.invite(external_address, ["Denmark"])) self.check_sent_emails([external_address]) def test_invite_outside_domain_before_closing(self): # type: () -> None """ If you invite someone with a different domain from that of the realm when `restricted_to_domain = False`, but `restricted_to_domain` later changes to true, the invitation should succeed but the invitee's signup attempt should fail. """ zulip_realm = get_realm("zulip") zulip_realm.restricted_to_domain = False zulip_realm.save() self.login(self.example_email("hamlet")) external_address = "foo@example.com" self.assert_json_success(self.invite(external_address, ["Denmark"])) self.check_sent_emails([external_address]) zulip_realm.restricted_to_domain = True zulip_realm.save() result = self.submit_reg_form_for_user("foo@example.com", "password") self.assertEqual(result.status_code, 200) self.assert_in_response("only allows users with e-mail", result) def test_invite_with_non_ascii_streams(self): # type: () -> None """ Inviting someone to streams with non-ASCII characters succeeds. """ self.login(self.example_email("hamlet")) invitee = "alice-test@zulip.com" stream_name = u"hümbüǵ" # Make sure we're subscribed before inviting someone. self.subscribe(self.example_user("hamlet"), stream_name) self.assert_json_success(self.invite(invitee, [stream_name])) def test_invitation_reminder_email(self): # type: () -> None from django.core.mail import outbox # All users belong to zulip realm referrer_user = 'hamlet' current_user_email = self.example_email(referrer_user) self.login(current_user_email) invitee_email = self.nonreg_email('alice') self.assert_json_success(self.invite(invitee_email, ["Denmark"])) self.assertTrue(find_key_by_email(invitee_email)) self.check_sent_emails([invitee_email]) data = {"email": invitee_email, "referrer_email": current_user_email} invitee = get_prereg_user_by_email(data["email"]) referrer = self.example_user(referrer_user) link = create_confirmation_link(invitee, referrer.realm.host, Confirmation.INVITATION) context = common_context(referrer) context.update({ 'activate_url': link, 'referrer_name': referrer.full_name, 'referrer_email': referrer.email, 'referrer_realm_name': referrer.realm.name, }) with self.settings(EMAIL_BACKEND='django.core.mail.backends.console.EmailBackend'): send_future_email( "zerver/emails/invitation_reminder", to_email=data["email"], from_address=FromAddress.NOREPLY, context=context) email_jobs_to_deliver = ScheduledEmail.objects.filter( scheduled_timestamp__lte=timezone_now()) self.assertEqual(len(email_jobs_to_deliver), 1) email_count = len(outbox) for job in email_jobs_to_deliver: send_email(**ujson.loads(job.data)) self.assertEqual(len(outbox), email_count + 1) self.assertIn(FromAddress.NOREPLY, outbox[-1].from_email) # Now verify that signing up clears invite_reminder emails email_jobs_to_deliver = ScheduledEmail.objects.filter( scheduled_timestamp__lte=timezone_now()) self.assertEqual(len(email_jobs_to_deliver), 1) self.register(invitee_email, "test") email_jobs_to_deliver = ScheduledEmail.objects.filter( scheduled_timestamp__lte=timezone_now()) self.assertEqual(len(email_jobs_to_deliver), 0) class InviteeEmailsParserTests(TestCase): def setUp(self): # type: () -> None self.email1 = "email1@zulip.com" self.email2 = "email2@zulip.com" self.email3 = "email3@zulip.com" def test_if_emails_separated_by_commas_are_parsed_and_striped_correctly(self): # type: () -> None emails_raw = "{} ,{}, {}".format(self.email1, self.email2, self.email3) expected_set = {self.email1, self.email2, self.email3} self.assertEqual(get_invitee_emails_set(emails_raw), expected_set) def test_if_emails_separated_by_newlines_are_parsed_and_striped_correctly(self): # type: () -> None emails_raw = "{}\n {}\n {} ".format(self.email1, self.email2, self.email3) expected_set = {self.email1, self.email2, self.email3} self.assertEqual(get_invitee_emails_set(emails_raw), expected_set) def test_if_emails_from_email_client_separated_by_newlines_are_parsed_correctly(self): # type: () -> None emails_raw = "Email One <{}>\nEmailTwo<{}>\nEmail Three<{}>".format(self.email1, self.email2, self.email3) expected_set = {self.email1, self.email2, self.email3} self.assertEqual(get_invitee_emails_set(emails_raw), expected_set) def test_if_emails_in_mixed_style_are_parsed_correctly(self): # type: () -> None emails_raw = "Email One <{}>,EmailTwo<{}>\n{}".format(self.email1, self.email2, self.email3) expected_set = {self.email1, self.email2, self.email3} self.assertEqual(get_invitee_emails_set(emails_raw), expected_set) class MultiuseInviteTest(ZulipTestCase): def setUp(self): # type: () -> None self.realm = get_realm('zulip') self.realm.invite_required = True self.realm.save() def generate_multiuse_invite_link(self, streams=None, date_sent=None): # type: (List[Stream], Optional[datetime.datetime]) -> Text invite = MultiuseInvite(realm=self.realm, referred_by=self.example_user("iago")) invite.save() if streams is not None: invite.streams = streams invite.save() if date_sent is None: date_sent = timezone_now() key = generate_key() Confirmation.objects.create(content_object=invite, date_sent=date_sent, confirmation_key=key, type=Confirmation.MULTIUSE_INVITE) return confirmation_url(key, self.realm.host, Confirmation.MULTIUSE_INVITE) def check_user_able_to_register(self, email, invite_link): # type: (Text, Text) -> None password = "password" result = self.client_post(invite_link, {'email': email}) self.assertEqual(result.status_code, 302) self.assertTrue(result["Location"].endswith( "/accounts/send_confirm/%s" % (email,))) result = self.client_get(result["Location"]) self.assert_in_response("Check your email so we can get started.", result) confirmation_url = self.get_confirmation_url_from_outbox(email) result = self.client_get(confirmation_url) self.assertEqual(result.status_code, 200) result = self.submit_reg_form_for_user(email, password) self.assertEqual(result.status_code, 302) from django.core.mail import outbox outbox.pop() def check_user_subscribed_only_to_streams(self, user_name, streams): # type: (str, List[Stream]) -> None sorted(streams, key=lambda x: x.name) subscribed_streams = gather_subscriptions(self.nonreg_user(user_name))[0] self.assertEqual(len(subscribed_streams), len(streams)) for x, y in zip(subscribed_streams, streams): self.assertEqual(x["name"], y.name) def test_valid_multiuse_link(self): # type: () -> None email1 = self.nonreg_email("test") email2 = self.nonreg_email("test1") email3 = self.nonreg_email("alice") date_sent = timezone_now() - datetime.timedelta(days=settings.INVITATION_LINK_VALIDITY_DAYS - 1) invite_link = self.generate_multiuse_invite_link(date_sent=date_sent) self.check_user_able_to_register(email1, invite_link) self.check_user_able_to_register(email2, invite_link) self.check_user_able_to_register(email3, invite_link) def test_expired_multiuse_link(self): # type: () -> None email = self.nonreg_email('newuser') date_sent = timezone_now() - datetime.timedelta(days=settings.INVITATION_LINK_VALIDITY_DAYS) invite_link = self.generate_multiuse_invite_link(date_sent=date_sent) result = self.client_post(invite_link, {'email': email}) self.assertEqual(result.status_code, 200) self.assert_in_response("Whoops. The confirmation link has expired.", result) def test_invalid_multiuse_link(self): # type: () -> None email = self.nonreg_email('newuser') invite_link = "/join/invalid_key/" result = self.client_post(invite_link, {'email': email}) self.assertEqual(result.status_code, 200) self.assert_in_response("Whoops. The confirmation link is malformed.", result) def test_invalid_multiuse_link_in_open_realm(self): # type: () -> None self.realm.invite_required = False self.realm.save() email = self.nonreg_email('newuser') invite_link = "/join/invalid_key/" with patch('zerver.views.registration.get_realm_from_request', return_value=self.realm): with patch('zerver.views.registration.get_realm', return_value=self.realm): self.check_user_able_to_register(email, invite_link) def test_multiuse_link_with_specified_streams(self): # type: () -> None name1 = "newuser" name2 = "bob" email1 = self.nonreg_email(name1) email2 = self.nonreg_email(name2) stream_names = ["Rome", "Scotland", "Venice"] streams = [get_stream(stream_name, self.realm) for stream_name in stream_names] invite_link = self.generate_multiuse_invite_link(streams=streams) self.check_user_able_to_register(email1, invite_link) self.check_user_subscribed_only_to_streams(name1, streams) stream_names = ["Rome", "Verona"] streams = [get_stream(stream_name, self.realm) for stream_name in stream_names] invite_link = self.generate_multiuse_invite_link(streams=streams) self.check_user_able_to_register(email2, invite_link) self.check_user_subscribed_only_to_streams(name2, streams) class EmailUnsubscribeTests(ZulipTestCase): def test_error_unsubscribe(self): # type: () -> None # An invalid insubscribe token "test123" produces an error. result = self.client_get('/accounts/unsubscribe/missed_messages/test123') self.assert_in_response('Unknown email unsubscribe request', result) # An unknown message type "fake" produces an error. user_profile = self.example_user('hamlet') unsubscribe_link = one_click_unsubscribe_link(user_profile, "fake") result = self.client_get(urllib.parse.urlparse(unsubscribe_link).path) self.assert_in_response('Unknown email unsubscribe request', result) def test_missedmessage_unsubscribe(self): # type: () -> None """ We provide one-click unsubscribe links in missed message e-mails that you can click even when logged out to update your email notification settings. """ user_profile = self.example_user('hamlet') user_profile.enable_offline_email_notifications = True user_profile.save() unsubscribe_link = one_click_unsubscribe_link(user_profile, "missed_messages") result = self.client_get(urllib.parse.urlparse(unsubscribe_link).path) self.assertEqual(result.status_code, 200) user_profile.refresh_from_db() self.assertFalse(user_profile.enable_offline_email_notifications) def test_welcome_unsubscribe(self): # type: () -> None """ We provide one-click unsubscribe links in welcome e-mails that you can click even when logged out to stop receiving them. """ user_profile = self.example_user('hamlet') # Simulate a new user signing up, which enqueues 2 welcome e-mails. enqueue_welcome_emails(user_profile) self.assertEqual(2, ScheduledEmail.objects.filter(user=user_profile).count()) # Simulate unsubscribing from the welcome e-mails. unsubscribe_link = one_click_unsubscribe_link(user_profile, "welcome") result = self.client_get(urllib.parse.urlparse(unsubscribe_link).path) # The welcome email jobs are no longer scheduled. self.assertEqual(result.status_code, 200) self.assertEqual(0, ScheduledEmail.objects.filter(user=user_profile).count()) def test_digest_unsubscribe(self): # type: () -> None """ We provide one-click unsubscribe links in digest e-mails that you can click even when logged out to stop receiving them. Unsubscribing from these emails also dequeues any digest email jobs that have been queued. """ user_profile = self.example_user('hamlet') self.assertTrue(user_profile.enable_digest_emails) # Enqueue a fake digest email. context = {'name': '', 'realm_uri': '', 'unread_pms': [], 'hot_conversations': [], 'new_users': [], 'new_streams': {'plain': []}, 'unsubscribe_link': ''} send_future_email('zerver/emails/digest', to_user_id=user_profile.id, context=context) self.assertEqual(1, ScheduledEmail.objects.filter(user=user_profile).count()) # Simulate unsubscribing from digest e-mails. unsubscribe_link = one_click_unsubscribe_link(user_profile, "digest") result = self.client_get(urllib.parse.urlparse(unsubscribe_link).path) # The setting is toggled off, and scheduled jobs have been removed. self.assertEqual(result.status_code, 200) # Circumvent user_profile caching. user_profile.refresh_from_db() self.assertFalse(user_profile.enable_digest_emails) self.assertEqual(0, ScheduledEmail.objects.filter(user=user_profile).count()) class RealmCreationTest(ZulipTestCase): def test_create_realm(self): # type: () -> None password = "test" string_id = "zuliptest" email = "user1@test.com" realm = get_realm('test') # Make sure the realm does not exist self.assertIsNone(realm) with self.settings(OPEN_REALM_CREATION=True): # Create new realm with the email result = self.client_post('/create_realm/', {'email': email}) self.assertEqual(result.status_code, 302) self.assertTrue(result["Location"].endswith( "/accounts/send_confirm/%s" % (email,))) result = self.client_get(result["Location"]) self.assert_in_response("Check your email so we can get started.", result) # Visit the confirmation link. confirmation_url = self.get_confirmation_url_from_outbox(email) result = self.client_get(confirmation_url) self.assertEqual(result.status_code, 200) result = self.submit_reg_form_for_user(email, password, realm_subdomain=string_id) self.assertEqual(result.status_code, 302) # Make sure the realm is created realm = get_realm(string_id) self.assertIsNotNone(realm) self.assertEqual(realm.string_id, string_id) self.assertEqual(get_user(email, realm).realm, realm) # Check defaults self.assertEqual(realm.org_type, Realm.CORPORATE) self.assertEqual(realm.restricted_to_domain, False) self.assertEqual(realm.invite_required, True) self.assertTrue(result["Location"].endswith("/")) # Check welcome messages for stream_name, text, message_count in [ ('announce', 'This is', 1), ('core team', 'This is', 1), ('general', 'Welcome to', 1), ('new members', 'stream is', 1), ('zulip', 'Here is', 3)]: stream = get_stream(stream_name, realm) recipient = get_recipient(Recipient.STREAM, stream.id) messages = Message.objects.filter(recipient=recipient).order_by('pub_date') self.assertEqual(len(messages), message_count) self.assertIn(text, messages[0].content) def test_create_realm_existing_email(self): # type: () -> None """ Trying to create a realm with an existing email should just redirect to a login page. """ with self.settings(OPEN_REALM_CREATION=True): email = self.example_email("hamlet") result = self.client_post('/create_realm/', {'email': email}) self.assertEqual(result.status_code, 302) self.assertIn('login', result['Location']) def test_create_realm_no_creation_key(self): # type: () -> None """ Trying to create a realm without a creation_key should fail when OPEN_REALM_CREATION is false. """ email = "user1@test.com" realm = get_realm('test') # Make sure the realm does not exist self.assertIsNone(realm) with self.settings(OPEN_REALM_CREATION=False): # Create new realm with the email, but no creation key. result = self.client_post('/create_realm/', {'email': email}) self.assertEqual(result.status_code, 200) self.assert_in_response('New organization creation disabled.', result) @override_settings(OPEN_REALM_CREATION=True) def test_create_realm_with_subdomain(self): # type: () -> None password = "test" string_id = "zuliptest" email = "user1@test.com" realm_name = "Test" # Make sure the realm does not exist self.assertIsNone(get_realm('test')) # Create new realm with the email result = self.client_post('/create_realm/', {'email': email}) self.assertEqual(result.status_code, 302) self.assertTrue(result["Location"].endswith( "/accounts/send_confirm/%s" % (email,))) result = self.client_get(result["Location"]) self.assert_in_response("Check your email so we can get started.", result) # Visit the confirmation link. confirmation_url = self.get_confirmation_url_from_outbox(email) result = self.client_get(confirmation_url) self.assertEqual(result.status_code, 200) result = self.submit_reg_form_for_user(email, password, realm_subdomain = string_id, realm_name=realm_name, # Pass HTTP_HOST for the target subdomain HTTP_HOST=string_id + ".testserver") self.assertEqual(result.status_code, 302) # Make sure the realm is created realm = get_realm(string_id) self.assertIsNotNone(realm) self.assertEqual(realm.string_id, string_id) self.assertEqual(get_user(email, realm).realm, realm) self.assertEqual(realm.name, realm_name) self.assertEqual(realm.subdomain, string_id) @override_settings(OPEN_REALM_CREATION=True) def test_mailinator_signup(self): # type: () -> None result = self.client_post('/create_realm/', {'email': "hi@mailinator.com"}) self.assert_in_response('Please use your real email address.', result) @override_settings(OPEN_REALM_CREATION=True) def test_subdomain_restrictions(self): # type: () -> None password = "test" email = "user1@test.com" realm_name = "Test" result = self.client_post('/create_realm/', {'email': email}) self.client_get(result["Location"]) confirmation_url = self.get_confirmation_url_from_outbox(email) self.client_get(confirmation_url) errors = {'id': "length 3 or greater", '-id': "cannot start or end with a", 'string-ID': "lowercase letters", 'string_id': "lowercase letters", 'stream': "unavailable", 'streams': "unavailable", 'about': "unavailable", 'abouts': "unavailable", 'zephyr': "unavailable"} for string_id, error_msg in errors.items(): result = self.submit_reg_form_for_user(email, password, realm_subdomain = string_id, realm_name = realm_name) self.assert_in_response(error_msg, result) # test valid subdomain result = self.submit_reg_form_for_user(email, password, realm_subdomain = 'a-0', realm_name = realm_name) self.assertEqual(result.status_code, 302) def test_is_root_domain_available(self): # type: () -> None self.assertTrue(is_root_domain_available()) with self.settings(ROOT_DOMAIN_LANDING_PAGE=True): self.assertFalse(is_root_domain_available()) realm = get_realm("zulip") realm.string_id = Realm.SUBDOMAIN_FOR_ROOT_DOMAIN realm.save() self.assertFalse(is_root_domain_available()) class UserSignUpTest(ZulipTestCase): def _assert_redirected_to(self, result, url): # type: (HttpResponse, Text) -> None self.assertEqual(result.status_code, 302) self.assertEqual(result['LOCATION'], url) def test_bad_email_configuration_for_accounts_home(self): # type: () -> None """ Make sure we redirect for SMTP errors. """ email = self.nonreg_email('newguy') smtp_mock = patch( 'zerver.views.registration.send_registration_completion_email', side_effect=smtplib.SMTPException('uh oh') ) error_mock = patch('logging.error') with smtp_mock, error_mock as err: result = self.client_post('/accounts/home/', {'email': email}) self._assert_redirected_to(result, '/config-error/smtp') self.assertEqual( err.call_args_list[0][0][0], 'Error in accounts_home: uh oh' ) def test_bad_email_configuration_for_create_realm(self): # type: () -> None """ Make sure we redirect for SMTP errors. """ email = self.nonreg_email('newguy') smtp_mock = patch( 'zerver.views.registration.send_registration_completion_email', side_effect=smtplib.SMTPException('uh oh') ) error_mock = patch('logging.error') with smtp_mock, error_mock as err: result = self.client_post('/create_realm/', {'email': email}) self._assert_redirected_to(result, '/config-error/smtp') self.assertEqual( err.call_args_list[0][0][0], 'Error in create_realm: uh oh' ) def test_user_default_language_and_timezone(self): # type: () -> None """ Check if the default language of new user is the default language of the realm. """ email = self.nonreg_email('newguy') password = "newpassword" timezone = "US/Mountain" realm = get_realm('zulip') do_set_realm_property(realm, 'default_language', u"de") result = self.client_post('/accounts/home/', {'email': email}) self.assertEqual(result.status_code, 302) self.assertTrue(result["Location"].endswith( "/accounts/send_confirm/%s" % (email,))) result = self.client_get(result["Location"]) self.assert_in_response("Check your email so we can get started.", result) # Visit the confirmation link. confirmation_url = self.get_confirmation_url_from_outbox(email) result = self.client_get(confirmation_url) self.assertEqual(result.status_code, 200) # Pick a password and agree to the ToS. result = self.submit_reg_form_for_user(email, password, timezone=timezone) self.assertEqual(result.status_code, 302) user_profile = self.nonreg_user('newguy') self.assertEqual(user_profile.default_language, realm.default_language) self.assertEqual(user_profile.timezone, timezone) from django.core.mail import outbox outbox.pop() def test_signup_already_active(self): # type: () -> None """ Check if signing up with an active email redirects to a login page. """ email = self.example_email("hamlet") result = self.client_post('/accounts/home/', {'email': email}) self.assertEqual(result.status_code, 302) self.assertIn('login', result['Location']) result = self.client_get(result.url) self.assert_in_response("You've already registered", result) def test_signup_invalid_name(self): # type: () -> None """ Check if an invalid name during signup is handled properly. """ email = "newguy@zulip.com" password = "newpassword" result = self.client_post('/accounts/home/', {'email': email}) self.assertEqual(result.status_code, 302) self.assertTrue(result["Location"].endswith( "/accounts/send_confirm/%s" % (email,))) result = self.client_get(result["Location"]) self.assert_in_response("Check your email so we can get started.", result) # Visit the confirmation link. confirmation_url = self.get_confirmation_url_from_outbox(email) result = self.client_get(confirmation_url) self.assertEqual(result.status_code, 200) # Pick a password and agree to the ToS. result = self.submit_reg_form_for_user(email, password, full_name="") self.assert_in_success_response(["Invalid characters in name!"], result) # Verify that the user is asked for name and password self.assert_in_success_response(['id_password', 'id_full_name'], result) def test_signup_without_password(self): # type: () -> None """ Check if signing up without a password works properly when password_auth_enabled is False. """ email = self.nonreg_email('newuser') result = self.client_post('/accounts/home/', {'email': email}) self.assertEqual(result.status_code, 302) self.assertTrue(result["Location"].endswith( "/accounts/send_confirm/%s" % (email,))) result = self.client_get(result["Location"]) self.assert_in_response("Check your email so we can get started.", result) # Visit the confirmation link. confirmation_url = self.get_confirmation_url_from_outbox(email) result = self.client_get(confirmation_url) self.assertEqual(result.status_code, 200) with patch('zerver.views.registration.password_auth_enabled', return_value=False): result = self.client_post( '/accounts/register/', {'full_name': 'New User', 'key': find_key_by_email(email), 'terms': True}) # User should now be logged in. self.assertEqual(result.status_code, 302) user_profile = self.nonreg_user('newuser') self.assertEqual(get_session_dict_user(self.client.session), user_profile.id) def test_signup_without_full_name(self): # type: () -> None """ Check if signing up without a full name redirects to a registration form. """ email = "newguy@zulip.com" password = "newpassword" result = self.client_post('/accounts/home/', {'email': email}) self.assertEqual(result.status_code, 302) self.assertTrue(result["Location"].endswith( "/accounts/send_confirm/%s" % (email,))) result = self.client_get(result["Location"]) self.assert_in_response("Check your email so we can get started.", result) # Visit the confirmation link. confirmation_url = self.get_confirmation_url_from_outbox(email) result = self.client_get(confirmation_url) self.assertEqual(result.status_code, 200) result = self.client_post( '/accounts/register/', {'password': password, 'key': find_key_by_email(email), 'terms': True, 'from_confirmation': '1'}) self.assert_in_success_response(["You're almost there."], result) # Verify that the user is asked for name and password self.assert_in_success_response(['id_password', 'id_full_name'], result) def test_signup_with_full_name(self): # type: () -> None """ Check if signing up without a full name redirects to a registration form. """ email = "newguy@zulip.com" password = "newpassword" result = self.client_post('/accounts/home/', {'email': email}) self.assertEqual(result.status_code, 302) self.assertTrue(result["Location"].endswith( "/accounts/send_confirm/%s" % (email,))) result = self.client_get(result["Location"]) self.assert_in_response("Check your email so we can get started.", result) # Visit the confirmation link. confirmation_url = self.get_confirmation_url_from_outbox(email) result = self.client_get(confirmation_url) self.assertEqual(result.status_code, 200) result = self.client_post( '/accounts/register/', {'password': password, 'key': find_key_by_email(email), 'terms': True, 'full_name': "New Guy", 'from_confirmation': '1'}) self.assert_in_success_response(["You're almost there."], result) def test_signup_invalid_subdomain(self): # type: () -> None """ Check if attempting to authenticate to the wrong subdomain logs an error and redirects. """ email = "newuser@zulip.com" password = "newpassword" result = self.client_post('/accounts/home/', {'email': email}) self.assertEqual(result.status_code, 302) self.assertTrue(result["Location"].endswith( "/accounts/send_confirm/%s" % (email,))) result = self.client_get(result["Location"]) self.assert_in_response("Check your email so we can get started.", result) # Visit the confirmation link. confirmation_url = self.get_confirmation_url_from_outbox(email) result = self.client_get(confirmation_url) self.assertEqual(result.status_code, 200) def invalid_subdomain(**kwargs): # type: (**Any) -> Any return_data = kwargs.get('return_data', {}) return_data['invalid_subdomain'] = True with patch('zerver.views.registration.authenticate', side_effect=invalid_subdomain): with patch('logging.error') as mock_error: result = self.client_post( '/accounts/register/', {'password': password, 'full_name': 'New User', 'key': find_key_by_email(email), 'terms': True}) mock_error.assert_called_once() self.assertEqual(result.status_code, 302) def test_failed_signup_due_to_restricted_domain(self): # type: () -> None realm = get_realm('zulip') realm.invite_required = False realm.save() request = HostRequestMock(host = realm.host) request.session = {} # type: ignore email = 'user@acme.com' form = HomepageForm({'email': email}, realm=realm) self.assertIn("Your email address, {}, is not in one of the domains".format(email), form.errors['email'][0]) def test_failed_signup_due_to_invite_required(self): # type: () -> None realm = get_realm('zulip') realm.invite_required = True realm.save() request = HostRequestMock(host = realm.host) request.session = {} # type: ignore email = 'user@zulip.com' form = HomepageForm({'email': email}, realm=realm) self.assertIn("Please request an invite for {} from".format(email), form.errors['email'][0]) def test_failed_signup_due_to_nonexistent_realm(self): # type: () -> None request = HostRequestMock(host = 'acme.' + settings.EXTERNAL_HOST) request.session = {} # type: ignore email = 'user@acme.com' form = HomepageForm({'email': email}, realm=None) self.assertIn("organization you are trying to join using {} does " "not exist".format(email), form.errors['email'][0]) def test_ldap_registration_from_confirmation(self): # type: () -> None password = "testing" email = "newuser@zulip.com" subdomain = "zulip" ldap_user_attr_map = {'full_name': 'fn', 'short_name': 'sn'} ldap_patcher = patch('django_auth_ldap.config.ldap.initialize') mock_initialize = ldap_patcher.start() mock_ldap = MockLDAP() mock_initialize.return_value = mock_ldap mock_ldap.directory = { 'uid=newuser,ou=users,dc=zulip,dc=com': { 'userPassword': 'testing', 'fn': ['New LDAP fullname'] } } with patch('zerver.views.registration.get_subdomain', return_value=subdomain): result = self.client_post('/register/', {'email': email}) self.assertEqual(result.status_code, 302) self.assertTrue(result["Location"].endswith( "/accounts/send_confirm/%s" % (email,))) result = self.client_get(result["Location"]) self.assert_in_response("Check your email so we can get started.", result) # Visit the confirmation link. from django.core.mail import outbox for message in reversed(outbox): if email in message.to: confirmation_link_pattern = re.compile(settings.EXTERNAL_HOST + "(\S+)>") confirmation_url = confirmation_link_pattern.search( message.body).groups()[0] break else: raise AssertionError("Couldn't find a confirmation email.") with self.settings( POPULATE_PROFILE_VIA_LDAP=True, LDAP_APPEND_DOMAIN='zulip.com', AUTH_LDAP_BIND_PASSWORD='', AUTH_LDAP_USER_ATTR_MAP=ldap_user_attr_map, AUTHENTICATION_BACKENDS=('zproject.backends.ZulipLDAPAuthBackend',), AUTH_LDAP_USER_DN_TEMPLATE='uid=%(user)s,ou=users,dc=zulip,dc=com'): result = self.client_get(confirmation_url) self.assertEqual(result.status_code, 200) # Full name should be set from LDAP result = self.submit_reg_form_for_user(email, password, full_name="Ignore", from_confirmation="1", # Pass HTTP_HOST for the target subdomain HTTP_HOST=subdomain + ".testserver") self.assert_in_success_response(["You're almost there.", "New LDAP fullname", "newuser@zulip.com"], result) # Verify that the user is asked for name self.assert_in_success_response(['id_full_name'], result) # TODO: Ideally, we wouldn't ask for a password if LDAP is # enabled, in which case this assert should be invertedq. self.assert_in_success_response(['id_password'], result) # Test the TypeError exception handler mock_ldap.directory = { 'uid=newuser,ou=users,dc=zulip,dc=com': { 'userPassword': 'testing', 'fn': None # This will raise TypeError } } result = self.submit_reg_form_for_user(email, password, from_confirmation='1', # Pass HTTP_HOST for the target subdomain HTTP_HOST=subdomain + ".testserver") self.assert_in_success_response(["You're almost there.", "newuser@zulip.com"], result) def test_ldap_registration_end_to_end(self): # type: () -> None password = "testing" email = "newuser@zulip.com" subdomain = "zulip" ldap_user_attr_map = {'full_name': 'fn', 'short_name': 'sn'} ldap_patcher = patch('django_auth_ldap.config.ldap.initialize') mock_initialize = ldap_patcher.start() mock_ldap = MockLDAP() mock_initialize.return_value = mock_ldap full_name = 'New LDAP fullname' mock_ldap.directory = { 'uid=newuser,ou=users,dc=zulip,dc=com': { 'userPassword': 'testing', 'fn': [full_name] } } with patch('zerver.views.registration.get_subdomain', return_value=subdomain): result = self.client_post('/register/', {'email': email}) self.assertEqual(result.status_code, 302) self.assertTrue(result["Location"].endswith( "/accounts/send_confirm/%s" % (email,))) result = self.client_get(result["Location"]) self.assert_in_response("Check your email so we can get started.", result) with self.settings( POPULATE_PROFILE_VIA_LDAP=True, LDAP_APPEND_DOMAIN='zulip.com', AUTH_LDAP_BIND_PASSWORD='', AUTH_LDAP_USER_ATTR_MAP=ldap_user_attr_map, AUTHENTICATION_BACKENDS=('zproject.backends.ZulipLDAPAuthBackend',), AUTH_LDAP_USER_DN_TEMPLATE='uid=%(user)s,ou=users,dc=zulip,dc=com'): # Click confirmation link result = self.submit_reg_form_for_user(email, password, full_name="Ignore", from_confirmation="1", # Pass HTTP_HOST for the target subdomain HTTP_HOST=subdomain + ".testserver") # Full name should be set from LDAP self.assert_in_success_response(["You're almost there.", full_name, "newuser@zulip.com"], result) # Submit the final form. result = self.submit_reg_form_for_user(email, password, full_name=full_name, # Pass HTTP_HOST for the target subdomain HTTP_HOST=subdomain + ".testserver") user_profile = UserProfile.objects.get(email=email) # Name comes from form which was set by LDAP. self.assertEqual(user_profile.full_name, full_name) def test_ldap_registration_when_names_changes_are_disabled(self): # type: () -> None password = "testing" email = "newuser@zulip.com" subdomain = "zulip" ldap_user_attr_map = {'full_name': 'fn', 'short_name': 'sn'} ldap_patcher = patch('django_auth_ldap.config.ldap.initialize') mock_initialize = ldap_patcher.start() mock_ldap = MockLDAP() mock_initialize.return_value = mock_ldap mock_ldap.directory = { 'uid=newuser,ou=users,dc=zulip,dc=com': { 'userPassword': 'testing', 'fn': ['New LDAP fullname'] } } with patch('zerver.views.registration.get_subdomain', return_value=subdomain): result = self.client_post('/register/', {'email': email}) self.assertEqual(result.status_code, 302) self.assertTrue(result["Location"].endswith( "/accounts/send_confirm/%s" % (email,))) result = self.client_get(result["Location"]) self.assert_in_response("Check your email so we can get started.", result) with self.settings( POPULATE_PROFILE_VIA_LDAP=True, LDAP_APPEND_DOMAIN='zulip.com', AUTH_LDAP_BIND_PASSWORD='', AUTH_LDAP_USER_ATTR_MAP=ldap_user_attr_map, AUTHENTICATION_BACKENDS=('zproject.backends.ZulipLDAPAuthBackend',), AUTH_LDAP_USER_DN_TEMPLATE='uid=%(user)s,ou=users,dc=zulip,dc=com'): # Click confirmation link. This will 'authenticated_full_name' # session variable which will be used to set the fullname of # the user. result = self.submit_reg_form_for_user(email, password, full_name="Ignore", from_confirmation="1", # Pass HTTP_HOST for the target subdomain HTTP_HOST=subdomain + ".testserver") with patch('zerver.views.registration.name_changes_disabled', return_value=True): result = self.submit_reg_form_for_user(email, password, # Pass HTTP_HOST for the target subdomain HTTP_HOST=subdomain + ".testserver") user_profile = UserProfile.objects.get(email=email) # Name comes from LDAP session. self.assertEqual(user_profile.full_name, 'New LDAP fullname') def test_registration_when_name_changes_are_disabled(self): # type: () -> None """ Test `name_changes_disabled` when we are not running under LDAP. """ password = "testing" email = "newuser@zulip.com" subdomain = "zulip" with patch('zerver.views.registration.get_subdomain', return_value=subdomain): result = self.client_post('/register/', {'email': email}) self.assertEqual(result.status_code, 302) self.assertTrue(result["Location"].endswith( "/accounts/send_confirm/%s" % (email,))) result = self.client_get(result["Location"]) self.assert_in_response("Check your email so we can get started.", result) with patch('zerver.views.registration.name_changes_disabled', return_value=True): result = self.submit_reg_form_for_user(email, password, full_name="New Name", # Pass HTTP_HOST for the target subdomain HTTP_HOST=subdomain + ".testserver") user_profile = UserProfile.objects.get(email=email) # 'New Name' comes from POST data; not from LDAP session. self.assertEqual(user_profile.full_name, 'New Name') def test_realm_creation_through_ldap(self): # type: () -> None password = "testing" email = "newuser@zulip.com" subdomain = "zulip" realm_name = "Zulip" ldap_user_attr_map = {'full_name': 'fn', 'short_name': 'sn'} ldap_patcher = patch('django_auth_ldap.config.ldap.initialize') mock_initialize = ldap_patcher.start() mock_ldap = MockLDAP() mock_initialize.return_value = mock_ldap mock_ldap.directory = { 'uid=newuser,ou=users,dc=zulip,dc=com': { 'userPassword': 'testing', 'fn': ['New User Name'] } } with patch('zerver.views.registration.get_subdomain', return_value=subdomain): result = self.client_post('/register/', {'email': email}) self.assertEqual(result.status_code, 302) self.assertTrue(result["Location"].endswith( "/accounts/send_confirm/%s" % (email,))) result = self.client_get(result["Location"]) self.assert_in_response("Check your email so we can get started.", result) # Visit the confirmation link. from django.core.mail import outbox for message in reversed(outbox): if email in message.to: confirmation_link_pattern = re.compile(settings.EXTERNAL_HOST + "(\S+)>") confirmation_url = confirmation_link_pattern.search( message.body).groups()[0] break else: raise AssertionError("Couldn't find a confirmation email.") with self.settings( POPULATE_PROFILE_VIA_LDAP=True, LDAP_APPEND_DOMAIN='zulip.com', AUTH_LDAP_BIND_PASSWORD='', AUTH_LDAP_USER_ATTR_MAP=ldap_user_attr_map, AUTHENTICATION_BACKENDS=('zproject.backends.ZulipLDAPAuthBackend',), AUTH_LDAP_USER_DN_TEMPLATE='uid=%(user)s,ou=users,dc=zulip,dc=com', TERMS_OF_SERVICE=False, ): result = self.client_get(confirmation_url) self.assertEqual(result.status_code, 200) key = find_key_by_email(email), confirmation = Confirmation.objects.get(confirmation_key=key[0]) prereg_user = confirmation.content_object prereg_user.realm_creation = True prereg_user.save() result = self.submit_reg_form_for_user(email, password, realm_name=realm_name, realm_subdomain=subdomain, from_confirmation='1', # Pass HTTP_HOST for the target subdomain HTTP_HOST=subdomain + ".testserver") self.assert_in_success_response(["You're almost there.", "newuser@zulip.com"], result) mock_ldap.reset() mock_initialize.stop() @patch('DNS.dnslookup', return_value=[['sipbtest:*:20922:101:Fred Sipb,,,:/mit/sipbtest:/bin/athena/tcsh']]) def test_registration_of_mirror_dummy_user(self, ignored): # type: (Any) -> None password = "test" subdomain = "zephyr" user_profile = self.mit_user("sipbtest") email = user_profile.email user_profile.is_mirror_dummy = True user_profile.is_active = False user_profile.save() result = self.client_post('/register/', {'email': email}, subdomain="zephyr") self.assertEqual(result.status_code, 302) self.assertTrue(result["Location"].endswith( "/accounts/send_confirm/%s" % (email,))) result = self.client_get(result["Location"], subdomain="zephyr") self.assert_in_response("Check your email so we can get started.", result) # Visit the confirmation link. from django.core.mail import outbox for message in reversed(outbox): if email in message.to: confirmation_link_pattern = re.compile(settings.EXTERNAL_HOST + "(\S+)>") confirmation_url = confirmation_link_pattern.search( message.body).groups()[0] break else: raise AssertionError("Couldn't find a confirmation email.") result = self.client_get(confirmation_url, subdomain="zephyr") self.assertEqual(result.status_code, 200) # If the mirror dummy user is already active, attempting to submit the # registration form should just redirect to a login page. user_profile.is_active = True user_profile.save() result = self.submit_reg_form_for_user(email, password, from_confirmation='1', # Pass HTTP_HOST for the target subdomain HTTP_HOST=subdomain + ".testserver") self.assertEqual(result.status_code, 302) self.assertIn('login', result['Location']) user_profile.is_active = False user_profile.save() result = self.submit_reg_form_for_user(email, password, from_confirmation='1', # Pass HTTP_HOST for the target subdomain HTTP_HOST=subdomain + ".testserver") self.assertEqual(result.status_code, 200) result = self.submit_reg_form_for_user(email, password, # Pass HTTP_HOST for the target subdomain HTTP_HOST=subdomain + ".testserver") self.assertEqual(result.status_code, 302) self.assertEqual(get_session_dict_user(self.client.session), user_profile.id) def test_registration_of_active_mirror_dummy_user(self): # type: (Any) -> None """ Trying to activate an already-active mirror dummy user should just redirect to a login page. """ user_profile = self.mit_user("sipbtest") email = user_profile.email user_profile.is_mirror_dummy = True user_profile.is_active = True user_profile.save() result = self.client_post('/register/', {'email': email}) self.assertEqual(result.status_code, 302) self.assertIn('login', result['Location']) class DeactivateUserTest(ZulipTestCase): def test_deactivate_user(self): # type: () -> None email = self.example_email("hamlet") self.login(email) user = self.example_user('hamlet') self.assertTrue(user.is_active) result = self.client_delete('/json/users/me') self.assert_json_success(result) user = self.example_user('hamlet') self.assertFalse(user.is_active) self.login(email, fails=True) def test_do_not_deactivate_final_admin(self): # type: () -> None email = self.example_email("iago") self.login(email) user = self.example_user('iago') self.assertTrue(user.is_active) result = self.client_delete('/json/users/me') self.assert_json_error(result, "Cannot deactivate the only organization administrator") user = self.example_user('iago') self.assertTrue(user.is_active) self.assertTrue(user.is_realm_admin) email = self.example_email("hamlet") user_2 = self.example_user('hamlet') do_change_is_admin(user_2, True) self.assertTrue(user_2.is_realm_admin) result = self.client_delete('/json/users/me') self.assert_json_success(result) do_change_is_admin(user, True) class TestLoginPage(ZulipTestCase): def test_login_page_wrong_subdomain_error(self): # type: () -> None result = self.client_get("/login/?subdomain=1") self.assertIn(WRONG_SUBDOMAIN_ERROR, result.content.decode('utf8')) @patch('django.http.HttpRequest.get_host') def test_login_page_redirects_for_root_alias(self, mock_get_host): # type: (MagicMock) -> None mock_get_host.return_value = 'www.testserver' with self.settings(ROOT_DOMAIN_LANDING_PAGE=True): result = self.client_get("/en/login/") self.assertEqual(result.status_code, 302) self.assertEqual(result.url, '/accounts/find/') @patch('django.http.HttpRequest.get_host') def test_login_page_redirects_for_root_domain(self, mock_get_host): # type: (MagicMock) -> None mock_get_host.return_value = 'testserver' with self.settings(ROOT_DOMAIN_LANDING_PAGE=True): result = self.client_get("/en/login/") self.assertEqual(result.status_code, 302) self.assertEqual(result.url, '/accounts/find/') mock_get_host.return_value = 'www.testserver.com' with self.settings(ROOT_DOMAIN_LANDING_PAGE=True, EXTERNAL_HOST='www.testserver.com', ROOT_SUBDOMAIN_ALIASES=['test']): result = self.client_get("/en/login/") self.assertEqual(result.status_code, 302) self.assertEqual(result.url, '/accounts/find/') @patch('django.http.HttpRequest.get_host') def test_login_page_works_without_subdomains(self, mock_get_host): # type: (MagicMock) -> None mock_get_host.return_value = 'www.testserver' with self.settings(ROOT_SUBDOMAIN_ALIASES=['www']): result = self.client_get("/en/login/") self.assertEqual(result.status_code, 200) mock_get_host.return_value = 'testserver' with self.settings(ROOT_SUBDOMAIN_ALIASES=['www']): result = self.client_get("/en/login/") self.assertEqual(result.status_code, 200) class TestFindMyTeam(ZulipTestCase): def test_template(self): # type: () -> None result = self.client_get('/accounts/find/') self.assertIn("Find your Zulip accounts", result.content.decode('utf8')) def test_result(self): # type: () -> None result = self.client_post('/accounts/find/', dict(emails="iago@zulip.com,cordelia@zulip.com")) self.assertEqual(result.status_code, 302) self.assertEqual(result.url, "/accounts/find/?emails=iago%40zulip.com%2Ccordelia%40zulip.com") result = self.client_get(result.url) content = result.content.decode('utf8') self.assertIn("Emails sent! You will only receive emails", content) self.assertIn(self.example_email("iago"), content) self.assertIn(self.example_email("cordelia"), content) from django.core.mail import outbox self.assertEqual(len(outbox), 2) def test_find_team_ignore_invalid_email(self): # type: () -> None result = self.client_post('/accounts/find/', dict(emails="iago@zulip.com,invalid_email@zulip.com")) self.assertEqual(result.status_code, 302) self.assertEqual(result.url, "/accounts/find/?emails=iago%40zulip.com%2Cinvalid_email%40zulip.com") result = self.client_get(result.url) content = result.content.decode('utf8') self.assertIn("Emails sent! You will only receive emails", content) self.assertIn(self.example_email("iago"), content) self.assertIn("invalid_email@", content) from django.core.mail import outbox self.assertEqual(len(outbox), 1) def test_find_team_reject_invalid_email(self): # type: () -> None result = self.client_post('/accounts/find/', dict(emails="invalid_string")) self.assertEqual(result.status_code, 200) self.assertIn(b"Enter a valid email", result.content) from django.core.mail import outbox self.assertEqual(len(outbox), 0) # Just for coverage on perhaps-unnecessary validation code. result = self.client_get('/accounts/find/?emails=invalid') self.assertEqual(result.status_code, 200) def test_find_team_zero_emails(self): # type: () -> None data = {'emails': ''} result = self.client_post('/accounts/find/', data) self.assertIn('This field is required', result.content.decode('utf8')) self.assertEqual(result.status_code, 200) from django.core.mail import outbox self.assertEqual(len(outbox), 0) def test_find_team_one_email(self): # type: () -> None data = {'emails': self.example_email("hamlet")} result = self.client_post('/accounts/find/', data) self.assertEqual(result.status_code, 302) self.assertEqual(result.url, '/accounts/find/?emails=hamlet%40zulip.com') from django.core.mail import outbox self.assertEqual(len(outbox), 1) def test_find_team_deactivated_user(self): # type: () -> None do_deactivate_user(self.example_user("hamlet")) data = {'emails': self.example_email("hamlet")} result = self.client_post('/accounts/find/', data) self.assertEqual(result.status_code, 302) self.assertEqual(result.url, '/accounts/find/?emails=hamlet%40zulip.com') from django.core.mail import outbox self.assertEqual(len(outbox), 0) def test_find_team_deactivated_realm(self): # type: () -> None do_deactivate_realm(get_realm("zulip")) data = {'emails': self.example_email("hamlet")} result = self.client_post('/accounts/find/', data) self.assertEqual(result.status_code, 302) self.assertEqual(result.url, '/accounts/find/?emails=hamlet%40zulip.com') from django.core.mail import outbox self.assertEqual(len(outbox), 0) def test_find_team_bot_email(self): # type: () -> None data = {'emails': self.example_email("webhook_bot")} result = self.client_post('/accounts/find/', data) self.assertEqual(result.status_code, 302) self.assertEqual(result.url, '/accounts/find/?emails=webhook-bot%40zulip.com') from django.core.mail import outbox self.assertEqual(len(outbox), 0) def test_find_team_more_than_ten_emails(self): # type: () -> None data = {'emails': ','.join(['hamlet-{}@zulip.com'.format(i) for i in range(11)])} result = self.client_post('/accounts/find/', data) self.assertEqual(result.status_code, 200) self.assertIn("Please enter at most 10", result.content.decode('utf8')) from django.core.mail import outbox self.assertEqual(len(outbox), 0) class ConfirmationKeyTest(ZulipTestCase): def test_confirmation_key(self): # type: () -> None request = MagicMock() request.session = { 'confirmation_key': {'confirmation_key': 'xyzzy'} } result = confirmation_key(request) self.assert_json_success(result) self.assert_in_response('xyzzy', result) class MobileAuthOTPTest(ZulipTestCase): def test_xor_hex_strings(self): # type: () -> None self.assertEqual(xor_hex_strings('1237c81ab', '18989fd12'), '0aaf57cb9') with self.assertRaises(AssertionError): xor_hex_strings('1', '31') def test_is_valid_otp(self): # type: () -> None self.assertEqual(is_valid_otp('1234'), False) self.assertEqual(is_valid_otp('1234abcd' * 8), True) self.assertEqual(is_valid_otp('1234abcZ' * 8), False) def test_ascii_to_hex(self): # type: () -> None self.assertEqual(ascii_to_hex('ZcdR1234'), '5a63645231323334') self.assertEqual(hex_to_ascii('5a63645231323334'), 'ZcdR1234') def test_otp_encrypt_api_key(self): # type: () -> None hamlet = self.example_user('hamlet') hamlet.api_key = '12ac' * 8 otp = '7be38894' * 8 result = otp_encrypt_api_key(hamlet, otp) self.assertEqual(result, '4ad1e9f7' * 8) decryped = otp_decrypt_api_key(result, otp) self.assertEqual(decryped, hamlet.api_key) class LoginOrAskForRegistrationTestCase(ZulipTestCase): def test_confirm(self): # type: () -> None request = HostRequestMock() email = 'new@zulip.com' user_profile = None # type: Optional[UserProfile] full_name = 'New User' invalid_subdomain = False result = login_or_register_remote_user( request, email, user_profile, full_name=full_name, invalid_subdomain=invalid_subdomain) self.assert_in_response('No account found for', result) self.assert_in_response('new@zulip.com. Would you like to register instead?', result) def test_invalid_subdomain(self): # type: () -> None request = HostRequestMock() email = 'new@zulip.com' user_profile = None # type: Optional[UserProfile] full_name = 'New User' invalid_subdomain = True response = login_or_register_remote_user( request, email, user_profile, full_name=full_name, invalid_subdomain=invalid_subdomain) self.assertEqual(response.status_code, 302) self.assertIn('/accounts/login/?subdomain=1', response.url) def test_invalid_email(self): # type: () -> None request = HostRequestMock() email = None # type: Optional[Text] user_profile = None # type: Optional[UserProfile] full_name = 'New User' invalid_subdomain = False response = login_or_register_remote_user( request, email, user_profile, full_name=full_name, invalid_subdomain=invalid_subdomain) self.assert_in_response('Please click the following button if ' 'you wish to register', response) def test_login_under_subdomains(self): # type: () -> None request = HostRequestMock() setattr(request, 'session', self.client.session) user_profile = self.example_user('hamlet') user_profile.backend = 'zproject.backends.GitHubAuthBackend' full_name = 'Hamlet' invalid_subdomain = False response = login_or_register_remote_user( request, user_profile.email, user_profile, full_name=full_name, invalid_subdomain=invalid_subdomain) user_id = get_session_dict_user(getattr(request, 'session')) self.assertEqual(user_id, user_profile.id) self.assertEqual(response.status_code, 302) self.assertIn('http://zulip.testserver', response.url)