zulip/zerver/tests/test_messages.py

4298 lines
188 KiB
Python

# -*- coding: utf-8 -*-
from django.db import IntegrityError
from django.db.models import Q
from django.conf import settings
from django.http import HttpResponse
from django.test import TestCase, override_settings
from django.utils.timezone import now as timezone_now
from zerver.lib import bugdown
from zerver.decorator import JsonableError
from zerver.lib.test_runner import slow
from zerver.lib.cache import get_stream_cache_key, cache_delete
from zerver.lib.addressee import Addressee
from zerver.lib.actions import (
check_message,
check_send_stream_message,
create_mirror_user_if_needed,
do_add_alert_words,
do_change_stream_invite_only,
do_create_user,
do_deactivate_user,
do_send_messages,
do_update_message,
do_set_realm_property,
extract_recipients,
get_active_presence_idle_user_ids,
get_client,
get_last_message_id,
get_user_info_for_message_updates,
internal_prep_private_message,
internal_prep_stream_message_by_name,
internal_send_huddle_message,
internal_send_message,
internal_send_private_message,
internal_send_stream_message,
internal_send_stream_message_by_name,
send_rate_limited_pm_notification_to_bot_owner,
)
from zerver.lib.create_user import (
create_user_profile,
)
from zerver.lib.message import (
MessageDict,
bulk_access_messages,
get_first_visible_message_id,
get_raw_unread_data,
get_recent_private_conversations,
maybe_update_first_visible_message_id,
messages_for_ids,
sew_messages_and_reactions,
update_first_visible_message_id,
)
from zerver.lib.test_helpers import (
get_user_messages,
make_client,
message_stream_count,
most_recent_message,
most_recent_usermessage,
queries_captured,
get_subscription,
)
from zerver.lib.test_classes import (
ZulipTestCase,
)
from zerver.lib.topic import (
LEGACY_PREV_TOPIC,
DB_TOPIC_NAME,
)
from zerver.lib.soft_deactivation import (
add_missing_messages,
do_soft_activate_users,
do_soft_deactivate_users,
reactivate_user_if_soft_deactivated,
)
from zerver.models import (
MAX_MESSAGE_LENGTH, MAX_TOPIC_NAME_LENGTH,
Message, Realm, Recipient, Stream, UserMessage, UserProfile, Attachment,
RealmAuditLog, RealmDomain, get_realm, UserPresence, Subscription,
get_stream, get_stream_recipient, get_system_bot, get_user, Reaction,
flush_per_request_caches, ScheduledMessage, get_huddle_recipient
)
from zerver.lib.timestamp import convert_to_UTC, datetime_to_timestamp
from zerver.lib.timezone import get_timezone
from zerver.lib.upload import create_attachment
from zerver.lib.url_encoding import near_message_url
from zerver.views.messages import create_mirrored_message_users
from analytics.lib.counts import COUNT_STATS
from analytics.models import RealmCount
import datetime
import mock
import time
import ujson
from typing import Any, Dict, List, Optional, Set
from collections import namedtuple
class MiscMessageTest(ZulipTestCase):
def test_get_last_message_id(self) -> None:
self.assertEqual(
get_last_message_id(),
Message.objects.latest('id').id
)
Message.objects.all().delete()
self.assertEqual(get_last_message_id(), -1)
class TopicHistoryTest(ZulipTestCase):
def test_topics_history_zephyr_mirror(self) -> None:
user_profile = self.mit_user('sipbtest')
stream_name = 'new_stream'
# Send a message to this new stream from another user
self.subscribe(self.mit_user("starnine"), stream_name)
stream = get_stream(stream_name, user_profile.realm)
self.send_stream_message(self.mit_email("starnine"), stream_name,
topic_name="secret topic", sender_realm="zephyr")
# Now subscribe this MIT user to the new stream and verify
# that the new topic is not accessible
self.login(user_profile.email, realm=user_profile.realm)
self.subscribe(user_profile, stream_name)
endpoint = '/json/users/me/%d/topics' % (stream.id,)
result = self.client_get(endpoint, dict(), subdomain="zephyr")
self.assert_json_success(result)
history = result.json()['topics']
self.assertEqual(history, [])
def test_topics_history(self) -> None:
# verified: int(UserMessage.flags.read) == 1
user_profile = self.example_user('iago')
email = user_profile.email
stream_name = 'Verona'
self.login(email)
stream = get_stream(stream_name, user_profile.realm)
recipient = get_stream_recipient(stream.id)
def create_test_message(topic: str) -> int:
# TODO: Clean this up to send messages the normal way.
hamlet = self.example_user('hamlet')
message = Message(
sender=hamlet,
recipient=recipient,
content='whatever',
pub_date=timezone_now(),
sending_client=get_client('whatever'),
)
message.set_topic_name(topic)
message.save()
UserMessage.objects.create(
user_profile=user_profile,
message=message,
flags=0,
)
return message.id
# our most recent topics are topic0, topic1, topic2
# Create old messages with strange spellings.
create_test_message('topic2')
create_test_message('toPIc1')
create_test_message('toPIc0')
create_test_message('topic2')
create_test_message('topic2')
create_test_message('Topic2')
# Create new messages
topic2_msg_id = create_test_message('topic2')
create_test_message('topic1')
create_test_message('topic1')
topic1_msg_id = create_test_message('topic1')
topic0_msg_id = create_test_message('topic0')
endpoint = '/json/users/me/%d/topics' % (stream.id,)
result = self.client_get(endpoint, dict())
self.assert_json_success(result)
history = result.json()['topics']
# We only look at the most recent three topics, because
# the prior fixture data may be unreliable.
history = history[:3]
self.assertEqual([topic['name'] for topic in history], [
'topic0',
'topic1',
'topic2',
])
self.assertEqual([topic['max_id'] for topic in history], [
topic0_msg_id,
topic1_msg_id,
topic2_msg_id,
])
# Now try as cordelia, who we imagine as a totally new user in
# that she doesn't have UserMessage rows. We should see the
# same results for a public stream.
self.login(self.example_email("cordelia"))
result = self.client_get(endpoint, dict())
self.assert_json_success(result)
history = result.json()['topics']
# We only look at the most recent three topics, because
# the prior fixture data may be unreliable.
history = history[:3]
self.assertEqual([topic['name'] for topic in history], [
'topic0',
'topic1',
'topic2',
])
self.assertIn('topic0', [topic['name'] for topic in history])
self.assertEqual([topic['max_id'] for topic in history], [
topic0_msg_id,
topic1_msg_id,
topic2_msg_id,
])
# Now make stream private, but subscribe cordelia
do_change_stream_invite_only(stream, True)
self.subscribe(self.example_user("cordelia"), stream.name)
result = self.client_get(endpoint, dict())
self.assert_json_success(result)
history = result.json()['topics']
history = history[:3]
# Cordelia doesn't have these recent history items when we
# wasn't subscribed in her results.
self.assertNotIn('topic0', [topic['name'] for topic in history])
self.assertNotIn('topic1', [topic['name'] for topic in history])
self.assertNotIn('topic2', [topic['name'] for topic in history])
def test_bad_stream_id(self) -> None:
email = self.example_email("iago")
self.login(email)
# non-sensible stream id
endpoint = '/json/users/me/9999999999/topics'
result = self.client_get(endpoint, dict())
self.assert_json_error(result, 'Invalid stream id')
# out of realm
bad_stream = self.make_stream(
'mit_stream',
realm=get_realm('zephyr')
)
endpoint = '/json/users/me/%s/topics' % (bad_stream.id,)
result = self.client_get(endpoint, dict())
self.assert_json_error(result, 'Invalid stream id')
# private stream to which I am not subscribed
private_stream = self.make_stream(
'private_stream',
invite_only=True
)
endpoint = '/json/users/me/%s/topics' % (private_stream.id,)
result = self.client_get(endpoint, dict())
self.assert_json_error(result, 'Invalid stream id')
class TopicDeleteTest(ZulipTestCase):
def test_topic_delete(self) -> None:
initial_last_msg_id = self.get_last_message().id
stream_name = 'new_stream'
topic_name = 'new topic 2'
# NON-ADMIN USER
user_profile = self.example_user('hamlet')
self.subscribe(user_profile, stream_name)
# Send message
stream = get_stream(stream_name, user_profile.realm)
self.send_stream_message(user_profile.email, stream_name, topic_name=topic_name)
last_msg_id = self.send_stream_message(user_profile.email, stream_name, topic_name=topic_name)
# Deleting the topic
self.login(user_profile.email, realm=user_profile.realm)
endpoint = '/json/streams/' + str(stream.id) + '/delete_topic'
result = self.client_post(endpoint, {
"topic_name": topic_name
})
self.assert_json_error(result, "Must be an organization administrator")
self.assertEqual(self.get_last_message().id, last_msg_id)
# Make stream private with limited history
do_change_stream_invite_only(stream, invite_only=True,
history_public_to_subscribers=False)
# ADMIN USER subscribed now
user_profile = self.example_user('iago')
self.subscribe(user_profile, stream_name)
self.login(user_profile.email, realm=user_profile.realm)
new_last_msg_id = self.send_stream_message(user_profile.email, stream_name, topic_name=topic_name)
# Now admin deletes all messages in topic -- which should only
# delete new_last_msg_id, i.e. the one sent since they joined.
self.assertEqual(self.get_last_message().id, new_last_msg_id)
result = self.client_post(endpoint, {
"topic_name": topic_name
})
self.assert_json_success(result)
self.assertEqual(self.get_last_message().id, last_msg_id)
# Make the stream's history public to subscribers
do_change_stream_invite_only(stream, invite_only=True,
history_public_to_subscribers=True)
# Delete the topic should now remove all messages
result = self.client_post(endpoint, {
"topic_name": topic_name
})
self.assert_json_success(result)
self.assertEqual(self.get_last_message().id, initial_last_msg_id)
class TestCrossRealmPMs(ZulipTestCase):
def make_realm(self, domain: str) -> Realm:
realm = Realm.objects.create(string_id=domain, invite_required=False)
RealmDomain.objects.create(realm=realm, domain=domain)
return realm
def create_user(self, email: str) -> UserProfile:
subdomain = email.split("@")[1]
self.register(email, 'test', subdomain=subdomain)
return get_user(email, get_realm(subdomain))
@slow("Sends a large number of messages")
@override_settings(CROSS_REALM_BOT_EMAILS=['feedback@zulip.com',
'welcome-bot@zulip.com',
'support@3.example.com'])
def test_realm_scenarios(self) -> None:
self.make_realm('1.example.com')
r2 = self.make_realm('2.example.com')
self.make_realm('3.example.com')
def assert_message_received(to_user: UserProfile, from_user: UserProfile) -> None:
messages = get_user_messages(to_user)
self.assertEqual(messages[-1].sender.id, from_user.id)
def assert_invalid_email() -> Any:
return self.assertRaisesRegex(
JsonableError,
'Invalid email ')
user1_email = 'user1@1.example.com'
user1a_email = 'user1a@1.example.com'
user2_email = 'user2@2.example.com'
user3_email = 'user3@3.example.com'
feedback_email = 'feedback@zulip.com'
support_email = 'support@3.example.com' # note: not zulip.com
user1 = self.create_user(user1_email)
user1a = self.create_user(user1a_email)
user2 = self.create_user(user2_email)
self.create_user(user3_email)
feedback_bot = get_system_bot(feedback_email)
with self.settings(CROSS_REALM_BOT_EMAILS=['feedback@zulip.com', 'welcome-bot@zulip.com']):
# HACK: We should probably be creating this "bot" user another
# way, but since you can't register a user with a
# cross-realm email, we need to hide this for now.
support_bot = self.create_user(support_email)
# Users can PM themselves
self.send_personal_message(user1_email, user1_email, sender_realm="1.example.com")
assert_message_received(user1, user1)
# Users on the same realm can PM each other
self.send_personal_message(user1_email, user1a_email, sender_realm="1.example.com")
assert_message_received(user1a, user1)
# Cross-realm bots in the zulip.com realm can PM any realm
# (They need lower level APIs to do this.)
internal_send_private_message(
realm=r2,
sender=get_system_bot(feedback_email),
recipient_user=get_user(user2_email, r2),
content='bla',
)
assert_message_received(user2, feedback_bot)
# All users can PM cross-realm bots in the zulip.com realm
self.send_personal_message(user1_email, feedback_email, sender_realm="1.example.com")
assert_message_received(feedback_bot, user1)
# Users can PM cross-realm bots on non-zulip realms.
# (The support bot represents some theoretical bot that we may
# create in the future that does not have zulip.com as its realm.)
self.send_personal_message(user1_email, support_email, sender_realm="1.example.com")
assert_message_received(support_bot, user1)
# Allow sending PMs to two different cross-realm bots simultaneously.
# (We don't particularly need this feature, but since users can
# already individually send PMs to cross-realm bots, we shouldn't
# prevent them from sending multiple bots at once. We may revisit
# this if it's a nuisance for huddles.)
self.send_huddle_message(user1_email, [feedback_email, support_email],
sender_realm="1.example.com")
assert_message_received(feedback_bot, user1)
assert_message_received(support_bot, user1)
# Prevent old loophole where I could send PMs to other users as long
# as I copied a cross-realm bot from the same realm.
with assert_invalid_email():
self.send_huddle_message(user1_email, [user3_email, support_email],
sender_realm="1.example.com")
# Users on three different realms can't PM each other,
# even if one of the users is a cross-realm bot.
with assert_invalid_email():
self.send_huddle_message(user1_email, [user2_email, feedback_email],
sender_realm="1.example.com")
with assert_invalid_email():
self.send_huddle_message(feedback_email, [user1_email, user2_email])
# Users on the different realms cannot PM each other
with assert_invalid_email():
self.send_personal_message(user1_email, user2_email, sender_realm="1.example.com")
# Users on non-zulip realms can't PM "ordinary" Zulip users
with assert_invalid_email():
self.send_personal_message(user1_email, "hamlet@zulip.com", sender_realm="1.example.com")
# Users on three different realms cannot PM each other
with assert_invalid_email():
self.send_huddle_message(user1_email, [user2_email, user3_email],
sender_realm="1.example.com")
class TestAddressee(ZulipTestCase):
def test_addressee_for_user_ids(self) -> None:
realm = get_realm('zulip')
user_ids = [self.example_user('cordelia').id,
self.example_user('hamlet').id,
self.example_user('othello').id]
result = Addressee.for_user_ids(user_ids=user_ids, realm=realm)
user_profiles = result.user_profiles()
result_user_ids = [user_profiles[0].id, user_profiles[1].id,
user_profiles[2].id]
self.assertEqual(set(result_user_ids), set(user_ids))
def test_addressee_for_user_ids_nonexistent_id(self) -> None:
def assert_invalid_user_id() -> Any:
return self.assertRaisesRegex(
JsonableError,
'Invalid user ID ')
with assert_invalid_user_id():
Addressee.for_user_ids(user_ids=[779], realm=get_realm('zulip'))
def test_addressee_legacy_build_for_user_ids(self) -> None:
realm = get_realm('zulip')
self.login(self.example_email('hamlet'))
user_ids = [self.example_user('cordelia').id,
self.example_user('othello').id]
result = Addressee.legacy_build(
sender=self.example_user('hamlet'), message_type_name='private',
message_to=user_ids, topic_name='random_topic',
realm=realm
)
user_profiles = result.user_profiles()
result_user_ids = [user_profiles[0].id, user_profiles[1].id]
self.assertEqual(set(result_user_ids), set(user_ids))
def test_addressee_legacy_build_for_stream_id(self) -> None:
realm = get_realm('zulip')
self.login(self.example_email('iago'))
sender = self.example_user('iago')
self.subscribe(sender, "Denmark")
stream = get_stream('Denmark', realm)
result = Addressee.legacy_build(
sender=sender, message_type_name='stream',
message_to=[stream.id], topic_name='random_topic',
realm=realm
)
stream_id = result.stream_id()
self.assertEqual(stream.id, stream_id)
class InternalPrepTest(ZulipTestCase):
def test_returns_for_internal_sends(self) -> None:
# For our internal_send_* functions we return
# if the prep stages fail. This is mostly defensive
# code, since we are generally creating the messages
# ourselves, but we want to make sure that the functions
# won't actually explode if we give them bad content.
bad_content = ''
realm = get_realm('zulip')
cordelia = self.example_user('cordelia')
hamlet = self.example_user('hamlet')
othello = self.example_user('othello')
stream = get_stream('Verona', realm)
with mock.patch('logging.exception') as m:
internal_send_private_message(
realm=realm,
sender=cordelia,
recipient_user=hamlet,
content=bad_content,
)
arg = m.call_args_list[0][0][0]
self.assertIn('Message must not be empty', arg)
with mock.patch('logging.exception') as m:
internal_send_huddle_message(
realm=realm,
sender=cordelia,
emails=[hamlet.email, othello.email],
content=bad_content,
)
arg = m.call_args_list[0][0][0]
self.assertIn('Message must not be empty', arg)
with mock.patch('logging.exception') as m:
internal_send_stream_message(
realm=realm,
sender=cordelia,
topic='whatever',
content=bad_content,
stream=stream
)
arg = m.call_args_list[0][0][0]
self.assertIn('Message must not be empty', arg)
with mock.patch('logging.exception') as m:
internal_send_stream_message_by_name(
realm=realm,
sender=cordelia,
stream_name=stream.name,
topic='whatever',
content=bad_content
)
arg = m.call_args_list[0][0][0]
self.assertIn('Message must not be empty', arg)
with mock.patch('logging.exception') as m:
internal_send_message(
realm=realm,
sender_email=settings.ERROR_BOT,
recipient_type_name='stream',
recipients=stream.name,
topic_name='whatever',
content=bad_content,
)
arg = m.call_args_list[0][0][0]
self.assertIn('Message must not be empty', arg)
def test_error_handling(self) -> None:
realm = get_realm('zulip')
sender = self.example_user('cordelia')
recipient_user = self.example_user('hamlet')
content = 'x' * 15000
result = internal_prep_private_message(
realm=realm,
sender=sender,
recipient_user=recipient_user,
content=content)
message = result['message']
self.assertIn('message was too long', message.content)
with self.assertRaises(RuntimeError):
internal_prep_private_message(
realm=None, # should cause error
sender=sender,
recipient_user=recipient_user,
content=content)
# Simulate sending a message to somebody not in the
# realm of the sender.
recipient_user = self.mit_user('starnine')
with mock.patch('logging.exception') as logging_mock:
result = internal_prep_private_message(
realm=realm,
sender=sender,
recipient_user=recipient_user,
content=content)
arg = logging_mock.call_args_list[0][0][0]
prefix = "Error queueing internal message by cordelia@zulip.com: You can't send private messages outside of your organization."
self.assertTrue(arg.startswith(prefix))
def test_ensure_stream_gets_called(self) -> None:
realm = get_realm('zulip')
sender = self.example_user('cordelia')
stream_name = 'test_stream'
topic = 'whatever'
content = 'hello'
internal_prep_stream_message_by_name(
realm=realm,
sender=sender,
stream_name=stream_name,
topic=topic,
content=content)
# This would throw an error if the stream
# wasn't automatically created.
Stream.objects.get(name=stream_name, realm_id=realm.id)
class ExtractedRecipientsTest(TestCase):
def test_extract_recipients_emails(self) -> None:
# JSON list w/dups, empties, and trailing whitespace
s = ujson.dumps([' alice@zulip.com ', ' bob@zulip.com ', ' ', 'bob@zulip.com'])
# sorted() gets confused by extract_recipients' return type
# For testing, ignorance here is better than manual casting
result = sorted(extract_recipients(s)) # type: ignore
self.assertEqual(result, ['alice@zulip.com', 'bob@zulip.com'])
# simple string with one name
s = 'alice@zulip.com '
self.assertEqual(extract_recipients(s), ['alice@zulip.com'])
# JSON-encoded string
s = '"alice@zulip.com"'
self.assertEqual(extract_recipients(s), ['alice@zulip.com'])
# bare comma-delimited string
s = 'bob@zulip.com, alice@zulip.com'
result = sorted(extract_recipients(s)) # type: ignore
self.assertEqual(result, ['alice@zulip.com', 'bob@zulip.com'])
# JSON-encoded, comma-delimited string
s = '"bob@zulip.com,alice@zulip.com"'
result = sorted(extract_recipients(s)) # type: ignore
self.assertEqual(result, ['alice@zulip.com', 'bob@zulip.com'])
# Invalid data
s = ujson.dumps(dict(color='red'))
with self.assertRaisesRegex(ValueError, 'Invalid data type for recipients'):
extract_recipients(s)
# Empty list
self.assertEqual(extract_recipients([]), [])
# Heterogeneous lists are not supported
mixed = ujson.dumps(['eeshan@example.com', 3, 4])
with self.assertRaisesRegex(TypeError, 'Recipient lists may contain emails or user IDs, but not both.'):
extract_recipients(mixed)
def test_extract_recipient_ids(self) -> None:
# JSON list w/dups
s = ujson.dumps([3, 3, 12])
result = sorted(extract_recipients(s)) # type: ignore
self.assertEqual(result, [3, 12])
# Invalid data
ids = ujson.dumps(dict(recipient=12))
with self.assertRaisesRegex(ValueError, 'Invalid data type for recipients'):
extract_recipients(ids)
# Heterogeneous lists are not supported
mixed = ujson.dumps([3, 4, 'eeshan@example.com'])
with self.assertRaisesRegex(TypeError, 'Recipient lists may contain emails or user IDs, but not both.'):
extract_recipients(mixed)
class PersonalMessagesTest(ZulipTestCase):
def test_near_pm_message_url(self) -> None:
realm = get_realm('zulip')
message = dict(
type='personal',
id=555,
display_recipient=[
dict(id=77),
dict(id=80),
],
)
url = near_message_url(
realm=realm,
message=message,
)
self.assertEqual(url, 'http://zulip.testserver/#narrow/pm-with/77,80-pm/near/555')
def test_is_private_flag_not_leaked(self) -> None:
"""
Make sure `is_private` flag is not leaked to the API.
"""
self.login(self.example_email("hamlet"))
self.send_personal_message(self.example_email("hamlet"),
self.example_email("cordelia"),
"test")
for msg in self.get_messages():
self.assertNotIn('is_private', msg['flags'])
def test_auto_subbed_to_personals(self) -> None:
"""
Newly created users are auto-subbed to the ability to receive
personals.
"""
test_email = self.nonreg_email('test')
self.register(test_email, "test")
user_profile = self.nonreg_user('test')
old_messages_count = message_stream_count(user_profile)
self.send_personal_message(test_email, test_email)
new_messages_count = message_stream_count(user_profile)
self.assertEqual(new_messages_count, old_messages_count + 1)
recipient = Recipient.objects.get(type_id=user_profile.id,
type=Recipient.PERSONAL)
message = most_recent_message(user_profile)
self.assertEqual(message.recipient, recipient)
with mock.patch('zerver.models.get_display_recipient', return_value='recip'):
self.assertEqual(str(message),
'<Message: recip / / '
'<UserProfile: test@zulip.com <Realm: zulip 1>>>')
user_message = most_recent_usermessage(user_profile)
self.assertEqual(str(user_message),
'<UserMessage: recip / test@zulip.com ([])>'
)
@slow("checks several profiles")
def test_personal_to_self(self) -> None:
"""
If you send a personal to yourself, only you see it.
"""
old_user_profiles = list(UserProfile.objects.all())
test_email = self.nonreg_email('test1')
self.register(test_email, "test1")
old_messages = []
for user_profile in old_user_profiles:
old_messages.append(message_stream_count(user_profile))
self.send_personal_message(test_email, test_email)
new_messages = []
for user_profile in old_user_profiles:
new_messages.append(message_stream_count(user_profile))
self.assertEqual(old_messages, new_messages)
user_profile = self.nonreg_user('test1')
recipient = Recipient.objects.get(type_id=user_profile.id, type=Recipient.PERSONAL)
self.assertEqual(most_recent_message(user_profile).recipient, recipient)
def assert_personal(self, sender_email: str, receiver_email: str, content: str="testcontent") -> None:
"""
Send a private message from `sender_email` to `receiver_email` and check
that only those two parties actually received the message.
"""
realm = get_realm('zulip') # Assume realm is always 'zulip'
sender = get_user(sender_email, realm)
receiver = get_user(receiver_email, realm)
sender_messages = message_stream_count(sender)
receiver_messages = message_stream_count(receiver)
other_user_profiles = UserProfile.objects.filter(~Q(email=sender_email) &
~Q(email=receiver_email))
old_other_messages = []
for user_profile in other_user_profiles:
old_other_messages.append(message_stream_count(user_profile))
self.send_personal_message(sender_email, receiver_email, content)
# Users outside the conversation don't get the message.
new_other_messages = []
for user_profile in other_user_profiles:
new_other_messages.append(message_stream_count(user_profile))
self.assertEqual(old_other_messages, new_other_messages)
# The personal message is in the streams of both the sender and receiver.
self.assertEqual(message_stream_count(sender),
sender_messages + 1)
self.assertEqual(message_stream_count(receiver),
receiver_messages + 1)
recipient = Recipient.objects.get(type_id=receiver.id, type=Recipient.PERSONAL)
self.assertEqual(most_recent_message(sender).recipient, recipient)
self.assertEqual(most_recent_message(receiver).recipient, recipient)
def test_personal(self) -> None:
"""
If you send a personal, only you and the recipient see it.
"""
self.login(self.example_email("hamlet"))
self.assert_personal(self.example_email("hamlet"), self.example_email("othello"))
def test_non_ascii_personal(self) -> None:
"""
Sending a PM containing non-ASCII characters succeeds.
"""
self.login(self.example_email("hamlet"))
self.assert_personal(self.example_email("hamlet"), self.example_email("othello"), u"hümbüǵ")
class StreamMessagesTest(ZulipTestCase):
def assert_stream_message(self, stream_name: str, topic_name: str="test topic",
content: str="test content") -> None:
"""
Check that messages sent to a stream reach all subscribers to that stream.
"""
realm = get_realm('zulip')
subscribers = self.users_subscribed_to_stream(stream_name, realm)
# Outgoing webhook bots don't store UserMessage rows; they will be processed later.
subscribers = [subscriber for subscriber in subscribers
if subscriber.bot_type != UserProfile.OUTGOING_WEBHOOK_BOT]
old_subscriber_messages = []
for subscriber in subscribers:
old_subscriber_messages.append(message_stream_count(subscriber))
non_subscribers = [user_profile for user_profile in UserProfile.objects.all()
if user_profile not in subscribers]
old_non_subscriber_messages = []
for non_subscriber in non_subscribers:
old_non_subscriber_messages.append(message_stream_count(non_subscriber))
non_bot_subscribers = [user_profile for user_profile in subscribers
if not user_profile.is_bot]
a_subscriber_email = non_bot_subscribers[0].email
self.login(a_subscriber_email)
self.send_stream_message(a_subscriber_email, stream_name,
content=content, topic_name=topic_name)
# Did all of the subscribers get the message?
new_subscriber_messages = []
for subscriber in subscribers:
new_subscriber_messages.append(message_stream_count(subscriber))
# Did non-subscribers not get the message?
new_non_subscriber_messages = []
for non_subscriber in non_subscribers:
new_non_subscriber_messages.append(message_stream_count(non_subscriber))
self.assertEqual(old_non_subscriber_messages, new_non_subscriber_messages)
self.assertEqual(new_subscriber_messages, [elt + 1 for elt in old_subscriber_messages])
def test_performance(self) -> None:
'''
This test is part of the automated test suite, but
it is more intended as an aid to measuring the
performance of do_send_messages() with consistent
data setup across different commits. You can modify
the values below and run just this test, and then
comment out the print statement toward the bottom.
'''
num_messages = 2
num_extra_users = 10
sender = self.example_user('cordelia')
realm = sender.realm
message_content = 'whatever'
stream = get_stream('Denmark', realm)
topic_name = 'lunch'
recipient = get_stream_recipient(stream.id)
sending_client = make_client(name="test suite")
for i in range(num_extra_users):
# Make every other user be idle.
long_term_idle = i % 2 > 0
email = 'foo%d@example.com' % (i,)
user = UserProfile.objects.create(
realm=realm,
email=email,
pointer=0,
long_term_idle=long_term_idle,
)
Subscription.objects.create(
user_profile=user,
recipient=recipient
)
def send_test_message() -> None:
message = Message(
sender=sender,
recipient=recipient,
content=message_content,
pub_date=timezone_now(),
sending_client=sending_client,
)
message.set_topic_name(topic_name)
do_send_messages([dict(message=message)])
before_um_count = UserMessage.objects.count()
t = time.time()
for i in range(num_messages):
send_test_message()
delay = time.time() - t
assert(delay) # quiet down lint
# print(delay)
after_um_count = UserMessage.objects.count()
ums_created = after_um_count - before_um_count
num_active_users = num_extra_users / 2
self.assertTrue(ums_created > (num_active_users * num_messages))
def test_not_too_many_queries(self) -> None:
recipient_list = [self.example_user("hamlet"), self.example_user("iago"),
self.example_user("cordelia"), self.example_user("othello")]
for user_profile in recipient_list:
self.subscribe(user_profile, "Denmark")
sender = self.example_user('hamlet')
sending_client = make_client(name="test suite")
stream_name = 'Denmark'
topic_name = 'foo'
content = 'whatever'
realm = sender.realm
# To get accurate count of the queries, we should make sure that
# caches don't come into play. If we count queries while caches are
# filled, we will get a lower count. Caches are not supposed to be
# persistent, so our test can also fail if cache is invalidated
# during the course of the unit test.
flush_per_request_caches()
cache_delete(get_stream_cache_key(stream_name, realm.id))
with queries_captured() as queries:
check_send_stream_message(
sender=sender,
client=sending_client,
stream_name=stream_name,
topic=topic_name,
body=content,
)
self.assert_length(queries, 14)
def test_stream_message_dict(self) -> None:
user_profile = self.example_user('iago')
self.subscribe(user_profile, "Denmark")
self.send_stream_message(self.example_email("hamlet"), "Denmark",
content="whatever", topic_name="my topic")
message = most_recent_message(user_profile)
row = MessageDict.get_raw_db_rows([message.id])[0]
dct = MessageDict.build_dict_from_raw_db_row(row)
MessageDict.post_process_dicts([dct], apply_markdown=True, client_gravatar=False)
self.assertEqual(dct['display_recipient'], 'Denmark')
stream = get_stream('Denmark', user_profile.realm)
self.assertEqual(dct['stream_id'], stream.id)
def test_stream_message_unicode(self) -> None:
user_profile = self.example_user('iago')
self.subscribe(user_profile, "Denmark")
self.send_stream_message(self.example_email("hamlet"), "Denmark",
content="whatever", topic_name="my topic")
message = most_recent_message(user_profile)
self.assertEqual(str(message),
u'<Message: Denmark / my topic / '
'<UserProfile: hamlet@zulip.com <Realm: zulip 1>>>')
def test_message_mentions(self) -> None:
user_profile = self.example_user('iago')
self.subscribe(user_profile, "Denmark")
self.send_stream_message(self.example_email("hamlet"), "Denmark",
content="test @**Iago** rules")
message = most_recent_message(user_profile)
assert(UserMessage.objects.get(user_profile=user_profile, message=message).flags.mentioned.is_set)
def test_is_private_flag(self) -> None:
user_profile = self.example_user('iago')
self.subscribe(user_profile, "Denmark")
self.send_stream_message(self.example_email("hamlet"), "Denmark",
content="test")
message = most_recent_message(user_profile)
self.assertFalse(UserMessage.objects.get(user_profile=user_profile, message=message).flags.is_private.is_set)
self.send_personal_message(self.example_email("hamlet"), user_profile.email,
content="test")
message = most_recent_message(user_profile)
self.assertTrue(UserMessage.objects.get(user_profile=user_profile, message=message).flags.is_private.is_set)
def _send_stream_message(self, email: str, stream_name: str, content: str) -> Set[int]:
with mock.patch('zerver.lib.actions.send_event') as m:
self.send_stream_message(
email,
stream_name,
content=content
)
self.assertEqual(m.call_count, 1)
users = m.call_args[0][2]
user_ids = {u['id'] for u in users}
return user_ids
def test_unsub_mention(self) -> None:
cordelia = self.example_user('cordelia')
hamlet = self.example_user('hamlet')
stream_name = 'Test Stream'
self.subscribe(hamlet, stream_name)
UserMessage.objects.filter(
user_profile=cordelia
).delete()
def mention_cordelia() -> Set[int]:
content = 'test @**Cordelia Lear** rules'
user_ids = self._send_stream_message(
email=hamlet.email,
stream_name=stream_name,
content=content
)
return user_ids
def num_cordelia_messages() -> int:
return UserMessage.objects.filter(
user_profile=cordelia
).count()
user_ids = mention_cordelia()
self.assertEqual(0, num_cordelia_messages())
self.assertNotIn(cordelia.id, user_ids)
# Make sure test isn't too brittle-subscribing
# Cordelia and mentioning her should give her a
# message.
self.subscribe(cordelia, stream_name)
user_ids = mention_cordelia()
self.assertIn(cordelia.id, user_ids)
self.assertEqual(1, num_cordelia_messages())
def test_message_bot_mentions(self) -> None:
cordelia = self.example_user('cordelia')
hamlet = self.example_user('hamlet')
realm = hamlet.realm
stream_name = 'Test Stream'
self.subscribe(hamlet, stream_name)
normal_bot = do_create_user(
email='normal-bot@zulip.com',
password='',
realm=realm,
full_name='Normal Bot',
short_name='',
bot_type=UserProfile.DEFAULT_BOT,
bot_owner=cordelia,
)
content = 'test @**Normal Bot** rules'
user_ids = self._send_stream_message(
email=hamlet.email,
stream_name=stream_name,
content=content
)
self.assertIn(normal_bot.id, user_ids)
user_message = most_recent_usermessage(normal_bot)
self.assertEqual(user_message.message.content, content)
self.assertTrue(user_message.flags.mentioned)
def test_stream_message_mirroring(self) -> None:
from zerver.lib.actions import do_change_is_admin
user_profile = self.example_user('iago')
email = user_profile.email
do_change_is_admin(user_profile, True, 'api_super_user')
result = self.api_post(email, "/api/v1/messages", {"type": "stream",
"to": "Verona",
"sender": self.example_email("cordelia"),
"client": "test suite",
"topic": "announcement",
"content": "Everyone knows Iago rules",
"forged": "true"})
self.assert_json_success(result)
do_change_is_admin(user_profile, False, 'api_super_user')
result = self.api_post(email, "/api/v1/messages", {"type": "stream",
"to": "Verona",
"sender": self.example_email("cordelia"),
"client": "test suite",
"topic": "announcement",
"content": "Everyone knows Iago rules",
"forged": "true"})
self.assert_json_error(result, "User not authorized for this query")
def test_message_to_stream(self) -> None:
"""
If you send a message to a stream, everyone subscribed to the stream
receives the messages.
"""
self.assert_stream_message("Scotland")
def test_non_ascii_stream_message(self) -> None:
"""
Sending a stream message containing non-ASCII characters in the stream
name, topic, or message body succeeds.
"""
self.login(self.example_email("hamlet"))
# Subscribe everyone to a stream with non-ASCII characters.
non_ascii_stream_name = u"hümbüǵ"
realm = get_realm("zulip")
stream = self.make_stream(non_ascii_stream_name)
for user_profile in UserProfile.objects.filter(is_active=True, is_bot=False,
realm=realm)[0:3]:
self.subscribe(user_profile, stream.name)
self.assert_stream_message(non_ascii_stream_name, topic_name=u"hümbüǵ",
content=u"hümbüǵ")
def test_get_raw_unread_data_for_huddle_messages(self) -> None:
users = [
self.example_user('hamlet'),
self.example_user('cordelia'),
self.example_user('iago'),
self.example_user('prospero'),
self.example_user('othello'),
]
message1_id = self.send_huddle_message(users[0].email,
[user.email for user in users],
"test content 1")
message2_id = self.send_huddle_message(users[0].email,
[user.email for user in users],
"test content 2")
msg_data = get_raw_unread_data(users[1])
# both the messages are present in msg_data
self.assertIn(message1_id, msg_data["huddle_dict"].keys())
self.assertIn(message2_id, msg_data["huddle_dict"].keys())
# only these two messages are present in msg_data
self.assertEqual(len(msg_data["huddle_dict"].keys()), 2)
recent_conversations = get_recent_private_conversations(users[1])
self.assertEqual(len(recent_conversations), 1)
recent_conversation = list(recent_conversations.values())[0]
self.assertEqual(set(recent_conversation['user_ids']), set(user.id for user in users if
user != users[1]))
self.assertEqual(recent_conversation['max_message_id'], message2_id)
class MessageDictTest(ZulipTestCase):
@slow('builds lots of messages')
def test_bulk_message_fetching(self) -> None:
sender = self.example_user('othello')
receiver = self.example_user('hamlet')
pm_recipient = Recipient.objects.get(type_id=receiver.id, type=Recipient.PERSONAL)
stream_name = u'Çiğdem'
stream = self.make_stream(stream_name)
stream_recipient = Recipient.objects.get(type_id=stream.id, type=Recipient.STREAM)
sending_client = make_client(name="test suite")
ids = []
for i in range(300):
for recipient in [pm_recipient, stream_recipient]:
message = Message(
sender=sender,
recipient=recipient,
content='whatever %d' % i,
rendered_content='DOES NOT MATTER',
rendered_content_version=bugdown.version,
pub_date=timezone_now(),
sending_client=sending_client,
last_edit_time=timezone_now(),
edit_history='[]'
)
message.set_topic_name('whatever')
message.save()
ids.append(message.id)
Reaction.objects.create(user_profile=sender, message=message,
emoji_name='simple_smile')
num_ids = len(ids)
self.assertTrue(num_ids >= 600)
flush_per_request_caches()
t = time.time()
with queries_captured() as queries:
rows = list(MessageDict.get_raw_db_rows(ids))
objs = [
MessageDict.build_dict_from_raw_db_row(row)
for row in rows
]
MessageDict.post_process_dicts(objs, apply_markdown=False, client_gravatar=False)
delay = time.time() - t
# Make sure we don't take longer than 1.5ms per message to
# extract messages. Note that we increased this from 1ms to
# 1.5ms to handle tests running in parallel being a bit
# slower.
error_msg = "Number of ids: {}. Time delay: {}".format(num_ids, delay)
self.assertTrue(delay < 0.0015 * num_ids, error_msg)
self.assert_length(queries, 7)
self.assertEqual(len(rows), num_ids)
def test_applying_markdown(self) -> None:
sender = self.example_user('othello')
receiver = self.example_user('hamlet')
recipient = Recipient.objects.get(type_id=receiver.id, type=Recipient.PERSONAL)
sending_client = make_client(name="test suite")
message = Message(
sender=sender,
recipient=recipient,
content='hello **world**',
pub_date=timezone_now(),
sending_client=sending_client,
last_edit_time=timezone_now(),
edit_history='[]'
)
message.set_topic_name('whatever')
message.save()
# An important part of this test is to get the message through this exact code path,
# because there is an ugly hack we need to cover. So don't just say "row = message".
row = MessageDict.get_raw_db_rows([message.id])[0]
dct = MessageDict.build_dict_from_raw_db_row(row)
expected_content = '<p>hello <strong>world</strong></p>'
self.assertEqual(dct['rendered_content'], expected_content)
message = Message.objects.get(id=message.id)
self.assertEqual(message.rendered_content, expected_content)
self.assertEqual(message.rendered_content_version, bugdown.version)
@mock.patch("zerver.lib.message.bugdown.convert")
def test_applying_markdown_invalid_format(self, convert_mock: Any) -> None:
# pretend the converter returned an invalid message without raising an exception
convert_mock.return_value = None
sender = self.example_user('othello')
receiver = self.example_user('hamlet')
recipient = Recipient.objects.get(type_id=receiver.id, type=Recipient.PERSONAL)
sending_client = make_client(name="test suite")
message = Message(
sender=sender,
recipient=recipient,
content='hello **world**',
pub_date=timezone_now(),
sending_client=sending_client,
last_edit_time=timezone_now(),
edit_history='[]'
)
message.set_topic_name('whatever')
message.save()
# An important part of this test is to get the message through this exact code path,
# because there is an ugly hack we need to cover. So don't just say "row = message".
row = MessageDict.get_raw_db_rows([message.id])[0]
dct = MessageDict.build_dict_from_raw_db_row(row)
error_content = '<p>[Zulip note: Sorry, we could not understand the formatting of your message]</p>'
self.assertEqual(dct['rendered_content'], error_content)
def test_reaction(self) -> None:
sender = self.example_user('othello')
receiver = self.example_user('hamlet')
recipient = Recipient.objects.get(type_id=receiver.id, type=Recipient.PERSONAL)
sending_client = make_client(name="test suite")
message = Message(
sender=sender,
recipient=recipient,
content='hello **world**',
pub_date=timezone_now(),
sending_client=sending_client,
last_edit_time=timezone_now(),
edit_history='[]'
)
message.set_topic_name('whatever')
message.save()
reaction = Reaction.objects.create(
message=message, user_profile=sender,
emoji_name='simple_smile')
row = MessageDict.get_raw_db_rows([message.id])[0]
msg_dict = MessageDict.build_dict_from_raw_db_row(row)
self.assertEqual(msg_dict['reactions'][0]['emoji_name'],
reaction.emoji_name)
self.assertEqual(msg_dict['reactions'][0]['user']['id'],
sender.id)
self.assertEqual(msg_dict['reactions'][0]['user']['email'],
sender.email)
self.assertEqual(msg_dict['reactions'][0]['user']['full_name'],
sender.full_name)
def test_missing_anchor(self) -> None:
self.login(self.example_email("hamlet"))
result = self.client_get(
'/json/messages?use_first_unread_anchor=false&num_before=1&num_after=1')
self.assert_json_error(
result, "Missing 'anchor' argument (or set 'use_first_unread_anchor'=True).")
class SewMessageAndReactionTest(ZulipTestCase):
def test_sew_messages_and_reaction(self) -> None:
sender = self.example_user('othello')
receiver = self.example_user('hamlet')
pm_recipient = Recipient.objects.get(type_id=receiver.id, type=Recipient.PERSONAL)
stream_name = u'Çiğdem'
stream = self.make_stream(stream_name)
stream_recipient = Recipient.objects.get(type_id=stream.id, type=Recipient.STREAM)
sending_client = make_client(name="test suite")
needed_ids = []
for i in range(5):
for recipient in [pm_recipient, stream_recipient]:
message = Message(
sender=sender,
recipient=recipient,
content='whatever %d' % i,
pub_date=timezone_now(),
sending_client=sending_client,
last_edit_time=timezone_now(),
edit_history='[]'
)
message.set_topic_name('whatever')
message.save()
needed_ids.append(message.id)
reaction = Reaction(user_profile=sender, message=message,
emoji_name='simple_smile')
reaction.save()
messages = Message.objects.filter(id__in=needed_ids).values(
*['id', 'content'])
reactions = Reaction.get_raw_db_rows(needed_ids)
tied_data = sew_messages_and_reactions(messages, reactions)
for data in tied_data:
self.assertEqual(len(data['reactions']), 1)
self.assertEqual(data['reactions'][0]['emoji_name'],
'simple_smile')
self.assertTrue(data['id'])
self.assertTrue(data['content'])
class MessagePOSTTest(ZulipTestCase):
def test_message_to_self(self) -> None:
"""
Sending a message to a stream to which you are subscribed is
successful.
"""
self.login(self.example_email("hamlet"))
result = self.client_post("/json/messages", {"type": "stream",
"to": "Verona",
"client": "test suite",
"content": "Test message",
"topic": "Test topic"})
self.assert_json_success(result)
def test_api_message_to_self(self) -> None:
"""
Same as above, but for the API view
"""
email = self.example_email("hamlet")
result = self.api_post(email, "/api/v1/messages", {"type": "stream",
"to": "Verona",
"client": "test suite",
"content": "Test message",
"topic": "Test topic"})
self.assert_json_success(result)
def test_message_to_stream_with_nonexistent_id(self) -> None:
cordelia = self.example_user('cordelia')
bot = self.create_test_bot(
short_name='whatever',
user_profile=cordelia,
)
result = self.api_post(
bot.email, "/api/v1/messages",
{
"type": "stream",
"to": ujson.dumps([99999]),
"client": "test suite",
"content": "Stream message by ID.",
"topic": "Test topic for stream ID message"
}
)
self.assert_json_error(result, "Stream with ID '99999' does not exist")
msg = self.get_last_message()
expected = ("Hi there! We thought you'd like to know that your bot **{sender}** just "
"tried to send a message to stream {stream_info}, but that stream does not "
"yet exist. To create it, click the gear in the left-side stream list.")
expected = expected.format(sender=bot.full_name, stream_info='with ID 99999')
self.assertEqual(msg.content, expected)
def test_message_to_stream_by_id(self) -> None:
"""
Sending a message to a stream (by stream ID) to which you are
subscribed is successful.
"""
self.login(self.example_email("hamlet"))
realm = get_realm('zulip')
stream = get_stream('Verona', realm)
result = self.client_post("/json/messages", {"type": "stream",
"to": ujson.dumps([stream.id]),
"client": "test suite",
"content": "Stream message by ID.",
"topic": "Test topic for stream ID message"})
self.assert_json_success(result)
sent_message = self.get_last_message()
self.assertEqual(sent_message.content, "Stream message by ID.")
def test_message_to_announce(self) -> None:
"""
Sending a message to an announcement_only stream by a realm admin
successful.
"""
user_profile = self.example_user("iago")
self.login(user_profile.email)
stream_name = "Verona"
stream = get_stream(stream_name, user_profile.realm)
stream.is_announcement_only = True
stream.save()
result = self.client_post("/json/messages", {"type": "stream",
"to": stream_name,
"client": "test suite",
"content": "Test message",
"topic": "Test topic"})
self.assert_json_success(result)
# Cross realm bots should be allowed.
notification_bot = get_system_bot("notification-bot@zulip.com")
result = self.api_post(notification_bot.email,
"/json/messages", {"type": "stream",
"to": stream_name,
"client": "test suite",
"content": "Test message",
"topic": "Test topic"})
self.assert_json_success(result)
def test_message_fail_to_announce(self) -> None:
"""
Sending a message to an announcement_only stream not by a realm
admin fails.
"""
user_profile = self.example_user("hamlet")
self.login(user_profile.email)
stream_name = "Verona"
stream = get_stream(stream_name, user_profile.realm)
stream.is_announcement_only = True
stream.save()
result = self.client_post("/json/messages", {"type": "stream",
"to": stream_name,
"client": "test suite",
"content": "Test message",
"topic": "Test topic"})
self.assert_json_error(result, "Only organization administrators can send to this stream.")
def test_api_message_with_default_to(self) -> None:
"""
Sending messages without a to field should be sent to the default
stream for the user_profile.
"""
user_profile = self.example_user('hamlet')
email = user_profile.email
user_profile.default_sending_stream_id = get_stream('Verona', user_profile.realm).id
user_profile.save()
result = self.api_post(email, "/api/v1/messages", {"type": "stream",
"client": "test suite",
"content": "Test message no to",
"topic": "Test topic"})
self.assert_json_success(result)
sent_message = self.get_last_message()
self.assertEqual(sent_message.content, "Test message no to")
def test_message_to_nonexistent_stream(self) -> None:
"""
Sending a message to a nonexistent stream fails.
"""
self.login(self.example_email("hamlet"))
self.assertFalse(Stream.objects.filter(name="nonexistent_stream"))
result = self.client_post("/json/messages", {"type": "stream",
"to": "nonexistent_stream",
"client": "test suite",
"content": "Test message",
"topic": "Test topic"})
self.assert_json_error(result, "Stream 'nonexistent_stream' does not exist")
def test_message_to_nonexistent_stream_with_bad_characters(self) -> None:
"""
Nonexistent stream name with bad characters should be escaped properly.
"""
self.login(self.example_email("hamlet"))
self.assertFalse(Stream.objects.filter(name="""&<"'><non-existent>"""))
result = self.client_post("/json/messages", {"type": "stream",
"to": """&<"'><non-existent>""",
"client": "test suite",
"content": "Test message",
"topic": "Test topic"})
self.assert_json_error(result, "Stream '&amp;&lt;&quot;&#39;&gt;&lt;non-existent&gt;' does not exist")
def test_personal_message(self) -> None:
"""
Sending a personal message to a valid username is successful.
"""
user_profile = self.example_user("hamlet")
self.login(user_profile.email)
result = self.client_post("/json/messages", {"type": "private",
"content": "Test message",
"client": "test suite",
"to": self.example_email("othello")})
self.assert_json_success(result)
message_id = ujson.loads(result.content.decode())['id']
recent_conversations = get_recent_private_conversations(user_profile)
self.assertEqual(len(recent_conversations), 1)
recent_conversation = list(recent_conversations.values())[0]
recipient_id = list(recent_conversations.keys())[0]
self.assertEqual(set(recent_conversation['user_ids']), set([self.example_user("othello").id]))
self.assertEqual(recent_conversation['max_message_id'], message_id)
# Now send a message to yourself and see how that interacts with the data structure
result = self.client_post("/json/messages", {"type": "private",
"content": "Test message",
"client": "test suite",
"to": self.example_email("hamlet")})
self.assert_json_success(result)
self_message_id = ujson.loads(result.content.decode())['id']
recent_conversations = get_recent_private_conversations(user_profile)
self.assertEqual(len(recent_conversations), 2)
recent_conversation = recent_conversations[recipient_id]
self.assertEqual(set(recent_conversation['user_ids']), set([self.example_user("othello").id]))
self.assertEqual(recent_conversation['max_message_id'], message_id)
# Now verify we have the appropriate self-pm data structure
del recent_conversations[recipient_id]
recent_conversation = list(recent_conversations.values())[0]
recipient_id = list(recent_conversations.keys())[0]
self.assertEqual(set(recent_conversation['user_ids']), set([]))
self.assertEqual(recent_conversation['max_message_id'], self_message_id)
def test_personal_message_by_id(self) -> None:
"""
Sending a personal message to a valid user ID is successful.
"""
self.login(self.example_email("hamlet"))
result = self.client_post(
"/json/messages",
{
"type": "private",
"content": "Test message",
"client": "test suite",
"to": ujson.dumps([self.example_user("othello").id])
}
)
self.assert_json_success(result)
msg = self.get_last_message()
self.assertEqual("Test message", msg.content)
self.assertEqual(msg.recipient_id, self.example_user("othello").id)
def test_group_personal_message_by_id(self) -> None:
"""
Sending a personal message to a valid user ID is successful.
"""
self.login(self.example_email("hamlet"))
result = self.client_post(
"/json/messages",
{
"type": "private",
"content": "Test message",
"client": "test suite",
"to": ujson.dumps([self.example_user("othello").id,
self.example_user("cordelia").id])
}
)
self.assert_json_success(result)
msg = self.get_last_message()
self.assertEqual("Test message", msg.content)
self.assertEqual(msg.recipient_id, get_huddle_recipient(
{self.example_user("hamlet").id,
self.example_user("othello").id,
self.example_user("cordelia").id}).id
)
def test_personal_message_copying_self(self) -> None:
"""
Sending a personal message to yourself plus another user is successful,
and counts as a message just to that user.
"""
self.login(self.example_email("hamlet"))
result = self.client_post("/json/messages", {
"type": "private",
"content": "Test message",
"client": "test suite",
"to": ujson.dumps([self.example_email("othello"),
self.example_email("hamlet")])})
self.assert_json_success(result)
msg = self.get_last_message()
# Verify that we're not actually on the "recipient list"
self.assertNotIn("Hamlet", str(msg.recipient))
def test_personal_message_to_nonexistent_user(self) -> None:
"""
Sending a personal message to an invalid email returns error JSON.
"""
self.login(self.example_email("hamlet"))
result = self.client_post("/json/messages", {"type": "private",
"content": "Test message",
"client": "test suite",
"to": "nonexistent"})
self.assert_json_error(result, "Invalid email 'nonexistent'")
def test_personal_message_to_deactivated_user(self) -> None:
"""
Sending a personal message to a deactivated user returns error JSON.
"""
target_user_profile = self.example_user("othello")
do_deactivate_user(target_user_profile)
self.login(self.example_email("hamlet"))
result = self.client_post("/json/messages", {
"type": "private",
"content": "Test message",
"client": "test suite",
"to": self.example_email("othello")})
self.assert_json_error(result, "'othello@zulip.com' is no longer using Zulip.")
result = self.client_post("/json/messages", {
"type": "private",
"content": "Test message",
"client": "test suite",
"to": ujson.dumps([self.example_email("othello"),
self.example_email("cordelia")])})
self.assert_json_error(result, "'othello@zulip.com' is no longer using Zulip.")
def test_invalid_type(self) -> None:
"""
Sending a message of unknown type returns error JSON.
"""
self.login(self.example_email("hamlet"))
result = self.client_post("/json/messages", {"type": "invalid type",
"content": "Test message",
"client": "test suite",
"to": self.example_email("othello")})
self.assert_json_error(result, "Invalid message type")
def test_empty_message(self) -> None:
"""
Sending a message that is empty or only whitespace should fail
"""
self.login(self.example_email("hamlet"))
result = self.client_post("/json/messages", {"type": "private",
"content": " ",
"client": "test suite",
"to": self.example_email("othello")})
self.assert_json_error(result, "Message must not be empty")
def test_empty_string_topic(self) -> None:
"""
Sending a message that has empty string topic should fail
"""
self.login(self.example_email("hamlet"))
result = self.client_post("/json/messages", {"type": "stream",
"to": "Verona",
"client": "test suite",
"content": "Test message",
"topic": ""})
self.assert_json_error(result, "Topic can't be empty")
def test_missing_topic(self) -> None:
"""
Sending a message without topic should fail
"""
self.login(self.example_email("hamlet"))
result = self.client_post("/json/messages", {"type": "stream",
"to": "Verona",
"client": "test suite",
"content": "Test message"})
self.assert_json_error(result, "Missing topic")
def test_invalid_message_type(self) -> None:
"""
Messages other than the type of "private" or "stream" are considered as invalid
"""
self.login(self.example_email("hamlet"))
result = self.client_post("/json/messages", {"type": "invalid",
"to": "Verona",
"client": "test suite",
"content": "Test message",
"topic": "Test topic"})
self.assert_json_error(result, "Invalid message type")
def test_private_message_without_recipients(self) -> None:
"""
Sending private message without recipients should fail
"""
self.login(self.example_email("hamlet"))
result = self.client_post("/json/messages", {"type": "private",
"content": "Test content",
"client": "test suite",
"to": ""})
self.assert_json_error(result, "Message must have recipients")
def test_mirrored_huddle(self) -> None:
"""
Sending a mirrored huddle message works
"""
self.login(self.mit_email("starnine"), realm=get_realm("zephyr"))
result = self.client_post("/json/messages", {"type": "private",
"sender": self.mit_email("sipbtest"),
"content": "Test message",
"client": "zephyr_mirror",
"to": ujson.dumps([self.mit_email("starnine"),
self.mit_email("espuser")])},
subdomain="zephyr")
self.assert_json_success(result)
def test_mirrored_personal(self) -> None:
"""
Sending a mirrored personal message works
"""
self.login(self.mit_email("starnine"), realm=get_realm("zephyr"))
result = self.client_post("/json/messages", {"type": "private",
"sender": self.mit_email("sipbtest"),
"content": "Test message",
"client": "zephyr_mirror",
"to": self.mit_email("starnine")},
subdomain="zephyr")
self.assert_json_success(result)
def test_mirrored_personal_to_someone_else(self) -> None:
"""
Sending a mirrored personal message to someone else is not allowed.
"""
self.login(self.mit_email("starnine"), realm=get_realm("zephyr"))
result = self.client_post("/json/messages", {"type": "private",
"sender": self.mit_email("sipbtest"),
"content": "Test message",
"client": "zephyr_mirror",
"to": self.mit_email("espuser")},
subdomain="zephyr")
self.assert_json_error(result, "User not authorized for this query")
def test_duplicated_mirrored_huddle(self) -> None:
"""
Sending two mirrored huddles in the row return the same ID
"""
msg = {"type": "private",
"sender": self.mit_email("sipbtest"),
"content": "Test message",
"client": "zephyr_mirror",
"to": ujson.dumps([self.mit_email("espuser"),
self.mit_email("starnine")])}
with mock.patch('DNS.dnslookup', return_value=[['starnine:*:84233:101:Athena Consulting Exchange User,,,:/mit/starnine:/bin/bash']]):
self.login(self.mit_email("starnine"), realm=get_realm("zephyr"))
result1 = self.client_post("/json/messages", msg,
subdomain="zephyr")
with mock.patch('DNS.dnslookup', return_value=[['espuser:*:95494:101:Esp Classroom,,,:/mit/espuser:/bin/athena/bash']]):
self.login(self.mit_email("espuser"), realm=get_realm("zephyr"))
result2 = self.client_post("/json/messages", msg,
subdomain="zephyr")
self.assertEqual(ujson.loads(result1.content)['id'],
ujson.loads(result2.content)['id'])
def test_message_with_null_bytes(self) -> None:
"""
A message with null bytes in it is handled.
"""
self.login(self.example_email("hamlet"))
post_data = {"type": "stream", "to": "Verona", "client": "test suite",
"content": u" I like null bytes \x00 in my content", "topic": "Test topic"}
result = self.client_post("/json/messages", post_data)
self.assert_json_error(result, "Message must not contain null bytes")
def test_strip_message(self) -> None:
"""
A message with mixed whitespace at the end is cleaned up.
"""
self.login(self.example_email("hamlet"))
post_data = {"type": "stream", "to": "Verona", "client": "test suite",
"content": " I like whitespace at the end! \n\n \n", "topic": "Test topic"}
result = self.client_post("/json/messages", post_data)
self.assert_json_success(result)
sent_message = self.get_last_message()
self.assertEqual(sent_message.content, " I like whitespace at the end!")
def test_long_message(self) -> None:
"""
Sending a message longer than the maximum message length succeeds but is
truncated.
"""
self.login(self.example_email("hamlet"))
long_message = "A" * (MAX_MESSAGE_LENGTH + 1)
post_data = {"type": "stream", "to": "Verona", "client": "test suite",
"content": long_message, "topic": "Test topic"}
result = self.client_post("/json/messages", post_data)
self.assert_json_success(result)
sent_message = self.get_last_message()
self.assertEqual(sent_message.content,
"A" * (MAX_MESSAGE_LENGTH - 20) + "\n[message truncated]")
def test_long_topic(self) -> None:
"""
Sending a message with a topic longer than the maximum topic length
succeeds, but the topic is truncated.
"""
self.login(self.example_email("hamlet"))
long_topic = "A" * (MAX_TOPIC_NAME_LENGTH + 1)
post_data = {"type": "stream", "to": "Verona", "client": "test suite",
"content": "test content", "topic": long_topic}
result = self.client_post("/json/messages", post_data)
self.assert_json_success(result)
sent_message = self.get_last_message()
self.assertEqual(sent_message.topic_name(),
"A" * (MAX_TOPIC_NAME_LENGTH - 3) + "...")
def test_send_forged_message_as_not_superuser(self) -> None:
self.login(self.example_email("hamlet"))
result = self.client_post("/json/messages", {"type": "stream",
"to": "Verona",
"client": "test suite",
"content": "Test message",
"topic": "Test topic",
"forged": True})
self.assert_json_error(result, "User not authorized for this query")
def test_send_message_as_not_superuser_to_different_domain(self) -> None:
self.login(self.example_email("hamlet"))
result = self.client_post("/json/messages", {"type": "stream",
"to": "Verona",
"client": "test suite",
"content": "Test message",
"topic": "Test topic",
"realm_str": "mit"})
self.assert_json_error(result, "User not authorized for this query")
def test_send_message_as_superuser_to_domain_that_dont_exist(self) -> None:
user = get_system_bot(settings.EMAIL_GATEWAY_BOT)
password = "test_password"
user.set_password(password)
user.is_api_super_user = True
user.save()
self.login(user.email, password)
result = self.client_post("/json/messages", {"type": "stream",
"to": "Verona",
"client": "test suite",
"content": "Test message",
"topic": "Test topic",
"realm_str": "non-existing"})
user.is_api_super_user = False
user.save()
self.assert_json_error(result, "Unknown organization 'non-existing'")
def test_send_message_when_sender_is_not_set(self) -> None:
self.login(self.mit_email("starnine"), realm=get_realm("zephyr"))
result = self.client_post("/json/messages", {"type": "private",
"content": "Test message",
"client": "zephyr_mirror",
"to": self.mit_email("starnine")},
subdomain="zephyr")
self.assert_json_error(result, "Missing sender")
def test_send_message_as_not_superuser_when_type_is_not_private(self) -> None:
self.login(self.mit_email("starnine"), realm=get_realm("zephyr"))
result = self.client_post("/json/messages", {"type": "not-private",
"sender": self.mit_email("sipbtest"),
"content": "Test message",
"client": "zephyr_mirror",
"to": self.mit_email("starnine")},
subdomain="zephyr")
self.assert_json_error(result, "User not authorized for this query")
@mock.patch("zerver.views.messages.create_mirrored_message_users")
def test_send_message_create_mirrored_message_user_returns_invalid_input(
self, create_mirrored_message_users_mock: Any) -> None:
create_mirrored_message_users_mock.return_value = (False, True)
self.login(self.mit_email("starnine"), realm=get_realm("zephyr"))
result = self.client_post("/json/messages", {"type": "private",
"sender": self.mit_email("sipbtest"),
"content": "Test message",
"client": "zephyr_mirror",
"to": self.mit_email("starnine")},
subdomain="zephyr")
self.assert_json_error(result, "Invalid mirrored message")
@mock.patch("zerver.views.messages.create_mirrored_message_users")
def test_send_message_when_client_is_zephyr_mirror_but_string_id_is_not_zephyr(
self, create_mirrored_message_users_mock: Any) -> None:
create_mirrored_message_users_mock.return_value = (True, True)
user = self.mit_user("starnine")
email = user.email
user.realm.string_id = 'notzephyr'
user.realm.save()
self.login(email, realm=get_realm("notzephyr"))
result = self.client_post("/json/messages", {"type": "private",
"sender": self.mit_email("sipbtest"),
"content": "Test message",
"client": "zephyr_mirror",
"to": email},
subdomain="notzephyr")
self.assert_json_error(result, "Zephyr mirroring is not allowed in this organization")
@mock.patch("zerver.views.messages.create_mirrored_message_users")
def test_send_message_when_client_is_zephyr_mirror_but_recipient_is_user_id(
self, create_mirrored_message_users_mock: Any) -> None:
create_mirrored_message_users_mock.return_value = (True, True)
user = self.mit_user("starnine")
user_id = user.id
user_email = user.email
self.login(user_email, realm=get_realm("zephyr"))
result = self.client_post("/json/messages", {"type": "private",
"sender": self.mit_email("sipbtest"),
"content": "Test message",
"client": "zephyr_mirror",
"to": ujson.dumps([user_id])},
subdomain="zephyr")
self.assert_json_error(result, "Mirroring not allowed with recipient user IDs")
def test_send_message_irc_mirror(self) -> None:
self.login(self.example_email('hamlet'))
bot_info = {
'full_name': 'IRC bot',
'short_name': 'irc',
}
result = self.client_post("/json/bots", bot_info)
self.assert_json_success(result)
email = "irc-bot@zulip.testserver"
user = get_user(email, get_realm('zulip'))
user.is_api_super_user = True
user.save()
user = get_user(email, get_realm('zulip'))
self.subscribe(user, "IRCland")
# Simulate a mirrored message with a slightly old timestamp.
fake_pub_date = timezone_now() - datetime.timedelta(minutes=37)
fake_pub_time = datetime_to_timestamp(fake_pub_date)
result = self.api_post(email, "/api/v1/messages", {"type": "stream",
"forged": "true",
"time": fake_pub_time,
"sender": "irc-user@irc.zulip.com",
"content": "Test message",
"client": "irc_mirror",
"topic": "from irc",
"to": "IRCLand"})
self.assert_json_success(result)
msg = self.get_last_message()
self.assertEqual(int(datetime_to_timestamp(msg.pub_date)), int(fake_pub_time))
def test_unsubscribed_api_super_user(self) -> None:
cordelia = self.example_user('cordelia')
stream_name = 'private_stream'
self.make_stream(stream_name, invite_only=True)
self.unsubscribe(cordelia, stream_name)
# As long as Cordelia is a super_user, she can send messages
# to ANY stream, even one she is not unsubscribed to, and
# she can do it for herself or on behalf of a mirrored user.
def test_with(sender_email: str, client: str, forged: bool) -> None:
payload = dict(
type="stream",
to=stream_name,
sender=sender_email,
client=client,
topic='whatever',
content='whatever',
forged=ujson.dumps(forged),
)
cordelia.is_api_super_user = False
cordelia.save()
result = self.api_post(cordelia.email, "/api/v1/messages", payload)
self.assert_json_error_contains(result, 'authorized')
cordelia.is_api_super_user = True
cordelia.save()
result = self.api_post(cordelia.email, "/api/v1/messages", payload)
self.assert_json_success(result)
test_with(
sender_email=cordelia.email,
client='test suite',
forged=False,
)
test_with(
sender_email='irc_person@zulip.com',
client='irc_mirror',
forged=True,
)
def test_bot_can_send_to_owner_stream(self) -> None:
cordelia = self.example_user('cordelia')
bot = self.create_test_bot(
short_name='whatever',
user_profile=cordelia,
)
stream_name = 'private_stream'
self.make_stream(stream_name, invite_only=True)
payload = dict(
type="stream",
to=stream_name,
sender=bot.email,
client='test suite',
topic='whatever',
content='whatever',
)
result = self.api_post(bot.email, "/api/v1/messages", payload)
self.assert_json_error_contains(result, 'Not authorized to send')
# We subscribe the bot owner! (aka cordelia)
self.subscribe(bot.bot_owner, stream_name)
result = self.api_post(bot.email, "/api/v1/messages", payload)
self.assert_json_success(result)
def test_notification_bot(self) -> None:
sender = self.notification_bot()
stream_name = 'private_stream'
self.make_stream(stream_name, invite_only=True)
payload = dict(
type="stream",
to=stream_name,
sender=sender.email,
client='test suite',
topic='whatever',
content='whatever',
)
result = self.api_post(sender.email, "/api/v1/messages", payload)
self.assert_json_success(result)
def test_create_mirror_user_despite_race(self) -> None:
realm = get_realm('zulip')
email = 'fred@example.com'
email_to_full_name = lambda email: 'fred'
def create_user(**kwargs: Any) -> UserProfile:
self.assertEqual(kwargs['full_name'], 'fred')
self.assertEqual(kwargs['email'], email)
self.assertEqual(kwargs['active'], False)
self.assertEqual(kwargs['is_mirror_dummy'], True)
# We create an actual user here to simulate a race.
# We use the minimal, un-mocked function.
kwargs['bot_type'] = None
kwargs['bot_owner'] = None
kwargs['tos_version'] = None
kwargs['timezone'] = timezone_now()
create_user_profile(**kwargs).save()
raise IntegrityError()
with mock.patch('zerver.lib.actions.create_user',
side_effect=create_user) as m:
mirror_fred_user = create_mirror_user_if_needed(
realm,
email,
email_to_full_name,
)
self.assertEqual(mirror_fred_user.email, email)
m.assert_called()
def test_guest_user(self) -> None:
sender = self.example_user('polonius')
stream_name = 'public stream'
self.make_stream(stream_name, invite_only=False)
payload = dict(
type="stream",
to=stream_name,
sender=sender.email,
client='test suite',
topic='whatever',
content='whatever',
)
# Guest user can't send message to unsubscribed public streams
result = self.api_post(sender.email, "/api/v1/messages", payload)
self.assert_json_error(result, "Not authorized to send to stream 'public stream'")
self.subscribe(sender, stream_name)
# Guest user can send message to subscribed public streams
result = self.api_post(sender.email, "/api/v1/messages", payload)
self.assert_json_success(result)
class ScheduledMessageTest(ZulipTestCase):
def last_scheduled_message(self) -> ScheduledMessage:
return ScheduledMessage.objects.all().order_by('-id')[0]
def do_schedule_message(self, msg_type: str, to: str, msg: str,
defer_until: str='', tz_guess: str='',
delivery_type: str='send_later',
realm_str: str='zulip') -> HttpResponse:
self.login(self.example_email("hamlet"))
topic_name = ''
if msg_type == 'stream':
topic_name = 'Test topic'
payload = {"type": msg_type,
"to": to,
"client": "test suite",
"content": msg,
"topic": topic_name,
"realm_str": realm_str,
"delivery_type": delivery_type,
"tz_guess": tz_guess}
if defer_until:
payload["deliver_at"] = defer_until
result = self.client_post("/json/messages", payload)
return result
def test_schedule_message(self) -> None:
content = "Test message"
defer_until = timezone_now().replace(tzinfo=None) + datetime.timedelta(days=1)
defer_until_str = str(defer_until)
# Scheduling a message to a stream you are subscribed is successful.
result = self.do_schedule_message('stream', 'Verona',
content + ' 1', defer_until_str)
message = self.last_scheduled_message()
self.assert_json_success(result)
self.assertEqual(message.content, 'Test message 1')
self.assertEqual(message.topic_name(), 'Test topic')
self.assertEqual(message.scheduled_timestamp, convert_to_UTC(defer_until))
self.assertEqual(message.delivery_type, ScheduledMessage.SEND_LATER)
# Scheduling a message for reminders.
result = self.do_schedule_message('stream', 'Verona',
content + ' 2', defer_until_str,
delivery_type='remind')
message = self.last_scheduled_message()
self.assert_json_success(result)
self.assertEqual(message.delivery_type, ScheduledMessage.REMIND)
# Scheduling a private message is successful.
result = self.do_schedule_message('private', self.example_email("othello"),
content + ' 3', defer_until_str)
message = self.last_scheduled_message()
self.assert_json_success(result)
self.assertEqual(message.content, 'Test message 3')
self.assertEqual(message.scheduled_timestamp, convert_to_UTC(defer_until))
self.assertEqual(message.delivery_type, ScheduledMessage.SEND_LATER)
# Setting a reminder in PM's to other users causes a error.
result = self.do_schedule_message('private', self.example_email("othello"),
content + ' 4', defer_until_str,
delivery_type='remind')
self.assert_json_error(result, 'Reminders can only be set for streams.')
# Setting a reminder in PM's to ourself is successful.
# Required by reminders from message actions popover caret feature.
result = self.do_schedule_message('private', self.example_email("hamlet"),
content + ' 5', defer_until_str,
delivery_type='remind')
message = self.last_scheduled_message()
self.assert_json_success(result)
self.assertEqual(message.content, 'Test message 5')
self.assertEqual(message.delivery_type, ScheduledMessage.REMIND)
# Scheduling a message while guessing timezone.
tz_guess = 'Asia/Kolkata'
result = self.do_schedule_message('stream', 'Verona', content + ' 6',
defer_until_str, tz_guess=tz_guess)
message = self.last_scheduled_message()
self.assert_json_success(result)
self.assertEqual(message.content, 'Test message 6')
local_tz = get_timezone(tz_guess)
# Since mypy is not able to recognize localize and normalize as attributes of tzinfo we use ignore.
utz_defer_until = local_tz.normalize(local_tz.localize(defer_until)) # type: ignore # Reason in comment on previous line.
self.assertEqual(message.scheduled_timestamp,
convert_to_UTC(utz_defer_until))
self.assertEqual(message.delivery_type, ScheduledMessage.SEND_LATER)
# Test with users timezone setting as set to some timezone rather than
# empty. This will help interpret timestamp in users local timezone.
user = self.example_user("hamlet")
user.timezone = 'US/Pacific'
user.save(update_fields=['timezone'])
result = self.do_schedule_message('stream', 'Verona',
content + ' 7', defer_until_str)
message = self.last_scheduled_message()
self.assert_json_success(result)
self.assertEqual(message.content, 'Test message 7')
local_tz = get_timezone(user.timezone)
# Since mypy is not able to recognize localize and normalize as attributes of tzinfo we use ignore.
utz_defer_until = local_tz.normalize(local_tz.localize(defer_until)) # type: ignore # Reason in comment on previous line.
self.assertEqual(message.scheduled_timestamp,
convert_to_UTC(utz_defer_until))
self.assertEqual(message.delivery_type, ScheduledMessage.SEND_LATER)
def test_scheduling_in_past(self) -> None:
# Scheduling a message in past should fail.
content = "Test message"
defer_until = timezone_now()
defer_until_str = str(defer_until)
result = self.do_schedule_message('stream', 'Verona',
content + ' 1', defer_until_str)
self.assert_json_error(result, 'Time must be in the future.')
def test_invalid_timestamp(self) -> None:
# Scheduling a message from which timestamp couldn't be parsed
# successfully should fail.
content = "Test message"
defer_until = 'Missed the timestamp'
result = self.do_schedule_message('stream', 'Verona',
content + ' 1', defer_until)
self.assert_json_error(result, 'Invalid time format')
def test_missing_deliver_at(self) -> None:
content = "Test message"
result = self.do_schedule_message('stream', 'Verona',
content + ' 1')
self.assert_json_error(result, 'Missing deliver_at in a request for delayed message delivery')
class EditMessageTest(ZulipTestCase):
def check_message(self, msg_id: int, topic_name: Optional[str]=None,
content: Optional[str]=None) -> Message:
msg = Message.objects.get(id=msg_id)
cached = MessageDict.wide_dict(msg)
MessageDict.finalize_payload(cached, apply_markdown=False, client_gravatar=False)
uncached = MessageDict.to_dict_uncached_helper(msg)
MessageDict.post_process_dicts([uncached], apply_markdown=False, client_gravatar=False)
self.assertEqual(cached, uncached)
if topic_name:
self.assertEqual(msg.topic_name(), topic_name)
if content:
self.assertEqual(msg.content, content)
return msg
def test_save_message(self) -> None:
"""This is also tested by a client test, but here we can verify
the cache against the database"""
self.login(self.example_email("hamlet"))
msg_id = self.send_stream_message(self.example_email("hamlet"), "Scotland",
topic_name="editing", content="before edit")
result = self.client_patch("/json/messages/" + str(msg_id), {
'message_id': msg_id,
'content': 'after edit'
})
self.assert_json_success(result)
self.check_message(msg_id, content="after edit")
result = self.client_patch("/json/messages/" + str(msg_id), {
'message_id': msg_id,
'topic': 'edited'
})
self.assert_json_success(result)
self.check_message(msg_id, topic_name="edited")
def test_fetch_raw_message(self) -> None:
self.login(self.example_email("hamlet"))
msg_id = self.send_personal_message(
from_email=self.example_email("hamlet"),
to_email=self.example_email("cordelia"),
content="**before** edit",
)
result = self.client_get('/json/messages/' + str(msg_id))
self.assert_json_success(result)
self.assertEqual(result.json()['raw_content'], '**before** edit')
# Test error cases
result = self.client_get('/json/messages/999999')
self.assert_json_error(result, 'Invalid message(s)')
self.login(self.example_email("cordelia"))
result = self.client_get('/json/messages/' + str(msg_id))
self.assert_json_success(result)
self.login(self.example_email("othello"))
result = self.client_get('/json/messages/' + str(msg_id))
self.assert_json_error(result, 'Invalid message(s)')
def test_fetch_raw_message_stream_wrong_realm(self) -> None:
user_profile = self.example_user("hamlet")
self.login(user_profile.email)
stream = self.make_stream('public_stream')
self.subscribe(user_profile, stream.name)
msg_id = self.send_stream_message(user_profile.email, stream.name,
topic_name="test", content="test")
result = self.client_get('/json/messages/' + str(msg_id))
self.assert_json_success(result)
self.login(self.mit_email("sipbtest"), realm=get_realm("zephyr"))
result = self.client_get('/json/messages/' + str(msg_id), subdomain="zephyr")
self.assert_json_error(result, 'Invalid message(s)')
def test_fetch_raw_message_private_stream(self) -> None:
user_profile = self.example_user("hamlet")
self.login(user_profile.email)
stream = self.make_stream('private_stream', invite_only=True)
self.subscribe(user_profile, stream.name)
msg_id = self.send_stream_message(user_profile.email, stream.name,
topic_name="test", content="test")
result = self.client_get('/json/messages/' + str(msg_id))
self.assert_json_success(result)
self.login(self.example_email("othello"))
result = self.client_get('/json/messages/' + str(msg_id))
self.assert_json_error(result, 'Invalid message(s)')
def test_edit_message_no_permission(self) -> None:
self.login(self.example_email("hamlet"))
msg_id = self.send_stream_message(self.example_email("iago"), "Scotland",
topic_name="editing", content="before edit")
result = self.client_patch("/json/messages/" + str(msg_id), {
'message_id': msg_id,
'content': 'content after edit',
})
self.assert_json_error(result, "You don't have permission to edit this message")
def test_edit_message_no_changes(self) -> None:
self.login(self.example_email("hamlet"))
msg_id = self.send_stream_message(self.example_email("hamlet"), "Scotland",
topic_name="editing", content="before edit")
result = self.client_patch("/json/messages/" + str(msg_id), {
'message_id': msg_id,
})
self.assert_json_error(result, "Nothing to change")
def test_edit_message_no_topic(self) -> None:
self.login(self.example_email("hamlet"))
msg_id = self.send_stream_message(self.example_email("hamlet"), "Scotland",
topic_name="editing", content="before edit")
result = self.client_patch("/json/messages/" + str(msg_id), {
'message_id': msg_id,
'topic': ' '
})
self.assert_json_error(result, "Topic can't be empty")
def test_edit_message_no_content(self) -> None:
self.login(self.example_email("hamlet"))
msg_id = self.send_stream_message(self.example_email("hamlet"), "Scotland",
topic_name="editing", content="before edit")
result = self.client_patch("/json/messages/" + str(msg_id), {
'message_id': msg_id,
'content': ' '
})
self.assert_json_success(result)
content = Message.objects.filter(id=msg_id).values_list('content', flat = True)[0]
self.assertEqual(content, "(deleted)")
def test_edit_message_history_disabled(self) -> None:
user_profile = self.example_user("hamlet")
do_set_realm_property(user_profile.realm, "allow_edit_history", False)
self.login(self.example_email("hamlet"))
# Single-line edit
msg_id_1 = self.send_stream_message(self.example_email("hamlet"),
"Denmark",
topic_name="editing",
content="content before edit")
new_content_1 = 'content after edit'
result_1 = self.client_patch("/json/messages/" + str(msg_id_1), {
'message_id': msg_id_1, 'content': new_content_1
})
self.assert_json_success(result_1)
result = self.client_get(
"/json/messages/" + str(msg_id_1) + "/history")
self.assert_json_error(result, "Message edit history is disabled in this organization")
# Now verify that if we fetch the message directly, there's no
# edit history data attached.
messages_result = self.client_get("/json/messages",
{"anchor": msg_id_1, "num_before": 0, "num_after": 10})
self.assert_json_success(messages_result)
json_messages = ujson.loads(
messages_result.content.decode('utf-8'))
for msg in json_messages['messages']:
self.assertNotIn("edit_history", msg)
def test_edit_message_history(self) -> None:
self.login(self.example_email("hamlet"))
# Single-line edit
msg_id_1 = self.send_stream_message(
self.example_email("hamlet"),
"Scotland",
topic_name="editing",
content="content before edit")
new_content_1 = 'content after edit'
result_1 = self.client_patch("/json/messages/" + str(msg_id_1), {
'message_id': msg_id_1, 'content': new_content_1
})
self.assert_json_success(result_1)
message_edit_history_1 = self.client_get(
"/json/messages/" + str(msg_id_1) + "/history")
json_response_1 = ujson.loads(
message_edit_history_1.content.decode('utf-8'))
message_history_1 = json_response_1['message_history']
# Check content of message after edit.
self.assertEqual(message_history_1[0]['rendered_content'],
'<p>content before edit</p>')
self.assertEqual(message_history_1[1]['rendered_content'],
'<p>content after edit</p>')
self.assertEqual(message_history_1[1]['content_html_diff'],
('<p>content '
'<span class="highlight_text_inserted">after</span> '
'<span class="highlight_text_deleted">before</span>'
' edit</p>'))
# Check content of message before edit.
self.assertEqual(message_history_1[1]['prev_rendered_content'],
'<p>content before edit</p>')
# Edits on new lines
msg_id_2 = self.send_stream_message(
self.example_email("hamlet"),
"Scotland",
topic_name="editing",
content=('content before edit, line 1\n'
'\n'
'content before edit, line 3'))
new_content_2 = ('content before edit, line 1\n'
'content after edit, line 2\n'
'content before edit, line 3')
result_2 = self.client_patch("/json/messages/" + str(msg_id_2), {
'message_id': msg_id_2, 'content': new_content_2
})
self.assert_json_success(result_2)
message_edit_history_2 = self.client_get(
"/json/messages/" + str(msg_id_2) + "/history")
json_response_2 = ujson.loads(
message_edit_history_2.content.decode('utf-8'))
message_history_2 = json_response_2['message_history']
self.assertEqual(message_history_2[0]['rendered_content'],
('<p>content before edit, line 1</p>\n'
'<p>content before edit, line 3</p>'))
self.assertEqual(message_history_2[1]['rendered_content'],
('<p>content before edit, line 1<br>\n'
'content after edit, line 2<br>\n'
'content before edit, line 3</p>'))
self.assertEqual(message_history_2[1]['content_html_diff'],
('<p>content before edit, line 1<br> '
'content <span class="highlight_text_inserted">after edit, line 2<br> '
'content</span> before edit, line 3</p>'))
self.assertEqual(message_history_2[1]['prev_rendered_content'],
('<p>content before edit, line 1</p>\n'
'<p>content before edit, line 3</p>'))
def test_edit_link(self) -> None:
# Link editing
self.login(self.example_email("hamlet"))
msg_id_1 = self.send_stream_message(
self.example_email("hamlet"),
"Scotland",
topic_name="editing",
content="Here is a link to [zulip](www.zulip.org).")
new_content_1 = 'Here is a link to [zulip](www.zulipchat.com).'
result_1 = self.client_patch("/json/messages/" + str(msg_id_1), {
'message_id': msg_id_1, 'content': new_content_1
})
self.assert_json_success(result_1)
message_edit_history_1 = self.client_get(
"/json/messages/" + str(msg_id_1) + "/history")
json_response_1 = ujson.loads(
message_edit_history_1.content.decode('utf-8'))
message_history_1 = json_response_1['message_history']
# Check content of message after edit.
self.assertEqual(message_history_1[0]['rendered_content'],
'<p>Here is a link to '
'<a href="http://www.zulip.org" target="_blank" title="http://www.zulip.org">zulip</a>.</p>')
self.assertEqual(message_history_1[1]['rendered_content'],
'<p>Here is a link to '
'<a href="http://www.zulipchat.com" target="_blank" title="http://www.zulipchat.com">zulip</a>.</p>')
self.assertEqual(message_history_1[1]['content_html_diff'],
('<p>Here is a link to <a href="http://www.zulipchat.com" '
'target="_blank" title="http://www.zulipchat.com">zulip '
'<span class="highlight_text_inserted"> Link: http://www.zulipchat.com .'
'</span> <span class="highlight_text_deleted"> Link: http://www.zulip.org .'
'</span> </a></p>'))
def test_edit_history_unedited(self) -> None:
self.login(self.example_email('hamlet'))
msg_id = self.send_stream_message(
self.example_email('hamlet'),
'Scotland',
topic_name='editing',
content='This message has not been edited.')
result = self.client_get('/json/messages/{}/history'.format(msg_id))
self.assert_json_success(result)
message_history = result.json()['message_history']
self.assert_length(message_history, 1)
def test_user_info_for_updates(self) -> None:
hamlet = self.example_user('hamlet')
cordelia = self.example_user('cordelia')
self.login(hamlet.email)
self.subscribe(hamlet, 'Scotland')
self.subscribe(cordelia, 'Scotland')
msg_id = self.send_stream_message(hamlet.email, 'Scotland',
content='@**Cordelia Lear**')
user_info = get_user_info_for_message_updates(msg_id)
message_user_ids = user_info['message_user_ids']
self.assertIn(hamlet.id, message_user_ids)
self.assertIn(cordelia.id, message_user_ids)
mention_user_ids = user_info['mention_user_ids']
self.assertEqual(mention_user_ids, {cordelia.id})
def test_edit_cases(self) -> None:
"""This test verifies the accuracy of construction of Zulip's edit
history data structures."""
self.login(self.example_email("hamlet"))
hamlet = self.example_user('hamlet')
msg_id = self.send_stream_message(self.example_email("hamlet"), "Scotland",
topic_name="topic 1", content="content 1")
result = self.client_patch("/json/messages/" + str(msg_id), {
'message_id': msg_id,
'content': 'content 2',
})
self.assert_json_success(result)
history = ujson.loads(Message.objects.get(id=msg_id).edit_history)
self.assertEqual(history[0]['prev_content'], 'content 1')
self.assertEqual(history[0]['user_id'], hamlet.id)
self.assertEqual(set(history[0].keys()),
{u'timestamp', u'prev_content', u'user_id',
u'prev_rendered_content', u'prev_rendered_content_version'})
result = self.client_patch("/json/messages/" + str(msg_id), {
'message_id': msg_id,
'topic': 'topic 2',
})
self.assert_json_success(result)
history = ujson.loads(Message.objects.get(id=msg_id).edit_history)
self.assertEqual(history[0][LEGACY_PREV_TOPIC], 'topic 1')
self.assertEqual(history[0]['user_id'], hamlet.id)
self.assertEqual(set(history[0].keys()), {'timestamp', LEGACY_PREV_TOPIC, 'user_id'})
result = self.client_patch("/json/messages/" + str(msg_id), {
'message_id': msg_id,
'content': 'content 3',
'topic': 'topic 3',
})
self.assert_json_success(result)
history = ujson.loads(Message.objects.get(id=msg_id).edit_history)
self.assertEqual(history[0]['prev_content'], 'content 2')
self.assertEqual(history[0][LEGACY_PREV_TOPIC], 'topic 2')
self.assertEqual(history[0]['user_id'], hamlet.id)
self.assertEqual(set(history[0].keys()),
{'timestamp', LEGACY_PREV_TOPIC, 'prev_content', 'user_id',
'prev_rendered_content', 'prev_rendered_content_version'})
result = self.client_patch("/json/messages/" + str(msg_id), {
'message_id': msg_id,
'content': 'content 4',
})
self.assert_json_success(result)
history = ujson.loads(Message.objects.get(id=msg_id).edit_history)
self.assertEqual(history[0]['prev_content'], 'content 3')
self.assertEqual(history[0]['user_id'], hamlet.id)
self.login(self.example_email("iago"))
result = self.client_patch("/json/messages/" + str(msg_id), {
'message_id': msg_id,
'topic': 'topic 4',
})
self.assert_json_success(result)
history = ujson.loads(Message.objects.get(id=msg_id).edit_history)
self.assertEqual(history[0][LEGACY_PREV_TOPIC], 'topic 3')
self.assertEqual(history[0]['user_id'], self.example_user('iago').id)
history = ujson.loads(Message.objects.get(id=msg_id).edit_history)
self.assertEqual(history[0][LEGACY_PREV_TOPIC], 'topic 3')
self.assertEqual(history[2][LEGACY_PREV_TOPIC], 'topic 2')
self.assertEqual(history[3][LEGACY_PREV_TOPIC], 'topic 1')
self.assertEqual(history[1]['prev_content'], 'content 3')
self.assertEqual(history[2]['prev_content'], 'content 2')
self.assertEqual(history[4]['prev_content'], 'content 1')
# Now, we verify that the edit history data sent back has the
# correct filled-out fields
message_edit_history = self.client_get("/json/messages/" + str(msg_id) + "/history")
json_response = ujson.loads(message_edit_history.content.decode('utf-8'))
# We reverse the message history view output so that the IDs line up with the above.
message_history = list(reversed(json_response['message_history']))
i = 0
for entry in message_history:
expected_entries = {u'content', u'rendered_content', u'topic', u'timestamp', u'user_id'}
if i in {0, 2, 3}:
expected_entries.add('prev_topic')
if i in {1, 2, 4}:
expected_entries.add('prev_content')
expected_entries.add('prev_rendered_content')
expected_entries.add('content_html_diff')
i += 1
self.assertEqual(expected_entries, set(entry.keys()))
self.assertEqual(len(message_history), 6)
self.assertEqual(message_history[0]['prev_topic'], 'topic 3')
self.assertEqual(message_history[0]['topic'], 'topic 4')
self.assertEqual(message_history[1]['topic'], 'topic 3')
self.assertEqual(message_history[2]['topic'], 'topic 3')
self.assertEqual(message_history[2]['prev_topic'], 'topic 2')
self.assertEqual(message_history[3]['topic'], 'topic 2')
self.assertEqual(message_history[3]['prev_topic'], 'topic 1')
self.assertEqual(message_history[4]['topic'], 'topic 1')
self.assertEqual(message_history[0]['content'], 'content 4')
self.assertEqual(message_history[1]['content'], 'content 4')
self.assertEqual(message_history[1]['prev_content'], 'content 3')
self.assertEqual(message_history[2]['content'], 'content 3')
self.assertEqual(message_history[2]['prev_content'], 'content 2')
self.assertEqual(message_history[3]['content'], 'content 2')
self.assertEqual(message_history[4]['content'], 'content 2')
self.assertEqual(message_history[4]['prev_content'], 'content 1')
self.assertEqual(message_history[5]['content'], 'content 1')
self.assertEqual(message_history[5]['topic'], 'topic 1')
def test_edit_message_content_limit(self) -> None:
def set_message_editing_params(allow_message_editing: bool,
message_content_edit_limit_seconds: int,
allow_community_topic_editing: bool) -> None:
result = self.client_patch("/json/realm", {
'allow_message_editing': ujson.dumps(allow_message_editing),
'message_content_edit_limit_seconds': message_content_edit_limit_seconds,
'allow_community_topic_editing': ujson.dumps(allow_community_topic_editing),
})
self.assert_json_success(result)
def do_edit_message_assert_success(id_: int, unique_str: str, topic_only: bool=False) -> None:
new_topic = 'topic' + unique_str
new_content = 'content' + unique_str
params_dict = {'message_id': id_, 'topic': new_topic}
if not topic_only:
params_dict['content'] = new_content
result = self.client_patch("/json/messages/" + str(id_), params_dict)
self.assert_json_success(result)
if topic_only:
self.check_message(id_, topic_name=new_topic)
else:
self.check_message(id_, topic_name=new_topic, content=new_content)
def do_edit_message_assert_error(id_: int, unique_str: str, error: str,
topic_only: bool=False) -> None:
message = Message.objects.get(id=id_)
old_topic = message.topic_name()
old_content = message.content
new_topic = 'topic' + unique_str
new_content = 'content' + unique_str
params_dict = {'message_id': id_, 'topic': new_topic}
if not topic_only:
params_dict['content'] = new_content
result = self.client_patch("/json/messages/" + str(id_), params_dict)
message = Message.objects.get(id=id_)
self.assert_json_error(result, error)
self.check_message(id_, topic_name=old_topic, content=old_content)
self.login(self.example_email("iago"))
# send a message in the past
id_ = self.send_stream_message(self.example_email("iago"), "Scotland",
content="content", topic_name="topic")
message = Message.objects.get(id=id_)
message.pub_date = message.pub_date - datetime.timedelta(seconds=180)
message.save()
# test the various possible message editing settings
# high enough time limit, all edits allowed
set_message_editing_params(True, 240, False)
do_edit_message_assert_success(id_, 'A')
# out of time, only topic editing allowed
set_message_editing_params(True, 120, False)
do_edit_message_assert_success(id_, 'B', True)
do_edit_message_assert_error(id_, 'C', "The time limit for editing this message has passed")
# infinite time, all edits allowed
set_message_editing_params(True, 0, False)
do_edit_message_assert_success(id_, 'D')
# without allow_message_editing, nothing is allowed
set_message_editing_params(False, 240, False)
do_edit_message_assert_error(id_, 'E', "Your organization has turned off message editing", True)
set_message_editing_params(False, 120, False)
do_edit_message_assert_error(id_, 'F', "Your organization has turned off message editing", True)
set_message_editing_params(False, 0, False)
do_edit_message_assert_error(id_, 'G', "Your organization has turned off message editing", True)
def test_allow_community_topic_editing(self) -> None:
def set_message_editing_params(allow_message_editing,
message_content_edit_limit_seconds,
allow_community_topic_editing):
# type: (bool, int, bool) -> None
result = self.client_patch("/json/realm", {
'allow_message_editing': ujson.dumps(allow_message_editing),
'message_content_edit_limit_seconds': message_content_edit_limit_seconds,
'allow_community_topic_editing': ujson.dumps(allow_community_topic_editing),
})
self.assert_json_success(result)
def do_edit_message_assert_success(id_, unique_str):
# type: (int, str) -> None
new_topic = 'topic' + unique_str
params_dict = {'message_id': id_, 'topic': new_topic}
result = self.client_patch("/json/messages/" + str(id_), params_dict)
self.assert_json_success(result)
self.check_message(id_, topic_name=new_topic)
def do_edit_message_assert_error(id_, unique_str, error):
# type: (int, str, str) -> None
message = Message.objects.get(id=id_)
old_topic = message.topic_name()
old_content = message.content
new_topic = 'topic' + unique_str
params_dict = {'message_id': id_, 'topic': new_topic}
result = self.client_patch("/json/messages/" + str(id_), params_dict)
message = Message.objects.get(id=id_)
self.assert_json_error(result, error)
self.check_message(id_, topic_name=old_topic, content=old_content)
self.login(self.example_email("iago"))
# send a message in the past
id_ = self.send_stream_message(self.example_email("hamlet"), "Scotland",
content="content", topic_name="topic")
message = Message.objects.get(id=id_)
message.pub_date = message.pub_date - datetime.timedelta(seconds=180)
message.save()
# any user can edit the topic of a message
set_message_editing_params(True, 0, True)
# log in as a new user
self.login(self.example_email("cordelia"))
do_edit_message_assert_success(id_, 'A')
# only admins can edit the topics of messages
self.login(self.example_email("iago"))
set_message_editing_params(True, 0, False)
do_edit_message_assert_success(id_, 'B')
self.login(self.example_email("cordelia"))
do_edit_message_assert_error(id_, 'C', "You don't have permission to edit this message")
# users cannot edit topics if allow_message_editing is False
self.login(self.example_email("iago"))
set_message_editing_params(False, 0, True)
self.login(self.example_email("cordelia"))
do_edit_message_assert_error(id_, 'D', "Your organization has turned off message editing")
# non-admin users cannot edit topics sent > 24 hrs ago
message.pub_date = message.pub_date - datetime.timedelta(seconds=90000)
message.save()
self.login(self.example_email("iago"))
set_message_editing_params(True, 0, True)
do_edit_message_assert_success(id_, 'E')
self.login(self.example_email("cordelia"))
do_edit_message_assert_error(id_, 'F', "The time limit for editing this message has passed")
# anyone should be able to edit "no topic" indefinitely
message.set_topic_name("(no topic)")
message.save()
self.login(self.example_email("cordelia"))
do_edit_message_assert_success(id_, 'D')
@mock.patch("zerver.lib.actions.send_event")
def test_edit_topic_public_history_stream(self, mock_send_event: mock.MagicMock) -> None:
stream_name = "Macbeth"
hamlet = self.example_user("hamlet")
cordelia = self.example_user("cordelia")
self.make_stream(stream_name, history_public_to_subscribers=True)
self.subscribe(hamlet, stream_name)
self.login(hamlet.email)
message_id = self.send_stream_message(hamlet.email, stream_name, "Where am I?")
self.login(cordelia.email)
self.subscribe(cordelia, stream_name)
message = Message.objects.get(id=message_id)
def do_update_message_topic_success(user_profile: UserProfile, message: Message,
topic_name: str, users_to_be_notified: List[Dict[str, Any]]) -> None:
do_update_message(
user_profile=user_profile,
message=message,
topic_name=topic_name,
propagate_mode="change_later",
content=None,
rendered_content=None,
prior_mention_user_ids=set(),
mention_user_ids=set()
)
mock_send_event.assert_called_with(mock.ANY, mock.ANY, users_to_be_notified)
# Returns the users that need to be notified when a message topic is changed
def notify(user_id: int) -> Dict[str, Any]:
um = UserMessage.objects.get(message=message_id)
if um.user_profile_id == user_id:
return {
"id": user_id,
"flags": um.flags_list()
}
else:
return {
"id": user_id,
"flags": ["read"]
}
users_to_be_notified = list(map(notify, [hamlet.id, cordelia.id]))
# Edit topic of a message sent before Cordelia subscribed the stream
do_update_message_topic_success(cordelia, message, "Othello eats apple", users_to_be_notified)
# If Cordelia is long-term idle, she doesn't get a notification.
cordelia.long_term_idle = True
cordelia.save()
users_to_be_notified = list(map(notify, [hamlet.id]))
do_update_message_topic_success(cordelia, message, "Another topic idle", users_to_be_notified)
cordelia.long_term_idle = False
cordelia.save()
# Even if Hamlet unsubscribes the stream, he should be notified when the topic is changed
# because he has a UserMessage row.
self.unsubscribe(hamlet, stream_name)
users_to_be_notified = list(map(notify, [hamlet.id, cordelia.id]))
do_update_message_topic_success(cordelia, message, "Another topic", users_to_be_notified)
# Hamlet subscribes to the stream again and Cordelia unsubscribes, then Hamlet changes
# the message topic. Cordelia won't receive any updates when a message on that stream is
# changed because she is not a subscriber and doesn't have a UserMessage row.
self.subscribe(hamlet, stream_name)
self.unsubscribe(cordelia, stream_name)
self.login(hamlet.email)
users_to_be_notified = list(map(notify, [hamlet.id]))
do_update_message_topic_success(hamlet, message, "Change again", users_to_be_notified)
def test_propagate_topic_forward(self) -> None:
self.login(self.example_email("hamlet"))
id1 = self.send_stream_message(self.example_email("hamlet"), "Scotland",
topic_name="topic1")
id2 = self.send_stream_message(self.example_email("iago"), "Scotland",
topic_name="topic1")
id3 = self.send_stream_message(self.example_email("iago"), "Rome",
topic_name="topic1")
id4 = self.send_stream_message(self.example_email("hamlet"), "Scotland",
topic_name="topic2")
id5 = self.send_stream_message(self.example_email("iago"), "Scotland",
topic_name="topic1")
result = self.client_patch("/json/messages/" + str(id1), {
'message_id': id1,
'topic': 'edited',
'propagate_mode': 'change_later'
})
self.assert_json_success(result)
self.check_message(id1, topic_name="edited")
self.check_message(id2, topic_name="edited")
self.check_message(id3, topic_name="topic1")
self.check_message(id4, topic_name="topic2")
self.check_message(id5, topic_name="edited")
def test_propagate_all_topics(self) -> None:
self.login(self.example_email("hamlet"))
id1 = self.send_stream_message(self.example_email("hamlet"), "Scotland",
topic_name="topic1")
id2 = self.send_stream_message(self.example_email("hamlet"), "Scotland",
topic_name="topic1")
id3 = self.send_stream_message(self.example_email("iago"), "Rome",
topic_name="topic1")
id4 = self.send_stream_message(self.example_email("hamlet"), "Scotland",
topic_name="topic2")
id5 = self.send_stream_message(self.example_email("iago"), "Scotland",
topic_name="topic1")
id6 = self.send_stream_message(self.example_email("iago"), "Scotland",
topic_name="topic3")
result = self.client_patch("/json/messages/" + str(id2), {
'message_id': id2,
'topic': 'edited',
'propagate_mode': 'change_all'
})
self.assert_json_success(result)
self.check_message(id1, topic_name="edited")
self.check_message(id2, topic_name="edited")
self.check_message(id3, topic_name="topic1")
self.check_message(id4, topic_name="topic2")
self.check_message(id5, topic_name="edited")
self.check_message(id6, topic_name="topic3")
class MirroredMessageUsersTest(ZulipTestCase):
def test_invalid_sender(self) -> None:
user = self.example_user('hamlet')
recipients = [] # type: List[str]
Request = namedtuple('Request', ['POST'])
request = Request(POST=dict()) # no sender
(valid_input, mirror_sender) = \
create_mirrored_message_users(request, user, recipients)
self.assertEqual(valid_input, False)
self.assertEqual(mirror_sender, None)
def test_invalid_client(self) -> None:
client = get_client(name='banned_mirror') # Invalid!!!
user = self.example_user('hamlet')
sender = user
recipients = [] # type: List[str]
Request = namedtuple('Request', ['POST', 'client'])
request = Request(POST = dict(sender=sender.email, type='private'),
client = client)
(valid_input, mirror_sender) = \
create_mirrored_message_users(request, user, recipients)
self.assertEqual(valid_input, False)
self.assertEqual(mirror_sender, None)
def test_invalid_email(self) -> None:
invalid_email = 'alice AT example.com'
recipients = [invalid_email]
# We use an MIT user here to maximize code coverage
user = self.mit_user('starnine')
sender = user
Request = namedtuple('Request', ['POST', 'client'])
for client_name in ['zephyr_mirror', 'irc_mirror', 'jabber_mirror']:
client = get_client(name=client_name)
request = Request(POST = dict(sender=sender.email, type='private'),
client = client)
(valid_input, mirror_sender) = \
create_mirrored_message_users(request, user, recipients)
self.assertEqual(valid_input, False)
self.assertEqual(mirror_sender, None)
@mock.patch('DNS.dnslookup', return_value=[['sipbtest:*:20922:101:Fred Sipb,,,:/mit/sipbtest:/bin/athena/tcsh']])
def test_zephyr_mirror_new_recipient(self, ignored: Any) -> None:
"""Test mirror dummy user creation for PM recipients"""
client = get_client(name='zephyr_mirror')
user = self.mit_user('starnine')
sender = self.mit_user('sipbtest')
new_user_email = 'bob_the_new_user@mit.edu'
new_user_realm = get_realm("zephyr")
recipients = [user.email, new_user_email]
# Now make the request.
Request = namedtuple('Request', ['POST', 'client'])
request = Request(POST = dict(sender=sender.email, type='private'),
client = client)
(valid_input, mirror_sender) = \
create_mirrored_message_users(request, user, recipients)
self.assertTrue(valid_input)
self.assertEqual(mirror_sender, sender)
realm_users = UserProfile.objects.filter(realm=sender.realm)
realm_emails = {user.email for user in realm_users}
self.assertIn(user.email, realm_emails)
self.assertIn(new_user_email, realm_emails)
bob = get_user(new_user_email, new_user_realm)
self.assertTrue(bob.is_mirror_dummy)
@mock.patch('DNS.dnslookup', return_value=[['sipbtest:*:20922:101:Fred Sipb,,,:/mit/sipbtest:/bin/athena/tcsh']])
def test_zephyr_mirror_new_sender(self, ignored: Any) -> None:
"""Test mirror dummy user creation for sender when sending to stream"""
client = get_client(name='zephyr_mirror')
user = self.mit_user('starnine')
sender_email = 'new_sender@mit.edu'
recipients = ['stream_name']
# Now make the request.
Request = namedtuple('Request', ['POST', 'client'])
request = Request(POST = dict(sender=sender_email, type='stream'),
client = client)
(valid_input, mirror_sender) = \
create_mirrored_message_users(request, user, recipients)
assert(mirror_sender is not None)
self.assertTrue(valid_input)
self.assertEqual(mirror_sender.email, sender_email)
self.assertTrue(mirror_sender.is_mirror_dummy)
def test_irc_mirror(self) -> None:
client = get_client(name='irc_mirror')
sender = self.example_user('hamlet')
user = sender
recipients = [self.nonreg_email('alice'), 'bob@irc.zulip.com', self.nonreg_email('cordelia')]
# Now make the request.
Request = namedtuple('Request', ['POST', 'client'])
request = Request(POST = dict(sender=sender.email, type='private'),
client = client)
(valid_input, mirror_sender) = \
create_mirrored_message_users(request, user, recipients)
self.assertEqual(valid_input, True)
self.assertEqual(mirror_sender, sender)
realm_users = UserProfile.objects.filter(realm=sender.realm)
realm_emails = {user.email for user in realm_users}
self.assertIn(self.nonreg_email('alice'), realm_emails)
self.assertIn('bob@irc.zulip.com', realm_emails)
bob = get_user('bob@irc.zulip.com', sender.realm)
self.assertTrue(bob.is_mirror_dummy)
def test_jabber_mirror(self) -> None:
client = get_client(name='jabber_mirror')
sender = self.example_user('hamlet')
user = sender
recipients = [self.nonreg_email('alice'), self.nonreg_email('bob'), self.nonreg_email('cordelia')]
# Now make the request.
Request = namedtuple('Request', ['POST', 'client'])
request = Request(POST = dict(sender=sender.email, type='private'),
client = client)
(valid_input, mirror_sender) = \
create_mirrored_message_users(request, user, recipients)
self.assertEqual(valid_input, True)
self.assertEqual(mirror_sender, sender)
realm_users = UserProfile.objects.filter(realm=sender.realm)
realm_emails = {user.email for user in realm_users}
self.assertIn(self.nonreg_email('alice'), realm_emails)
self.assertIn(self.nonreg_email('bob'), realm_emails)
bob = get_user(self.nonreg_email('bob'), sender.realm)
self.assertTrue(bob.is_mirror_dummy)
class MessageAccessTests(ZulipTestCase):
def test_update_invalid_flags(self) -> None:
message = self.send_personal_message(
self.example_email("cordelia"),
self.example_email("hamlet"),
"hello",
)
self.login(self.example_email("hamlet"))
result = self.client_post("/json/messages/flags",
{"messages": ujson.dumps([message]),
"op": "add",
"flag": "invalid"})
self.assert_json_error(result, "Invalid flag: 'invalid'")
result = self.client_post("/json/messages/flags",
{"messages": ujson.dumps([message]),
"op": "add",
"flag": "is_private"})
self.assert_json_error(result, "Invalid flag: 'is_private'")
result = self.client_post("/json/messages/flags",
{"messages": ujson.dumps([message]),
"op": "add",
"flag": "active_mobile_push_notification"})
self.assert_json_error(result, "Invalid flag: 'active_mobile_push_notification'")
def change_star(self, messages: List[int], add: bool=True, **kwargs: Any) -> HttpResponse:
return self.client_post("/json/messages/flags",
{"messages": ujson.dumps(messages),
"op": "add" if add else "remove",
"flag": "starred"},
**kwargs)
def test_change_star(self) -> None:
"""
You can set a message as starred/un-starred through
POST /json/messages/flags.
"""
self.login(self.example_email("hamlet"))
message_ids = [self.send_personal_message(self.example_email("hamlet"),
self.example_email("hamlet"),
"test")]
# Star a message.
result = self.change_star(message_ids)
self.assert_json_success(result)
for msg in self.get_messages():
if msg['id'] in message_ids:
self.assertEqual(msg['flags'], ['starred'])
else:
self.assertEqual(msg['flags'], ['read'])
result = self.change_star(message_ids, False)
self.assert_json_success(result)
# Remove the stars.
for msg in self.get_messages():
if msg['id'] in message_ids:
self.assertEqual(msg['flags'], [])
def test_change_star_public_stream_historical(self) -> None:
"""
You can set a message as starred/un-starred through
POST /json/messages/flags.
"""
stream_name = "new_stream"
self.subscribe(self.example_user("hamlet"), stream_name)
self.login(self.example_email("hamlet"))
message_ids = [
self.send_stream_message(self.example_email("hamlet"), stream_name, "test"),
]
# Send a second message so we can verify it isn't modified
other_message_ids = [
self.send_stream_message(self.example_email("hamlet"), stream_name, "test_unused"),
]
received_message_ids = [
self.send_personal_message(
self.example_email("hamlet"),
self.example_email("cordelia"),
"test_received"
),
]
# Now login as another user who wasn't on that stream
self.login(self.example_email("cordelia"))
# Send a message to yourself to make sure we have at least one with the read flag
sent_message_ids = [
self.send_personal_message(
self.example_email("cordelia"),
self.example_email("cordelia"),
"test_read_message",
),
]
result = self.client_post("/json/messages/flags",
{"messages": ujson.dumps(sent_message_ids),
"op": "add",
"flag": "read"})
# We can't change flags other than "starred" on historical messages:
result = self.client_post("/json/messages/flags",
{"messages": ujson.dumps(message_ids),
"op": "add",
"flag": "read"})
self.assert_json_error(result, 'Invalid message(s)')
# Trying to change a list of more than one historical message fails
result = self.change_star(message_ids * 2)
self.assert_json_error(result, 'Invalid message(s)')
# Confirm that one can change the historical flag now
result = self.change_star(message_ids)
self.assert_json_success(result)
for msg in self.get_messages():
if msg['id'] in message_ids:
self.assertEqual(set(msg['flags']), {'starred', 'historical', 'read'})
elif msg['id'] in received_message_ids:
self.assertEqual(msg['flags'], [])
else:
self.assertEqual(msg['flags'], ['read'])
self.assertNotIn(msg['id'], other_message_ids)
result = self.change_star(message_ids, False)
self.assert_json_success(result)
# But it still doesn't work if you're in another realm
self.login(self.mit_email("sipbtest"), realm=get_realm("zephyr"))
result = self.change_star(message_ids, subdomain="zephyr")
self.assert_json_error(result, 'Invalid message(s)')
def test_change_star_private_message_security(self) -> None:
"""
You can set a message as starred/un-starred through
POST /json/messages/flags.
"""
self.login(self.example_email("hamlet"))
message_ids = [
self.send_personal_message(
self.example_email("hamlet"),
self.example_email("hamlet"),
"test",
),
]
# Starring private messages you didn't receive fails.
self.login(self.example_email("cordelia"))
result = self.change_star(message_ids)
self.assert_json_error(result, 'Invalid message(s)')
def test_change_star_private_stream_security(self) -> None:
stream_name = "private_stream"
self.make_stream(stream_name, invite_only=True)
self.subscribe(self.example_user("hamlet"), stream_name)
self.login(self.example_email("hamlet"))
message_ids = [
self.send_stream_message(self.example_email("hamlet"), stream_name, "test"),
]
# Starring private stream messages you received works
result = self.change_star(message_ids)
self.assert_json_success(result)
# Starring private stream messages you didn't receive fails.
self.login(self.example_email("cordelia"))
result = self.change_star(message_ids)
self.assert_json_error(result, 'Invalid message(s)')
stream_name = "private_stream_2"
self.make_stream(stream_name, invite_only=True,
history_public_to_subscribers=True)
self.subscribe(self.example_user("hamlet"), stream_name)
self.login(self.example_email("hamlet"))
message_ids = [
self.send_stream_message(self.example_email("hamlet"), stream_name, "test"),
]
# With stream.history_public_to_subscribers = True, you still
# can't see it if you didn't receive the message and are
# not subscribed.
self.login(self.example_email("cordelia"))
result = self.change_star(message_ids)
self.assert_json_error(result, 'Invalid message(s)')
# But if you subscribe, then you can star the message
self.subscribe(self.example_user("cordelia"), stream_name)
result = self.change_star(message_ids)
self.assert_json_success(result)
def test_new_message(self) -> None:
"""
New messages aren't starred.
"""
test_email = self.example_email('hamlet')
self.login(test_email)
content = "Test message for star"
self.send_stream_message(test_email, "Verona",
content=content)
sent_message = UserMessage.objects.filter(
user_profile=self.example_user('hamlet')
).order_by("id").reverse()[0]
self.assertEqual(sent_message.message.content, content)
self.assertFalse(sent_message.flags.starred)
def test_change_star_public_stream_security_for_guest_user(self) -> None:
# Guest user can't access(star) unsubscribed public stream messages
normal_user = self.example_user("hamlet")
stream_name = "public_stream"
self.make_stream(stream_name)
self.subscribe(normal_user, stream_name)
self.login(normal_user.email)
message_id = [
self.send_stream_message(normal_user.email, stream_name, "test 1")
]
guest_user = self.example_user('polonius')
self.login(guest_user.email)
result = self.change_star(message_id)
self.assert_json_error(result, 'Invalid message(s)')
# Subscribed guest users can access public stream messages sent before they join
self.subscribe(guest_user, stream_name)
result = self.change_star(message_id)
self.assert_json_success(result)
# And messages sent after they join
self.login(normal_user.email)
message_id = [
self.send_stream_message(normal_user.email, stream_name, "test 2")
]
self.login(guest_user.email)
result = self.change_star(message_id)
self.assert_json_success(result)
def test_change_star_private_stream_security_for_guest_user(self) -> None:
# Guest users can't access(star) unsubscribed private stream messages
normal_user = self.example_user("hamlet")
stream_name = "private_stream"
stream = self.make_stream(stream_name, invite_only=True)
self.subscribe(normal_user, stream_name)
self.login(normal_user.email)
message_id = [
self.send_stream_message(normal_user.email, stream_name, "test 1")
]
guest_user = self.example_user('polonius')
self.login(guest_user.email)
result = self.change_star(message_id)
self.assert_json_error(result, 'Invalid message(s)')
# Guest user can't access messages of subscribed private streams if
# history is not public to subscribers
self.subscribe(guest_user, stream_name)
result = self.change_star(message_id)
self.assert_json_error(result, 'Invalid message(s)')
# Guest user can access messages of subscribed private streams if
# history is public to subscribers
do_change_stream_invite_only(stream, True, history_public_to_subscribers=True)
result = self.change_star(message_id)
self.assert_json_success(result)
# With history not public to subscribers, they can still see new messages
do_change_stream_invite_only(stream, True, history_public_to_subscribers=False)
self.login(normal_user.email)
message_id = [
self.send_stream_message(normal_user.email, stream_name, "test 2")
]
self.login(guest_user.email)
result = self.change_star(message_id)
self.assert_json_success(result)
def test_bulk_access_messages_private_stream(self) -> None:
user = self.example_user("hamlet")
self.login(user.email)
stream_name = "private_stream"
stream = self.make_stream(stream_name, invite_only=True,
history_public_to_subscribers=False)
self.subscribe(user, stream_name)
# Send a message before subscribing a new user to stream
message_one_id = self.send_stream_message(user.email,
stream_name, "Message one")
later_subscribed_user = self.example_user("cordelia")
# Subscribe a user to private-protected history stream
self.subscribe(later_subscribed_user, stream_name)
# Send a message after subscribing a new user to stream
message_two_id = self.send_stream_message(user.email,
stream_name, "Message two")
message_ids = [message_one_id, message_two_id]
messages = [Message.objects.select_related().get(id=message_id)
for message_id in message_ids]
filtered_messages = bulk_access_messages(later_subscribed_user, messages)
# Message sent before subscribing wouldn't be accessible by later
# subscribed user as stream has protected history
self.assertEqual(len(filtered_messages), 1)
self.assertEqual(filtered_messages[0].id, message_two_id)
do_change_stream_invite_only(stream, True, history_public_to_subscribers=True)
filtered_messages = bulk_access_messages(later_subscribed_user, messages)
# Message sent before subscribing are accessible by 8user as stream
# don't have protected history
self.assertEqual(len(filtered_messages), 2)
# Testing messages accessiblity for an unsubscribed user
unsubscribed_user = self.example_user("ZOE")
filtered_messages = bulk_access_messages(unsubscribed_user, messages)
self.assertEqual(len(filtered_messages), 0)
def test_bulk_access_messages_public_stream(self) -> None:
user = self.example_user("hamlet")
self.login(user.email)
# Testing messages accessiblity including a public stream message
stream_name = "public_stream"
self.subscribe(user, stream_name)
message_one_id = self.send_stream_message(user.email,
stream_name, "Message one")
later_subscribed_user = self.example_user("cordelia")
self.subscribe(later_subscribed_user, stream_name)
# Send a message after subscribing a new user to stream
message_two_id = self.send_stream_message(user.email,
stream_name, "Message two")
message_ids = [message_one_id, message_two_id]
messages = [Message.objects.select_related().get(id=message_id)
for message_id in message_ids]
# All public stream messages are always accessible
filtered_messages = bulk_access_messages(later_subscribed_user, messages)
self.assertEqual(len(filtered_messages), 2)
unsubscribed_user = self.example_user("ZOE")
filtered_messages = bulk_access_messages(unsubscribed_user, messages)
self.assertEqual(len(filtered_messages), 2)
class AttachmentTest(ZulipTestCase):
def test_basics(self) -> None:
self.assertFalse(Message.content_has_attachment('whatever'))
self.assertFalse(Message.content_has_attachment('yo http://foo.com'))
self.assertTrue(Message.content_has_attachment('yo\n https://staging.zulip.com/user_uploads/'))
self.assertTrue(Message.content_has_attachment('yo\n /user_uploads/1/wEAnI-PEmVmCjo15xxNaQbnj/photo-10.jpg foo'))
self.assertFalse(Message.content_has_image('whatever'))
self.assertFalse(Message.content_has_image('yo http://foo.com'))
self.assertFalse(Message.content_has_image('yo\n /user_uploads/1/wEAnI-PEmVmCjo15xxNaQbnj/photo-10.pdf foo'))
for ext in [".bmp", ".gif", ".jpg", "jpeg", ".png", ".webp", ".JPG"]:
content = 'yo\n /user_uploads/1/wEAnI-PEmVmCjo15xxNaQbnj/photo-10.%s foo' % (ext,)
self.assertTrue(Message.content_has_image(content))
self.assertFalse(Message.content_has_link('whatever'))
self.assertTrue(Message.content_has_link('yo\n http://foo.com'))
self.assertTrue(Message.content_has_link('yo\n https://example.com?spam=1&eggs=2'))
self.assertTrue(Message.content_has_link('yo /user_uploads/1/wEAnI-PEmVmCjo15xxNaQbnj/photo-10.pdf foo'))
def test_claim_attachment(self) -> None:
# Create dummy DB entry
user_profile = self.example_user('hamlet')
sample_size = 10
dummy_files = [
('zulip.txt', '1/31/4CBjtTLYZhk66pZrF8hnYGwc/zulip.txt', sample_size),
('temp_file.py', '1/31/4CBjtTLYZhk66pZrF8hnYGwc/temp_file.py', sample_size),
('abc.py', '1/31/4CBjtTLYZhk66pZrF8hnYGwc/abc.py', sample_size)
]
for file_name, path_id, size in dummy_files:
create_attachment(file_name, path_id, user_profile, size)
# Send message referring the attachment
self.subscribe(user_profile, "Denmark")
body = "Some files here ...[zulip.txt](http://localhost:9991/user_uploads/1/31/4CBjtTLYZhk66pZrF8hnYGwc/zulip.txt)" + \
"http://localhost:9991/user_uploads/1/31/4CBjtTLYZhk66pZrF8hnYGwc/temp_file.py.... Some more...." + \
"http://localhost:9991/user_uploads/1/31/4CBjtTLYZhk66pZrF8hnYGwc/abc.py"
self.send_stream_message(user_profile.email, "Denmark", body, "test")
for file_name, path_id, size in dummy_files:
attachment = Attachment.objects.get(path_id=path_id)
self.assertTrue(attachment.is_claimed())
class MissedMessageTest(ZulipTestCase):
def test_presence_idle_user_ids(self) -> None:
UserPresence.objects.all().delete()
sender = self.example_user('cordelia')
realm = sender.realm
hamlet = self.example_user('hamlet')
othello = self.example_user('othello')
recipient_ids = {hamlet.id, othello.id}
message_type = 'stream'
user_flags = {} # type: Dict[int, List[str]]
def assert_missing(user_ids: List[int]) -> None:
presence_idle_user_ids = get_active_presence_idle_user_ids(
realm=realm,
sender_id=sender.id,
message_type=message_type,
active_user_ids=recipient_ids,
user_flags=user_flags,
)
self.assertEqual(sorted(user_ids), sorted(presence_idle_user_ids))
def set_presence(user_id: int, client_name: str, ago: int) -> None:
when = timezone_now() - datetime.timedelta(seconds=ago)
UserPresence.objects.create(
user_profile_id=user_id,
client=get_client(client_name),
timestamp=when,
)
message_type = 'private'
assert_missing([hamlet.id, othello.id])
message_type = 'stream'
user_flags[hamlet.id] = ['mentioned']
assert_missing([hamlet.id])
set_presence(hamlet.id, 'iPhone', ago=5000)
assert_missing([hamlet.id])
set_presence(hamlet.id, 'webapp', ago=15)
assert_missing([])
message_type = 'private'
assert_missing([othello.id])
class LogDictTest(ZulipTestCase):
def test_to_log_dict(self) -> None:
email = self.example_email('hamlet')
stream_name = 'Denmark'
topic_name = 'Copenhagen'
content = 'find me some good coffee shops'
# self.login(self.example_email("hamlet"))
message_id = self.send_stream_message(email, stream_name,
topic_name=topic_name,
content=content)
message = Message.objects.get(id=message_id)
dct = message.to_log_dict()
self.assertTrue('timestamp' in dct)
self.assertEqual(dct['content'], 'find me some good coffee shops')
self.assertEqual(dct['id'], message.id)
self.assertEqual(dct['recipient'], 'Denmark')
self.assertEqual(dct['sender_realm_str'], 'zulip')
self.assertEqual(dct['sender_email'], self.example_email("hamlet"))
self.assertEqual(dct['sender_full_name'], 'King Hamlet')
self.assertEqual(dct['sender_id'], self.example_user('hamlet').id)
self.assertEqual(dct['sender_short_name'], 'hamlet')
self.assertEqual(dct['sending_client'], 'test suite')
self.assertEqual(dct[DB_TOPIC_NAME], 'Copenhagen')
self.assertEqual(dct['type'], 'stream')
class CheckMessageTest(ZulipTestCase):
def test_basic_check_message_call(self) -> None:
sender = self.example_user('othello')
client = make_client(name="test suite")
stream_name = u'España y Francia'
self.make_stream(stream_name)
topic_name = 'issue'
message_content = 'whatever'
addressee = Addressee.for_stream_name(stream_name, topic_name)
ret = check_message(sender, client, addressee, message_content)
self.assertEqual(ret['message'].sender.email, self.example_email("othello"))
def test_bot_pm_feature(self) -> None:
"""We send a PM to a bot's owner if their bot sends a message to
an unsubscribed stream"""
parent = self.example_user('othello')
bot = do_create_user(
email='othello-bot@zulip.com',
password='',
realm=parent.realm,
full_name='',
short_name='',
bot_type=UserProfile.DEFAULT_BOT,
bot_owner=parent
)
bot.last_reminder = None
sender = bot
client = make_client(name="test suite")
stream_name = u'Россия'
topic_name = 'issue'
addressee = Addressee.for_stream_name(stream_name, topic_name)
message_content = 'whatever'
old_count = message_stream_count(parent)
# Try sending to stream that doesn't exist sends a reminder to
# the sender
with self.assertRaises(JsonableError):
check_message(sender, client, addressee, message_content)
new_count = message_stream_count(parent)
self.assertEqual(new_count, old_count + 1)
self.assertIn("that stream does not yet exist.", most_recent_message(parent).content)
# Try sending to stream that exists with no subscribers soon
# after; due to rate-limiting, this should send nothing.
self.make_stream(stream_name)
ret = check_message(sender, client, addressee, message_content)
new_count = message_stream_count(parent)
self.assertEqual(new_count, old_count + 1)
# Try sending to stream that exists with no subscribers longer
# after; this should send an error to the bot owner that the
# stream doesn't exist
assert(sender.last_reminder is not None)
sender.last_reminder = sender.last_reminder - datetime.timedelta(hours=1)
sender.save(update_fields=["last_reminder"])
ret = check_message(sender, client, addressee, message_content)
new_count = message_stream_count(parent)
self.assertEqual(new_count, old_count + 2)
self.assertEqual(ret['message'].sender.email, 'othello-bot@zulip.com')
self.assertIn("there are no subscribers to that stream", most_recent_message(parent).content)
def test_bot_pm_error_handling(self) -> None:
# This just test some defensive code.
cordelia = self.example_user('cordelia')
test_bot = self.create_test_bot(
short_name='test',
user_profile=cordelia,
)
content = 'whatever'
good_realm = test_bot.realm
wrong_realm = get_realm('mit')
wrong_sender = cordelia
send_rate_limited_pm_notification_to_bot_owner(test_bot, wrong_realm, content)
self.assertEqual(test_bot.last_reminder, None)
send_rate_limited_pm_notification_to_bot_owner(wrong_sender, good_realm, content)
self.assertEqual(test_bot.last_reminder, None)
test_bot.realm.deactivated = True
send_rate_limited_pm_notification_to_bot_owner(test_bot, good_realm, content)
self.assertEqual(test_bot.last_reminder, None)
class DeleteMessageTest(ZulipTestCase):
def test_delete_message_by_user(self) -> None:
def set_message_deleting_params(allow_message_deleting: bool,
message_content_delete_limit_seconds: int) -> None:
self.login("iago@zulip.com")
result = self.client_patch("/json/realm", {
'allow_message_deleting': ujson.dumps(allow_message_deleting),
'message_content_delete_limit_seconds': message_content_delete_limit_seconds
})
self.assert_json_success(result)
def test_delete_message_by_admin(msg_id: int) -> HttpResponse:
self.login("iago@zulip.com")
result = self.client_delete('/json/messages/{msg_id}'.format(msg_id=msg_id))
return result
def test_delete_message_by_owner(msg_id: int) -> HttpResponse:
self.login("hamlet@zulip.com")
result = self.client_delete('/json/messages/{msg_id}'.format(msg_id=msg_id))
return result
def test_delete_message_by_other_user(msg_id: int) -> HttpResponse:
self.login("cordelia@zulip.com")
result = self.client_delete('/json/messages/{msg_id}'.format(msg_id=msg_id))
return result
# Test if message deleting is not allowed(default).
set_message_deleting_params(False, 0)
self.login("hamlet@zulip.com")
msg_id = self.send_stream_message("hamlet@zulip.com", "Scotland")
result = test_delete_message_by_owner(msg_id=msg_id)
self.assert_json_error(result, "You don't have permission to delete this message")
result = test_delete_message_by_other_user(msg_id=msg_id)
self.assert_json_error(result, "You don't have permission to delete this message")
result = test_delete_message_by_admin(msg_id=msg_id)
self.assert_json_success(result)
# Test if message deleting is allowed.
# Test if time limit is zero(no limit).
set_message_deleting_params(True, 0)
msg_id = self.send_stream_message("hamlet@zulip.com", "Scotland")
message = Message.objects.get(id=msg_id)
message.pub_date = message.pub_date - datetime.timedelta(seconds=600)
message.save()
result = test_delete_message_by_other_user(msg_id=msg_id)
self.assert_json_error(result, "You don't have permission to delete this message")
result = test_delete_message_by_owner(msg_id=msg_id)
self.assert_json_success(result)
# Test if time limit is non-zero.
set_message_deleting_params(True, 240)
msg_id_1 = self.send_stream_message("hamlet@zulip.com", "Scotland")
message = Message.objects.get(id=msg_id_1)
message.pub_date = message.pub_date - datetime.timedelta(seconds=120)
message.save()
msg_id_2 = self.send_stream_message("hamlet@zulip.com", "Scotland")
message = Message.objects.get(id=msg_id_2)
message.pub_date = message.pub_date - datetime.timedelta(seconds=360)
message.save()
result = test_delete_message_by_other_user(msg_id=msg_id_1)
self.assert_json_error(result, "You don't have permission to delete this message")
result = test_delete_message_by_owner(msg_id=msg_id_1)
self.assert_json_success(result)
result = test_delete_message_by_owner(msg_id=msg_id_2)
self.assert_json_error(result, "The time limit for deleting this message has passed")
# No limit for admin.
result = test_delete_message_by_admin(msg_id=msg_id_2)
self.assert_json_success(result)
# Test mulitple delete requests with no latency issues
msg_id = self.send_stream_message("hamlet@zulip.com", "Scotland")
result = test_delete_message_by_owner(msg_id=msg_id)
self.assert_json_success(result)
result = test_delete_message_by_owner(msg_id=msg_id)
self.assert_json_error(result, "Invalid message(s)")
# Test handling of 500 error caused by mulitple delete requests due to latency.
# see issue #11219.
with mock.patch("zerver.views.messages.do_delete_messages") as m, \
mock.patch("zerver.views.messages.validate_can_delete_message", return_value=None), \
mock.patch("zerver.views.messages.access_message", return_value=(None, None)):
m.side_effect = IntegrityError()
result = test_delete_message_by_owner(msg_id=msg_id)
self.assert_json_error(result, "Message already deleted")
m.side_effect = Message.DoesNotExist()
result = test_delete_message_by_owner(msg_id=msg_id)
self.assert_json_error(result, "Message already deleted")
class SoftDeactivationMessageTest(ZulipTestCase):
def test_reactivate_user_if_soft_deactivated(self) -> None:
recipient_list = [self.example_user("hamlet"), self.example_user("iago")]
for user_profile in recipient_list:
self.subscribe(user_profile, "Denmark")
sender = self.example_email('iago')
stream_name = 'Denmark'
topic_name = 'foo'
def last_realm_audit_log_entry(event_type: str) -> RealmAuditLog:
return RealmAuditLog.objects.filter(
event_type=event_type
).order_by('-event_time')[0]
long_term_idle_user = self.example_user('hamlet')
# We are sending this message to ensure that long_term_idle_user has
# at least one UserMessage row.
self.send_stream_message(long_term_idle_user.email, stream_name)
do_soft_deactivate_users([long_term_idle_user])
message = 'Test Message 1'
message_id = self.send_stream_message(sender, stream_name,
message, topic_name)
idle_user_msg_list = get_user_messages(long_term_idle_user)
idle_user_msg_count = len(idle_user_msg_list)
self.assertNotEqual(idle_user_msg_list[-1].content, message)
with queries_captured() as queries:
reactivate_user_if_soft_deactivated(long_term_idle_user)
self.assert_length(queries, 8)
self.assertFalse(long_term_idle_user.long_term_idle)
self.assertEqual(last_realm_audit_log_entry(
RealmAuditLog.USER_SOFT_ACTIVATED).modified_user, long_term_idle_user)
idle_user_msg_list = get_user_messages(long_term_idle_user)
self.assertEqual(len(idle_user_msg_list), idle_user_msg_count + 1)
self.assertEqual(idle_user_msg_list[-1].content, message)
long_term_idle_user.refresh_from_db()
self.assertEqual(long_term_idle_user.last_active_message_id, message_id)
def test_add_missing_messages(self) -> None:
recipient_list = [self.example_user("hamlet"), self.example_user("iago")]
for user_profile in recipient_list:
self.subscribe(user_profile, "Denmark")
sender = self.example_user('iago')
realm = sender.realm
sending_client = make_client(name="test suite")
stream_name = 'Denmark'
stream = get_stream(stream_name, realm)
topic_name = 'foo'
def send_fake_message(message_content: str, stream: Stream) -> Message:
recipient = get_stream_recipient(stream.id)
message = Message(sender = sender,
recipient = recipient,
content = message_content,
pub_date = timezone_now(),
sending_client = sending_client)
message.set_topic_name(topic_name)
message.save()
return message
long_term_idle_user = self.example_user('hamlet')
self.send_stream_message(long_term_idle_user.email, stream_name)
do_soft_deactivate_users([long_term_idle_user])
# Test that add_missing_messages() in simplest case of adding a
# message for which UserMessage row doesn't exist for this user.
sent_message = send_fake_message('Test Message 1', stream)
idle_user_msg_list = get_user_messages(long_term_idle_user)
idle_user_msg_count = len(idle_user_msg_list)
self.assertNotEqual(idle_user_msg_list[-1], sent_message)
with queries_captured() as queries:
add_missing_messages(long_term_idle_user)
self.assert_length(queries, 6)
idle_user_msg_list = get_user_messages(long_term_idle_user)
self.assertEqual(len(idle_user_msg_list), idle_user_msg_count + 1)
self.assertEqual(idle_user_msg_list[-1], sent_message)
long_term_idle_user.refresh_from_db()
self.assertEqual(long_term_idle_user.last_active_message_id, sent_message.id)
# Test that add_missing_messages() only adds messages that aren't
# already present in the UserMessage table. This test works on the
# fact that previous test just above this added a message but didn't
# updated the last_active_message_id field for the user.
sent_message = send_fake_message('Test Message 2', stream)
idle_user_msg_list = get_user_messages(long_term_idle_user)
idle_user_msg_count = len(idle_user_msg_list)
self.assertNotEqual(idle_user_msg_list[-1], sent_message)
with queries_captured() as queries:
add_missing_messages(long_term_idle_user)
self.assert_length(queries, 6)
idle_user_msg_list = get_user_messages(long_term_idle_user)
self.assertEqual(len(idle_user_msg_list), idle_user_msg_count + 1)
self.assertEqual(idle_user_msg_list[-1], sent_message)
long_term_idle_user.refresh_from_db()
self.assertEqual(long_term_idle_user.last_active_message_id, sent_message.id)
# Test UserMessage rows are created correctly in case of stream
# Subscription was altered by admin while user was away.
# Test for a public stream.
sent_message_list = []
sent_message_list.append(send_fake_message('Test Message 3', stream))
# Alter subscription to stream.
self.unsubscribe(long_term_idle_user, stream_name)
send_fake_message('Test Message 4', stream)
self.subscribe(long_term_idle_user, stream_name)
sent_message_list.append(send_fake_message('Test Message 5', stream))
sent_message_list.reverse()
idle_user_msg_list = get_user_messages(long_term_idle_user)
idle_user_msg_count = len(idle_user_msg_list)
for sent_message in sent_message_list:
self.assertNotEqual(idle_user_msg_list.pop(), sent_message)
with queries_captured() as queries:
add_missing_messages(long_term_idle_user)
self.assert_length(queries, 6)
idle_user_msg_list = get_user_messages(long_term_idle_user)
self.assertEqual(len(idle_user_msg_list), idle_user_msg_count + 2)
for sent_message in sent_message_list:
self.assertEqual(idle_user_msg_list.pop(), sent_message)
long_term_idle_user.refresh_from_db()
self.assertEqual(long_term_idle_user.last_active_message_id, sent_message_list[0].id)
# Test consecutive subscribe/unsubscribe in a public stream
sent_message_list = []
sent_message_list.append(send_fake_message('Test Message 6', stream))
# Unsubscribe from stream and then immediately subscribe back again.
self.unsubscribe(long_term_idle_user, stream_name)
self.subscribe(long_term_idle_user, stream_name)
sent_message_list.append(send_fake_message('Test Message 7', stream))
# Again unsubscribe from stream and send a message.
# This will make sure that if initially in a unsubscribed state
# a consecutive subscribe/unsubscribe doesn't misbehave.
self.unsubscribe(long_term_idle_user, stream_name)
send_fake_message('Test Message 8', stream)
# Do a subscribe and unsubscribe immediately.
self.subscribe(long_term_idle_user, stream_name)
self.unsubscribe(long_term_idle_user, stream_name)
sent_message_list.reverse()
idle_user_msg_list = get_user_messages(long_term_idle_user)
idle_user_msg_count = len(idle_user_msg_list)
for sent_message in sent_message_list:
self.assertNotEqual(idle_user_msg_list.pop(), sent_message)
with queries_captured() as queries:
add_missing_messages(long_term_idle_user)
self.assert_length(queries, 6)
idle_user_msg_list = get_user_messages(long_term_idle_user)
self.assertEqual(len(idle_user_msg_list), idle_user_msg_count + 2)
for sent_message in sent_message_list:
self.assertEqual(idle_user_msg_list.pop(), sent_message)
long_term_idle_user.refresh_from_db()
self.assertEqual(long_term_idle_user.last_active_message_id, sent_message_list[0].id)
# Test for when user unsubscribes before soft deactivation
# (must reactivate them in order to do this).
do_soft_activate_users([long_term_idle_user])
self.subscribe(long_term_idle_user, stream_name)
# Send a real message to update last_active_message_id
sent_message_id = self.send_stream_message(
sender.email, stream_name, 'Test Message 9')
self.unsubscribe(long_term_idle_user, stream_name)
# Soft deactivate and send another message to the unsubscribed stream.
do_soft_deactivate_users([long_term_idle_user])
send_fake_message('Test Message 10', stream)
idle_user_msg_list = get_user_messages(long_term_idle_user)
idle_user_msg_count = len(idle_user_msg_list)
self.assertEqual(idle_user_msg_list[-1].id, sent_message_id)
with queries_captured() as queries:
add_missing_messages(long_term_idle_user)
# There are no streams to fetch missing messages from, so
# the Message.objects query will be avoided.
self.assert_length(queries, 4)
idle_user_msg_list = get_user_messages(long_term_idle_user)
# No new UserMessage rows should have been created.
self.assertEqual(len(idle_user_msg_list), idle_user_msg_count)
# Note: At this point in this test we have long_term_idle_user
# unsubscribed from the 'Denmark' stream.
# Test for a Private Stream.
stream_name = "Core"
private_stream = self.make_stream('Core', invite_only=True)
self.subscribe(self.example_user("iago"), stream_name)
sent_message_list = []
send_fake_message('Test Message 11', private_stream)
self.subscribe(self.example_user("hamlet"), stream_name)
sent_message_list.append(send_fake_message('Test Message 12', private_stream))
self.unsubscribe(long_term_idle_user, stream_name)
send_fake_message('Test Message 13', private_stream)
self.subscribe(long_term_idle_user, stream_name)
sent_message_list.append(send_fake_message('Test Message 14', private_stream))
sent_message_list.reverse()
idle_user_msg_list = get_user_messages(long_term_idle_user)
idle_user_msg_count = len(idle_user_msg_list)
for sent_message in sent_message_list:
self.assertNotEqual(idle_user_msg_list.pop(), sent_message)
with queries_captured() as queries:
add_missing_messages(long_term_idle_user)
self.assert_length(queries, 6)
idle_user_msg_list = get_user_messages(long_term_idle_user)
self.assertEqual(len(idle_user_msg_list), idle_user_msg_count + 2)
for sent_message in sent_message_list:
self.assertEqual(idle_user_msg_list.pop(), sent_message)
long_term_idle_user.refresh_from_db()
self.assertEqual(long_term_idle_user.last_active_message_id, sent_message_list[0].id)
@mock.patch('zerver.lib.soft_deactivation.BULK_CREATE_BATCH_SIZE', 2)
def test_add_missing_messages_pagination(self) -> None:
recipient_list = [self.example_user("hamlet"), self.example_user("iago")]
stream_name = 'Denmark'
for user_profile in recipient_list:
self.subscribe(user_profile, stream_name)
sender = self.example_user('iago')
long_term_idle_user = self.example_user('hamlet')
self.send_stream_message(long_term_idle_user.email, stream_name)
do_soft_deactivate_users([long_term_idle_user])
num_new_messages = 5
message_ids = []
for _ in range(num_new_messages):
message_id = self.send_stream_message(sender.email, stream_name)
message_ids.append(message_id)
idle_user_msg_list = get_user_messages(long_term_idle_user)
idle_user_msg_count = len(idle_user_msg_list)
with queries_captured() as queries:
add_missing_messages(long_term_idle_user)
self.assert_length(queries, 10)
idle_user_msg_list = get_user_messages(long_term_idle_user)
self.assertEqual(len(idle_user_msg_list), idle_user_msg_count + num_new_messages)
long_term_idle_user.refresh_from_db()
self.assertEqual(long_term_idle_user.last_active_message_id, message_ids[-1])
def test_user_message_filter(self) -> None:
# In this test we are basically testing out the logic used out in
# do_send_messages() in action.py for filtering the messages for which
# UserMessage rows should be created for a soft-deactivated user.
recipient_list = [
self.example_user("hamlet"),
self.example_user("iago"),
self.example_user('cordelia')
]
for user_profile in recipient_list:
self.subscribe(user_profile, "Denmark")
cordelia = self.example_user('cordelia')
sender = self.example_email('iago')
stream_name = 'Denmark'
topic_name = 'foo'
def send_stream_message(content: str) -> None:
self.send_stream_message(sender, stream_name,
content, topic_name)
def send_personal_message(content: str) -> None:
self.send_personal_message(sender, self.example_email("hamlet"), content)
long_term_idle_user = self.example_user('hamlet')
self.send_stream_message(long_term_idle_user.email, stream_name)
do_soft_deactivate_users([long_term_idle_user])
def assert_um_count(user: UserProfile, count: int) -> None:
user_messages = get_user_messages(user)
self.assertEqual(len(user_messages), count)
def assert_last_um_content(user: UserProfile, content: str, negate: bool=False) -> None:
user_messages = get_user_messages(user)
if negate:
self.assertNotEqual(user_messages[-1].content, content)
else:
self.assertEqual(user_messages[-1].content, content)
# Test that sending a message to a stream with soft deactivated user
# doesn't end up creating UserMessage row for deactivated user.
general_user_msg_count = len(get_user_messages(cordelia))
soft_deactivated_user_msg_count = len(get_user_messages(long_term_idle_user))
message = 'Test Message 1'
send_stream_message(message)
assert_last_um_content(long_term_idle_user, message, negate=True)
assert_um_count(long_term_idle_user, soft_deactivated_user_msg_count)
assert_um_count(cordelia, general_user_msg_count + 1)
assert_last_um_content(cordelia, message)
# Test that sending a message to a stream with soft deactivated user
# and push/email notifications on creates a UserMessage row for the
# deactivated user.
sub = get_subscription(stream_name, long_term_idle_user)
sub.push_notifications = True
sub.save()
general_user_msg_count = len(get_user_messages(cordelia))
soft_deactivated_user_msg_count = len(get_user_messages(long_term_idle_user))
message = 'Test private stream message'
send_stream_message(message)
assert_um_count(long_term_idle_user, soft_deactivated_user_msg_count + 1)
assert_last_um_content(long_term_idle_user, message)
sub.push_notifications = False
sub.save()
# Test sending a private message to soft deactivated user creates
# UserMessage row.
soft_deactivated_user_msg_count = len(get_user_messages(long_term_idle_user))
message = 'Test PM'
send_personal_message(message)
assert_um_count(long_term_idle_user, soft_deactivated_user_msg_count + 1)
assert_last_um_content(long_term_idle_user, message)
# Test UserMessage row is created while user is deactivated if
# user itself is mentioned.
general_user_msg_count = len(get_user_messages(cordelia))
soft_deactivated_user_msg_count = len(get_user_messages(long_term_idle_user))
message = 'Test @**King Hamlet** mention'
send_stream_message(message)
assert_last_um_content(long_term_idle_user, message)
assert_um_count(long_term_idle_user, soft_deactivated_user_msg_count + 1)
assert_um_count(cordelia, general_user_msg_count + 1)
assert_last_um_content(cordelia, message)
# Test UserMessage row is not created while user is deactivated if
# anyone is mentioned but the user.
general_user_msg_count = len(get_user_messages(cordelia))
soft_deactivated_user_msg_count = len(get_user_messages(long_term_idle_user))
message = 'Test @**Cordelia Lear** mention'
send_stream_message(message)
assert_last_um_content(long_term_idle_user, message, negate=True)
assert_um_count(long_term_idle_user, soft_deactivated_user_msg_count)
assert_um_count(cordelia, general_user_msg_count + 1)
assert_last_um_content(cordelia, message)
# Test UserMessage row is created while user is deactivated if
# there is a wildcard mention such as @all or @everyone
general_user_msg_count = len(get_user_messages(cordelia))
soft_deactivated_user_msg_count = len(get_user_messages(long_term_idle_user))
message = 'Test @**all** mention'
send_stream_message(message)
assert_last_um_content(long_term_idle_user, message)
assert_um_count(long_term_idle_user, soft_deactivated_user_msg_count + 1)
assert_um_count(cordelia, general_user_msg_count + 1)
assert_last_um_content(cordelia, message)
general_user_msg_count = len(get_user_messages(cordelia))
soft_deactivated_user_msg_count = len(get_user_messages(long_term_idle_user))
message = 'Test @**everyone** mention'
send_stream_message(message)
assert_last_um_content(long_term_idle_user, message)
assert_um_count(long_term_idle_user, soft_deactivated_user_msg_count + 1)
assert_um_count(cordelia, general_user_msg_count + 1)
assert_last_um_content(cordelia, message)
general_user_msg_count = len(get_user_messages(cordelia))
soft_deactivated_user_msg_count = len(get_user_messages(long_term_idle_user))
message = 'Test @**stream** mention'
send_stream_message(message)
assert_last_um_content(long_term_idle_user, message)
assert_um_count(long_term_idle_user, soft_deactivated_user_msg_count + 1)
assert_um_count(cordelia, general_user_msg_count + 1)
assert_last_um_content(cordelia, message)
# Test UserMessage row is not created while user is deactivated if there
# is a alert word in message.
do_add_alert_words(long_term_idle_user, ['test_alert_word'])
general_user_msg_count = len(get_user_messages(cordelia))
soft_deactivated_user_msg_count = len(get_user_messages(long_term_idle_user))
message = 'Testing test_alert_word'
send_stream_message(message)
assert_last_um_content(long_term_idle_user, message)
assert_um_count(long_term_idle_user, soft_deactivated_user_msg_count + 1)
assert_um_count(cordelia, general_user_msg_count + 1)
assert_last_um_content(cordelia, message)
# Test UserMessage row is created while user is deactivated if
# message is a me message.
general_user_msg_count = len(get_user_messages(cordelia))
soft_deactivated_user_msg_count = len(get_user_messages(long_term_idle_user))
message = '/me says test'
send_stream_message(message)
assert_last_um_content(long_term_idle_user, message, negate=True)
assert_um_count(long_term_idle_user, soft_deactivated_user_msg_count)
assert_um_count(cordelia, general_user_msg_count + 1)
assert_last_um_content(cordelia, message)
class MessageHydrationTest(ZulipTestCase):
def test_hydrate_stream_recipient_info(self) -> None:
realm = get_realm('zulip')
cordelia = self.example_user('cordelia')
stream_id = get_stream('Verona', realm).id
obj = dict(
raw_display_recipient='Verona',
recipient_type=Recipient.STREAM,
recipient_type_id=stream_id,
sender_is_mirror_dummy=False,
sender_email=cordelia.email,
sender_full_name=cordelia.full_name,
sender_short_name=cordelia.short_name,
sender_id=cordelia.id,
)
MessageDict.hydrate_recipient_info(obj)
self.assertEqual(obj['display_recipient'], 'Verona')
self.assertEqual(obj['type'], 'stream')
def test_hydrate_pm_recipient_info(self) -> None:
cordelia = self.example_user('cordelia')
obj = dict(
raw_display_recipient=[
dict(
email='aaron@example.com',
full_name='Aaron Smith',
),
],
recipient_type=Recipient.PERSONAL,
recipient_type_id=None,
sender_is_mirror_dummy=False,
sender_email=cordelia.email,
sender_full_name=cordelia.full_name,
sender_short_name=cordelia.short_name,
sender_id=cordelia.id,
)
MessageDict.hydrate_recipient_info(obj)
self.assertEqual(
obj['display_recipient'],
[
dict(
email='aaron@example.com',
full_name='Aaron Smith',
),
dict(
email=cordelia.email,
full_name=cordelia.full_name,
id=cordelia.id,
short_name=cordelia.short_name,
is_mirror_dummy=False,
),
],
)
self.assertEqual(obj['type'], 'private')
def test_messages_for_ids(self) -> None:
hamlet = self.example_user('hamlet')
cordelia = self.example_user('cordelia')
stream_name = 'test stream'
self.subscribe(cordelia, stream_name)
old_message_id = self.send_stream_message(cordelia.email, stream_name, content='foo')
self.subscribe(hamlet, stream_name)
content = 'hello @**King Hamlet**'
new_message_id = self.send_stream_message(cordelia.email, stream_name, content=content)
user_message_flags = {
old_message_id: ['read', 'historical'],
new_message_id: ['mentioned'],
}
messages = messages_for_ids(
message_ids=[old_message_id, new_message_id],
user_message_flags=user_message_flags,
search_fields={},
apply_markdown=True,
client_gravatar=True,
allow_edit_history=False,
)
self.assertEqual(len(messages), 2)
for message in messages:
if message['id'] == old_message_id:
old_message = message
elif message['id'] == new_message_id:
new_message = message
self.assertEqual(old_message['content'], '<p>foo</p>')
self.assertEqual(old_message['flags'], ['read', 'historical'])
self.assertIn('class="user-mention"', new_message['content'])
self.assertEqual(new_message['flags'], ['mentioned'])
class MessageVisibilityTest(ZulipTestCase):
def test_update_first_visible_message_id(self) -> None:
Message.objects.all().delete()
message_ids = [self.send_stream_message(self.example_email("othello"), "Scotland") for i in range(15)]
# If message_visibility_limit is None update_first_visible_message_id
# should set first_visible_message_id to 0
realm = get_realm("zulip")
realm.message_visibility_limit = None
# Setting to a random value other than 0 as the default value of
# first_visible_message_id is 0
realm.first_visible_message_id = 5
realm.save()
update_first_visible_message_id(realm)
self.assertEqual(get_first_visible_message_id(realm), 0)
realm.message_visibility_limit = 10
realm.save()
expected_message_id = message_ids[5]
update_first_visible_message_id(realm)
self.assertEqual(get_first_visible_message_id(realm), expected_message_id)
# If the message_visibility_limit is greater than number of messages
# get_first_visible_message_id should return 0
realm.message_visibility_limit = 50
realm.save()
update_first_visible_message_id(realm)
self.assertEqual(get_first_visible_message_id(realm), 0)
def test_maybe_update_first_visible_message_id(self) -> None:
realm = get_realm("zulip")
lookback_hours = 30
realm.message_visibility_limit = None
realm.save()
end_time = timezone_now() - datetime.timedelta(hours=lookback_hours - 5)
stat = COUNT_STATS['messages_sent:is_bot:hour']
RealmCount.objects.create(realm=realm, property=stat.property,
end_time=end_time, value=5)
with mock.patch("zerver.lib.message.update_first_visible_message_id") as m:
maybe_update_first_visible_message_id(realm, lookback_hours)
m.assert_not_called()
realm.message_visibility_limit = 10
realm.save()
RealmCount.objects.all().delete()
with mock.patch("zerver.lib.message.update_first_visible_message_id") as m:
maybe_update_first_visible_message_id(realm, lookback_hours)
m.assert_not_called()
RealmCount.objects.create(realm=realm, property=stat.property,
end_time=end_time, value=5)
with mock.patch("zerver.lib.message.update_first_visible_message_id") as m:
maybe_update_first_visible_message_id(realm, lookback_hours)
m.assert_called_once_with(realm)