diff --git a/zerver/tests/test_message_flags.py b/zerver/tests/test_message_flags.py index f4b5dbb5ab..8714a3d713 100644 --- a/zerver/tests/test_message_flags.py +++ b/zerver/tests/test_message_flags.py @@ -3,13 +3,16 @@ from unittest import mock import ujson from django.db import connection +from django.http import HttpResponse +from zerver.lib.actions import do_change_stream_invite_only from zerver.lib.fix_unreads import fix, fix_unsubscribed from zerver.lib.message import ( MessageDict, UnreadMessagesResult, aggregate_unread_data, apply_unread_message_event, + bulk_access_messages, get_raw_unread_data, ) from zerver.lib.test_classes import ZulipTestCase @@ -854,3 +857,363 @@ class GetUnreadMsgsTest(ZulipTestCase): um.save() result = get_unread_data() self.assertEqual(result['mentions'], []) + +class MessageAccessTests(ZulipTestCase): + def test_update_invalid_flags(self) -> None: + message = self.send_personal_message( + self.example_user("cordelia"), + self.example_user("hamlet"), + "hello", + ) + + self.login('hamlet') + result = self.client_post("/json/messages/flags", + {"messages": ujson.dumps([message]), + "op": "add", + "flag": "invalid"}) + self.assert_json_error(result, "Invalid flag: 'invalid'") + + result = self.client_post("/json/messages/flags", + {"messages": ujson.dumps([message]), + "op": "add", + "flag": "is_private"}) + self.assert_json_error(result, "Invalid flag: 'is_private'") + + result = self.client_post("/json/messages/flags", + {"messages": ujson.dumps([message]), + "op": "add", + "flag": "active_mobile_push_notification"}) + self.assert_json_error(result, "Invalid flag: 'active_mobile_push_notification'") + + result = self.client_post("/json/messages/flags", + {"messages": ujson.dumps([message]), + "op": "add", + "flag": "mentioned"}) + self.assert_json_error(result, "Flag not editable: 'mentioned'") + + def change_star(self, messages: List[int], add: bool=True, **kwargs: Any) -> HttpResponse: + return self.client_post("/json/messages/flags", + {"messages": ujson.dumps(messages), + "op": "add" if add else "remove", + "flag": "starred"}, + **kwargs) + + def test_change_star(self) -> None: + """ + You can set a message as starred/un-starred through + POST /json/messages/flags. + """ + self.login('hamlet') + message_ids = [self.send_personal_message(self.example_user("hamlet"), + self.example_user("hamlet"), + "test")] + + # Star a message. + result = self.change_star(message_ids) + self.assert_json_success(result) + + for msg in self.get_messages(): + if msg['id'] in message_ids: + self.assertEqual(msg['flags'], ['starred']) + else: + self.assertEqual(msg['flags'], ['read']) + + result = self.change_star(message_ids, False) + self.assert_json_success(result) + + # Remove the stars. + for msg in self.get_messages(): + if msg['id'] in message_ids: + self.assertEqual(msg['flags'], []) + + def test_change_star_public_stream_historical(self) -> None: + """ + You can set a message as starred/un-starred through + POST /json/messages/flags. + """ + stream_name = "new_stream" + self.subscribe(self.example_user("hamlet"), stream_name) + self.login('hamlet') + message_ids = [ + self.send_stream_message(self.example_user("hamlet"), stream_name, "test"), + ] + # Send a second message so we can verify it isn't modified + other_message_ids = [ + self.send_stream_message(self.example_user("hamlet"), stream_name, "test_unused"), + ] + received_message_ids = [ + self.send_personal_message( + self.example_user("hamlet"), + self.example_user("cordelia"), + "test_received", + ), + ] + + # Now login as another user who wasn't on that stream + self.login('cordelia') + # Send a message to yourself to make sure we have at least one with the read flag + sent_message_ids = [ + self.send_personal_message( + self.example_user("cordelia"), + self.example_user("cordelia"), + "test_read_message", + ), + ] + result = self.client_post("/json/messages/flags", + {"messages": ujson.dumps(sent_message_ids), + "op": "add", + "flag": "read"}) + + # We can't change flags other than "starred" on historical messages: + result = self.client_post("/json/messages/flags", + {"messages": ujson.dumps(message_ids), + "op": "add", + "flag": "read"}) + self.assert_json_error(result, 'Invalid message(s)') + + # Trying to change a list of more than one historical message fails + result = self.change_star(message_ids * 2) + self.assert_json_error(result, 'Invalid message(s)') + + # Confirm that one can change the historical flag now + result = self.change_star(message_ids) + self.assert_json_success(result) + + for msg in self.get_messages(): + if msg['id'] in message_ids: + self.assertEqual(set(msg['flags']), {'starred', 'historical', 'read'}) + elif msg['id'] in received_message_ids: + self.assertEqual(msg['flags'], []) + else: + self.assertEqual(msg['flags'], ['read']) + self.assertNotIn(msg['id'], other_message_ids) + + result = self.change_star(message_ids, False) + self.assert_json_success(result) + + # But it still doesn't work if you're in another realm + user = self.mit_user('sipbtest') + self.login_user(user) + result = self.change_star(message_ids, subdomain="zephyr") + self.assert_json_error(result, 'Invalid message(s)') + + def test_change_star_private_message_security(self) -> None: + """ + You can set a message as starred/un-starred through + POST /json/messages/flags. + """ + self.login('hamlet') + message_ids = [ + self.send_personal_message( + self.example_user("hamlet"), + self.example_user("hamlet"), + "test", + ), + ] + + # Starring private messages you didn't receive fails. + self.login('cordelia') + result = self.change_star(message_ids) + self.assert_json_error(result, 'Invalid message(s)') + + def test_change_star_private_stream_security(self) -> None: + stream_name = "private_stream" + self.make_stream(stream_name, invite_only=True) + self.subscribe(self.example_user("hamlet"), stream_name) + self.login('hamlet') + message_ids = [ + self.send_stream_message(self.example_user("hamlet"), stream_name, "test"), + ] + + # Starring private stream messages you received works + result = self.change_star(message_ids) + self.assert_json_success(result) + + # Starring private stream messages you didn't receive fails. + self.login('cordelia') + result = self.change_star(message_ids) + self.assert_json_error(result, 'Invalid message(s)') + + stream_name = "private_stream_2" + self.make_stream(stream_name, invite_only=True, + history_public_to_subscribers=True) + self.subscribe(self.example_user("hamlet"), stream_name) + self.login('hamlet') + message_ids = [ + self.send_stream_message(self.example_user("hamlet"), stream_name, "test"), + ] + + # With stream.history_public_to_subscribers = True, you still + # can't see it if you didn't receive the message and are + # not subscribed. + self.login('cordelia') + result = self.change_star(message_ids) + self.assert_json_error(result, 'Invalid message(s)') + + # But if you subscribe, then you can star the message + self.subscribe(self.example_user("cordelia"), stream_name) + result = self.change_star(message_ids) + self.assert_json_success(result) + + def test_new_message(self) -> None: + """ + New messages aren't starred. + """ + sender = self.example_user('hamlet') + self.login_user(sender) + content = "Test message for star" + self.send_stream_message(sender, "Verona", + content=content) + + sent_message = UserMessage.objects.filter( + user_profile=self.example_user('hamlet'), + ).order_by("id").reverse()[0] + self.assertEqual(sent_message.message.content, content) + self.assertFalse(sent_message.flags.starred) + + def test_change_star_public_stream_security_for_guest_user(self) -> None: + # Guest user can't access(star) unsubscribed public stream messages + normal_user = self.example_user("hamlet") + stream_name = "public_stream" + self.make_stream(stream_name) + self.subscribe(normal_user, stream_name) + self.login_user(normal_user) + + message_id = [ + self.send_stream_message(normal_user, stream_name, "test 1"), + ] + + guest_user = self.example_user('polonius') + self.login_user(guest_user) + result = self.change_star(message_id) + self.assert_json_error(result, 'Invalid message(s)') + + # Subscribed guest users can access public stream messages sent before they join + self.subscribe(guest_user, stream_name) + result = self.change_star(message_id) + self.assert_json_success(result) + + # And messages sent after they join + self.login_user(normal_user) + message_id = [ + self.send_stream_message(normal_user, stream_name, "test 2"), + ] + self.login_user(guest_user) + result = self.change_star(message_id) + self.assert_json_success(result) + + def test_change_star_private_stream_security_for_guest_user(self) -> None: + # Guest users can't access(star) unsubscribed private stream messages + normal_user = self.example_user("hamlet") + stream_name = "private_stream" + stream = self.make_stream(stream_name, invite_only=True) + self.subscribe(normal_user, stream_name) + self.login_user(normal_user) + + message_id = [ + self.send_stream_message(normal_user, stream_name, "test 1"), + ] + + guest_user = self.example_user('polonius') + self.login_user(guest_user) + result = self.change_star(message_id) + self.assert_json_error(result, 'Invalid message(s)') + + # Guest user can't access messages of subscribed private streams if + # history is not public to subscribers + self.subscribe(guest_user, stream_name) + result = self.change_star(message_id) + self.assert_json_error(result, 'Invalid message(s)') + + # Guest user can access messages of subscribed private streams if + # history is public to subscribers + do_change_stream_invite_only(stream, True, history_public_to_subscribers=True) + result = self.change_star(message_id) + self.assert_json_success(result) + + # With history not public to subscribers, they can still see new messages + do_change_stream_invite_only(stream, True, history_public_to_subscribers=False) + self.login_user(normal_user) + message_id = [ + self.send_stream_message(normal_user, stream_name, "test 2"), + ] + self.login_user(guest_user) + result = self.change_star(message_id) + self.assert_json_success(result) + + def test_bulk_access_messages_private_stream(self) -> None: + user = self.example_user("hamlet") + self.login_user(user) + + stream_name = "private_stream" + stream = self.make_stream(stream_name, invite_only=True, + history_public_to_subscribers=False) + + self.subscribe(user, stream_name) + # Send a message before subscribing a new user to stream + message_one_id = self.send_stream_message(user, + stream_name, "Message one") + + later_subscribed_user = self.example_user("cordelia") + # Subscribe a user to private-protected history stream + self.subscribe(later_subscribed_user, stream_name) + + # Send a message after subscribing a new user to stream + message_two_id = self.send_stream_message(user, + stream_name, "Message two") + + message_ids = [message_one_id, message_two_id] + messages = [Message.objects.select_related().get(id=message_id) + for message_id in message_ids] + + filtered_messages = bulk_access_messages(later_subscribed_user, messages) + + # Message sent before subscribing wouldn't be accessible by later + # subscribed user as stream has protected history + self.assertEqual(len(filtered_messages), 1) + self.assertEqual(filtered_messages[0].id, message_two_id) + + do_change_stream_invite_only(stream, True, history_public_to_subscribers=True) + + filtered_messages = bulk_access_messages(later_subscribed_user, messages) + + # Message sent before subscribing are accessible by 8user as stream + # don't have protected history + self.assertEqual(len(filtered_messages), 2) + + # Testing messages accessiblity for an unsubscribed user + unsubscribed_user = self.example_user("ZOE") + + filtered_messages = bulk_access_messages(unsubscribed_user, messages) + + self.assertEqual(len(filtered_messages), 0) + + def test_bulk_access_messages_public_stream(self) -> None: + user = self.example_user("hamlet") + self.login_user(user) + + # Testing messages accessiblity including a public stream message + stream_name = "public_stream" + self.subscribe(user, stream_name) + message_one_id = self.send_stream_message(user, + stream_name, "Message one") + + later_subscribed_user = self.example_user("cordelia") + self.subscribe(later_subscribed_user, stream_name) + + # Send a message after subscribing a new user to stream + message_two_id = self.send_stream_message(user, + stream_name, "Message two") + + message_ids = [message_one_id, message_two_id] + messages = [Message.objects.select_related().get(id=message_id) + for message_id in message_ids] + + # All public stream messages are always accessible + filtered_messages = bulk_access_messages(later_subscribed_user, messages) + self.assertEqual(len(filtered_messages), 2) + + unsubscribed_user = self.example_user("ZOE") + filtered_messages = bulk_access_messages(unsubscribed_user, messages) + + self.assertEqual(len(filtered_messages), 2) diff --git a/zerver/tests/test_messages.py b/zerver/tests/test_messages.py index 59f5425757..35da3b2705 100644 --- a/zerver/tests/test_messages.py +++ b/zerver/tests/test_messages.py @@ -1,9 +1,7 @@ import datetime -from typing import Any, Dict, List +from typing import Dict, List from unittest import mock -import ujson -from django.http import HttpResponse from django.utils.timezone import now as timezone_now from analytics.lib.counts import COUNT_STATS @@ -11,7 +9,6 @@ from analytics.models import RealmCount from zerver.decorator import JsonableError from zerver.lib.actions import ( check_message, - do_change_stream_invite_only, do_claim_attachments, do_create_user, do_update_message, @@ -24,7 +21,6 @@ from zerver.lib.actions import ( from zerver.lib.addressee import Addressee from zerver.lib.markdown import MentionData from zerver.lib.message import ( - bulk_access_messages, get_first_visible_message_id, maybe_update_first_visible_message_id, render_markdown, @@ -45,7 +41,6 @@ from zerver.models import ( Message, Recipient, Subscription, - UserMessage, UserPresence, UserProfile, bulk_get_huddle_user_ids, @@ -125,366 +120,6 @@ class PersonalMessagesTest(ZulipTestCase): f'', ) -class MessageAccessTests(ZulipTestCase): - def test_update_invalid_flags(self) -> None: - message = self.send_personal_message( - self.example_user("cordelia"), - self.example_user("hamlet"), - "hello", - ) - - self.login('hamlet') - result = self.client_post("/json/messages/flags", - {"messages": ujson.dumps([message]), - "op": "add", - "flag": "invalid"}) - self.assert_json_error(result, "Invalid flag: 'invalid'") - - result = self.client_post("/json/messages/flags", - {"messages": ujson.dumps([message]), - "op": "add", - "flag": "is_private"}) - self.assert_json_error(result, "Invalid flag: 'is_private'") - - result = self.client_post("/json/messages/flags", - {"messages": ujson.dumps([message]), - "op": "add", - "flag": "active_mobile_push_notification"}) - self.assert_json_error(result, "Invalid flag: 'active_mobile_push_notification'") - - result = self.client_post("/json/messages/flags", - {"messages": ujson.dumps([message]), - "op": "add", - "flag": "mentioned"}) - self.assert_json_error(result, "Flag not editable: 'mentioned'") - - def change_star(self, messages: List[int], add: bool=True, **kwargs: Any) -> HttpResponse: - return self.client_post("/json/messages/flags", - {"messages": ujson.dumps(messages), - "op": "add" if add else "remove", - "flag": "starred"}, - **kwargs) - - def test_change_star(self) -> None: - """ - You can set a message as starred/un-starred through - POST /json/messages/flags. - """ - self.login('hamlet') - message_ids = [self.send_personal_message(self.example_user("hamlet"), - self.example_user("hamlet"), - "test")] - - # Star a message. - result = self.change_star(message_ids) - self.assert_json_success(result) - - for msg in self.get_messages(): - if msg['id'] in message_ids: - self.assertEqual(msg['flags'], ['starred']) - else: - self.assertEqual(msg['flags'], ['read']) - - result = self.change_star(message_ids, False) - self.assert_json_success(result) - - # Remove the stars. - for msg in self.get_messages(): - if msg['id'] in message_ids: - self.assertEqual(msg['flags'], []) - - def test_change_star_public_stream_historical(self) -> None: - """ - You can set a message as starred/un-starred through - POST /json/messages/flags. - """ - stream_name = "new_stream" - self.subscribe(self.example_user("hamlet"), stream_name) - self.login('hamlet') - message_ids = [ - self.send_stream_message(self.example_user("hamlet"), stream_name, "test"), - ] - # Send a second message so we can verify it isn't modified - other_message_ids = [ - self.send_stream_message(self.example_user("hamlet"), stream_name, "test_unused"), - ] - received_message_ids = [ - self.send_personal_message( - self.example_user("hamlet"), - self.example_user("cordelia"), - "test_received", - ), - ] - - # Now login as another user who wasn't on that stream - self.login('cordelia') - # Send a message to yourself to make sure we have at least one with the read flag - sent_message_ids = [ - self.send_personal_message( - self.example_user("cordelia"), - self.example_user("cordelia"), - "test_read_message", - ), - ] - result = self.client_post("/json/messages/flags", - {"messages": ujson.dumps(sent_message_ids), - "op": "add", - "flag": "read"}) - - # We can't change flags other than "starred" on historical messages: - result = self.client_post("/json/messages/flags", - {"messages": ujson.dumps(message_ids), - "op": "add", - "flag": "read"}) - self.assert_json_error(result, 'Invalid message(s)') - - # Trying to change a list of more than one historical message fails - result = self.change_star(message_ids * 2) - self.assert_json_error(result, 'Invalid message(s)') - - # Confirm that one can change the historical flag now - result = self.change_star(message_ids) - self.assert_json_success(result) - - for msg in self.get_messages(): - if msg['id'] in message_ids: - self.assertEqual(set(msg['flags']), {'starred', 'historical', 'read'}) - elif msg['id'] in received_message_ids: - self.assertEqual(msg['flags'], []) - else: - self.assertEqual(msg['flags'], ['read']) - self.assertNotIn(msg['id'], other_message_ids) - - result = self.change_star(message_ids, False) - self.assert_json_success(result) - - # But it still doesn't work if you're in another realm - user = self.mit_user('sipbtest') - self.login_user(user) - result = self.change_star(message_ids, subdomain="zephyr") - self.assert_json_error(result, 'Invalid message(s)') - - def test_change_star_private_message_security(self) -> None: - """ - You can set a message as starred/un-starred through - POST /json/messages/flags. - """ - self.login('hamlet') - message_ids = [ - self.send_personal_message( - self.example_user("hamlet"), - self.example_user("hamlet"), - "test", - ), - ] - - # Starring private messages you didn't receive fails. - self.login('cordelia') - result = self.change_star(message_ids) - self.assert_json_error(result, 'Invalid message(s)') - - def test_change_star_private_stream_security(self) -> None: - stream_name = "private_stream" - self.make_stream(stream_name, invite_only=True) - self.subscribe(self.example_user("hamlet"), stream_name) - self.login('hamlet') - message_ids = [ - self.send_stream_message(self.example_user("hamlet"), stream_name, "test"), - ] - - # Starring private stream messages you received works - result = self.change_star(message_ids) - self.assert_json_success(result) - - # Starring private stream messages you didn't receive fails. - self.login('cordelia') - result = self.change_star(message_ids) - self.assert_json_error(result, 'Invalid message(s)') - - stream_name = "private_stream_2" - self.make_stream(stream_name, invite_only=True, - history_public_to_subscribers=True) - self.subscribe(self.example_user("hamlet"), stream_name) - self.login('hamlet') - message_ids = [ - self.send_stream_message(self.example_user("hamlet"), stream_name, "test"), - ] - - # With stream.history_public_to_subscribers = True, you still - # can't see it if you didn't receive the message and are - # not subscribed. - self.login('cordelia') - result = self.change_star(message_ids) - self.assert_json_error(result, 'Invalid message(s)') - - # But if you subscribe, then you can star the message - self.subscribe(self.example_user("cordelia"), stream_name) - result = self.change_star(message_ids) - self.assert_json_success(result) - - def test_new_message(self) -> None: - """ - New messages aren't starred. - """ - sender = self.example_user('hamlet') - self.login_user(sender) - content = "Test message for star" - self.send_stream_message(sender, "Verona", - content=content) - - sent_message = UserMessage.objects.filter( - user_profile=self.example_user('hamlet'), - ).order_by("id").reverse()[0] - self.assertEqual(sent_message.message.content, content) - self.assertFalse(sent_message.flags.starred) - - def test_change_star_public_stream_security_for_guest_user(self) -> None: - # Guest user can't access(star) unsubscribed public stream messages - normal_user = self.example_user("hamlet") - stream_name = "public_stream" - self.make_stream(stream_name) - self.subscribe(normal_user, stream_name) - self.login_user(normal_user) - - message_id = [ - self.send_stream_message(normal_user, stream_name, "test 1"), - ] - - guest_user = self.example_user('polonius') - self.login_user(guest_user) - result = self.change_star(message_id) - self.assert_json_error(result, 'Invalid message(s)') - - # Subscribed guest users can access public stream messages sent before they join - self.subscribe(guest_user, stream_name) - result = self.change_star(message_id) - self.assert_json_success(result) - - # And messages sent after they join - self.login_user(normal_user) - message_id = [ - self.send_stream_message(normal_user, stream_name, "test 2"), - ] - self.login_user(guest_user) - result = self.change_star(message_id) - self.assert_json_success(result) - - def test_change_star_private_stream_security_for_guest_user(self) -> None: - # Guest users can't access(star) unsubscribed private stream messages - normal_user = self.example_user("hamlet") - stream_name = "private_stream" - stream = self.make_stream(stream_name, invite_only=True) - self.subscribe(normal_user, stream_name) - self.login_user(normal_user) - - message_id = [ - self.send_stream_message(normal_user, stream_name, "test 1"), - ] - - guest_user = self.example_user('polonius') - self.login_user(guest_user) - result = self.change_star(message_id) - self.assert_json_error(result, 'Invalid message(s)') - - # Guest user can't access messages of subscribed private streams if - # history is not public to subscribers - self.subscribe(guest_user, stream_name) - result = self.change_star(message_id) - self.assert_json_error(result, 'Invalid message(s)') - - # Guest user can access messages of subscribed private streams if - # history is public to subscribers - do_change_stream_invite_only(stream, True, history_public_to_subscribers=True) - result = self.change_star(message_id) - self.assert_json_success(result) - - # With history not public to subscribers, they can still see new messages - do_change_stream_invite_only(stream, True, history_public_to_subscribers=False) - self.login_user(normal_user) - message_id = [ - self.send_stream_message(normal_user, stream_name, "test 2"), - ] - self.login_user(guest_user) - result = self.change_star(message_id) - self.assert_json_success(result) - - def test_bulk_access_messages_private_stream(self) -> None: - user = self.example_user("hamlet") - self.login_user(user) - - stream_name = "private_stream" - stream = self.make_stream(stream_name, invite_only=True, - history_public_to_subscribers=False) - - self.subscribe(user, stream_name) - # Send a message before subscribing a new user to stream - message_one_id = self.send_stream_message(user, - stream_name, "Message one") - - later_subscribed_user = self.example_user("cordelia") - # Subscribe a user to private-protected history stream - self.subscribe(later_subscribed_user, stream_name) - - # Send a message after subscribing a new user to stream - message_two_id = self.send_stream_message(user, - stream_name, "Message two") - - message_ids = [message_one_id, message_two_id] - messages = [Message.objects.select_related().get(id=message_id) - for message_id in message_ids] - - filtered_messages = bulk_access_messages(later_subscribed_user, messages) - - # Message sent before subscribing wouldn't be accessible by later - # subscribed user as stream has protected history - self.assertEqual(len(filtered_messages), 1) - self.assertEqual(filtered_messages[0].id, message_two_id) - - do_change_stream_invite_only(stream, True, history_public_to_subscribers=True) - - filtered_messages = bulk_access_messages(later_subscribed_user, messages) - - # Message sent before subscribing are accessible by 8user as stream - # don't have protected history - self.assertEqual(len(filtered_messages), 2) - - # Testing messages accessiblity for an unsubscribed user - unsubscribed_user = self.example_user("ZOE") - - filtered_messages = bulk_access_messages(unsubscribed_user, messages) - - self.assertEqual(len(filtered_messages), 0) - - def test_bulk_access_messages_public_stream(self) -> None: - user = self.example_user("hamlet") - self.login_user(user) - - # Testing messages accessiblity including a public stream message - stream_name = "public_stream" - self.subscribe(user, stream_name) - message_one_id = self.send_stream_message(user, - stream_name, "Message one") - - later_subscribed_user = self.example_user("cordelia") - self.subscribe(later_subscribed_user, stream_name) - - # Send a message after subscribing a new user to stream - message_two_id = self.send_stream_message(user, - stream_name, "Message two") - - message_ids = [message_one_id, message_two_id] - messages = [Message.objects.select_related().get(id=message_id) - for message_id in message_ids] - - # All public stream messages are always accessible - filtered_messages = bulk_access_messages(later_subscribed_user, messages) - self.assertEqual(len(filtered_messages), 2) - - unsubscribed_user = self.example_user("ZOE") - filtered_messages = bulk_access_messages(unsubscribed_user, messages) - - self.assertEqual(len(filtered_messages), 2) - class MessageHasKeywordsTest(ZulipTestCase): '''Test for keywords like has_link, has_image, has_attachment.'''