mirror of https://github.com/zulip/zulip.git
237 lines
11 KiB
Python
237 lines
11 KiB
Python
from __future__ import absolute_import
|
|
from __future__ import print_function
|
|
|
|
import random
|
|
import re
|
|
import ujson
|
|
|
|
from django.conf import settings
|
|
from django.core import mail
|
|
from django.http import HttpResponse
|
|
from django.test import override_settings
|
|
from mock import patch, MagicMock
|
|
from six.moves import range
|
|
from typing import Any, Dict, List, Text
|
|
|
|
from zerver.lib.notifications import handle_missedmessage_emails
|
|
from zerver.lib.actions import render_incoming_message, do_update_message
|
|
from zerver.lib.message import access_message
|
|
from zerver.lib.test_classes import ZulipTestCase
|
|
from zerver.models import get_user_profile_by_email, Recipient, UserMessage, UserProfile
|
|
|
|
class TestMissedMessages(ZulipTestCase):
|
|
def normalize_string(self, s):
|
|
# type: (Text) -> Text
|
|
s = s.strip()
|
|
return re.sub(r'\s+', ' ', s)
|
|
|
|
def _get_tokens(self):
|
|
# type: () -> List[str]
|
|
return [str(random.getrandbits(32)) for _ in range(30)]
|
|
|
|
def _test_cases(self, tokens, msg_id, body, send_as_user):
|
|
# type: (List[str], int, str, bool) -> None
|
|
othello = get_user_profile_by_email('othello@zulip.com')
|
|
hamlet = get_user_profile_by_email('hamlet@zulip.com')
|
|
handle_missedmessage_emails(hamlet.id, [{'message_id': msg_id}])
|
|
if settings.EMAIL_GATEWAY_PATTERN != "":
|
|
reply_to_addresses = [settings.EMAIL_GATEWAY_PATTERN % (u'mm' + t) for t in tokens]
|
|
else:
|
|
reply_to_addresses = ["noreply@example.com"]
|
|
msg = mail.outbox[0]
|
|
sender = 'Zulip <{}>'.format(settings.NOREPLY_EMAIL_ADDRESS)
|
|
from_email = sender
|
|
self.assertEqual(len(mail.outbox), 1)
|
|
if send_as_user:
|
|
from_email = '"%s" <%s>' % (othello.full_name, othello.email)
|
|
self.assertEqual(msg.extra_headers['Sender'], sender)
|
|
else:
|
|
self.assertNotIn("Sender", msg.extra_headers)
|
|
self.assertEqual(msg.from_email, from_email)
|
|
self.assertIn(msg.extra_headers['Reply-To'], reply_to_addresses)
|
|
self.assertIn(body, self.normalize_string(msg.body))
|
|
|
|
@patch('zerver.lib.email_mirror.generate_random_token')
|
|
def _extra_context_in_missed_stream_messages(self, send_as_user, mock_random_token):
|
|
# type: (bool, MagicMock) -> None
|
|
tokens = self._get_tokens()
|
|
mock_random_token.side_effect = tokens
|
|
|
|
for i in range(0, 11):
|
|
self.send_message("othello@zulip.com", "Denmark", Recipient.STREAM, str(i))
|
|
self.send_message("othello@zulip.com", "Denmark", Recipient.STREAM, '11', subject='test2')
|
|
msg_id = self.send_message("othello@zulip.com", "denmark", Recipient.STREAM, '@**hamlet**')
|
|
body = 'Denmark > test Othello, the Moor of Venice 1 2 3 4 5 6 7 8 9 10 @**hamlet**'
|
|
self._test_cases(tokens, msg_id, body, send_as_user)
|
|
|
|
@patch('zerver.lib.email_mirror.generate_random_token')
|
|
def _extra_context_in_personal_missed_stream_messages(self, send_as_user, mock_random_token):
|
|
# type: (bool, MagicMock) -> None
|
|
tokens = self._get_tokens()
|
|
mock_random_token.side_effect = tokens
|
|
|
|
msg_id = self.send_message("othello@zulip.com", "hamlet@zulip.com",
|
|
Recipient.PERSONAL,
|
|
'Extremely personal message!')
|
|
body = 'You and Othello, the Moor of Venice Extremely personal message!'
|
|
self._test_cases(tokens, msg_id, body, send_as_user)
|
|
|
|
@patch('zerver.lib.email_mirror.generate_random_token')
|
|
def _reply_to_email_in_personal_missed_stream_messages(self, send_as_user, mock_random_token):
|
|
# type: (bool, MagicMock) -> None
|
|
tokens = self._get_tokens()
|
|
mock_random_token.side_effect = tokens
|
|
|
|
msg_id = self.send_message("othello@zulip.com", "hamlet@zulip.com",
|
|
Recipient.PERSONAL,
|
|
'Extremely personal message!')
|
|
body = 'Or just reply to this email.'
|
|
self._test_cases(tokens, msg_id, body, send_as_user)
|
|
|
|
@patch('zerver.lib.email_mirror.generate_random_token')
|
|
def _reply_warning_in_personal_missed_stream_messages(self, send_as_user, mock_random_token):
|
|
# type: (bool, MagicMock) -> None
|
|
tokens = self._get_tokens()
|
|
mock_random_token.side_effect = tokens
|
|
|
|
msg_id = self.send_message("othello@zulip.com", "hamlet@zulip.com",
|
|
Recipient.PERSONAL,
|
|
'Extremely personal message!')
|
|
body = 'Please do not reply to this automated message.'
|
|
self._test_cases(tokens, msg_id, body, send_as_user)
|
|
|
|
@patch('zerver.lib.email_mirror.generate_random_token')
|
|
def _extra_context_in_huddle_missed_stream_messages(self, send_as_user, mock_random_token):
|
|
# type: (bool, MagicMock) -> None
|
|
tokens = self._get_tokens()
|
|
mock_random_token.side_effect = tokens
|
|
|
|
msg_id = self.send_message("othello@zulip.com",
|
|
["hamlet@zulip.com", "iago@zulip.com"],
|
|
Recipient.PERSONAL,
|
|
'Group personal message!')
|
|
|
|
body = ('You and Iago, Othello, the Moor of Venice Othello,'
|
|
' the Moor of Venice Group personal message')
|
|
self._test_cases(tokens, msg_id, body, send_as_user)
|
|
|
|
@patch('zerver.lib.email_mirror.generate_random_token')
|
|
def _deleted_message_in_missed_stream_messages(self, send_as_user, mock_random_token):
|
|
# type: (bool, MagicMock) -> None
|
|
tokens = self._get_tokens()
|
|
mock_random_token.side_effect = tokens
|
|
|
|
msg_id = self.send_message("othello@zulip.com", "denmark", Recipient.STREAM,
|
|
'@**hamlet** to be deleted')
|
|
|
|
othello = get_user_profile_by_email('othello@zulip.com')
|
|
hamlet = get_user_profile_by_email('hamlet@zulip.com')
|
|
self.login("othello@zulip.com")
|
|
result = self.client_patch('/json/messages/' + str(msg_id),
|
|
{'message_id': msg_id, 'content': ' ', 'user_profile': othello})
|
|
self.assert_json_success(result)
|
|
handle_missedmessage_emails(hamlet.id, [{'message_id': msg_id}])
|
|
self.assertEqual(len(mail.outbox), 0)
|
|
|
|
@patch('zerver.lib.email_mirror.generate_random_token')
|
|
def _deleted_message_in_personal_missed_stream_messages(self, send_as_user, mock_random_token):
|
|
# type: (bool, MagicMock) -> None
|
|
tokens = self._get_tokens()
|
|
mock_random_token.side_effect = tokens
|
|
|
|
msg_id = self.send_message("othello@zulip.com", "hamlet@zulip.com", Recipient.PERSONAL,
|
|
'Extremely personal message! to be deleted!')
|
|
|
|
othello = get_user_profile_by_email('othello@zulip.com')
|
|
hamlet = get_user_profile_by_email('hamlet@zulip.com')
|
|
self.login("othello@zulip.com")
|
|
result = self.client_patch('/json/messages/' + str(msg_id),
|
|
{'message_id': msg_id, 'content': ' ', 'user_profile': othello})
|
|
self.assert_json_success(result)
|
|
handle_missedmessage_emails(hamlet.id, [{'message_id': msg_id}])
|
|
self.assertEqual(len(mail.outbox), 0)
|
|
|
|
@patch('zerver.lib.email_mirror.generate_random_token')
|
|
def _deleted_message_in_huddle_missed_stream_messages(self, send_as_user, mock_random_token):
|
|
# type: (bool, MagicMock) -> None
|
|
tokens = self._get_tokens()
|
|
mock_random_token.side_effect = tokens
|
|
|
|
msg_id = self.send_message("othello@zulip.com", ["hamlet@zulip.com", "iago@zulip.com"],
|
|
Recipient.PERSONAL, 'Group personal message!')
|
|
|
|
othello = get_user_profile_by_email('othello@zulip.com')
|
|
hamlet = get_user_profile_by_email('hamlet@zulip.com')
|
|
iago = get_user_profile_by_email('iago@zulip.com')
|
|
self.login("othello@zulip.com")
|
|
result = self.client_patch('/json/messages/' + str(msg_id),
|
|
{'message_id': msg_id, 'content': ' ', 'user_profile': othello})
|
|
self.assert_json_success(result)
|
|
handle_missedmessage_emails(hamlet.id, [{'message_id': msg_id}])
|
|
self.assertEqual(len(mail.outbox), 0)
|
|
handle_missedmessage_emails(iago.id, [{'message_id': msg_id}])
|
|
self.assertEqual(len(mail.outbox), 0)
|
|
|
|
@override_settings(SEND_MISSED_MESSAGE_EMAILS_AS_USER=True)
|
|
def test_extra_context_in_missed_stream_messages_as_user(self):
|
|
# type: () -> None
|
|
self._extra_context_in_missed_stream_messages(True)
|
|
|
|
def test_extra_context_in_missed_stream_messages(self):
|
|
# type: () -> None
|
|
self._extra_context_in_missed_stream_messages(False)
|
|
|
|
def test_reply_to_email_in_personal_missed_stream_messages(self):
|
|
# type: () -> None
|
|
self._reply_to_email_in_personal_missed_stream_messages(False)
|
|
|
|
@override_settings(EMAIL_GATEWAY_PATTERN="")
|
|
def test_reply_warning_in_personal_missed_stream_messages(self):
|
|
# type: () -> None
|
|
self._reply_warning_in_personal_missed_stream_messages(False)
|
|
|
|
@override_settings(SEND_MISSED_MESSAGE_EMAILS_AS_USER=True)
|
|
def test_extra_context_in_personal_missed_stream_messages_as_user(self):
|
|
# type: () -> None
|
|
self._extra_context_in_personal_missed_stream_messages(True)
|
|
|
|
def test_extra_context_in_personal_missed_stream_messages(self):
|
|
# type: () -> None
|
|
self._extra_context_in_personal_missed_stream_messages(False)
|
|
|
|
@override_settings(SEND_MISSED_MESSAGE_EMAILS_AS_USER=True)
|
|
def test_extra_context_in_huddle_missed_stream_messages_as_user(self):
|
|
# type: () -> None
|
|
self._extra_context_in_huddle_missed_stream_messages(True)
|
|
|
|
def test_extra_context_in_huddle_missed_stream_messages(self):
|
|
# type: () -> None
|
|
self._extra_context_in_huddle_missed_stream_messages(False)
|
|
|
|
@override_settings(SEND_MISSED_MESSAGE_EMAILS_AS_USER=True)
|
|
def test_deleted_message_in_missed_stream_messages_as_user(self):
|
|
# type: () -> None
|
|
self._deleted_message_in_missed_stream_messages(True)
|
|
|
|
def test_deleted_message_in_missed_stream_messages(self):
|
|
# type: () -> None
|
|
self._deleted_message_in_missed_stream_messages(False)
|
|
|
|
@override_settings(SEND_MISSED_MESSAGE_EMAILS_AS_USER=True)
|
|
def test_deleted_message_in_personal_missed_stream_messages_as_user(self):
|
|
# type: () -> None
|
|
self._deleted_message_in_personal_missed_stream_messages(True)
|
|
|
|
def test_deleted_message_in_personal_missed_stream_messages(self):
|
|
# type: () -> None
|
|
self._deleted_message_in_personal_missed_stream_messages(False)
|
|
|
|
@override_settings(SEND_MISSED_MESSAGE_EMAILS_AS_USER=True)
|
|
def test_deleted_message_in_huddle_missed_stream_messages_as_user(self):
|
|
# type: () -> None
|
|
self._deleted_message_in_huddle_missed_stream_messages(True)
|
|
|
|
def test_deleted_message_in_huddle_missed_stream_messages(self):
|
|
# type: () -> None
|
|
self._deleted_message_in_huddle_missed_stream_messages(False)
|