mirror of https://github.com/zulip/zulip.git
309 lines
13 KiB
Python
309 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
|
from __future__ import absolute_import
|
|
|
|
import ujson
|
|
from typing import Any, Dict, List
|
|
from six import string_types
|
|
|
|
from zerver.lib.test_helpers import tornado_redirected_to_list, get_display_recipient, \
|
|
get_test_image_file
|
|
from zerver.lib.test_classes import ZulipTestCase
|
|
from zerver.models import get_realm, RealmEmoji, Recipient, UserMessage
|
|
|
|
class ReactionEmojiTest(ZulipTestCase):
|
|
def test_missing_emoji(self):
|
|
# type: () -> None
|
|
"""
|
|
Sending reaction without emoji fails
|
|
"""
|
|
sender = self.example_email("hamlet")
|
|
result = self.client_put('/api/v1/messages/1/emoji_reactions/',
|
|
**self.api_auth(sender))
|
|
self.assertEqual(result.status_code, 400)
|
|
|
|
def test_add_invalid_emoji(self):
|
|
# type: () -> None
|
|
"""
|
|
Sending invalid emoji fails
|
|
"""
|
|
sender = self.example_email("hamlet")
|
|
result = self.client_put('/api/v1/messages/1/emoji_reactions/foo',
|
|
**self.api_auth(sender))
|
|
self.assert_json_error(result, "Emoji 'foo' does not exist")
|
|
|
|
def test_remove_invalid_emoji(self):
|
|
# type: () -> None
|
|
"""
|
|
Removing invalid emoji fails
|
|
"""
|
|
sender = self.example_email("hamlet")
|
|
result = self.client_delete('/api/v1/messages/1/emoji_reactions/foo',
|
|
**self.api_auth(sender))
|
|
self.assert_json_error(result, "Emoji 'foo' does not exist")
|
|
|
|
def test_add_deactivated_realm_emoji(self):
|
|
# type: () -> None
|
|
"""
|
|
Sending deactivated realm emoji fails.
|
|
"""
|
|
email = "iago@zulip.com"
|
|
self.login(email)
|
|
with get_test_image_file('img.png') as fp1:
|
|
emoji_data = {'f1': fp1}
|
|
result = self.client_post('/json/realm/emoji/my_emoji', info=emoji_data)
|
|
self.assert_json_success(result)
|
|
emoji = RealmEmoji.objects.get(name="my_emoji")
|
|
emoji.deactivated = True
|
|
emoji.save(update_fields=['deactivated'])
|
|
sender = 'hamlet@zulip.com'
|
|
result = self.client_put('/api/v1/messages/1/emoji_reactions/my_emoji',
|
|
**self.api_auth(sender))
|
|
self.assert_json_error(result, "Emoji 'my_emoji' does not exist")
|
|
|
|
def test_valid_emoji(self):
|
|
# type: () -> None
|
|
"""
|
|
Reacting with valid emoji succeeds
|
|
"""
|
|
sender = self.example_email("hamlet")
|
|
result = self.client_put('/api/v1/messages/1/emoji_reactions/smile',
|
|
**self.api_auth(sender))
|
|
self.assert_json_success(result)
|
|
self.assertEqual(200, result.status_code)
|
|
|
|
def test_zulip_emoji(self):
|
|
# type: () -> None
|
|
"""
|
|
Reacting with zulip emoji succeeds
|
|
"""
|
|
sender = self.example_email("hamlet")
|
|
result = self.client_put('/api/v1/messages/1/emoji_reactions/zulip',
|
|
**self.api_auth(sender))
|
|
self.assert_json_success(result)
|
|
self.assertEqual(200, result.status_code)
|
|
|
|
def test_valid_emoji_react_historical(self):
|
|
# type: () -> None
|
|
"""
|
|
Reacting with valid emoji on a historical message succeeds
|
|
"""
|
|
realm = get_realm("zulip")
|
|
stream_name = "Saxony"
|
|
self.subscribe_to_stream(self.example_email("cordelia"), stream_name, realm=realm)
|
|
message_id = self.send_message(self.example_email("cordelia"), stream_name, Recipient.STREAM)
|
|
|
|
user_profile = self.example_user('hamlet')
|
|
sender = user_profile.email
|
|
|
|
# Verify that hamlet did not receive the message.
|
|
self.assertFalse(UserMessage.objects.filter(user_profile=user_profile,
|
|
message_id=message_id).exists())
|
|
|
|
# Have hamlet react to the message
|
|
result = self.client_put('/api/v1/messages/%s/emoji_reactions/smile' % (message_id,),
|
|
**self.api_auth(sender))
|
|
self.assert_json_success(result)
|
|
|
|
# Fetch the now-created UserMessage object to confirm it exists and is historical
|
|
user_message = UserMessage.objects.get(user_profile=user_profile, message_id=message_id)
|
|
self.assertTrue(user_message.flags.historical)
|
|
self.assertTrue(user_message.flags.read)
|
|
self.assertFalse(user_message.flags.starred)
|
|
|
|
def test_valid_realm_emoji(self):
|
|
# type: () -> None
|
|
"""
|
|
Reacting with valid realm emoji succeeds
|
|
"""
|
|
sender = self.example_email("hamlet")
|
|
emoji_name = 'my_emoji'
|
|
with get_test_image_file('img.png') as fp1:
|
|
emoji_data = {'f1': fp1}
|
|
result = self.client_post('/json/realm/emoji/my_emoji', info=emoji_data,
|
|
**self.api_auth(sender))
|
|
self.assert_json_success(result)
|
|
self.assertEqual(200, result.status_code)
|
|
|
|
result = self.client_get("/json/realm/emoji", **self.api_auth(sender))
|
|
content = ujson.loads(result.content)
|
|
self.assert_json_success(result)
|
|
self.assertTrue(emoji_name in content["emoji"])
|
|
|
|
result = self.client_put('/api/v1/messages/1/emoji_reactions/%s' % (emoji_name,),
|
|
**self.api_auth(sender))
|
|
self.assert_json_success(result)
|
|
|
|
class ReactionMessageIDTest(ZulipTestCase):
|
|
def test_missing_message_id(self):
|
|
# type: () -> None
|
|
"""
|
|
Reacting without a message_id fails
|
|
"""
|
|
sender = self.example_email("hamlet")
|
|
result = self.client_put('/api/v1/messages//emoji_reactions/smile',
|
|
**self.api_auth(sender))
|
|
self.assertEqual(result.status_code, 404)
|
|
|
|
def test_invalid_message_id(self):
|
|
# type: () -> None
|
|
"""
|
|
Reacting to an invalid message id fails
|
|
"""
|
|
sender = self.example_email("hamlet")
|
|
result = self.client_put('/api/v1/messages/-1/emoji_reactions/smile',
|
|
**self.api_auth(sender))
|
|
self.assertEqual(result.status_code, 404)
|
|
|
|
def test_inaccessible_message_id(self):
|
|
# type: () -> None
|
|
"""
|
|
Reacting to a inaccessible (for instance, private) message fails
|
|
"""
|
|
pm_sender = self.example_email("hamlet")
|
|
pm_recipient = self.example_email("othello")
|
|
reaction_sender = self.example_email("iago")
|
|
|
|
result = self.client_post("/api/v1/messages", {"type": "private",
|
|
"content": "Test message",
|
|
"to": pm_recipient},
|
|
**self.api_auth(pm_sender))
|
|
self.assert_json_success(result)
|
|
content = ujson.loads(result.content)
|
|
pm_id = content['id']
|
|
result = self.client_put('/api/v1/messages/%s/emoji_reactions/smile' % (pm_id,),
|
|
**self.api_auth(reaction_sender))
|
|
self.assert_json_error(result, "Invalid message(s)")
|
|
|
|
class ReactionTest(ZulipTestCase):
|
|
def test_add_existing_reaction(self):
|
|
# type: () -> None
|
|
"""
|
|
Creating the same reaction twice fails
|
|
"""
|
|
pm_sender = self.example_email("hamlet")
|
|
pm_recipient = self.example_email("othello")
|
|
reaction_sender = pm_recipient
|
|
|
|
pm = self.client_post("/api/v1/messages", {"type": "private",
|
|
"content": "Test message",
|
|
"to": pm_recipient},
|
|
**self.api_auth(pm_sender))
|
|
self.assert_json_success(pm)
|
|
content = ujson.loads(pm.content)
|
|
|
|
pm_id = content['id']
|
|
first = self.client_put('/api/v1/messages/%s/emoji_reactions/smile' % (pm_id,),
|
|
**self.api_auth(reaction_sender))
|
|
self.assert_json_success(first)
|
|
second = self.client_put('/api/v1/messages/%s/emoji_reactions/smile' % (pm_id,),
|
|
**self.api_auth(reaction_sender))
|
|
self.assert_json_error(second, "Reaction already exists")
|
|
|
|
def test_remove_nonexisting_reaction(self):
|
|
# type: () -> None
|
|
"""
|
|
Removing a reaction twice fails
|
|
"""
|
|
pm_sender = self.example_email("hamlet")
|
|
pm_recipient = self.example_email("othello")
|
|
reaction_sender = pm_recipient
|
|
|
|
pm = self.client_post("/api/v1/messages", {"type": "private",
|
|
"content": "Test message",
|
|
"to": pm_recipient},
|
|
**self.api_auth(pm_sender))
|
|
self.assert_json_success(pm)
|
|
content = ujson.loads(pm.content)
|
|
pm_id = content['id']
|
|
add = self.client_put('/api/v1/messages/%s/emoji_reactions/smile' % (pm_id,),
|
|
**self.api_auth(reaction_sender))
|
|
self.assert_json_success(add)
|
|
|
|
first = self.client_delete('/api/v1/messages/%s/emoji_reactions/smile' % (pm_id,),
|
|
**self.api_auth(reaction_sender))
|
|
self.assert_json_success(first)
|
|
|
|
second = self.client_delete('/api/v1/messages/%s/emoji_reactions/smile' % (pm_id,),
|
|
**self.api_auth(reaction_sender))
|
|
self.assert_json_error(second, "Reaction does not exist")
|
|
|
|
|
|
class ReactionEventTest(ZulipTestCase):
|
|
def test_add_event(self):
|
|
# type: () -> None
|
|
"""
|
|
Recipients of the message receive the reaction event
|
|
and event contains relevant data
|
|
"""
|
|
pm_sender = self.example_user('hamlet')
|
|
pm_recipient = self.example_user('othello')
|
|
reaction_sender = pm_recipient
|
|
|
|
result = self.client_post("/api/v1/messages", {"type": "private",
|
|
"content": "Test message",
|
|
"to": pm_recipient.email},
|
|
**self.api_auth(pm_sender.email))
|
|
self.assert_json_success(result)
|
|
content = ujson.loads(result.content)
|
|
pm_id = content['id']
|
|
|
|
expected_recipient_ids = set([pm_sender.id, pm_recipient.id])
|
|
|
|
events = [] # type: List[Dict[str, Any]]
|
|
with tornado_redirected_to_list(events):
|
|
result = self.client_put('/api/v1/messages/%s/emoji_reactions/smile' % (pm_id,),
|
|
**self.api_auth(reaction_sender.email))
|
|
self.assert_json_success(result)
|
|
self.assertEqual(len(events), 1)
|
|
|
|
event = events[0]['event']
|
|
event_user_ids = set(events[0]['users'])
|
|
|
|
self.assertEqual(expected_recipient_ids, event_user_ids)
|
|
self.assertEqual(event['user']['email'], reaction_sender.email)
|
|
self.assertEqual(event['type'], 'reaction')
|
|
self.assertEqual(event['op'], 'add')
|
|
self.assertEqual(event['emoji_name'], 'smile')
|
|
self.assertEqual(event['message_id'], pm_id)
|
|
|
|
def test_remove_event(self):
|
|
# type: () -> None
|
|
"""
|
|
Recipients of the message receive the reaction event
|
|
and event contains relevant data
|
|
"""
|
|
pm_sender = self.example_user('hamlet')
|
|
pm_recipient = self.example_user('othello')
|
|
reaction_sender = pm_recipient
|
|
|
|
result = self.client_post("/api/v1/messages", {"type": "private",
|
|
"content": "Test message",
|
|
"to": pm_recipient.email},
|
|
**self.api_auth(pm_sender.email))
|
|
self.assert_json_success(result)
|
|
content = ujson.loads(result.content)
|
|
pm_id = content['id']
|
|
|
|
expected_recipient_ids = set([pm_sender.id, pm_recipient.id])
|
|
|
|
add = self.client_put('/api/v1/messages/%s/emoji_reactions/smile' % (pm_id,),
|
|
**self.api_auth(reaction_sender.email))
|
|
self.assert_json_success(add)
|
|
|
|
events = [] # type: List[Dict[str, Any]]
|
|
with tornado_redirected_to_list(events):
|
|
result = self.client_delete('/api/v1/messages/%s/emoji_reactions/smile' % (pm_id,),
|
|
**self.api_auth(reaction_sender.email))
|
|
self.assert_json_success(result)
|
|
self.assertEqual(len(events), 1)
|
|
|
|
event = events[0]['event']
|
|
event_user_ids = set(events[0]['users'])
|
|
|
|
self.assertEqual(expected_recipient_ids, event_user_ids)
|
|
self.assertEqual(event['user']['email'], reaction_sender.email)
|
|
self.assertEqual(event['type'], 'reaction')
|
|
self.assertEqual(event['op'], 'remove')
|
|
self.assertEqual(event['emoji_name'], 'smile')
|
|
self.assertEqual(event['message_id'], pm_id)
|