zulip/zerver/tests/test_mirror_users.py

200 lines
7.2 KiB
Python

from collections import namedtuple
from typing import Any, List
from unittest import mock
from django.db import IntegrityError
from django.utils.timezone import now as timezone_now
from zerver.lib.actions import create_mirror_user_if_needed, get_client
from zerver.lib.create_user import create_user_profile
from zerver.lib.test_classes import ZulipTestCase
from zerver.lib.test_helpers import reset_emails_in_zulip_realm
from zerver.models import UserProfile, get_realm, get_user
from zerver.views.message_send import InvalidMirrorInput, create_mirrored_message_users
class MirroredMessageUsersTest(ZulipTestCase):
def test_invalid_sender(self) -> None:
user = self.example_user("hamlet")
recipients: List[str] = []
Request = namedtuple("Request", ["POST"])
request = Request(POST={}) # no sender
with self.assertRaises(InvalidMirrorInput):
create_mirrored_message_users(request, user, recipients)
def test_invalid_client(self) -> None:
client = get_client(name="banned_mirror") # Invalid!!!
user = self.example_user("hamlet")
sender = user
recipients: List[str] = []
Request = namedtuple("Request", ["POST", "client"])
request = Request(POST=dict(sender=sender.email, type="private"), client=client)
with self.assertRaises(InvalidMirrorInput):
create_mirrored_message_users(request, user, recipients)
def test_invalid_email(self) -> None:
invalid_email = "alice AT example.com"
recipients = [invalid_email]
# We use an MIT user here to maximize code coverage
user = self.mit_user("starnine")
sender = user
Request = namedtuple("Request", ["POST", "client"])
for client_name in ["zephyr_mirror", "irc_mirror", "jabber_mirror"]:
client = get_client(name=client_name)
request = Request(POST=dict(sender=sender.email, type="private"), client=client)
with self.assertRaises(InvalidMirrorInput):
create_mirrored_message_users(request, user, recipients)
@mock.patch(
"DNS.dnslookup",
return_value=[["sipbtest:*:20922:101:Fred Sipb,,,:/mit/sipbtest:/bin/athena/tcsh"]],
)
def test_zephyr_mirror_new_recipient(self, ignored: object) -> None:
"""Test mirror dummy user creation for PM recipients"""
client = get_client(name="zephyr_mirror")
user = self.mit_user("starnine")
sender = self.mit_user("sipbtest")
new_user_email = "bob_the_new_user@mit.edu"
new_user_realm = get_realm("zephyr")
recipients = [user.email, new_user_email]
# Now make the request.
Request = namedtuple("Request", ["POST", "client"])
request = Request(POST=dict(sender=sender.email, type="private"), client=client)
mirror_sender = create_mirrored_message_users(request, user, recipients)
self.assertEqual(mirror_sender, sender)
realm_users = UserProfile.objects.filter(realm=sender.realm)
realm_emails = {user.email for user in realm_users}
self.assertIn(user.email, realm_emails)
self.assertIn(new_user_email, realm_emails)
bob = get_user(new_user_email, new_user_realm)
self.assertTrue(bob.is_mirror_dummy)
@mock.patch(
"DNS.dnslookup",
return_value=[["sipbtest:*:20922:101:Fred Sipb,,,:/mit/sipbtest:/bin/athena/tcsh"]],
)
def test_zephyr_mirror_new_sender(self, ignored: object) -> None:
"""Test mirror dummy user creation for sender when sending to stream"""
client = get_client(name="zephyr_mirror")
user = self.mit_user("starnine")
sender_email = "new_sender@mit.edu"
recipients = ["stream_name"]
# Now make the request.
Request = namedtuple("Request", ["POST", "client"])
request = Request(POST=dict(sender=sender_email, type="stream"), client=client)
mirror_sender = create_mirrored_message_users(request, user, recipients)
assert mirror_sender is not None
self.assertEqual(mirror_sender.email, sender_email)
self.assertTrue(mirror_sender.is_mirror_dummy)
def test_irc_mirror(self) -> None:
reset_emails_in_zulip_realm()
client = get_client(name="irc_mirror")
sender = self.example_user("hamlet")
recipients = [
self.nonreg_email("alice"),
"bob@irc.zulip.com",
self.nonreg_email("cordelia"),
]
# Now make the request.
Request = namedtuple("Request", ["POST", "client"])
request = Request(POST=dict(sender=sender.email, type="private"), client=client)
mirror_sender = create_mirrored_message_users(request, sender, recipients)
self.assertEqual(mirror_sender, sender)
realm_users = UserProfile.objects.filter(realm=sender.realm)
realm_emails = {user.email for user in realm_users}
self.assertIn(self.nonreg_email("alice"), realm_emails)
self.assertIn("bob@irc.zulip.com", realm_emails)
bob = get_user("bob@irc.zulip.com", sender.realm)
self.assertTrue(bob.is_mirror_dummy)
def test_jabber_mirror(self) -> None:
reset_emails_in_zulip_realm()
client = get_client(name="jabber_mirror")
sender = self.example_user("hamlet")
user = sender
recipients = [
self.nonreg_email("alice"),
self.nonreg_email("bob"),
self.nonreg_email("cordelia"),
]
# Now make the request.
Request = namedtuple("Request", ["POST", "client"])
request = Request(POST=dict(sender=sender.email, type="private"), client=client)
mirror_sender = create_mirrored_message_users(request, user, recipients)
self.assertEqual(mirror_sender, sender)
realm_users = UserProfile.objects.filter(realm=sender.realm)
realm_emails = {user.email for user in realm_users}
self.assertIn(self.nonreg_email("alice"), realm_emails)
self.assertIn(self.nonreg_email("bob"), realm_emails)
bob = get_user(self.nonreg_email("bob"), sender.realm)
self.assertTrue(bob.is_mirror_dummy)
def test_create_mirror_user_despite_race(self) -> None:
realm = get_realm("zulip")
email = "fred@example.com"
email_to_full_name = lambda email: "fred"
def create_user(**kwargs: Any) -> UserProfile:
self.assertEqual(kwargs["full_name"], "fred")
self.assertEqual(kwargs["email"], email)
self.assertEqual(kwargs["active"], False)
self.assertEqual(kwargs["is_mirror_dummy"], True)
# We create an actual user here to simulate a race.
# We use the minimal, un-mocked function.
kwargs["bot_type"] = None
kwargs["bot_owner"] = None
kwargs["tos_version"] = None
kwargs["timezone"] = timezone_now()
create_user_profile(**kwargs).save()
raise IntegrityError()
with mock.patch("zerver.lib.actions.create_user", side_effect=create_user) as m:
mirror_fred_user = create_mirror_user_if_needed(
realm,
email,
email_to_full_name,
)
self.assertEqual(mirror_fred_user.delivery_email, email)
m.assert_called()