from collections import namedtuple from typing import Any, List from unittest import mock from django.db import IntegrityError from django.utils.timezone import now as timezone_now from zerver.lib.actions import create_mirror_user_if_needed, get_client from zerver.lib.create_user import create_user_profile from zerver.lib.test_classes import ZulipTestCase from zerver.lib.test_helpers import reset_emails_in_zulip_realm from zerver.models import UserProfile, get_realm, get_user from zerver.views.message_send import InvalidMirrorInput, create_mirrored_message_users class MirroredMessageUsersTest(ZulipTestCase): def test_invalid_sender(self) -> None: user = self.example_user("hamlet") recipients: List[str] = [] Request = namedtuple("Request", ["POST"]) request = Request(POST={}) # no sender with self.assertRaises(InvalidMirrorInput): create_mirrored_message_users(request, user, recipients) def test_invalid_client(self) -> None: client = get_client(name="banned_mirror") # Invalid!!! user = self.example_user("hamlet") sender = user recipients: List[str] = [] Request = namedtuple("Request", ["POST", "client"]) request = Request(POST=dict(sender=sender.email, type="private"), client=client) with self.assertRaises(InvalidMirrorInput): create_mirrored_message_users(request, user, recipients) def test_invalid_email(self) -> None: invalid_email = "alice AT example.com" recipients = [invalid_email] # We use an MIT user here to maximize code coverage user = self.mit_user("starnine") sender = user Request = namedtuple("Request", ["POST", "client"]) for client_name in ["zephyr_mirror", "irc_mirror", "jabber_mirror"]: client = get_client(name=client_name) request = Request(POST=dict(sender=sender.email, type="private"), client=client) with self.assertRaises(InvalidMirrorInput): create_mirrored_message_users(request, user, recipients) @mock.patch( "DNS.dnslookup", return_value=[["sipbtest:*:20922:101:Fred Sipb,,,:/mit/sipbtest:/bin/athena/tcsh"]], ) def test_zephyr_mirror_new_recipient(self, ignored: object) -> None: """Test mirror dummy user creation for PM recipients""" client = get_client(name="zephyr_mirror") user = self.mit_user("starnine") sender = self.mit_user("sipbtest") new_user_email = "bob_the_new_user@mit.edu" new_user_realm = get_realm("zephyr") recipients = [user.email, new_user_email] # Now make the request. Request = namedtuple("Request", ["POST", "client"]) request = Request(POST=dict(sender=sender.email, type="private"), client=client) mirror_sender = create_mirrored_message_users(request, user, recipients) self.assertEqual(mirror_sender, sender) realm_users = UserProfile.objects.filter(realm=sender.realm) realm_emails = {user.email for user in realm_users} self.assertIn(user.email, realm_emails) self.assertIn(new_user_email, realm_emails) bob = get_user(new_user_email, new_user_realm) self.assertTrue(bob.is_mirror_dummy) @mock.patch( "DNS.dnslookup", return_value=[["sipbtest:*:20922:101:Fred Sipb,,,:/mit/sipbtest:/bin/athena/tcsh"]], ) def test_zephyr_mirror_new_sender(self, ignored: object) -> None: """Test mirror dummy user creation for sender when sending to stream""" client = get_client(name="zephyr_mirror") user = self.mit_user("starnine") sender_email = "new_sender@mit.edu" recipients = ["stream_name"] # Now make the request. Request = namedtuple("Request", ["POST", "client"]) request = Request(POST=dict(sender=sender_email, type="stream"), client=client) mirror_sender = create_mirrored_message_users(request, user, recipients) assert mirror_sender is not None self.assertEqual(mirror_sender.email, sender_email) self.assertTrue(mirror_sender.is_mirror_dummy) def test_irc_mirror(self) -> None: reset_emails_in_zulip_realm() client = get_client(name="irc_mirror") sender = self.example_user("hamlet") recipients = [ self.nonreg_email("alice"), "bob@irc.zulip.com", self.nonreg_email("cordelia"), ] # Now make the request. Request = namedtuple("Request", ["POST", "client"]) request = Request(POST=dict(sender=sender.email, type="private"), client=client) mirror_sender = create_mirrored_message_users(request, sender, recipients) self.assertEqual(mirror_sender, sender) realm_users = UserProfile.objects.filter(realm=sender.realm) realm_emails = {user.email for user in realm_users} self.assertIn(self.nonreg_email("alice"), realm_emails) self.assertIn("bob@irc.zulip.com", realm_emails) bob = get_user("bob@irc.zulip.com", sender.realm) self.assertTrue(bob.is_mirror_dummy) def test_jabber_mirror(self) -> None: reset_emails_in_zulip_realm() client = get_client(name="jabber_mirror") sender = self.example_user("hamlet") user = sender recipients = [ self.nonreg_email("alice"), self.nonreg_email("bob"), self.nonreg_email("cordelia"), ] # Now make the request. Request = namedtuple("Request", ["POST", "client"]) request = Request(POST=dict(sender=sender.email, type="private"), client=client) mirror_sender = create_mirrored_message_users(request, user, recipients) self.assertEqual(mirror_sender, sender) realm_users = UserProfile.objects.filter(realm=sender.realm) realm_emails = {user.email for user in realm_users} self.assertIn(self.nonreg_email("alice"), realm_emails) self.assertIn(self.nonreg_email("bob"), realm_emails) bob = get_user(self.nonreg_email("bob"), sender.realm) self.assertTrue(bob.is_mirror_dummy) def test_create_mirror_user_despite_race(self) -> None: realm = get_realm("zulip") email = "fred@example.com" email_to_full_name = lambda email: "fred" def create_user(**kwargs: Any) -> UserProfile: self.assertEqual(kwargs["full_name"], "fred") self.assertEqual(kwargs["email"], email) self.assertEqual(kwargs["active"], False) self.assertEqual(kwargs["is_mirror_dummy"], True) # We create an actual user here to simulate a race. # We use the minimal, un-mocked function. kwargs["bot_type"] = None kwargs["bot_owner"] = None kwargs["tos_version"] = None kwargs["timezone"] = timezone_now() create_user_profile(**kwargs).save() raise IntegrityError() with mock.patch("zerver.lib.actions.create_user", side_effect=create_user) as m: mirror_fred_user = create_mirror_user_if_needed( realm, email, email_to_full_name, ) self.assertEqual(mirror_fred_user.delivery_email, email) m.assert_called()