2020-07-04 22:16:52 +02:00
|
|
|
from collections import namedtuple
|
2020-07-05 20:22:29 +02:00
|
|
|
from typing import Any, List
|
2020-07-04 22:16:52 +02:00
|
|
|
from unittest import mock
|
|
|
|
|
2020-07-05 20:22:29 +02:00
|
|
|
from django.db import IntegrityError
|
|
|
|
from django.utils.timezone import now as timezone_now
|
|
|
|
|
|
|
|
from zerver.lib.actions import create_mirror_user_if_needed, get_client
|
|
|
|
from zerver.lib.create_user import create_user_profile
|
2020-07-04 22:16:52 +02:00
|
|
|
from zerver.lib.test_classes import ZulipTestCase
|
|
|
|
from zerver.lib.test_helpers import reset_emails_in_zulip_realm
|
|
|
|
from zerver.models import UserProfile, get_realm, get_user
|
|
|
|
from zerver.views.message_send import InvalidMirrorInput, create_mirrored_message_users
|
|
|
|
|
|
|
|
|
|
|
|
class MirroredMessageUsersTest(ZulipTestCase):
|
|
|
|
def test_invalid_sender(self) -> None:
|
|
|
|
user = self.example_user('hamlet')
|
|
|
|
recipients: List[str] = []
|
|
|
|
|
|
|
|
Request = namedtuple('Request', ['POST'])
|
2020-09-02 08:14:51 +02:00
|
|
|
request = Request(POST={}) # no sender
|
2020-07-04 22:16:52 +02:00
|
|
|
|
|
|
|
with self.assertRaises(InvalidMirrorInput):
|
|
|
|
create_mirrored_message_users(request, user, recipients)
|
|
|
|
|
|
|
|
def test_invalid_client(self) -> None:
|
|
|
|
client = get_client(name='banned_mirror') # Invalid!!!
|
|
|
|
|
|
|
|
user = self.example_user('hamlet')
|
|
|
|
sender = user
|
|
|
|
|
|
|
|
recipients: List[str] = []
|
|
|
|
|
|
|
|
Request = namedtuple('Request', ['POST', 'client'])
|
|
|
|
request = Request(POST = dict(sender=sender.email, type='private'),
|
|
|
|
client = client)
|
|
|
|
|
|
|
|
with self.assertRaises(InvalidMirrorInput):
|
|
|
|
create_mirrored_message_users(request, user, recipients)
|
|
|
|
|
|
|
|
def test_invalid_email(self) -> None:
|
|
|
|
invalid_email = 'alice AT example.com'
|
|
|
|
recipients = [invalid_email]
|
|
|
|
|
|
|
|
# We use an MIT user here to maximize code coverage
|
|
|
|
user = self.mit_user('starnine')
|
|
|
|
sender = user
|
|
|
|
|
|
|
|
Request = namedtuple('Request', ['POST', 'client'])
|
|
|
|
|
|
|
|
for client_name in ['zephyr_mirror', 'irc_mirror', 'jabber_mirror']:
|
|
|
|
client = get_client(name=client_name)
|
|
|
|
|
|
|
|
request = Request(POST = dict(sender=sender.email, type='private'),
|
|
|
|
client = client)
|
|
|
|
|
|
|
|
with self.assertRaises(InvalidMirrorInput):
|
|
|
|
create_mirrored_message_users(request, user, recipients)
|
|
|
|
|
|
|
|
@mock.patch('DNS.dnslookup', return_value=[['sipbtest:*:20922:101:Fred Sipb,,,:/mit/sipbtest:/bin/athena/tcsh']])
|
|
|
|
def test_zephyr_mirror_new_recipient(self, ignored: object) -> None:
|
|
|
|
"""Test mirror dummy user creation for PM recipients"""
|
|
|
|
client = get_client(name='zephyr_mirror')
|
|
|
|
|
|
|
|
user = self.mit_user('starnine')
|
|
|
|
sender = self.mit_user('sipbtest')
|
|
|
|
new_user_email = 'bob_the_new_user@mit.edu'
|
|
|
|
new_user_realm = get_realm("zephyr")
|
|
|
|
|
|
|
|
recipients = [user.email, new_user_email]
|
|
|
|
|
|
|
|
# Now make the request.
|
|
|
|
Request = namedtuple('Request', ['POST', 'client'])
|
|
|
|
request = Request(POST = dict(sender=sender.email, type='private'),
|
|
|
|
client = client)
|
|
|
|
|
|
|
|
mirror_sender = create_mirrored_message_users(request, user, recipients)
|
|
|
|
|
|
|
|
self.assertEqual(mirror_sender, sender)
|
|
|
|
|
|
|
|
realm_users = UserProfile.objects.filter(realm=sender.realm)
|
|
|
|
realm_emails = {user.email for user in realm_users}
|
|
|
|
self.assertIn(user.email, realm_emails)
|
|
|
|
self.assertIn(new_user_email, realm_emails)
|
|
|
|
|
|
|
|
bob = get_user(new_user_email, new_user_realm)
|
|
|
|
self.assertTrue(bob.is_mirror_dummy)
|
|
|
|
|
|
|
|
@mock.patch('DNS.dnslookup', return_value=[['sipbtest:*:20922:101:Fred Sipb,,,:/mit/sipbtest:/bin/athena/tcsh']])
|
|
|
|
def test_zephyr_mirror_new_sender(self, ignored: object) -> None:
|
|
|
|
"""Test mirror dummy user creation for sender when sending to stream"""
|
|
|
|
client = get_client(name='zephyr_mirror')
|
|
|
|
|
|
|
|
user = self.mit_user('starnine')
|
|
|
|
sender_email = 'new_sender@mit.edu'
|
|
|
|
|
|
|
|
recipients = ['stream_name']
|
|
|
|
|
|
|
|
# Now make the request.
|
|
|
|
Request = namedtuple('Request', ['POST', 'client'])
|
|
|
|
request = Request(POST = dict(sender=sender_email, type='stream'),
|
|
|
|
client = client)
|
|
|
|
|
|
|
|
mirror_sender = create_mirrored_message_users(request, user, recipients)
|
|
|
|
|
|
|
|
assert(mirror_sender is not None)
|
|
|
|
self.assertEqual(mirror_sender.email, sender_email)
|
|
|
|
self.assertTrue(mirror_sender.is_mirror_dummy)
|
|
|
|
|
|
|
|
def test_irc_mirror(self) -> None:
|
|
|
|
reset_emails_in_zulip_realm()
|
|
|
|
client = get_client(name='irc_mirror')
|
|
|
|
|
|
|
|
sender = self.example_user('hamlet')
|
|
|
|
|
|
|
|
recipients = [self.nonreg_email('alice'), 'bob@irc.zulip.com', self.nonreg_email('cordelia')]
|
|
|
|
|
|
|
|
# Now make the request.
|
|
|
|
Request = namedtuple('Request', ['POST', 'client'])
|
|
|
|
request = Request(POST = dict(sender=sender.email, type='private'),
|
|
|
|
client = client)
|
|
|
|
|
|
|
|
mirror_sender = create_mirrored_message_users(request, sender, recipients)
|
|
|
|
|
|
|
|
self.assertEqual(mirror_sender, sender)
|
|
|
|
|
|
|
|
realm_users = UserProfile.objects.filter(realm=sender.realm)
|
|
|
|
realm_emails = {user.email for user in realm_users}
|
|
|
|
self.assertIn(self.nonreg_email('alice'), realm_emails)
|
|
|
|
self.assertIn('bob@irc.zulip.com', realm_emails)
|
|
|
|
|
|
|
|
bob = get_user('bob@irc.zulip.com', sender.realm)
|
|
|
|
self.assertTrue(bob.is_mirror_dummy)
|
|
|
|
|
|
|
|
def test_jabber_mirror(self) -> None:
|
|
|
|
reset_emails_in_zulip_realm()
|
|
|
|
client = get_client(name='jabber_mirror')
|
|
|
|
|
|
|
|
sender = self.example_user('hamlet')
|
|
|
|
user = sender
|
|
|
|
|
|
|
|
recipients = [self.nonreg_email('alice'), self.nonreg_email('bob'), self.nonreg_email('cordelia')]
|
|
|
|
|
|
|
|
# Now make the request.
|
|
|
|
Request = namedtuple('Request', ['POST', 'client'])
|
|
|
|
request = Request(POST = dict(sender=sender.email, type='private'),
|
|
|
|
client = client)
|
|
|
|
|
|
|
|
mirror_sender = create_mirrored_message_users(request, user, recipients)
|
|
|
|
|
|
|
|
self.assertEqual(mirror_sender, sender)
|
|
|
|
|
|
|
|
realm_users = UserProfile.objects.filter(realm=sender.realm)
|
|
|
|
realm_emails = {user.email for user in realm_users}
|
|
|
|
self.assertIn(self.nonreg_email('alice'), realm_emails)
|
|
|
|
self.assertIn(self.nonreg_email('bob'), realm_emails)
|
|
|
|
|
|
|
|
bob = get_user(self.nonreg_email('bob'), sender.realm)
|
|
|
|
self.assertTrue(bob.is_mirror_dummy)
|
2020-07-05 20:22:29 +02:00
|
|
|
|
|
|
|
def test_create_mirror_user_despite_race(self) -> None:
|
|
|
|
realm = get_realm('zulip')
|
|
|
|
|
|
|
|
email = 'fred@example.com'
|
|
|
|
|
|
|
|
email_to_full_name = lambda email: 'fred'
|
|
|
|
|
|
|
|
def create_user(**kwargs: Any) -> UserProfile:
|
|
|
|
self.assertEqual(kwargs['full_name'], 'fred')
|
|
|
|
self.assertEqual(kwargs['email'], email)
|
|
|
|
self.assertEqual(kwargs['active'], False)
|
|
|
|
self.assertEqual(kwargs['is_mirror_dummy'], True)
|
|
|
|
# We create an actual user here to simulate a race.
|
|
|
|
# We use the minimal, un-mocked function.
|
|
|
|
kwargs['bot_type'] = None
|
|
|
|
kwargs['bot_owner'] = None
|
|
|
|
kwargs['tos_version'] = None
|
|
|
|
kwargs['timezone'] = timezone_now()
|
|
|
|
create_user_profile(**kwargs).save()
|
|
|
|
raise IntegrityError()
|
|
|
|
|
|
|
|
with mock.patch('zerver.lib.actions.create_user',
|
|
|
|
side_effect=create_user) as m:
|
|
|
|
mirror_fred_user = create_mirror_user_if_needed(
|
|
|
|
realm,
|
|
|
|
email,
|
|
|
|
email_to_full_name,
|
|
|
|
)
|
|
|
|
|
|
|
|
self.assertEqual(mirror_fred_user.delivery_email, email)
|
|
|
|
m.assert_called()
|