2014-01-31 16:44:45 +01:00
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from django . db . models import Q
2014-02-25 21:25:02 +01:00
from sqlalchemy . sql import (
select , column , compiler
)
2014-01-31 16:44:45 +01:00
from zerver . lib import bugdown
from zerver . decorator import JsonableError
from zerver . lib . test_runner import slow
2014-02-13 16:24:06 +01:00
from zerver . views . messages import (
2014-02-25 21:25:02 +01:00
exclude_muting_conditions , get_sqlalchemy_connection ,
2014-02-13 16:24:06 +01:00
get_old_messages_backend , ok_to_include_history ,
)
2014-01-31 16:44:45 +01:00
from zilencer . models import Deployment
from zerver . lib . test_helpers import (
AuthedTestCase , POSTRequestMock ,
get_user_messages ,
message_ids , message_stream_count ,
most_recent_message ,
queries_captured ,
)
from zerver . models import (
MAX_MESSAGE_LENGTH , MAX_SUBJECT_LENGTH ,
Client , Message , Realm , Recipient , Stream , UserMessage , UserProfile ,
2014-02-13 16:24:06 +01:00
get_display_recipient , get_recipient , get_realm , get_stream , get_user_profile_by_email ,
2014-01-31 16:44:45 +01:00
)
from zerver . lib . actions import (
check_message , check_send_message ,
create_stream_if_needed ,
do_add_subscription , do_create_user ,
)
import datetime
import time
2014-02-25 21:25:02 +01:00
import re
2014-01-31 16:44:45 +01:00
import ujson
2014-02-25 21:25:02 +01:00
def get_sqlalchemy_query_params ( query ) :
dialect = get_sqlalchemy_connection ( ) . dialect
comp = compiler . SQLCompiler ( dialect , query )
comp . compile ( )
return comp . params
def fix_ws ( s ) :
return re . sub ( ' \ s+ ' , ' ' , str ( s ) ) . strip ( )
def get_recipient_id_for_stream_name ( realm , stream_name ) :
stream = get_stream ( stream_name , realm )
return get_recipient ( Recipient . STREAM , stream . id ) . id
2014-02-13 16:24:06 +01:00
class IncludeHistoryTest ( AuthedTestCase ) :
def test_ok_to_include_history ( self ) :
realm = get_realm ( ' zulip.com ' )
create_stream_if_needed ( realm , ' public_stream ' )
2014-02-13 18:53:51 +01:00
# Negated stream searches should not include history.
narrow = [
dict ( operator = ' stream ' , operand = ' public_stream ' , negated = True ) ,
]
self . assertFalse ( ok_to_include_history ( narrow , realm ) )
2014-02-13 16:24:06 +01:00
# Definitely forbid seeing history on private streams.
narrow = [
dict ( operator = ' stream ' , operand = ' private_stream ' ) ,
]
self . assertFalse ( ok_to_include_history ( narrow , realm ) )
# History doesn't apply to PMs.
narrow = [
dict ( operator = ' is ' , operand = ' private ' ) ,
]
self . assertFalse ( ok_to_include_history ( narrow , realm ) )
# If we are looking for something like starred messages, there is
# no point in searching historical messages.
narrow = [
dict ( operator = ' stream ' , operand = ' public_stream ' ) ,
dict ( operator = ' is ' , operand = ' starred ' ) ,
]
self . assertFalse ( ok_to_include_history ( narrow , realm ) )
# simple True case
narrow = [
dict ( operator = ' stream ' , operand = ' public_stream ' ) ,
]
self . assertTrue ( ok_to_include_history ( narrow , realm ) )
narrow = [
dict ( operator = ' stream ' , operand = ' public_stream ' ) ,
dict ( operator = ' topic ' , operand = ' whatever ' ) ,
dict ( operator = ' search ' , operand = ' needle in haystack ' ) ,
]
self . assertTrue ( ok_to_include_history ( narrow , realm ) )
2014-01-31 16:44:45 +01:00
class TestCrossRealmPMs ( AuthedTestCase ) :
def create_user ( self , email ) :
username , domain = email . split ( ' @ ' )
self . register ( username , ' test ' , domain = domain )
return get_user_profile_by_email ( email )
def test_same_realm ( self ) :
""" Users on the same realm can PM each other """
r1 = Realm . objects . create ( domain = ' 1.example.com ' )
deployment = Deployment . objects . filter ( ) [ 0 ]
deployment . realms . add ( r1 )
user1_email = ' user1@1.example.com '
user1 = self . create_user ( user1_email )
user2_email = ' user2@1.example.com '
user2 = self . create_user ( user2_email )
self . send_message ( user1_email , user2_email , Recipient . PERSONAL )
messages = get_user_messages ( user2 )
self . assertEqual ( len ( messages ) , 1 )
self . assertEquals ( messages [ 0 ] . sender . pk , user1 . pk )
def test_diffrent_realms ( self ) :
""" Users on the different realms can not PM each other """
r1 = Realm . objects . create ( domain = ' 1.example.com ' )
r2 = Realm . objects . create ( domain = ' 2.example.com ' )
deployment = Deployment . objects . filter ( ) [ 0 ]
deployment . realms . add ( r1 )
deployment . realms . add ( r2 )
user1_email = ' user1@1.example.com '
self . create_user ( user1_email )
user2_email = ' user2@2.example.com '
self . create_user ( user2_email )
with self . assertRaisesRegexp ( JsonableError ,
' You can \' t send private messages outside of your organization. ' ) :
self . send_message ( user1_email , user2_email , Recipient . PERSONAL )
def test_three_diffrent_realms ( self ) :
""" Users on three different realms can not PM each other """
r1 = Realm . objects . create ( domain = ' 1.example.com ' )
r2 = Realm . objects . create ( domain = ' 2.example.com ' )
r3 = Realm . objects . create ( domain = ' 3.example.com ' )
deployment = Deployment . objects . filter ( ) [ 0 ]
deployment . realms . add ( r1 )
deployment . realms . add ( r2 )
deployment . realms . add ( r3 )
user1_email = ' user1@1.example.com '
self . create_user ( user1_email )
user2_email = ' user2@2.example.com '
self . create_user ( user2_email )
user3_email = ' user3@2.example.com '
self . create_user ( user3_email )
with self . assertRaisesRegexp ( JsonableError ,
' You can \' t send private messages outside of your organization. ' ) :
self . send_message ( user1_email , [ user2_email , user3_email ] , Recipient . PERSONAL )
def test_from_zulip_realm ( self ) :
""" Users in the zulip.com realm can PM any realm """
r1 = Realm . objects . create ( domain = ' 1.example.com ' )
deployment = Deployment . objects . filter ( ) [ 0 ]
deployment . realms . add ( r1 )
user1_email = ' user1@zulip.com '
user1 = self . create_user ( user1_email )
user2_email = ' user2@1.example.com '
user2 = self . create_user ( user2_email )
self . send_message ( user1_email , user2_email , Recipient . PERSONAL )
messages = get_user_messages ( user2 )
self . assertEqual ( len ( messages ) , 1 )
self . assertEquals ( messages [ 0 ] . sender . pk , user1 . pk )
def test_to_zulip_realm ( self ) :
""" All users can PM users in the zulip.com realm """
r1 = Realm . objects . create ( domain = ' 1.example.com ' )
deployment = Deployment . objects . filter ( ) [ 0 ]
deployment . realms . add ( r1 )
user1_email = ' user1@1.example.com '
user1 = self . create_user ( user1_email )
user2_email = ' user2@zulip.com '
user2 = self . create_user ( user2_email )
self . send_message ( user1_email , user2_email , Recipient . PERSONAL )
messages = get_user_messages ( user2 )
self . assertEqual ( len ( messages ) , 1 )
self . assertEquals ( messages [ 0 ] . sender . pk , user1 . pk )
def test_zulip_realm_can_not_join_realms ( self ) :
""" Adding a zulip.com user to a PM will not let you cross realms """
r1 = Realm . objects . create ( domain = ' 1.example.com ' )
r2 = Realm . objects . create ( domain = ' 2.example.com ' )
deployment = Deployment . objects . filter ( ) [ 0 ]
deployment . realms . add ( r1 )
deployment . realms . add ( r2 )
user1_email = ' user1@1.example.com '
self . create_user ( user1_email )
user2_email = ' user2@2.example.com '
self . create_user ( user2_email )
user3_email = ' user3@zulip.com '
self . create_user ( user3_email )
with self . assertRaisesRegexp ( JsonableError ,
' You can \' t send private messages outside of your organization. ' ) :
self . send_message ( user1_email , [ user2_email , user3_email ] ,
Recipient . PERSONAL )
class PersonalMessagesTest ( AuthedTestCase ) :
def test_auto_subbed_to_personals ( self ) :
"""
Newly created users are auto - subbed to the ability to receive
personals .
"""
self . register ( " test " , " test " )
user_profile = get_user_profile_by_email ( ' test@zulip.com ' )
old_messages_count = message_stream_count ( user_profile )
self . send_message ( " test@zulip.com " , " test@zulip.com " , Recipient . PERSONAL )
new_messages_count = message_stream_count ( user_profile )
self . assertEqual ( new_messages_count , old_messages_count + 1 )
recipient = Recipient . objects . get ( type_id = user_profile . id ,
type = Recipient . PERSONAL )
self . assertEqual ( most_recent_message ( user_profile ) . recipient , recipient )
@slow ( 0.36 , " checks several profiles " )
def test_personal_to_self ( self ) :
"""
If you send a personal to yourself , only you see it .
"""
old_user_profiles = list ( UserProfile . objects . all ( ) )
self . register ( " test1 " , " test1 " )
old_messages = [ ]
for user_profile in old_user_profiles :
old_messages . append ( message_stream_count ( user_profile ) )
self . send_message ( " test1@zulip.com " , " test1@zulip.com " , Recipient . PERSONAL )
new_messages = [ ]
for user_profile in old_user_profiles :
new_messages . append ( message_stream_count ( user_profile ) )
self . assertEqual ( old_messages , new_messages )
user_profile = get_user_profile_by_email ( " test1@zulip.com " )
recipient = Recipient . objects . get ( type_id = user_profile . id , type = Recipient . PERSONAL )
self . assertEqual ( most_recent_message ( user_profile ) . recipient , recipient )
def assert_personal ( self , sender_email , receiver_email , content = " test content " ) :
"""
Send a private message from ` sender_email ` to ` receiver_email ` and check
that only those two parties actually received the message .
"""
sender = get_user_profile_by_email ( sender_email )
receiver = get_user_profile_by_email ( receiver_email )
sender_messages = message_stream_count ( sender )
receiver_messages = message_stream_count ( receiver )
other_user_profiles = UserProfile . objects . filter ( ~ Q ( email = sender_email ) &
~ Q ( email = receiver_email ) )
old_other_messages = [ ]
for user_profile in other_user_profiles :
old_other_messages . append ( message_stream_count ( user_profile ) )
self . send_message ( sender_email , receiver_email , Recipient . PERSONAL , content )
# Users outside the conversation don't get the message.
new_other_messages = [ ]
for user_profile in other_user_profiles :
new_other_messages . append ( message_stream_count ( user_profile ) )
self . assertEqual ( old_other_messages , new_other_messages )
# The personal message is in the streams of both the sender and receiver.
self . assertEqual ( message_stream_count ( sender ) ,
sender_messages + 1 )
self . assertEqual ( message_stream_count ( receiver ) ,
receiver_messages + 1 )
recipient = Recipient . objects . get ( type_id = receiver . id , type = Recipient . PERSONAL )
self . assertEqual ( most_recent_message ( sender ) . recipient , recipient )
self . assertEqual ( most_recent_message ( receiver ) . recipient , recipient )
@slow ( 0.28 , " assert_personal checks several profiles " )
def test_personal ( self ) :
"""
If you send a personal , only you and the recipient see it .
"""
self . login ( " hamlet@zulip.com " )
self . assert_personal ( " hamlet@zulip.com " , " othello@zulip.com " )
@slow ( 0.28 , " assert_personal checks several profiles " )
def test_non_ascii_personal ( self ) :
"""
Sending a PM containing non - ASCII characters succeeds .
"""
self . login ( " hamlet@zulip.com " )
self . assert_personal ( " hamlet@zulip.com " , " othello@zulip.com " , u " hümbüǵ " )
class StreamMessagesTest ( AuthedTestCase ) :
def assert_stream_message ( self , stream_name , subject = " test subject " ,
content = " test content " ) :
"""
Check that messages sent to a stream reach all subscribers to that stream .
"""
subscribers = self . users_subscribed_to_stream ( stream_name , " zulip.com " )
old_subscriber_messages = [ ]
for subscriber in subscribers :
old_subscriber_messages . append ( message_stream_count ( subscriber ) )
non_subscribers = [ user_profile for user_profile in UserProfile . objects . all ( )
if user_profile not in subscribers ]
old_non_subscriber_messages = [ ]
for non_subscriber in non_subscribers :
old_non_subscriber_messages . append ( message_stream_count ( non_subscriber ) )
a_subscriber_email = subscribers [ 0 ] . email
self . login ( a_subscriber_email )
self . send_message ( a_subscriber_email , stream_name , Recipient . STREAM ,
subject , content )
# Did all of the subscribers get the message?
new_subscriber_messages = [ ]
for subscriber in subscribers :
new_subscriber_messages . append ( message_stream_count ( subscriber ) )
# Did non-subscribers not get the message?
new_non_subscriber_messages = [ ]
for non_subscriber in non_subscribers :
new_non_subscriber_messages . append ( message_stream_count ( non_subscriber ) )
self . assertEqual ( old_non_subscriber_messages , new_non_subscriber_messages )
self . assertEqual ( new_subscriber_messages , [ elt + 1 for elt in old_subscriber_messages ] )
def test_not_too_many_queries ( self ) :
recipient_list = [ ' hamlet@zulip.com ' , ' iago@zulip.com ' , ' cordelia@zulip.com ' , ' othello@zulip.com ' ]
for email in recipient_list :
self . subscribe_to_stream ( email , " Denmark " )
sender_email = ' hamlet@zulip.com '
sender = get_user_profile_by_email ( sender_email )
message_type_name = " stream "
( sending_client , _ ) = Client . objects . get_or_create ( name = " test suite " )
stream = ' Denmark '
subject = ' foo '
content = ' whatever '
realm = sender . realm
def send_message ( ) :
check_send_message ( sender , sending_client , message_type_name , [ stream ] ,
subject , content , forwarder_user_profile = sender , realm = realm )
send_message ( ) # prime the caches
with queries_captured ( ) as queries :
send_message ( )
self . assert_length ( queries , 5 )
def test_message_mentions ( self ) :
user_profile = get_user_profile_by_email ( " iago@zulip.com " )
self . subscribe_to_stream ( user_profile . email , " Denmark " )
self . send_message ( " hamlet@zulip.com " , " Denmark " , Recipient . STREAM ,
content = " test @**Iago** rules " )
message = most_recent_message ( user_profile )
assert ( UserMessage . objects . get ( user_profile = user_profile , message = message ) . flags . mentioned . is_set )
@slow ( 0.28 , ' checks all users ' )
def test_message_to_stream ( self ) :
"""
If you send a message to a stream , everyone subscribed to the stream
receives the messages .
"""
self . assert_stream_message ( " Scotland " )
@slow ( 0.37 , ' checks all users ' )
def test_non_ascii_stream_message ( self ) :
"""
Sending a stream message containing non - ASCII characters in the stream
name , subject , or message body succeeds .
"""
self . login ( " hamlet@zulip.com " )
# Subscribe everyone to a stream with non-ASCII characters.
non_ascii_stream_name = u " hümbüǵ "
realm = Realm . objects . get ( domain = " zulip.com " )
stream , _ = create_stream_if_needed ( realm , non_ascii_stream_name )
for user_profile in UserProfile . objects . filter ( realm = realm ) :
do_add_subscription ( user_profile , stream , no_log = True )
self . assert_stream_message ( non_ascii_stream_name , subject = u " hümbüǵ " ,
content = u " hümbüǵ " )
class MessageDictTest ( AuthedTestCase ) :
@slow ( 1.6 , ' builds lots of messages ' )
def test_bulk_message_fetching ( self ) :
realm = Realm . objects . get ( domain = " zulip.com " )
sender = get_user_profile_by_email ( ' othello@zulip.com ' )
receiver = get_user_profile_by_email ( ' hamlet@zulip.com ' )
pm_recipient = Recipient . objects . get ( type_id = receiver . id , type = Recipient . PERSONAL )
stream , _ = create_stream_if_needed ( realm , ' devel ' )
stream_recipient = Recipient . objects . get ( type_id = stream . id , type = Recipient . STREAM )
sending_client , _ = Client . objects . get_or_create ( name = " test suite " )
for i in range ( 300 ) :
for recipient in [ pm_recipient , stream_recipient ] :
message = Message (
sender = sender ,
recipient = recipient ,
subject = ' whatever ' ,
content = ' whatever %d ' % i ,
pub_date = datetime . datetime . now ( ) ,
sending_client = sending_client ,
last_edit_time = datetime . datetime . now ( ) ,
edit_history = ' [] '
)
message . save ( )
ids = [ row [ ' id ' ] for row in Message . objects . all ( ) . values ( ' id ' ) ]
num_ids = len ( ids )
self . assertTrue ( num_ids > = 600 )
t = time . time ( )
with queries_captured ( ) as queries :
rows = list ( Message . get_raw_db_rows ( ids ) )
for row in rows :
Message . build_dict_from_raw_db_row ( row , False )
delay = time . time ( ) - t
# Make sure we don't take longer than 1ms per message to extract messages.
self . assertTrue ( delay < 0.001 * num_ids )
self . assert_length ( queries , 7 )
self . assertEqual ( len ( rows ) , num_ids )
def test_applying_markdown ( self ) :
sender = get_user_profile_by_email ( ' othello@zulip.com ' )
receiver = get_user_profile_by_email ( ' hamlet@zulip.com ' )
recipient = Recipient . objects . get ( type_id = receiver . id , type = Recipient . PERSONAL )
sending_client , _ = Client . objects . get_or_create ( name = " test suite " )
message = Message (
sender = sender ,
recipient = recipient ,
subject = ' whatever ' ,
content = ' hello **world** ' ,
pub_date = datetime . datetime . now ( ) ,
sending_client = sending_client ,
last_edit_time = datetime . datetime . now ( ) ,
edit_history = ' [] '
)
message . save ( )
# An important part of this test is to get the message through this exact code path,
# because there is an ugly hack we need to cover. So don't just say "row = message".
row = Message . get_raw_db_rows ( [ message . id ] ) [ 0 ]
dct = Message . build_dict_from_raw_db_row ( row , apply_markdown = True )
expected_content = ' <p>hello <strong>world</strong></p> '
self . assertEqual ( dct [ ' content ' ] , expected_content )
message = Message . objects . get ( id = message . id )
self . assertEqual ( message . rendered_content , expected_content )
self . assertEqual ( message . rendered_content_version , bugdown . version )
class MessagePOSTTest ( AuthedTestCase ) :
def test_message_to_self ( self ) :
"""
Sending a message to a stream to which you are subscribed is
successful .
"""
self . login ( " hamlet@zulip.com " )
result = self . client . post ( " /json/send_message " , { " type " : " stream " ,
" to " : " Verona " ,
" client " : " test suite " ,
" content " : " Test message " ,
" subject " : " Test subject " } )
self . assert_json_success ( result )
def test_api_message_to_self ( self ) :
"""
Same as above , but for the API view
"""
email = " hamlet@zulip.com "
api_key = self . get_api_key ( email )
result = self . client . post ( " /api/v1/send_message " , { " type " : " stream " ,
" to " : " Verona " ,
" client " : " test suite " ,
" content " : " Test message " ,
" subject " : " Test subject " ,
" email " : email ,
" api-key " : api_key } )
self . assert_json_success ( result )
def test_message_to_nonexistent_stream ( self ) :
"""
Sending a message to a nonexistent stream fails .
"""
self . login ( " hamlet@zulip.com " )
self . assertFalse ( Stream . objects . filter ( name = " nonexistent_stream " ) )
result = self . client . post ( " /json/send_message " , { " type " : " stream " ,
" to " : " nonexistent_stream " ,
" client " : " test suite " ,
" content " : " Test message " ,
" subject " : " Test subject " } )
self . assert_json_error ( result , " Stream does not exist " )
def test_personal_message ( self ) :
"""
Sending a personal message to a valid username is successful .
"""
self . login ( " hamlet@zulip.com " )
result = self . client . post ( " /json/send_message " , { " type " : " private " ,
" content " : " Test message " ,
" client " : " test suite " ,
" to " : " othello@zulip.com " } )
self . assert_json_success ( result )
def test_personal_message_to_nonexistent_user ( self ) :
"""
Sending a personal message to an invalid email returns error JSON .
"""
self . login ( " hamlet@zulip.com " )
result = self . client . post ( " /json/send_message " , { " type " : " private " ,
" content " : " Test message " ,
" client " : " test suite " ,
" to " : " nonexistent " } )
self . assert_json_error ( result , " Invalid email ' nonexistent ' " )
def test_invalid_type ( self ) :
"""
Sending a message of unknown type returns error JSON .
"""
self . login ( " hamlet@zulip.com " )
result = self . client . post ( " /json/send_message " , { " type " : " invalid type " ,
" content " : " Test message " ,
" client " : " test suite " ,
" to " : " othello@zulip.com " } )
self . assert_json_error ( result , " Invalid message type " )
def test_empty_message ( self ) :
"""
Sending a message that is empty or only whitespace should fail
"""
self . login ( " hamlet@zulip.com " )
result = self . client . post ( " /json/send_message " , { " type " : " private " ,
" content " : " " ,
" client " : " test suite " ,
" to " : " othello@zulip.com " } )
self . assert_json_error ( result , " Message must not be empty " )
def test_mirrored_huddle ( self ) :
"""
Sending a mirrored huddle message works
"""
self . login ( " starnine@mit.edu " )
result = self . client . post ( " /json/send_message " , { " type " : " private " ,
" sender " : " sipbtest@mit.edu " ,
" content " : " Test message " ,
" client " : " zephyr_mirror " ,
" to " : ujson . dumps ( [ " starnine@mit.edu " ,
" espuser@mit.edu " ] ) } )
self . assert_json_success ( result )
def test_mirrored_personal ( self ) :
"""
Sending a mirrored personal message works
"""
self . login ( " starnine@mit.edu " )
result = self . client . post ( " /json/send_message " , { " type " : " private " ,
" sender " : " sipbtest@mit.edu " ,
" content " : " Test message " ,
" client " : " zephyr_mirror " ,
" to " : " starnine@mit.edu " } )
self . assert_json_success ( result )
def test_duplicated_mirrored_huddle ( self ) :
"""
Sending two mirrored huddles in the row return the same ID
"""
msg = { " type " : " private " ,
" sender " : " sipbtest@mit.edu " ,
" content " : " Test message " ,
" client " : " zephyr_mirror " ,
" to " : ujson . dumps ( [ " sipbcert@mit.edu " ,
" starnine@mit.edu " ] ) }
self . login ( " starnine@mit.edu " )
result1 = self . client . post ( " /json/send_message " , msg )
self . login ( " sipbcert@mit.edu " )
result2 = self . client . post ( " /json/send_message " , msg )
self . assertEqual ( ujson . loads ( result1 . content ) [ ' id ' ] ,
ujson . loads ( result2 . content ) [ ' id ' ] )
def test_long_message ( self ) :
"""
Sending a message longer than the maximum message length succeeds but is
truncated .
"""
self . login ( " hamlet@zulip.com " )
long_message = " A " * ( MAX_MESSAGE_LENGTH + 1 )
post_data = { " type " : " stream " , " to " : " Verona " , " client " : " test suite " ,
" content " : long_message , " subject " : " Test subject " }
result = self . client . post ( " /json/send_message " , post_data )
self . assert_json_success ( result )
sent_message = Message . objects . all ( ) . order_by ( ' -id ' ) [ 0 ]
self . assertEquals ( sent_message . content ,
" A " * ( MAX_MESSAGE_LENGTH - 3 ) + " ... " )
def test_long_topic ( self ) :
"""
Sending a message with a topic longer than the maximum topic length
succeeds , but the topic is truncated .
"""
self . login ( " hamlet@zulip.com " )
long_topic = " A " * ( MAX_SUBJECT_LENGTH + 1 )
post_data = { " type " : " stream " , " to " : " Verona " , " client " : " test suite " ,
" content " : " test content " , " subject " : long_topic }
result = self . client . post ( " /json/send_message " , post_data )
self . assert_json_success ( result )
sent_message = Message . objects . all ( ) . order_by ( ' -id ' ) [ 0 ]
self . assertEquals ( sent_message . subject ,
" A " * ( MAX_SUBJECT_LENGTH - 3 ) + " ... " )
class GetOldMessagesTest ( AuthedTestCase ) :
def post_with_params ( self , modified_params ) :
post_params = { " anchor " : 1 , " num_before " : 1 , " num_after " : 1 }
post_params . update ( modified_params )
result = self . client . post ( " /json/get_old_messages " , dict ( post_params ) )
self . assert_json_success ( result )
return ujson . loads ( result . content )
def check_well_formed_messages_response ( self , result ) :
self . assertIn ( " messages " , result )
self . assertIsInstance ( result [ " messages " ] , list )
for message in result [ " messages " ] :
for field in ( " content " , " content_type " , " display_recipient " ,
" avatar_url " , " recipient_id " , " sender_full_name " ,
" sender_short_name " , " timestamp " ) :
self . assertIn ( field , message )
# TODO: deprecate soon in favor of avatar_url
self . assertIn ( ' gravatar_hash ' , message )
def get_query_ids ( self ) :
realm = get_user_profile_by_email ( ' hamlet@zulip.com ' ) . realm
query_ids = { }
scotland_stream = get_stream ( ' Scotland ' , realm )
query_ids [ ' scotland_recipient ' ] = get_recipient ( Recipient . STREAM , scotland_stream . id ) . id
return query_ids
def test_successful_get_old_messages ( self ) :
"""
A call to / json / get_old_messages with valid parameters returns a list of
messages .
"""
self . login ( " hamlet@zulip.com " )
2014-02-11 00:31:26 +01:00
result = self . post_with_params ( dict ( ) )
self . check_well_formed_messages_response ( result )
# We have to support the legacy tuple style while there are old
# clients around, which might include third party home-grown bots.
narrow = [ [ ' pm-with ' , ' othello@zulip.com ' ] ]
result = self . post_with_params ( dict ( narrow = ujson . dumps ( narrow ) ) )
self . check_well_formed_messages_response ( result )
narrow = [ dict ( operator = ' pm-with ' , operand = ' othello@zulip.com ' ) ]
result = self . post_with_params ( dict ( narrow = ujson . dumps ( narrow ) ) )
self . check_well_formed_messages_response ( result )
2014-01-31 16:44:45 +01:00
def test_get_old_messages_with_narrow_pm_with ( self ) :
"""
A request for old messages with a narrow by pm - with only returns
conversations with that user .
"""
me = ' hamlet@zulip.com '
def dr_emails ( dr ) :
return ' , ' . join ( sorted ( set ( [ r [ ' email ' ] for r in dr ] + [ me ] ) ) )
personals = [ m for m in get_user_messages ( get_user_profile_by_email ( me ) )
if m . recipient . type == Recipient . PERSONAL
or m . recipient . type == Recipient . HUDDLE ]
if not personals :
# FIXME: This is bad. We should use test data that is guaranteed
# to contain some personals for every user. See #617.
return
emails = dr_emails ( get_display_recipient ( personals [ 0 ] . recipient ) )
self . login ( me )
2014-02-11 00:31:26 +01:00
narrow = [ dict ( operator = ' pm-with ' , operand = emails ) ]
result = self . post_with_params ( dict ( narrow = ujson . dumps ( narrow ) ) )
2014-01-31 16:44:45 +01:00
self . check_well_formed_messages_response ( result )
for message in result [ " messages " ] :
self . assertEqual ( dr_emails ( message [ ' display_recipient ' ] ) , emails )
def test_get_old_messages_with_narrow_stream ( self ) :
"""
A request for old messages with a narrow by stream only returns
messages for that stream .
"""
self . login ( " hamlet@zulip.com " )
# We need to susbcribe to a stream and then send a message to
# it to ensure that we actually have a stream message in this
# narrow view.
realm = Realm . objects . get ( domain = " zulip.com " )
stream , _ = create_stream_if_needed ( realm , " Scotland " )
do_add_subscription ( get_user_profile_by_email ( " hamlet@zulip.com " ) ,
stream , no_log = True )
self . send_message ( " hamlet@zulip.com " , " Scotland " , Recipient . STREAM )
messages = get_user_messages ( get_user_profile_by_email ( " hamlet@zulip.com " ) )
stream_messages = filter ( lambda msg : msg . recipient . type == Recipient . STREAM ,
messages )
stream_name = get_display_recipient ( stream_messages [ 0 ] . recipient )
stream_id = stream_messages [ 0 ] . recipient . id
2014-02-11 00:31:26 +01:00
narrow = [ dict ( operator = ' stream ' , operand = stream_name ) ]
result = self . post_with_params ( dict ( narrow = ujson . dumps ( narrow ) ) )
2014-01-31 16:44:45 +01:00
self . check_well_formed_messages_response ( result )
for message in result [ " messages " ] :
self . assertEqual ( message [ " type " ] , " stream " )
self . assertEqual ( message [ " recipient_id " ] , stream_id )
def test_get_old_messages_with_narrow_stream_mit_unicode_regex ( self ) :
"""
A request for old messages for a user in the mit . edu relam with unicode
stream name should be correctly escaped in the database query .
"""
self . login ( " starnine@mit.edu " )
# We need to susbcribe to a stream and then send a message to
# it to ensure that we actually have a stream message in this
# narrow view.
realm = Realm . objects . get ( domain = " mit.edu " )
lambda_stream , _ = create_stream_if_needed ( realm , u " \u03bb -stream " )
do_add_subscription ( get_user_profile_by_email ( " starnine@mit.edu " ) ,
lambda_stream , no_log = True )
lambda_stream_d , _ = create_stream_if_needed ( realm , u " \u03bb -stream.d " )
do_add_subscription ( get_user_profile_by_email ( " starnine@mit.edu " ) ,
lambda_stream_d , no_log = True )
self . send_message ( " starnine@mit.edu " , u " \u03bb -stream " , Recipient . STREAM )
self . send_message ( " starnine@mit.edu " , u " \u03bb -stream.d " , Recipient . STREAM )
2014-02-11 00:31:26 +01:00
narrow = [ dict ( operator = ' stream ' , operand = u ' \u03bb -stream ' ) ]
result = self . post_with_params ( dict ( num_after = 2 , narrow = ujson . dumps ( narrow ) ) )
2014-01-31 16:44:45 +01:00
self . check_well_formed_messages_response ( result )
messages = get_user_messages ( get_user_profile_by_email ( " starnine@mit.edu " ) )
stream_messages = filter ( lambda msg : msg . recipient . type == Recipient . STREAM ,
messages )
self . assertEqual ( len ( result [ " messages " ] ) , 2 )
for i , message in enumerate ( result [ " messages " ] ) :
self . assertEqual ( message [ " type " ] , " stream " )
stream_id = stream_messages [ i ] . recipient . id
self . assertEqual ( message [ " recipient_id " ] , stream_id )
def test_get_old_messages_with_narrow_topic_mit_unicode_regex ( self ) :
"""
A request for old messages for a user in the mit . edu relam with unicode
topic name should be correctly escaped in the database query .
"""
self . login ( " starnine@mit.edu " )
# We need to susbcribe to a stream and then send a message to
# it to ensure that we actually have a stream message in this
# narrow view.
realm = Realm . objects . get ( domain = " mit.edu " )
stream , _ = create_stream_if_needed ( realm , " Scotland " )
do_add_subscription ( get_user_profile_by_email ( " starnine@mit.edu " ) ,
stream , no_log = True )
self . send_message ( " starnine@mit.edu " , " Scotland " , Recipient . STREAM ,
subject = u " \u03bb -topic " )
self . send_message ( " starnine@mit.edu " , " Scotland " , Recipient . STREAM ,
subject = u " \u03bb -topic.d " )
2014-02-11 00:31:26 +01:00
narrow = [ dict ( operator = ' topic ' , operand = u ' \u03bb -topic ' ) ]
result = self . post_with_params ( dict ( num_after = 2 , narrow = ujson . dumps ( narrow ) ) )
2014-01-31 16:44:45 +01:00
self . check_well_formed_messages_response ( result )
messages = get_user_messages ( get_user_profile_by_email ( " starnine@mit.edu " ) )
stream_messages = filter ( lambda msg : msg . recipient . type == Recipient . STREAM ,
messages )
self . assertEqual ( len ( result [ " messages " ] ) , 2 )
for i , message in enumerate ( result [ " messages " ] ) :
self . assertEqual ( message [ " type " ] , " stream " )
stream_id = stream_messages [ i ] . recipient . id
self . assertEqual ( message [ " recipient_id " ] , stream_id )
def test_get_old_messages_with_narrow_sender ( self ) :
"""
A request for old messages with a narrow by sender only returns
messages sent by that person .
"""
self . login ( " hamlet@zulip.com " )
# We need to send a message here to ensure that we actually
# have a stream message in this narrow view.
self . send_message ( " hamlet@zulip.com " , " Scotland " , Recipient . STREAM )
self . send_message ( " othello@zulip.com " , " Scotland " , Recipient . STREAM )
self . send_message ( " othello@zulip.com " , " hamlet@zulip.com " , Recipient . PERSONAL )
self . send_message ( " iago@zulip.com " , " Scotland " , Recipient . STREAM )
2014-02-11 00:31:26 +01:00
narrow = [ dict ( operator = ' sender ' , operand = ' othello@zulip.com ' ) ]
result = self . post_with_params ( dict ( narrow = ujson . dumps ( narrow ) ) )
2014-01-31 16:44:45 +01:00
self . check_well_formed_messages_response ( result )
for message in result [ " messages " ] :
self . assertEqual ( message [ " sender_email " ] , " othello@zulip.com " )
2014-02-19 00:35:43 +01:00
def test_get_old_messages_with_only_searching_anchor ( self ) :
"""
Test that specifying an anchor but 0 for num_before and num_after
returns at most 1 message .
"""
self . login ( " cordelia@zulip.com " )
anchor = self . send_message ( " cordelia@zulip.com " , " Scotland " , Recipient . STREAM )
narrow = [ dict ( operator = ' sender ' , operand = ' cordelia@zulip.com ' ) ]
result = self . post_with_params ( dict ( narrow = ujson . dumps ( narrow ) ,
anchor = anchor , num_before = 0 ,
num_after = 0 ) )
self . check_well_formed_messages_response ( result )
self . assertEqual ( len ( result [ ' messages ' ] ) , 1 )
narrow = [ dict ( operator = ' is ' , operand = ' mentioned ' ) ]
result = self . post_with_params ( dict ( narrow = ujson . dumps ( narrow ) ,
anchor = anchor , num_before = 0 ,
num_after = 0 ) )
self . check_well_formed_messages_response ( result )
self . assertEqual ( len ( result [ ' messages ' ] ) , 0 )
2014-01-31 16:44:45 +01:00
def test_missing_params ( self ) :
"""
anchor , num_before , and num_after are all required
POST parameters for get_old_messages .
"""
self . login ( " hamlet@zulip.com " )
required_args = ( ( " anchor " , 1 ) , ( " num_before " , 1 ) , ( " num_after " , 1 ) )
for i in range ( len ( required_args ) ) :
post_params = dict ( required_args [ : i ] + required_args [ i + 1 : ] )
result = self . client . post ( " /json/get_old_messages " , post_params )
self . assert_json_error ( result ,
" Missing ' %s ' argument " % ( required_args [ i ] [ 0 ] , ) )
def test_bad_int_params ( self ) :
"""
num_before , num_after , and narrow must all be non - negative
integers or strings that can be converted to non - negative integers .
"""
self . login ( " hamlet@zulip.com " )
other_params = [ ( " narrow " , { } ) , ( " anchor " , 0 ) ]
int_params = [ " num_before " , " num_after " ]
bad_types = ( False , " " , " -1 " , - 1 )
for idx , param in enumerate ( int_params ) :
for type in bad_types :
# Rotate through every bad type for every integer
# parameter, one at a time.
post_params = dict ( other_params + [ ( param , type ) ] + \
[ ( other_param , 0 ) for other_param in \
int_params [ : idx ] + int_params [ idx + 1 : ] ]
)
result = self . client . post ( " /json/get_old_messages " , post_params )
self . assert_json_error ( result ,
" Bad value for ' %s ' : %s " % ( param , type ) )
def test_bad_narrow_type ( self ) :
"""
narrow must be a list of string pairs .
"""
self . login ( " hamlet@zulip.com " )
other_params = [ ( " anchor " , 0 ) , ( " num_before " , 0 ) , ( " num_after " , 0 ) ]
bad_types = ( False , 0 , ' ' , ' { malformed json, ' ,
' {foo: 3} ' , ' [1,2] ' , ' [[ " x " , " y " , " z " ]] ' )
for type in bad_types :
post_params = dict ( other_params + [ ( " narrow " , type ) ] )
result = self . client . post ( " /json/get_old_messages " , post_params )
self . assert_json_error ( result ,
" Bad value for ' narrow ' : %s " % ( type , ) )
def test_old_empty_narrow ( self ) :
"""
' {} ' is accepted to mean ' no narrow ' , for use by old mobile clients .
"""
self . login ( " hamlet@zulip.com " )
all_result = self . post_with_params ( { } )
narrow_result = self . post_with_params ( { ' narrow ' : ' {} ' } )
for r in ( all_result , narrow_result ) :
self . check_well_formed_messages_response ( r )
self . assertEqual ( message_ids ( all_result ) , message_ids ( narrow_result ) )
def test_bad_narrow_operator ( self ) :
"""
Unrecognized narrow operators are rejected .
"""
self . login ( " hamlet@zulip.com " )
for operator in [ ' ' , ' foo ' , ' stream:verona ' , ' __init__ ' ] :
2014-02-11 00:31:26 +01:00
narrow = [ dict ( operator = operator , operand = ' ' ) ]
params = dict ( anchor = 0 , num_before = 0 , num_after = 0 , narrow = ujson . dumps ( narrow ) )
2014-01-31 16:44:45 +01:00
result = self . client . post ( " /json/get_old_messages " , params )
self . assert_json_error_contains ( result ,
" Invalid narrow operator: unknown operator " )
def exercise_bad_narrow_operand ( self , operator , operands , error_msg ) :
other_params = [ ( " anchor " , 0 ) , ( " num_before " , 0 ) , ( " num_after " , 0 ) ]
for operand in operands :
post_params = dict ( other_params + [
( " narrow " , ujson . dumps ( [ [ operator , operand ] ] ) ) ] )
result = self . client . post ( " /json/get_old_messages " , post_params )
self . assert_json_error_contains ( result , error_msg )
def test_bad_narrow_stream_content ( self ) :
"""
If an invalid stream name is requested in get_old_messages , an error is
returned .
"""
self . login ( " hamlet@zulip.com " )
bad_stream_content = ( 0 , [ ] , [ " x " , " y " ] )
self . exercise_bad_narrow_operand ( " stream " , bad_stream_content ,
" Bad value for ' narrow ' " )
def test_bad_narrow_one_on_one_email_content ( self ) :
"""
If an invalid ' pm-with ' is requested in get_old_messages , an
error is returned .
"""
self . login ( " hamlet@zulip.com " )
bad_stream_content = ( 0 , [ ] , [ " x " , " y " ] )
self . exercise_bad_narrow_operand ( " pm-with " , bad_stream_content ,
" Bad value for ' narrow ' " )
def test_bad_narrow_nonexistent_stream ( self ) :
self . login ( " hamlet@zulip.com " )
self . exercise_bad_narrow_operand ( " stream " , [ ' non-existent stream ' ] ,
" Invalid narrow operator: unknown stream " )
def test_bad_narrow_nonexistent_email ( self ) :
self . login ( " hamlet@zulip.com " )
self . exercise_bad_narrow_operand ( " pm-with " , [ ' non-existent-user@zulip.com ' ] ,
" Invalid narrow operator: unknown user " )
def test_message_without_rendered_content ( self ) :
""" Older messages may not have rendered_content in the database """
m = Message . objects . all ( ) . order_by ( ' -id ' ) [ 0 ]
m . rendered_content = m . rendered_content_version = None
m . content = ' test content '
# Use to_dict_uncached directly to avoid having to deal with memcached
d = m . to_dict_uncached ( True )
self . assertEqual ( d [ ' content ' ] , ' <p>test content</p> ' )
def common_check_get_old_messages_query ( self , query_params , expected ) :
user_profile = get_user_profile_by_email ( " hamlet@zulip.com " )
request = POSTRequestMock ( query_params , user_profile )
with queries_captured ( ) as queries :
get_old_messages_backend ( request , user_profile )
for query in queries :
if " /* get_old_messages */ " in query [ ' sql ' ] :
sql = query [ ' sql ' ] . replace ( " /* get_old_messages */ " , ' ' )
self . assertEqual ( sql , expected )
return
self . fail ( " get_old_messages query not found " )
2014-02-24 21:59:54 +01:00
def test_use_first_unread_anchor ( self ) :
realm = get_realm ( ' zulip.com ' )
create_stream_if_needed ( realm , ' devel ' )
user_profile = get_user_profile_by_email ( " hamlet@zulip.com " )
2014-02-24 23:11:27 +01:00
user_profile . muted_topics = ujson . dumps ( [ [ ' Scotland ' , ' golf ' ] , [ ' devel ' , ' css ' ] , [ ' bogus ' , ' bogus ' ] ] )
2014-02-24 21:59:54 +01:00
user_profile . save ( )
query_params = dict (
use_first_unread_anchor = ' true ' ,
anchor = 0 ,
num_before = 0 ,
num_after = 0 ,
narrow = ' [[ " stream " , " Scotland " ]] '
)
request = POSTRequestMock ( query_params , user_profile )
with queries_captured ( ) as queries :
get_old_messages_backend ( request , user_profile )
queries = filter ( lambda q : q [ ' sql ' ] . startswith ( " SELECT message_id, flags " ) , queries )
ids = { }
2014-02-25 15:41:32 +01:00
for stream_name in [ ' Scotland ' ] :
2014-02-24 21:59:54 +01:00
stream = get_stream ( stream_name , realm )
ids [ stream_name ] = get_recipient ( Recipient . STREAM , stream . id ) . id
2014-02-25 15:41:32 +01:00
cond = ''' AND NOT (recipient_id = {Scotland} AND upper(subject) = upper( ' golf ' )) '''
2014-02-24 21:59:54 +01:00
cond = cond . format ( * * ids )
self . assertTrue ( cond in queries [ 0 ] [ ' sql ' ] )
2014-02-25 21:25:02 +01:00
def test_exclude_muting_conditions ( self ) :
realm = get_realm ( ' zulip.com ' )
create_stream_if_needed ( realm , ' devel ' )
user_profile = get_user_profile_by_email ( " hamlet@zulip.com " )
user_profile . muted_topics = ujson . dumps ( [ [ ' Scotland ' , ' golf ' ] , [ ' devel ' , ' css ' ] , [ ' bogus ' , ' bogus ' ] ] )
user_profile . save ( )
narrow = [
dict ( operator = ' stream ' , operand = ' Scotland ' ) ,
]
muting_conditions = exclude_muting_conditions ( user_profile , narrow )
query = select ( [ column ( " id " ) . label ( " message_id " ) ] , None , " zerver_message " )
query = query . where ( * muting_conditions )
expected_query = '''
SELECT id AS message_id
FROM zerver_message
WHERE NOT ( recipient_id = : recipient_id_1 AND upper ( subject ) = upper ( : upper_1 ) )
'''
self . assertEqual ( fix_ws ( query ) , fix_ws ( expected_query ) )
params = get_sqlalchemy_query_params ( query )
self . assertEqual ( params [ ' recipient_id_1 ' ] , get_recipient_id_for_stream_name ( realm , ' Scotland ' ) )
self . assertEqual ( params [ ' upper_1 ' ] , ' golf ' )
2014-01-31 16:44:45 +01:00
def test_get_old_messages_queries ( self ) :
self . common_check_get_old_messages_query ( { ' anchor ' : 0 , ' num_before ' : 0 , ' num_after ' : 10 } ,
' SELECT anon_1.message_id, anon_1.flags \n FROM (SELECT message_id, flags \n FROM zerver_usermessage \n WHERE user_profile_id = 4 AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 11) AS anon_1 ORDER BY message_id ASC ' )
self . common_check_get_old_messages_query ( { ' anchor ' : 100 , ' num_before ' : 10 , ' num_after ' : 0 } ,
' SELECT anon_1.message_id, anon_1.flags \n FROM (SELECT message_id, flags \n FROM zerver_usermessage \n WHERE user_profile_id = 4 AND message_id <= 100 ORDER BY message_id DESC \n LIMIT 11) AS anon_1 ORDER BY message_id ASC ' )
self . common_check_get_old_messages_query ( { ' anchor ' : 100 , ' num_before ' : 10 , ' num_after ' : 10 } ,
' SELECT anon_1.message_id, anon_1.flags \n FROM ((SELECT message_id, flags \n FROM zerver_usermessage \n WHERE user_profile_id = 4 AND message_id <= 99 ORDER BY message_id DESC \n LIMIT 10) UNION ALL (SELECT message_id, flags \n FROM zerver_usermessage \n WHERE user_profile_id = 4 AND message_id >= 100 ORDER BY message_id ASC \n LIMIT 11)) AS anon_1 ORDER BY message_id ASC ' )
def test_get_old_messages_with_narrow_queries ( self ) :
query_ids = self . get_query_ids ( )
sql = ' SELECT anon_1.message_id, anon_1.flags \n FROM (SELECT message_id, flags \n FROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \n WHERE user_profile_id = 4 AND (sender_id = 3 AND recipient_id = 4 OR sender_id = 4 AND recipient_id = 3) AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC '
self . common_check_get_old_messages_query ( { ' anchor ' : 0 , ' num_before ' : 0 , ' num_after ' : 10 ,
' narrow ' : ' [[ " pm-with " , " othello@zulip.com " ]] ' } ,
sql )
sql = ' SELECT anon_1.message_id, anon_1.flags \n FROM (SELECT message_id, flags \n FROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \n WHERE user_profile_id = 4 AND (flags & 2) != 0 AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC '
self . common_check_get_old_messages_query ( { ' anchor ' : 0 , ' num_before ' : 0 , ' num_after ' : 10 ,
' narrow ' : ' [[ " is " , " starred " ]] ' } ,
sql )
sql = ' SELECT anon_1.message_id, anon_1.flags \n FROM (SELECT message_id, flags \n FROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \n WHERE user_profile_id = 4 AND sender_id = 3 AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC '
self . common_check_get_old_messages_query ( { ' anchor ' : 0 , ' num_before ' : 0 , ' num_after ' : 10 ,
' narrow ' : ' [[ " sender " , " othello@zulip.com " ]] ' } ,
sql )
sql_template = ' SELECT anon_1.message_id \n FROM (SELECT id AS message_id \n FROM zerver_message \n WHERE recipient_id = {scotland_recipient} AND zerver_message.id >= 0 ORDER BY zerver_message.id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC '
sql = sql_template . format ( * * query_ids )
self . common_check_get_old_messages_query ( { ' anchor ' : 0 , ' num_before ' : 0 , ' num_after ' : 10 ,
' narrow ' : ' [[ " stream " , " Scotland " ]] ' } ,
sql )
sql = " SELECT anon_1.message_id, anon_1.flags \n FROM (SELECT message_id, flags \n FROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \n WHERE user_profile_id = 4 AND upper(subject) = upper( ' blah ' ) AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC "
self . common_check_get_old_messages_query ( { ' anchor ' : 0 , ' num_before ' : 0 , ' num_after ' : 10 ,
' narrow ' : ' [[ " topic " , " blah " ]] ' } ,
sql )
sql_template = " SELECT anon_1.message_id \n FROM (SELECT id AS message_id \n FROM zerver_message \n WHERE recipient_id = {scotland_recipient} AND upper(subject) = upper( ' blah ' ) AND zerver_message.id >= 0 ORDER BY zerver_message.id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC "
sql = sql_template . format ( * * query_ids )
self . common_check_get_old_messages_query ( { ' anchor ' : 0 , ' num_before ' : 0 , ' num_after ' : 10 ,
' narrow ' : ' [[ " stream " , " Scotland " ], [ " topic " , " blah " ]] ' } ,
sql )
# Narrow to pms with yourself
sql = ' SELECT anon_1.message_id, anon_1.flags \n FROM (SELECT message_id, flags \n FROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \n WHERE user_profile_id = 4 AND sender_id = 4 AND recipient_id = 4 AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC '
self . common_check_get_old_messages_query ( { ' anchor ' : 0 , ' num_before ' : 0 , ' num_after ' : 10 ,
' narrow ' : ' [[ " pm-with " , " hamlet@zulip.com " ]] ' } ,
sql )
sql_template = ' SELECT anon_1.message_id, anon_1.flags \n FROM (SELECT message_id, flags \n FROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \n WHERE user_profile_id = 4 AND recipient_id = {scotland_recipient} AND (flags & 2) != 0 AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC '
sql = sql_template . format ( * * query_ids )
self . common_check_get_old_messages_query ( { ' anchor ' : 0 , ' num_before ' : 0 , ' num_after ' : 10 ,
' narrow ' : ' [[ " stream " , " Scotland " ], [ " is " , " starred " ]] ' } ,
sql )
def test_get_old_messages_with_search_queries ( self ) :
query_ids = self . get_query_ids ( )
sql = " SELECT anon_1.message_id, anon_1.flags, anon_1.subject, anon_1.rendered_content, anon_1.content_matches, anon_1.subject_matches \n FROM (SELECT message_id, flags, subject, rendered_content, ts_match_locs_array( ' zulip.english_us_search ' , rendered_content, plainto_tsquery( ' zulip.english_us_search ' , ' jumping ' )) AS content_matches, ts_match_locs_array( ' zulip.english_us_search ' , escape_html(subject), plainto_tsquery( ' zulip.english_us_search ' , ' jumping ' )) AS subject_matches \n FROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \n WHERE user_profile_id = 4 AND (search_tsvector @@ plainto_tsquery( ' zulip.english_us_search ' , ' jumping ' )) AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC "
self . common_check_get_old_messages_query ( { ' anchor ' : 0 , ' num_before ' : 0 , ' num_after ' : 10 ,
' narrow ' : ' [[ " search " , " jumping " ]] ' } ,
sql )
sql_template = " SELECT anon_1.message_id, anon_1.subject, anon_1.rendered_content, anon_1.content_matches, anon_1.subject_matches \n FROM (SELECT id AS message_id, subject, rendered_content, ts_match_locs_array( ' zulip.english_us_search ' , rendered_content, plainto_tsquery( ' zulip.english_us_search ' , ' jumping ' )) AS content_matches, ts_match_locs_array( ' zulip.english_us_search ' , escape_html(subject), plainto_tsquery( ' zulip.english_us_search ' , ' jumping ' )) AS subject_matches \n FROM zerver_message \n WHERE recipient_id = {scotland_recipient} AND (search_tsvector @@ plainto_tsquery( ' zulip.english_us_search ' , ' jumping ' )) AND zerver_message.id >= 0 ORDER BY zerver_message.id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC "
sql = sql_template . format ( * * query_ids )
self . common_check_get_old_messages_query ( { ' anchor ' : 0 , ' num_before ' : 0 , ' num_after ' : 10 ,
' narrow ' : ' [[ " stream " , " Scotland " ], [ " search " , " jumping " ]] ' } ,
sql )
sql = ' SELECT anon_1.message_id, anon_1.flags, anon_1.subject, anon_1.rendered_content, anon_1.content_matches, anon_1.subject_matches \n FROM (SELECT message_id, flags, subject, rendered_content, ts_match_locs_array( \' zulip.english_us_search \' , rendered_content, plainto_tsquery( \' zulip.english_us_search \' , \' " jumping " quickly \' )) AS content_matches, ts_match_locs_array( \' zulip.english_us_search \' , escape_html(subject), plainto_tsquery( \' zulip.english_us_search \' , \' " jumping " quickly \' )) AS subject_matches \n FROM zerver_usermessage JOIN zerver_message ON zerver_usermessage.message_id = zerver_message.id \n WHERE user_profile_id = 4 AND (content ILIKE \' % jumping % \' OR subject ILIKE \' % jumping % \' ) AND (search_tsvector @@ plainto_tsquery( \' zulip.english_us_search \' , \' " jumping " quickly \' )) AND message_id >= 0 ORDER BY message_id ASC \n LIMIT 10) AS anon_1 ORDER BY message_id ASC '
self . common_check_get_old_messages_query ( { ' anchor ' : 0 , ' num_before ' : 0 , ' num_after ' : 10 ,
' narrow ' : ' [[ " search " , " \\ " jumping \\ " quickly " ]] ' } ,
sql )
class EditMessageTest ( AuthedTestCase ) :
def check_message ( self , msg_id , subject = None , content = None ) :
msg = Message . objects . get ( id = msg_id )
cached = msg . to_dict ( False )
uncached = msg . to_dict_uncached ( False )
self . assertEqual ( cached , uncached )
if subject :
self . assertEqual ( msg . subject , subject )
if content :
self . assertEqual ( msg . content , content )
return msg
def test_save_message ( self ) :
# This is also tested by a client test, but here we can verify
# the cache against the database
self . login ( " hamlet@zulip.com " )
msg_id = self . send_message ( " hamlet@zulip.com " , " Scotland " , Recipient . STREAM ,
subject = " editing " , content = " before edit " )
result = self . client . post ( " /json/update_message " , {
' message_id ' : msg_id ,
' content ' : ' after edit '
} )
self . assert_json_success ( result )
self . check_message ( msg_id , content = " after edit " )
result = self . client . post ( " /json/update_message " , {
' message_id ' : msg_id ,
' subject ' : ' edited '
} )
self . assert_json_success ( result )
self . check_message ( msg_id , subject = " edited " )
def test_propagate_topic_forward ( self ) :
self . login ( " hamlet@zulip.com " )
id1 = self . send_message ( " hamlet@zulip.com " , " Scotland " , Recipient . STREAM ,
subject = " topic1 " )
id2 = self . send_message ( " iago@zulip.com " , " Scotland " , Recipient . STREAM ,
subject = " topic1 " )
id3 = self . send_message ( " iago@zulip.com " , " Rome " , Recipient . STREAM ,
subject = " topic1 " )
id4 = self . send_message ( " hamlet@zulip.com " , " Scotland " , Recipient . STREAM ,
subject = " topic2 " )
id5 = self . send_message ( " iago@zulip.com " , " Scotland " , Recipient . STREAM ,
subject = " topic1 " )
result = self . client . post ( " /json/update_message " , {
' message_id ' : id1 ,
' subject ' : ' edited ' ,
' propagate_mode ' : ' change_later '
} )
self . assert_json_success ( result )
self . check_message ( id1 , subject = " edited " )
self . check_message ( id2 , subject = " edited " )
self . check_message ( id3 , subject = " topic1 " )
self . check_message ( id4 , subject = " topic2 " )
self . check_message ( id5 , subject = " edited " )
def test_propagate_all_topics ( self ) :
self . login ( " hamlet@zulip.com " )
id1 = self . send_message ( " hamlet@zulip.com " , " Scotland " , Recipient . STREAM ,
subject = " topic1 " )
id2 = self . send_message ( " hamlet@zulip.com " , " Scotland " , Recipient . STREAM ,
subject = " topic1 " )
id3 = self . send_message ( " iago@zulip.com " , " Rome " , Recipient . STREAM ,
subject = " topic1 " )
id4 = self . send_message ( " hamlet@zulip.com " , " Scotland " , Recipient . STREAM ,
subject = " topic2 " )
id5 = self . send_message ( " iago@zulip.com " , " Scotland " , Recipient . STREAM ,
subject = " topic1 " )
id6 = self . send_message ( " iago@zulip.com " , " Scotland " , Recipient . STREAM ,
subject = " topic3 " )
result = self . client . post ( " /json/update_message " , {
' message_id ' : id2 ,
' subject ' : ' edited ' ,
' propagate_mode ' : ' change_all '
} )
self . assert_json_success ( result )
self . check_message ( id1 , subject = " edited " )
self . check_message ( id2 , subject = " edited " )
self . check_message ( id3 , subject = " topic1 " )
self . check_message ( id4 , subject = " topic2 " )
self . check_message ( id5 , subject = " edited " )
self . check_message ( id6 , subject = " topic3 " )
class StarTests ( AuthedTestCase ) :
def change_star ( self , messages , add = True ) :
return self . client . post ( " /json/update_message_flags " ,
{ " messages " : ujson . dumps ( messages ) ,
" op " : " add " if add else " remove " ,
" flag " : " starred " } )
def test_change_star ( self ) :
"""
You can set a message as starred / un - starred through
/ json / update_message_flags .
"""
self . login ( " hamlet@zulip.com " )
message_ids = [ self . send_message ( " hamlet@zulip.com " , " hamlet@zulip.com " ,
Recipient . PERSONAL , " test " ) ]
# Star a message.
result = self . change_star ( message_ids )
self . assert_json_success ( result )
for msg in self . get_old_messages ( ) :
if msg [ ' id ' ] in message_ids :
self . assertEqual ( msg [ ' flags ' ] , [ ' starred ' ] )
else :
self . assertEqual ( msg [ ' flags ' ] , [ ' read ' ] )
result = self . change_star ( message_ids , False )
self . assert_json_success ( result )
# Remove the stars.
for msg in self . get_old_messages ( ) :
if msg [ ' id ' ] in message_ids :
self . assertEqual ( msg [ ' flags ' ] , [ ] )
def test_new_message ( self ) :
"""
New messages aren ' t starred.
"""
test_email = " hamlet@zulip.com "
self . login ( test_email )
content = " Test message for star "
self . send_message ( test_email , " Verona " , Recipient . STREAM ,
content = content )
sent_message = UserMessage . objects . filter (
user_profile = get_user_profile_by_email ( test_email )
) . order_by ( " id " ) . reverse ( ) [ 0 ]
self . assertEqual ( sent_message . message . content , content )
self . assertFalse ( sent_message . flags . starred )
class CheckMessageTest ( AuthedTestCase ) :
def test_basic_check_message_call ( self ) :
sender = get_user_profile_by_email ( ' othello@zulip.com ' )
client , _ = Client . objects . get_or_create ( name = " test suite " )
stream_name = ' integration '
stream , _ = create_stream_if_needed ( Realm . objects . get ( domain = " zulip.com " ) , stream_name )
message_type_name = ' stream '
message_to = None
message_to = [ stream_name ]
subject_name = ' issue '
message_content = ' whatever '
ret = check_message ( sender , client , message_type_name , message_to ,
subject_name , message_content )
self . assertEqual ( ret [ ' message ' ] . sender . email , ' othello@zulip.com ' )
def test_bot_pm_feature ( self ) :
# We send a PM to a bot's owner if their bot sends a message to
# an unsubscribed stream
parent = get_user_profile_by_email ( ' othello@zulip.com ' )
bot = do_create_user (
email = ' othello-bot@zulip.com ' ,
password = ' ' ,
realm = parent . realm ,
full_name = ' ' ,
short_name = ' ' ,
active = True ,
bot = True ,
bot_owner = parent
)
bot . last_reminder = None
sender = bot
client , _ = Client . objects . get_or_create ( name = " test suite " )
stream_name = ' integration '
stream , _ = create_stream_if_needed ( Realm . objects . get ( domain = " zulip.com " ) , stream_name )
message_type_name = ' stream '
message_to = None
message_to = [ stream_name ]
subject_name = ' issue '
message_content = ' whatever '
old_count = message_stream_count ( parent )
ret = check_message ( sender , client , message_type_name , message_to ,
subject_name , message_content )
new_count = message_stream_count ( parent )
self . assertEqual ( new_count , old_count + 1 )
self . assertEqual ( ret [ ' message ' ] . sender . email , ' othello-bot@zulip.com ' )