zulip/zerver/test_messages.py

1225 lines
58 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
from __future__ import absolute_import
from django.db.models import Q
from zerver.lib import bugdown
from zerver.decorator import JsonableError
from zerver.lib.test_runner import slow
from zerver.views.messages import (
get_old_messages_backend, ok_to_include_history,
)
from zilencer.models import Deployment
from zerver.lib.test_helpers import (
AuthedTestCase, POSTRequestMock,
get_user_messages,
message_ids, message_stream_count,
most_recent_message,
queries_captured,
)
from zerver.models import (
MAX_MESSAGE_LENGTH, MAX_SUBJECT_LENGTH,
Client, Message, Realm, Recipient, Stream, UserMessage, UserProfile,
get_display_recipient, get_recipient, get_realm, get_stream, get_user_profile_by_email,
)
from zerver.lib.actions import (
check_message, check_send_message,
create_stream_if_needed,
do_add_subscription, do_create_user,
)
import datetime
import time
import ujson
class IncludeHistoryTest(AuthedTestCase):
def test_ok_to_include_history(self):
realm = get_realm('zulip.com')
create_stream_if_needed(realm, 'public_stream')
# Negated stream searches should not include history.
narrow = [
dict(operator='stream', operand='public_stream', negated=True),
]
self.assertFalse(ok_to_include_history(narrow, realm))
# Definitely forbid seeing history on private streams.
narrow = [
dict(operator='stream', operand='private_stream'),
]
self.assertFalse(ok_to_include_history(narrow, realm))
# History doesn't apply to PMs.
narrow = [
dict(operator='is', operand='private'),
]
self.assertFalse(ok_to_include_history(narrow, realm))
# If we are looking for something like starred messages, there is
# no point in searching historical messages.
narrow = [
dict(operator='stream', operand='public_stream'),
dict(operator='is', operand='starred'),
]
self.assertFalse(ok_to_include_history(narrow, realm))
# simple True case
narrow = [
dict(operator='stream', operand='public_stream'),
]
self.assertTrue(ok_to_include_history(narrow, realm))
narrow = [
dict(operator='stream', operand='public_stream'),
dict(operator='topic', operand='whatever'),
dict(operator='search', operand='needle in haystack'),
]
self.assertTrue(ok_to_include_history(narrow, realm))
class TestCrossRealmPMs(AuthedTestCase):
def create_user(self, email):
username, domain = email.split('@')
self.register(username, 'test', domain=domain)
return get_user_profile_by_email(email)
def test_same_realm(self):
"""Users on the same realm can PM each other"""
r1 = Realm.objects.create(domain='1.example.com')
deployment = Deployment.objects.filter()[0]
deployment.realms.add(r1)
user1_email = 'user1@1.example.com'
user1 = self.create_user(user1_email)
user2_email = 'user2@1.example.com'
user2 = self.create_user(user2_email)
self.send_message(user1_email, user2_email, Recipient.PERSONAL)
messages = get_user_messages(user2)
self.assertEqual(len(messages), 1)
self.assertEquals(messages[0].sender.pk, user1.pk)
def test_diffrent_realms(self):
"""Users on the different realms can not PM each other"""
r1 = Realm.objects.create(domain='1.example.com')
r2 = Realm.objects.create(domain='2.example.com')
deployment = Deployment.objects.filter()[0]
deployment.realms.add(r1)
deployment.realms.add(r2)
user1_email = 'user1@1.example.com'
self.create_user(user1_email)
user2_email = 'user2@2.example.com'
self.create_user(user2_email)
with self.assertRaisesRegexp(JsonableError,
'You can\'t send private messages outside of your organization.'):
self.send_message(user1_email, user2_email, Recipient.PERSONAL)
def test_three_diffrent_realms(self):
"""Users on three different realms can not PM each other"""
r1 = Realm.objects.create(domain='1.example.com')
r2 = Realm.objects.create(domain='2.example.com')
r3 = Realm.objects.create(domain='3.example.com')
deployment = Deployment.objects.filter()[0]
deployment.realms.add(r1)
deployment.realms.add(r2)
deployment.realms.add(r3)
user1_email = 'user1@1.example.com'
self.create_user(user1_email)
user2_email = 'user2@2.example.com'
self.create_user(user2_email)
user3_email = 'user3@2.example.com'
self.create_user(user3_email)
with self.assertRaisesRegexp(JsonableError,
'You can\'t send private messages outside of your organization.'):
self.send_message(user1_email, [user2_email, user3_email], Recipient.PERSONAL)
def test_from_zulip_realm(self):
"""Users in the zulip.com realm can PM any realm"""
r1 = Realm.objects.create(domain='1.example.com')
deployment = Deployment.objects.filter()[0]
deployment.realms.add(r1)
user1_email = 'user1@zulip.com'
user1 = self.create_user(user1_email)
user2_email = 'user2@1.example.com'
user2 = self.create_user(user2_email)
self.send_message(user1_email, user2_email, Recipient.PERSONAL)
messages = get_user_messages(user2)
self.assertEqual(len(messages), 1)
self.assertEquals(messages[0].sender.pk, user1.pk)
def test_to_zulip_realm(self):
"""All users can PM users in the zulip.com realm"""
r1 = Realm.objects.create(domain='1.example.com')
deployment = Deployment.objects.filter()[0]
deployment.realms.add(r1)
user1_email = 'user1@1.example.com'
user1 = self.create_user(user1_email)
user2_email = 'user2@zulip.com'
user2 = self.create_user(user2_email)
self.send_message(user1_email, user2_email, Recipient.PERSONAL)
messages = get_user_messages(user2)
self.assertEqual(len(messages), 1)
self.assertEquals(messages[0].sender.pk, user1.pk)
def test_zulip_realm_can_not_join_realms(self):
"""Adding a zulip.com user to a PM will not let you cross realms"""
r1 = Realm.objects.create(domain='1.example.com')
r2 = Realm.objects.create(domain='2.example.com')
deployment = Deployment.objects.filter()[0]
deployment.realms.add(r1)
deployment.realms.add(r2)
user1_email = 'user1@1.example.com'
self.create_user(user1_email)
user2_email = 'user2@2.example.com'
self.create_user(user2_email)
user3_email = 'user3@zulip.com'
self.create_user(user3_email)
with self.assertRaisesRegexp(JsonableError,
'You can\'t send private messages outside of your organization.'):
self.send_message(user1_email, [user2_email, user3_email],
Recipient.PERSONAL)
class PersonalMessagesTest(AuthedTestCase):
def test_auto_subbed_to_personals(self):
"""
Newly created users are auto-subbed to the ability to receive
personals.
"""
self.register("test", "test")
user_profile = get_user_profile_by_email('test@zulip.com')
old_messages_count = message_stream_count(user_profile)
self.send_message("test@zulip.com", "test@zulip.com", Recipient.PERSONAL)
new_messages_count = message_stream_count(user_profile)
self.assertEqual(new_messages_count, old_messages_count + 1)
recipient = Recipient.objects.get(type_id=user_profile.id,
type=Recipient.PERSONAL)
self.assertEqual(most_recent_message(user_profile).recipient, recipient)
@slow(0.36, "checks several profiles")
def test_personal_to_self(self):
"""
If you send a personal to yourself, only you see it.
"""
old_user_profiles = list(UserProfile.objects.all())
self.register("test1", "test1")
old_messages = []
for user_profile in old_user_profiles:
old_messages.append(message_stream_count(user_profile))
self.send_message("test1@zulip.com", "test1@zulip.com", Recipient.PERSONAL)
new_messages = []
for user_profile in old_user_profiles:
new_messages.append(message_stream_count(user_profile))
self.assertEqual(old_messages, new_messages)
user_profile = get_user_profile_by_email("test1@zulip.com")
recipient = Recipient.objects.get(type_id=user_profile.id, type=Recipient.PERSONAL)
self.assertEqual(most_recent_message(user_profile).recipient, recipient)
def assert_personal(self, sender_email, receiver_email, content="test content"):
"""
Send a private message from `sender_email` to `receiver_email` and check
that only those two parties actually received the message.
"""
sender = get_user_profile_by_email(sender_email)
receiver = get_user_profile_by_email(receiver_email)
sender_messages = message_stream_count(sender)
receiver_messages = message_stream_count(receiver)
other_user_profiles = UserProfile.objects.filter(~Q(email=sender_email) &
~Q(email=receiver_email))
old_other_messages = []
for user_profile in other_user_profiles:
old_other_messages.append(message_stream_count(user_profile))
self.send_message(sender_email, receiver_email, Recipient.PERSONAL, content)
# Users outside the conversation don't get the message.
new_other_messages = []
for user_profile in other_user_profiles:
new_other_messages.append(message_stream_count(user_profile))
self.assertEqual(old_other_messages, new_other_messages)
# The personal message is in the streams of both the sender and receiver.
self.assertEqual(message_stream_count(sender),
sender_messages + 1)
self.assertEqual(message_stream_count(receiver),
receiver_messages + 1)
recipient = Recipient.objects.get(type_id=receiver.id, type=Recipient.PERSONAL)
self.assertEqual(most_recent_message(sender).recipient, recipient)
self.assertEqual(most_recent_message(receiver).recipient, recipient)
@slow(0.28, "assert_personal checks several profiles")
def test_personal(self):
"""
If you send a personal, only you and the recipient see it.
"""
self.login("hamlet@zulip.com")
self.assert_personal("hamlet@zulip.com", "othello@zulip.com")
@slow(0.28, "assert_personal checks several profiles")
def test_non_ascii_personal(self):
"""
Sending a PM containing non-ASCII characters succeeds.
"""
self.login("hamlet@zulip.com")
self.assert_personal("hamlet@zulip.com", "othello@zulip.com", u"hümbüǵ")
class StreamMessagesTest(AuthedTestCase):
def assert_stream_message(self, stream_name, subject="test subject",
content="test content"):
"""
Check that messages sent to a stream reach all subscribers to that stream.
"""
subscribers = self.users_subscribed_to_stream(stream_name, "zulip.com")
old_subscriber_messages = []
for subscriber in subscribers:
old_subscriber_messages.append(message_stream_count(subscriber))
non_subscribers = [user_profile for user_profile in UserProfile.objects.all()
if user_profile not in subscribers]
old_non_subscriber_messages = []
for non_subscriber in non_subscribers:
old_non_subscriber_messages.append(message_stream_count(non_subscriber))
a_subscriber_email = subscribers[0].email
self.login(a_subscriber_email)
self.send_message(a_subscriber_email, stream_name, Recipient.STREAM,
subject, content)
# Did all of the subscribers get the message?
new_subscriber_messages = []
for subscriber in subscribers:
new_subscriber_messages.append(message_stream_count(subscriber))
# Did non-subscribers not get the message?
new_non_subscriber_messages = []
for non_subscriber in non_subscribers:
new_non_subscriber_messages.append(message_stream_count(non_subscriber))
self.assertEqual(old_non_subscriber_messages, new_non_subscriber_messages)
self.assertEqual(new_subscriber_messages, [elt + 1 for elt in old_subscriber_messages])
def test_not_too_many_queries(self):
recipient_list = ['hamlet@zulip.com', 'iago@zulip.com', 'cordelia@zulip.com', 'othello@zulip.com']
for email in recipient_list:
self.subscribe_to_stream(email, "Denmark")
sender_email = 'hamlet@zulip.com'
sender = get_user_profile_by_email(sender_email)
message_type_name = "stream"
(sending_client, _) = Client.objects.get_or_create(name="test suite")
stream = 'Denmark'
subject = 'foo'
content = 'whatever'
realm = sender.realm
def send_message():
check_send_message(sender, sending_client, message_type_name, [stream],
subject, content, forwarder_user_profile=sender, realm=realm)
send_message() # prime the caches
with queries_captured() as queries:
send_message()
self.assert_length(queries, 5)
def test_message_mentions(self):
user_profile = get_user_profile_by_email("iago@zulip.com")
self.subscribe_to_stream(user_profile.email, "Denmark")
self.send_message("hamlet@zulip.com", "Denmark", Recipient.STREAM,
content="test @**Iago** rules")
message = most_recent_message(user_profile)
assert(UserMessage.objects.get(user_profile=user_profile, message=message).flags.mentioned.is_set)
@slow(0.28, 'checks all users')
def test_message_to_stream(self):
"""
If you send a message to a stream, everyone subscribed to the stream
receives the messages.
"""
self.assert_stream_message("Scotland")
@slow(0.37, 'checks all users')
def test_non_ascii_stream_message(self):
"""
Sending a stream message containing non-ASCII characters in the stream
name, subject, or message body succeeds.
"""
self.login("hamlet@zulip.com")
# Subscribe everyone to a stream with non-ASCII characters.
non_ascii_stream_name = u"hümbüǵ"
realm = Realm.objects.get(domain="zulip.com")
stream, _ = create_stream_if_needed(realm, non_ascii_stream_name)
for user_profile in UserProfile.objects.filter(realm=realm):
do_add_subscription(user_profile, stream, no_log=True)
self.assert_stream_message(non_ascii_stream_name, subject=u"hümbüǵ",
content=u"hümbüǵ")
class MessageDictTest(AuthedTestCase):
@slow(1.6, 'builds lots of messages')
def test_bulk_message_fetching(self):
realm = Realm.objects.get(domain="zulip.com")
sender = get_user_profile_by_email('othello@zulip.com')
receiver = get_user_profile_by_email('hamlet@zulip.com')
pm_recipient = Recipient.objects.get(type_id=receiver.id, type=Recipient.PERSONAL)
stream, _ = create_stream_if_needed(realm, 'devel')
stream_recipient = Recipient.objects.get(type_id=stream.id, type=Recipient.STREAM)
sending_client, _ = Client.objects.get_or_create(name="test suite")
for i in range(300):
for recipient in [pm_recipient, stream_recipient]:
message = Message(
sender=sender,
recipient=recipient,
subject='whatever',
content='whatever %d' % i,
pub_date=datetime.datetime.now(),
sending_client=sending_client,
last_edit_time=datetime.datetime.now(),
edit_history='[]'
)
message.save()
ids = [row['id'] for row in Message.objects.all().values('id')]
num_ids = len(ids)
self.assertTrue(num_ids >= 600)
t = time.time()
with queries_captured() as queries:
rows = list(Message.get_raw_db_rows(ids))
for row in rows:
Message.build_dict_from_raw_db_row(row, False)
delay = time.time() - t
# Make sure we don't take longer than 1ms per message to extract messages.
self.assertTrue(delay < 0.001 * num_ids)
self.assert_length(queries, 7)
self.assertEqual(len(rows), num_ids)
def test_applying_markdown(self):
sender = get_user_profile_by_email('othello@zulip.com')
receiver = get_user_profile_by_email('hamlet@zulip.com')
recipient = Recipient.objects.get(type_id=receiver.id, type=Recipient.PERSONAL)
sending_client, _ = Client.objects.get_or_create(name="test suite")
message = Message(
sender=sender,
recipient=recipient,
subject='whatever',
content='hello **world**',
pub_date=datetime.datetime.now(),
sending_client=sending_client,
last_edit_time=datetime.datetime.now(),
edit_history='[]'
)
message.save()
# An important part of this test is to get the message through this exact code path,
# because there is an ugly hack we need to cover. So don't just say "row = message".
row = Message.get_raw_db_rows([message.id])[0]
dct = Message.build_dict_from_raw_db_row(row, apply_markdown=True)
expected_content = '<p>hello <strong>world</strong></p>'
self.assertEqual(dct['content'], expected_content)
message = Message.objects.get(id=message.id)
self.assertEqual(message.rendered_content, expected_content)
self.assertEqual(message.rendered_content_version, bugdown.version)
class MessagePOSTTest(AuthedTestCase):
def test_message_to_self(self):
"""
Sending a message to a stream to which you are subscribed is
successful.
"""
self.login("hamlet@zulip.com")
result = self.client.post("/json/send_message", {"type": "stream",
"to": "Verona",
"client": "test suite",
"content": "Test message",
"subject": "Test subject"})
self.assert_json_success(result)
def test_api_message_to_self(self):
"""
Same as above, but for the API view
"""
email = "hamlet@zulip.com"
api_key = self.get_api_key(email)
result = self.client.post("/api/v1/send_message", {"type": "stream",
"to": "Verona",
"client": "test suite",
"content": "Test message",
"subject": "Test subject",
"email": email,
"api-key": api_key})
self.assert_json_success(result)
def test_message_to_nonexistent_stream(self):
"""
Sending a message to a nonexistent stream fails.
"""
self.login("hamlet@zulip.com")
self.assertFalse(Stream.objects.filter(name="nonexistent_stream"))
result = self.client.post("/json/send_message", {"type": "stream",
"to": "nonexistent_stream",
"client": "test suite",
"content": "Test message",
"subject": "Test subject"})
self.assert_json_error(result, "Stream does not exist")
def test_personal_message(self):
"""
Sending a personal message to a valid username is successful.
"""
self.login("hamlet@zulip.com")
result = self.client.post("/json/send_message", {"type": "private",
"content": "Test message",
"client": "test suite",
"to": "othello@zulip.com"})
self.assert_json_success(result)
def test_personal_message_to_nonexistent_user(self):
"""
Sending a personal message to an invalid email returns error JSON.
"""
self.login("hamlet@zulip.com")
result = self.client.post("/json/send_message", {"type": "private",
"content": "Test message",
"client": "test suite",
"to": "nonexistent"})
self.assert_json_error(result, "Invalid email 'nonexistent'")
def test_invalid_type(self):
"""
Sending a message of unknown type returns error JSON.
"""
self.login("hamlet@zulip.com")
result = self.client.post("/json/send_message", {"type": "invalid type",
"content": "Test message",
"client": "test suite",
"to": "othello@zulip.com"})
self.assert_json_error(result, "Invalid message type")
def test_empty_message(self):
"""
Sending a message that is empty or only whitespace should fail
"""
self.login("hamlet@zulip.com")
result = self.client.post("/json/send_message", {"type": "private",
"content": " ",
"client": "test suite",
"to": "othello@zulip.com"})
self.assert_json_error(result, "Message must not be empty")
def test_mirrored_huddle(self):
"""
Sending a mirrored huddle message works
"""
self.login("starnine@mit.edu")
result = self.client.post("/json/send_message", {"type": "private",
"sender": "sipbtest@mit.edu",
"content": "Test message",
"client": "zephyr_mirror",
"to": ujson.dumps(["starnine@mit.edu",
"espuser@mit.edu"])})
self.assert_json_success(result)
def test_mirrored_personal(self):
"""
Sending a mirrored personal message works
"""
self.login("starnine@mit.edu")
result = self.client.post("/json/send_message", {"type": "private",
"sender": "sipbtest@mit.edu",
"content": "Test message",
"client": "zephyr_mirror",
"to": "starnine@mit.edu"})
self.assert_json_success(result)
def test_duplicated_mirrored_huddle(self):
"""
Sending two mirrored huddles in the row return the same ID
"""
msg = {"type": "private",
"sender": "sipbtest@mit.edu",
"content": "Test message",
"client": "zephyr_mirror",
"to": ujson.dumps(["sipbcert@mit.edu",
"starnine@mit.edu"])}
self.login("starnine@mit.edu")
result1 = self.client.post("/json/send_message", msg)
self.login("sipbcert@mit.edu")
result2 = self.client.post("/json/send_message", msg)
self.assertEqual(ujson.loads(result1.content)['id'],
ujson.loads(result2.content)['id'])
def test_long_message(self):
"""
Sending a message longer than the maximum message length succeeds but is
truncated.
"""
self.login("hamlet@zulip.com")
long_message = "A" * (MAX_MESSAGE_LENGTH + 1)
post_data = {"type": "stream", "to": "Verona", "client": "test suite",
"content": long_message, "subject": "Test subject"}
result = self.client.post("/json/send_message", post_data)
self.assert_json_success(result)
sent_message = Message.objects.all().order_by('-id')[0]
self.assertEquals(sent_message.content,
"A" * (MAX_MESSAGE_LENGTH - 3) + "...")
def test_long_topic(self):
"""
Sending a message with a topic longer than the maximum topic length
succeeds, but the topic is truncated.
"""
self.login("hamlet@zulip.com")
long_topic = "A" * (MAX_SUBJECT_LENGTH + 1)
post_data = {"type": "stream", "to": "Verona", "client": "test suite",
"content": "test content", "subject": long_topic}
result = self.client.post("/json/send_message", post_data)
self.assert_json_success(result)
sent_message = Message.objects.all().order_by('-id')[0]
self.assertEquals(sent_message.subject,
"A" * (MAX_SUBJECT_LENGTH - 3) + "...")
class GetOldMessagesTest(AuthedTestCase):
def post_with_params(self, modified_params):
post_params = {"anchor": 1, "num_before": 1, "num_after": 1}
post_params.update(modified_params)
result = self.client.post("/json/get_old_messages", dict(post_params))
self.assert_json_success(result)
return ujson.loads(result.content)
def check_well_formed_messages_response(self, result):
self.assertIn("messages", result)
self.assertIsInstance(result["messages"], list)
for message in result["messages"]:
for field in ("content", "content_type", "display_recipient",
"avatar_url", "recipient_id", "sender_full_name",
"sender_short_name", "timestamp"):
self.assertIn(field, message)
# TODO: deprecate soon in favor of avatar_url
self.assertIn('gravatar_hash', message)
def get_query_ids(self):
realm = get_user_profile_by_email('hamlet@zulip.com').realm
query_ids = {}
scotland_stream = get_stream('Scotland', realm)
query_ids['scotland_recipient'] = get_recipient(Recipient.STREAM, scotland_stream.id).id
return query_ids
def test_successful_get_old_messages(self):
"""
A call to /json/get_old_messages with valid parameters returns a list of
messages.
"""
self.login("hamlet@zulip.com")
result = self.post_with_params(dict())
self.check_well_formed_messages_response(result)
# We have to support the legacy tuple style while there are old
# clients around, which might include third party home-grown bots.
narrow = [['pm-with', 'othello@zulip.com']]
result = self.post_with_params(dict(narrow=ujson.dumps(narrow)))
self.check_well_formed_messages_response(result)
narrow = [dict(operator='pm-with', operand='othello@zulip.com')]
result = self.post_with_params(dict(narrow=ujson.dumps(narrow)))
self.check_well_formed_messages_response(result)
def test_get_old_messages_with_narrow_pm_with(self):
"""
A request for old messages with a narrow by pm-with only returns
conversations with that user.
"""
me = 'hamlet@zulip.com'
def dr_emails(dr):
return ','.join(sorted(set([r['email'] for r in dr] + [me])))
personals = [m for m in get_user_messages(get_user_profile_by_email(me))
if m.recipient.type == Recipient.PERSONAL
or m.recipient.type == Recipient.HUDDLE]
if not personals:
# FIXME: This is bad. We should use test data that is guaranteed
# to contain some personals for every user. See #617.
return
emails = dr_emails(get_display_recipient(personals[0].recipient))
self.login(me)
narrow = [dict(operator='pm-with', operand=emails)]
result = self.post_with_params(dict(narrow=ujson.dumps(narrow)))
self.check_well_formed_messages_response(result)
for message in result["messages"]:
self.assertEqual(dr_emails(message['display_recipient']), emails)
def test_get_old_messages_with_narrow_stream(self):
"""
A request for old messages with a narrow by stream only returns
messages for that stream.
"""
self.login("hamlet@zulip.com")
# We need to susbcribe to a stream and then send a message to
# it to ensure that we actually have a stream message in this
# narrow view.
realm = Realm.objects.get(domain="zulip.com")
stream, _ = create_stream_if_needed(realm, "Scotland")
do_add_subscription(get_user_profile_by_email("hamlet@zulip.com"),
stream, no_log=True)
self.send_message("hamlet@zulip.com", "Scotland", Recipient.STREAM)
messages = get_user_messages(get_user_profile_by_email("hamlet@zulip.com"))
stream_messages = filter(lambda msg: msg.recipient.type == Recipient.STREAM,
messages)
stream_name = get_display_recipient(stream_messages[0].recipient)
stream_id = stream_messages[0].recipient.id
narrow = [dict(operator='stream', operand=stream_name)]
result = self.post_with_params(dict(narrow=ujson.dumps(narrow)))
self.check_well_formed_messages_response(result)
for message in result["messages"]:
self.assertEqual(message["type"], "stream")
self.assertEqual(message["recipient_id"], stream_id)
def test_get_old_messages_with_narrow_stream_mit_unicode_regex(self):
"""
A request for old messages for a user in the mit.edu relam with unicode
stream name should be correctly escaped in the database query.
"""
self.login("starnine@mit.edu")
# We need to susbcribe to a stream and then send a message to
# it to ensure that we actually have a stream message in this
# narrow view.
realm = Realm.objects.get(domain="mit.edu")
lambda_stream, _ = create_stream_if_needed(realm, u"\u03bb-stream")
do_add_subscription(get_user_profile_by_email("starnine@mit.edu"),
lambda_stream, no_log=True)
lambda_stream_d, _ = create_stream_if_needed(realm, u"\u03bb-stream.d")
do_add_subscription(get_user_profile_by_email("starnine@mit.edu"),
lambda_stream_d, no_log=True)
self.send_message("starnine@mit.edu", u"\u03bb-stream", Recipient.STREAM)
self.send_message("starnine@mit.edu", u"\u03bb-stream.d", Recipient.STREAM)
narrow = [dict(operator='stream', operand=u'\u03bb-stream')]
result = self.post_with_params(dict(num_after=2, narrow=ujson.dumps(narrow)))
self.check_well_formed_messages_response(result)
messages = get_user_messages(get_user_profile_by_email("starnine@mit.edu"))
stream_messages = filter(lambda msg: msg.recipient.type == Recipient.STREAM,
messages)
self.assertEqual(len(result["messages"]), 2)
for i, message in enumerate(result["messages"]):
self.assertEqual(message["type"], "stream")
stream_id = stream_messages[i].recipient.id
self.assertEqual(message["recipient_id"], stream_id)
def test_get_old_messages_with_narrow_topic_mit_unicode_regex(self):
"""
A request for old messages for a user in the mit.edu relam with unicode
topic name should be correctly escaped in the database query.
"""
self.login("starnine@mit.edu")
# We need to susbcribe to a stream and then send a message to
# it to ensure that we actually have a stream message in this
# narrow view.
realm = Realm.objects.get(domain="mit.edu")
stream, _ = create_stream_if_needed(realm, "Scotland")
do_add_subscription(get_user_profile_by_email("starnine@mit.edu"),
stream, no_log=True)
self.send_message("starnine@mit.edu", "Scotland", Recipient.STREAM,
subject=u"\u03bb-topic")
self.send_message("starnine@mit.edu", "Scotland", Recipient.STREAM,
subject=u"\u03bb-topic.d")
narrow = [dict(operator='topic', operand=u'\u03bb-topic')]
result = self.post_with_params(dict(num_after=2, narrow=ujson.dumps(narrow)))
self.check_well_formed_messages_response(result)
messages = get_user_messages(get_user_profile_by_email("starnine@mit.edu"))
stream_messages = filter(lambda msg: msg.recipient.type == Recipient.STREAM,
messages)
self.assertEqual(len(result["messages"]), 2)
for i, message in enumerate(result["messages"]):
self.assertEqual(message["type"], "stream")
stream_id = stream_messages[i].recipient.id
self.assertEqual(message["recipient_id"], stream_id)
def test_get_old_messages_with_narrow_sender(self):
"""
A request for old messages with a narrow by sender only returns
messages sent by that person.
"""
self.login("hamlet@zulip.com")
# We need to send a message here to ensure that we actually
# have a stream message in this narrow view.
self.send_message("hamlet@zulip.com", "Scotland", Recipient.STREAM)
self.send_message("othello@zulip.com", "Scotland", Recipient.STREAM)
self.send_message("othello@zulip.com", "hamlet@zulip.com", Recipient.PERSONAL)
self.send_message("iago@zulip.com", "Scotland", Recipient.STREAM)
narrow = [dict(operator='sender', operand='othello@zulip.com')]
result = self.post_with_params(dict(narrow=ujson.dumps(narrow)))
self.check_well_formed_messages_response(result)
for message in result["messages"]:
self.assertEqual(message["sender_email"], "othello@zulip.com")
def test_get_old_messages_with_only_searching_anchor(self):
"""
Test that specifying an anchor but 0 for num_before and num_after
returns at most 1 message.
"""
self.login("cordelia@zulip.com")
anchor = self.send_message("cordelia@zulip.com", "Scotland", Recipient.STREAM)
narrow = [dict(operator='sender', operand='cordelia@zulip.com')]
result = self.post_with_params(dict(narrow=ujson.dumps(narrow),
anchor=anchor, num_before=0,
num_after=0))
self.check_well_formed_messages_response(result)
self.assertEqual(len(result['messages']), 1)
narrow = [dict(operator='is', operand='mentioned')]
result = self.post_with_params(dict(narrow=ujson.dumps(narrow),
anchor=anchor, num_before=0,
num_after=0))
self.check_well_formed_messages_response(result)
self.assertEqual(len(result['messages']), 0)
def test_missing_params(self):
"""
anchor, num_before, and num_after are all required
POST parameters for get_old_messages.
"""
self.login("hamlet@zulip.com")
required_args = (("anchor", 1), ("num_before", 1), ("num_after", 1))
for i in range(len(required_args)):
post_params = dict(required_args[:i] + required_args[i + 1:])
result = self.client.post("/json/get_old_messages", post_params)
self.assert_json_error(result,
"Missing '%s' argument" % (required_args[i][0],))
def test_bad_int_params(self):
"""
num_before, num_after, and narrow must all be non-negative
integers or strings that can be converted to non-negative integers.
"""
self.login("hamlet@zulip.com")
other_params = [("narrow", {}), ("anchor", 0)]
int_params = ["num_before", "num_after"]
bad_types = (False, "", "-1", -1)
for idx, param in enumerate(int_params):
for type in bad_types:
# Rotate through every bad type for every integer
# parameter, one at a time.
post_params = dict(other_params + [(param, type)] + \
[(other_param, 0) for other_param in \
int_params[:idx] + int_params[idx + 1:]]
)
result = self.client.post("/json/get_old_messages", post_params)
self.assert_json_error(result,
"Bad value for '%s': %s" % (param, type))
def test_bad_narrow_type(self):
"""
narrow must be a list of string pairs.
"""
self.login("hamlet@zulip.com")
other_params = [("anchor", 0), ("num_before", 0), ("num_after", 0)]
bad_types = (False, 0, '', '{malformed json,',
'{foo: 3}', '[1,2]', '[["x","y","z"]]')
for type in bad_types:
post_params = dict(other_params + [("narrow", type)])
result = self.client.post("/json/get_old_messages", post_params)
self.assert_json_error(result,
"Bad value for 'narrow': %s" % (type,))
def test_old_empty_narrow(self):
"""
'{}' is accepted to mean 'no narrow', for use by old mobile clients.
"""
self.login("hamlet@zulip.com")
all_result = self.post_with_params({})
narrow_result = self.post_with_params({'narrow': '{}'})
for r in (all_result, narrow_result):
self.check_well_formed_messages_response(r)
self.assertEqual(message_ids(all_result), message_ids(narrow_result))
def test_bad_narrow_operator(self):
"""
Unrecognized narrow operators are rejected.
"""
self.login("hamlet@zulip.com")
for operator in ['', 'foo', 'stream:verona', '__init__']:
narrow = [dict(operator=operator, operand='')]
params = dict(anchor=0, num_before=0, num_after=0, narrow=ujson.dumps(narrow))
result = self.client.post("/json/get_old_messages", params)
self.assert_json_error_contains(result,
"Invalid narrow operator: unknown operator")
def exercise_bad_narrow_operand(self, operator, operands, error_msg):
other_params = [("anchor", 0), ("num_before", 0), ("num_after", 0)]
for operand in operands:
post_params = dict(other_params + [
("narrow", ujson.dumps([[operator, operand]]))])
result = self.client.post("/json/get_old_messages", post_params)
self.assert_json_error_contains(result, error_msg)
def test_bad_narrow_stream_content(self):
"""
If an invalid stream name is requested in get_old_messages, an error is
returned.
"""
self.login("hamlet@zulip.com")
bad_stream_content = (0, [], ["x", "y"])
self.exercise_bad_narrow_operand("stream", bad_stream_content,
"Bad value for 'narrow'")
def test_bad_narrow_one_on_one_email_content(self):
"""
If an invalid 'pm-with' is requested in get_old_messages, an
error is returned.
"""
self.login("hamlet@zulip.com")
bad_stream_content = (0, [], ["x","y"])
self.exercise_bad_narrow_operand("pm-with", bad_stream_content,
"Bad value for 'narrow'")
def test_bad_narrow_nonexistent_stream(self):
self.login("hamlet@zulip.com")
self.exercise_bad_narrow_operand("stream", ['non-existent stream'],
"Invalid narrow operator: unknown stream")
def test_bad_narrow_nonexistent_email(self):
self.login("hamlet@zulip.com")
self.exercise_bad_narrow_operand("pm-with", ['non-existent-user@zulip.com'],
"Invalid narrow operator: unknown user")
def test_message_without_rendered_content(self):
"""Older messages may not have rendered_content in the database"""
m = Message.objects.all().order_by('-id')[0]
m.rendered_content = m.rendered_content_version = None
m.content = 'test content'
# Use to_dict_uncached directly to avoid having to deal with memcached
d = m.to_dict_uncached(True)
self.assertEqual(d['content'], '<p>test content</p>')
def common_check_get_old_messages_query(self, query_params, expected):
user_profile = get_user_profile_by_email("hamlet@zulip.com")
request = POSTRequestMock(query_params, user_profile)
with queries_captured() as queries:
get_old_messages_backend(request, user_profile)
for query in queries:
if "/* get_old_messages */" in query['sql']:
sql = query['sql'].replace(" /* get_old_messages */", '')
self.assertEqual(sql, expected)
return
self.fail("get_old_messages query not found")
def test_get_old_messages_queries(self):
self.common_check_get_old_messages_query({'anchor': 0, 'num_before': 0, 'num_after': 10},
'SELECT anon_1.message_id, anon_1.flags \nFROM (SELECT message_id, flags \nFROM zerver_usermessage \nWHERE user_profile_id = 4 AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 11) AS anon_1 ORDER BY message_id ASC')
self.common_check_get_old_messages_query({'anchor': 100, 'num_before': 10, 'num_after': 0},
'SELECT anon_1.message_id, anon_1.flags \nFROM (SELECT message_id, flags \nFROM zerver_usermessage \nWHERE user_profile_id = 4 AND message_id <= 100 ORDER BY message_id DESC \n LIMIT 11) AS anon_1 ORDER BY message_id ASC')
self.common_check_get_old_messages_query({'anchor': 100, 'num_before': 10, 'num_after': 10},
'SELECT anon_1.message_id, anon_1.flags \nFROM ((SELECT message_id, flags \nFROM zerver_usermessage \nWHERE user_profile_id = 4 AND message_id <= 99 ORDER BY message_id DESC \n LIMIT 10) UNION ALL (SELECT message_id, flags \nFROM zerver_usermessage \nWHERE user_profile_id = 4 AND message_id >= 100 ORDER BY message_id ASC \n LIMIT 11)) AS anon_1 ORDER BY message_id ASC')
def test_get_old_messages_with_narrow_queries(self):
query_ids = self.get_query_ids()
sql = 'SELECT anon_1.message_id, anon_1.flags \nFROM (SELECT message_id, flags \nFROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \nWHERE user_profile_id = 4 AND (sender_id = 3 AND recipient_id = 4 OR sender_id = 4 AND recipient_id = 3) AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC'
self.common_check_get_old_messages_query({'anchor': 0, 'num_before': 0, 'num_after': 10,
'narrow': '[["pm-with", "othello@zulip.com"]]'},
sql)
sql = 'SELECT anon_1.message_id, anon_1.flags \nFROM (SELECT message_id, flags \nFROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \nWHERE user_profile_id = 4 AND (flags & 2) != 0 AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC'
self.common_check_get_old_messages_query({'anchor': 0, 'num_before': 0, 'num_after': 10,
'narrow': '[["is", "starred"]]'},
sql)
sql = 'SELECT anon_1.message_id, anon_1.flags \nFROM (SELECT message_id, flags \nFROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \nWHERE user_profile_id = 4 AND sender_id = 3 AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC'
self.common_check_get_old_messages_query({'anchor': 0, 'num_before': 0, 'num_after': 10,
'narrow': '[["sender", "othello@zulip.com"]]'},
sql)
sql_template = 'SELECT anon_1.message_id \nFROM (SELECT id AS message_id \nFROM zerver_message \nWHERE recipient_id = {scotland_recipient} AND zerver_message.id >= 0 ORDER BY zerver_message.id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC'
sql = sql_template.format(**query_ids)
self.common_check_get_old_messages_query({'anchor': 0, 'num_before': 0, 'num_after': 10,
'narrow': '[["stream", "Scotland"]]'},
sql)
sql = "SELECT anon_1.message_id, anon_1.flags \nFROM (SELECT message_id, flags \nFROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \nWHERE user_profile_id = 4 AND upper(subject) = upper('blah') AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC"
self.common_check_get_old_messages_query({'anchor': 0, 'num_before': 0, 'num_after': 10,
'narrow': '[["topic", "blah"]]'},
sql)
sql_template = "SELECT anon_1.message_id \nFROM (SELECT id AS message_id \nFROM zerver_message \nWHERE recipient_id = {scotland_recipient} AND upper(subject) = upper('blah') AND zerver_message.id >= 0 ORDER BY zerver_message.id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC"
sql = sql_template.format(**query_ids)
self.common_check_get_old_messages_query({'anchor': 0, 'num_before': 0, 'num_after': 10,
'narrow': '[["stream", "Scotland"], ["topic", "blah"]]'},
sql)
# Narrow to pms with yourself
sql = 'SELECT anon_1.message_id, anon_1.flags \nFROM (SELECT message_id, flags \nFROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \nWHERE user_profile_id = 4 AND sender_id = 4 AND recipient_id = 4 AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC'
self.common_check_get_old_messages_query({'anchor': 0, 'num_before': 0, 'num_after': 10,
'narrow': '[["pm-with", "hamlet@zulip.com"]]'},
sql)
sql_template = 'SELECT anon_1.message_id, anon_1.flags \nFROM (SELECT message_id, flags \nFROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \nWHERE user_profile_id = 4 AND recipient_id = {scotland_recipient} AND (flags & 2) != 0 AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC'
sql = sql_template.format(**query_ids)
self.common_check_get_old_messages_query({'anchor': 0, 'num_before': 0, 'num_after': 10,
'narrow': '[["stream", "Scotland"], ["is", "starred"]]'},
sql)
def test_get_old_messages_with_search_queries(self):
query_ids = self.get_query_ids()
sql = "SELECT anon_1.message_id, anon_1.flags, anon_1.subject, anon_1.rendered_content, anon_1.content_matches, anon_1.subject_matches \nFROM (SELECT message_id, flags, subject, rendered_content, ts_match_locs_array('zulip.english_us_search', rendered_content, plainto_tsquery('zulip.english_us_search', 'jumping')) AS content_matches, ts_match_locs_array('zulip.english_us_search', escape_html(subject), plainto_tsquery('zulip.english_us_search', 'jumping')) AS subject_matches \nFROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \nWHERE user_profile_id = 4 AND (search_tsvector @@ plainto_tsquery('zulip.english_us_search', 'jumping')) AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC"
self.common_check_get_old_messages_query({'anchor': 0, 'num_before': 0, 'num_after': 10,
'narrow': '[["search", "jumping"]]'},
sql)
sql_template = "SELECT anon_1.message_id, anon_1.subject, anon_1.rendered_content, anon_1.content_matches, anon_1.subject_matches \nFROM (SELECT id AS message_id, subject, rendered_content, ts_match_locs_array('zulip.english_us_search', rendered_content, plainto_tsquery('zulip.english_us_search', 'jumping')) AS content_matches, ts_match_locs_array('zulip.english_us_search', escape_html(subject), plainto_tsquery('zulip.english_us_search', 'jumping')) AS subject_matches \nFROM zerver_message \nWHERE recipient_id = {scotland_recipient} AND (search_tsvector @@ plainto_tsquery('zulip.english_us_search', 'jumping')) AND zerver_message.id >= 0 ORDER BY zerver_message.id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC"
sql = sql_template.format(**query_ids)
self.common_check_get_old_messages_query({'anchor': 0, 'num_before': 0, 'num_after': 10,
'narrow': '[["stream", "Scotland"], ["search", "jumping"]]'},
sql)
sql = 'SELECT anon_1.message_id, anon_1.flags, anon_1.subject, anon_1.rendered_content, anon_1.content_matches, anon_1.subject_matches \nFROM (SELECT message_id, flags, subject, rendered_content, ts_match_locs_array(\'zulip.english_us_search\', rendered_content, plainto_tsquery(\'zulip.english_us_search\', \'"jumping" quickly\')) AS content_matches, ts_match_locs_array(\'zulip.english_us_search\', escape_html(subject), plainto_tsquery(\'zulip.english_us_search\', \'"jumping" quickly\')) AS subject_matches \nFROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \nWHERE user_profile_id = 4 AND (content ILIKE \'%jumping%\' OR subject ILIKE \'%jumping%\') AND (search_tsvector @@ plainto_tsquery(\'zulip.english_us_search\', \'"jumping" quickly\')) AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC'
self.common_check_get_old_messages_query({'anchor': 0, 'num_before': 0, 'num_after': 10,
'narrow': '[["search", "\\"jumping\\" quickly"]]'},
sql)
class EditMessageTest(AuthedTestCase):
def check_message(self, msg_id, subject=None, content=None):
msg = Message.objects.get(id=msg_id)
cached = msg.to_dict(False)
uncached = msg.to_dict_uncached(False)
self.assertEqual(cached, uncached)
if subject:
self.assertEqual(msg.subject, subject)
if content:
self.assertEqual(msg.content, content)
return msg
def test_save_message(self):
# This is also tested by a client test, but here we can verify
# the cache against the database
self.login("hamlet@zulip.com")
msg_id = self.send_message("hamlet@zulip.com", "Scotland", Recipient.STREAM,
subject="editing", content="before edit")
result = self.client.post("/json/update_message", {
'message_id': msg_id,
'content': 'after edit'
})
self.assert_json_success(result)
self.check_message(msg_id, content="after edit")
result = self.client.post("/json/update_message", {
'message_id': msg_id,
'subject': 'edited'
})
self.assert_json_success(result)
self.check_message(msg_id, subject="edited")
def test_propagate_topic_forward(self):
self.login("hamlet@zulip.com")
id1 = self.send_message("hamlet@zulip.com", "Scotland", Recipient.STREAM,
subject="topic1")
id2 = self.send_message("iago@zulip.com", "Scotland", Recipient.STREAM,
subject="topic1")
id3 = self.send_message("iago@zulip.com", "Rome", Recipient.STREAM,
subject="topic1")
id4 = self.send_message("hamlet@zulip.com", "Scotland", Recipient.STREAM,
subject="topic2")
id5 = self.send_message("iago@zulip.com", "Scotland", Recipient.STREAM,
subject="topic1")
result = self.client.post("/json/update_message", {
'message_id': id1,
'subject': 'edited',
'propagate_mode': 'change_later'
})
self.assert_json_success(result)
self.check_message(id1, subject="edited")
self.check_message(id2, subject="edited")
self.check_message(id3, subject="topic1")
self.check_message(id4, subject="topic2")
self.check_message(id5, subject="edited")
def test_propagate_all_topics(self):
self.login("hamlet@zulip.com")
id1 = self.send_message("hamlet@zulip.com", "Scotland", Recipient.STREAM,
subject="topic1")
id2 = self.send_message("hamlet@zulip.com", "Scotland", Recipient.STREAM,
subject="topic1")
id3 = self.send_message("iago@zulip.com", "Rome", Recipient.STREAM,
subject="topic1")
id4 = self.send_message("hamlet@zulip.com", "Scotland", Recipient.STREAM,
subject="topic2")
id5 = self.send_message("iago@zulip.com", "Scotland", Recipient.STREAM,
subject="topic1")
id6 = self.send_message("iago@zulip.com", "Scotland", Recipient.STREAM,
subject="topic3")
result = self.client.post("/json/update_message", {
'message_id': id2,
'subject': 'edited',
'propagate_mode': 'change_all'
})
self.assert_json_success(result)
self.check_message(id1, subject="edited")
self.check_message(id2, subject="edited")
self.check_message(id3, subject="topic1")
self.check_message(id4, subject="topic2")
self.check_message(id5, subject="edited")
self.check_message(id6, subject="topic3")
class StarTests(AuthedTestCase):
def change_star(self, messages, add=True):
return self.client.post("/json/update_message_flags",
{"messages": ujson.dumps(messages),
"op": "add" if add else "remove",
"flag": "starred"})
def test_change_star(self):
"""
You can set a message as starred/un-starred through
/json/update_message_flags.
"""
self.login("hamlet@zulip.com")
message_ids = [self.send_message("hamlet@zulip.com", "hamlet@zulip.com",
Recipient.PERSONAL, "test")]
# Star a message.
result = self.change_star(message_ids)
self.assert_json_success(result)
for msg in self.get_old_messages():
if msg['id'] in message_ids:
self.assertEqual(msg['flags'], ['starred'])
else:
self.assertEqual(msg['flags'], ['read'])
result = self.change_star(message_ids, False)
self.assert_json_success(result)
# Remove the stars.
for msg in self.get_old_messages():
if msg['id'] in message_ids:
self.assertEqual(msg['flags'], [])
def test_new_message(self):
"""
New messages aren't starred.
"""
test_email = "hamlet@zulip.com"
self.login(test_email)
content = "Test message for star"
self.send_message(test_email, "Verona", Recipient.STREAM,
content=content)
sent_message = UserMessage.objects.filter(
user_profile=get_user_profile_by_email(test_email)
).order_by("id").reverse()[0]
self.assertEqual(sent_message.message.content, content)
self.assertFalse(sent_message.flags.starred)
class CheckMessageTest(AuthedTestCase):
def test_basic_check_message_call(self):
sender = get_user_profile_by_email('othello@zulip.com')
client, _ = Client.objects.get_or_create(name="test suite")
stream_name = 'integration'
stream, _ = create_stream_if_needed(Realm.objects.get(domain="zulip.com"), stream_name)
message_type_name = 'stream'
message_to = None
message_to = [stream_name]
subject_name = 'issue'
message_content = 'whatever'
ret = check_message(sender, client, message_type_name, message_to,
subject_name, message_content)
self.assertEqual(ret['message'].sender.email, 'othello@zulip.com')
def test_bot_pm_feature(self):
# We send a PM to a bot's owner if their bot sends a message to
# an unsubscribed stream
parent = get_user_profile_by_email('othello@zulip.com')
bot = do_create_user(
email='othello-bot@zulip.com',
password='',
realm=parent.realm,
full_name='',
short_name='',
active=True,
bot=True,
bot_owner=parent
)
bot.last_reminder = None
sender = bot
client, _ = Client.objects.get_or_create(name="test suite")
stream_name = 'integration'
stream, _ = create_stream_if_needed(Realm.objects.get(domain="zulip.com"), stream_name)
message_type_name = 'stream'
message_to = None
message_to = [stream_name]
subject_name = 'issue'
message_content = 'whatever'
old_count = message_stream_count(parent)
ret = check_message(sender, client, message_type_name, message_to,
subject_name, message_content)
new_count = message_stream_count(parent)
self.assertEqual(new_count, old_count + 1)
self.assertEqual(ret['message'].sender.email, 'othello-bot@zulip.com')