mirror of https://github.com/zulip/zulip.git
tests: Move MessageAccessTests to test_message_flags.py.
This commit is contained in:
parent
096ed90ac2
commit
ce948e4a80
|
@ -3,13 +3,16 @@ from unittest import mock
|
|||
|
||||
import ujson
|
||||
from django.db import connection
|
||||
from django.http import HttpResponse
|
||||
|
||||
from zerver.lib.actions import do_change_stream_invite_only
|
||||
from zerver.lib.fix_unreads import fix, fix_unsubscribed
|
||||
from zerver.lib.message import (
|
||||
MessageDict,
|
||||
UnreadMessagesResult,
|
||||
aggregate_unread_data,
|
||||
apply_unread_message_event,
|
||||
bulk_access_messages,
|
||||
get_raw_unread_data,
|
||||
)
|
||||
from zerver.lib.test_classes import ZulipTestCase
|
||||
|
@ -854,3 +857,363 @@ class GetUnreadMsgsTest(ZulipTestCase):
|
|||
um.save()
|
||||
result = get_unread_data()
|
||||
self.assertEqual(result['mentions'], [])
|
||||
|
||||
class MessageAccessTests(ZulipTestCase):
|
||||
def test_update_invalid_flags(self) -> None:
|
||||
message = self.send_personal_message(
|
||||
self.example_user("cordelia"),
|
||||
self.example_user("hamlet"),
|
||||
"hello",
|
||||
)
|
||||
|
||||
self.login('hamlet')
|
||||
result = self.client_post("/json/messages/flags",
|
||||
{"messages": ujson.dumps([message]),
|
||||
"op": "add",
|
||||
"flag": "invalid"})
|
||||
self.assert_json_error(result, "Invalid flag: 'invalid'")
|
||||
|
||||
result = self.client_post("/json/messages/flags",
|
||||
{"messages": ujson.dumps([message]),
|
||||
"op": "add",
|
||||
"flag": "is_private"})
|
||||
self.assert_json_error(result, "Invalid flag: 'is_private'")
|
||||
|
||||
result = self.client_post("/json/messages/flags",
|
||||
{"messages": ujson.dumps([message]),
|
||||
"op": "add",
|
||||
"flag": "active_mobile_push_notification"})
|
||||
self.assert_json_error(result, "Invalid flag: 'active_mobile_push_notification'")
|
||||
|
||||
result = self.client_post("/json/messages/flags",
|
||||
{"messages": ujson.dumps([message]),
|
||||
"op": "add",
|
||||
"flag": "mentioned"})
|
||||
self.assert_json_error(result, "Flag not editable: 'mentioned'")
|
||||
|
||||
def change_star(self, messages: List[int], add: bool=True, **kwargs: Any) -> HttpResponse:
|
||||
return self.client_post("/json/messages/flags",
|
||||
{"messages": ujson.dumps(messages),
|
||||
"op": "add" if add else "remove",
|
||||
"flag": "starred"},
|
||||
**kwargs)
|
||||
|
||||
def test_change_star(self) -> None:
|
||||
"""
|
||||
You can set a message as starred/un-starred through
|
||||
POST /json/messages/flags.
|
||||
"""
|
||||
self.login('hamlet')
|
||||
message_ids = [self.send_personal_message(self.example_user("hamlet"),
|
||||
self.example_user("hamlet"),
|
||||
"test")]
|
||||
|
||||
# Star a message.
|
||||
result = self.change_star(message_ids)
|
||||
self.assert_json_success(result)
|
||||
|
||||
for msg in self.get_messages():
|
||||
if msg['id'] in message_ids:
|
||||
self.assertEqual(msg['flags'], ['starred'])
|
||||
else:
|
||||
self.assertEqual(msg['flags'], ['read'])
|
||||
|
||||
result = self.change_star(message_ids, False)
|
||||
self.assert_json_success(result)
|
||||
|
||||
# Remove the stars.
|
||||
for msg in self.get_messages():
|
||||
if msg['id'] in message_ids:
|
||||
self.assertEqual(msg['flags'], [])
|
||||
|
||||
def test_change_star_public_stream_historical(self) -> None:
|
||||
"""
|
||||
You can set a message as starred/un-starred through
|
||||
POST /json/messages/flags.
|
||||
"""
|
||||
stream_name = "new_stream"
|
||||
self.subscribe(self.example_user("hamlet"), stream_name)
|
||||
self.login('hamlet')
|
||||
message_ids = [
|
||||
self.send_stream_message(self.example_user("hamlet"), stream_name, "test"),
|
||||
]
|
||||
# Send a second message so we can verify it isn't modified
|
||||
other_message_ids = [
|
||||
self.send_stream_message(self.example_user("hamlet"), stream_name, "test_unused"),
|
||||
]
|
||||
received_message_ids = [
|
||||
self.send_personal_message(
|
||||
self.example_user("hamlet"),
|
||||
self.example_user("cordelia"),
|
||||
"test_received",
|
||||
),
|
||||
]
|
||||
|
||||
# Now login as another user who wasn't on that stream
|
||||
self.login('cordelia')
|
||||
# Send a message to yourself to make sure we have at least one with the read flag
|
||||
sent_message_ids = [
|
||||
self.send_personal_message(
|
||||
self.example_user("cordelia"),
|
||||
self.example_user("cordelia"),
|
||||
"test_read_message",
|
||||
),
|
||||
]
|
||||
result = self.client_post("/json/messages/flags",
|
||||
{"messages": ujson.dumps(sent_message_ids),
|
||||
"op": "add",
|
||||
"flag": "read"})
|
||||
|
||||
# We can't change flags other than "starred" on historical messages:
|
||||
result = self.client_post("/json/messages/flags",
|
||||
{"messages": ujson.dumps(message_ids),
|
||||
"op": "add",
|
||||
"flag": "read"})
|
||||
self.assert_json_error(result, 'Invalid message(s)')
|
||||
|
||||
# Trying to change a list of more than one historical message fails
|
||||
result = self.change_star(message_ids * 2)
|
||||
self.assert_json_error(result, 'Invalid message(s)')
|
||||
|
||||
# Confirm that one can change the historical flag now
|
||||
result = self.change_star(message_ids)
|
||||
self.assert_json_success(result)
|
||||
|
||||
for msg in self.get_messages():
|
||||
if msg['id'] in message_ids:
|
||||
self.assertEqual(set(msg['flags']), {'starred', 'historical', 'read'})
|
||||
elif msg['id'] in received_message_ids:
|
||||
self.assertEqual(msg['flags'], [])
|
||||
else:
|
||||
self.assertEqual(msg['flags'], ['read'])
|
||||
self.assertNotIn(msg['id'], other_message_ids)
|
||||
|
||||
result = self.change_star(message_ids, False)
|
||||
self.assert_json_success(result)
|
||||
|
||||
# But it still doesn't work if you're in another realm
|
||||
user = self.mit_user('sipbtest')
|
||||
self.login_user(user)
|
||||
result = self.change_star(message_ids, subdomain="zephyr")
|
||||
self.assert_json_error(result, 'Invalid message(s)')
|
||||
|
||||
def test_change_star_private_message_security(self) -> None:
|
||||
"""
|
||||
You can set a message as starred/un-starred through
|
||||
POST /json/messages/flags.
|
||||
"""
|
||||
self.login('hamlet')
|
||||
message_ids = [
|
||||
self.send_personal_message(
|
||||
self.example_user("hamlet"),
|
||||
self.example_user("hamlet"),
|
||||
"test",
|
||||
),
|
||||
]
|
||||
|
||||
# Starring private messages you didn't receive fails.
|
||||
self.login('cordelia')
|
||||
result = self.change_star(message_ids)
|
||||
self.assert_json_error(result, 'Invalid message(s)')
|
||||
|
||||
def test_change_star_private_stream_security(self) -> None:
|
||||
stream_name = "private_stream"
|
||||
self.make_stream(stream_name, invite_only=True)
|
||||
self.subscribe(self.example_user("hamlet"), stream_name)
|
||||
self.login('hamlet')
|
||||
message_ids = [
|
||||
self.send_stream_message(self.example_user("hamlet"), stream_name, "test"),
|
||||
]
|
||||
|
||||
# Starring private stream messages you received works
|
||||
result = self.change_star(message_ids)
|
||||
self.assert_json_success(result)
|
||||
|
||||
# Starring private stream messages you didn't receive fails.
|
||||
self.login('cordelia')
|
||||
result = self.change_star(message_ids)
|
||||
self.assert_json_error(result, 'Invalid message(s)')
|
||||
|
||||
stream_name = "private_stream_2"
|
||||
self.make_stream(stream_name, invite_only=True,
|
||||
history_public_to_subscribers=True)
|
||||
self.subscribe(self.example_user("hamlet"), stream_name)
|
||||
self.login('hamlet')
|
||||
message_ids = [
|
||||
self.send_stream_message(self.example_user("hamlet"), stream_name, "test"),
|
||||
]
|
||||
|
||||
# With stream.history_public_to_subscribers = True, you still
|
||||
# can't see it if you didn't receive the message and are
|
||||
# not subscribed.
|
||||
self.login('cordelia')
|
||||
result = self.change_star(message_ids)
|
||||
self.assert_json_error(result, 'Invalid message(s)')
|
||||
|
||||
# But if you subscribe, then you can star the message
|
||||
self.subscribe(self.example_user("cordelia"), stream_name)
|
||||
result = self.change_star(message_ids)
|
||||
self.assert_json_success(result)
|
||||
|
||||
def test_new_message(self) -> None:
|
||||
"""
|
||||
New messages aren't starred.
|
||||
"""
|
||||
sender = self.example_user('hamlet')
|
||||
self.login_user(sender)
|
||||
content = "Test message for star"
|
||||
self.send_stream_message(sender, "Verona",
|
||||
content=content)
|
||||
|
||||
sent_message = UserMessage.objects.filter(
|
||||
user_profile=self.example_user('hamlet'),
|
||||
).order_by("id").reverse()[0]
|
||||
self.assertEqual(sent_message.message.content, content)
|
||||
self.assertFalse(sent_message.flags.starred)
|
||||
|
||||
def test_change_star_public_stream_security_for_guest_user(self) -> None:
|
||||
# Guest user can't access(star) unsubscribed public stream messages
|
||||
normal_user = self.example_user("hamlet")
|
||||
stream_name = "public_stream"
|
||||
self.make_stream(stream_name)
|
||||
self.subscribe(normal_user, stream_name)
|
||||
self.login_user(normal_user)
|
||||
|
||||
message_id = [
|
||||
self.send_stream_message(normal_user, stream_name, "test 1"),
|
||||
]
|
||||
|
||||
guest_user = self.example_user('polonius')
|
||||
self.login_user(guest_user)
|
||||
result = self.change_star(message_id)
|
||||
self.assert_json_error(result, 'Invalid message(s)')
|
||||
|
||||
# Subscribed guest users can access public stream messages sent before they join
|
||||
self.subscribe(guest_user, stream_name)
|
||||
result = self.change_star(message_id)
|
||||
self.assert_json_success(result)
|
||||
|
||||
# And messages sent after they join
|
||||
self.login_user(normal_user)
|
||||
message_id = [
|
||||
self.send_stream_message(normal_user, stream_name, "test 2"),
|
||||
]
|
||||
self.login_user(guest_user)
|
||||
result = self.change_star(message_id)
|
||||
self.assert_json_success(result)
|
||||
|
||||
def test_change_star_private_stream_security_for_guest_user(self) -> None:
|
||||
# Guest users can't access(star) unsubscribed private stream messages
|
||||
normal_user = self.example_user("hamlet")
|
||||
stream_name = "private_stream"
|
||||
stream = self.make_stream(stream_name, invite_only=True)
|
||||
self.subscribe(normal_user, stream_name)
|
||||
self.login_user(normal_user)
|
||||
|
||||
message_id = [
|
||||
self.send_stream_message(normal_user, stream_name, "test 1"),
|
||||
]
|
||||
|
||||
guest_user = self.example_user('polonius')
|
||||
self.login_user(guest_user)
|
||||
result = self.change_star(message_id)
|
||||
self.assert_json_error(result, 'Invalid message(s)')
|
||||
|
||||
# Guest user can't access messages of subscribed private streams if
|
||||
# history is not public to subscribers
|
||||
self.subscribe(guest_user, stream_name)
|
||||
result = self.change_star(message_id)
|
||||
self.assert_json_error(result, 'Invalid message(s)')
|
||||
|
||||
# Guest user can access messages of subscribed private streams if
|
||||
# history is public to subscribers
|
||||
do_change_stream_invite_only(stream, True, history_public_to_subscribers=True)
|
||||
result = self.change_star(message_id)
|
||||
self.assert_json_success(result)
|
||||
|
||||
# With history not public to subscribers, they can still see new messages
|
||||
do_change_stream_invite_only(stream, True, history_public_to_subscribers=False)
|
||||
self.login_user(normal_user)
|
||||
message_id = [
|
||||
self.send_stream_message(normal_user, stream_name, "test 2"),
|
||||
]
|
||||
self.login_user(guest_user)
|
||||
result = self.change_star(message_id)
|
||||
self.assert_json_success(result)
|
||||
|
||||
def test_bulk_access_messages_private_stream(self) -> None:
|
||||
user = self.example_user("hamlet")
|
||||
self.login_user(user)
|
||||
|
||||
stream_name = "private_stream"
|
||||
stream = self.make_stream(stream_name, invite_only=True,
|
||||
history_public_to_subscribers=False)
|
||||
|
||||
self.subscribe(user, stream_name)
|
||||
# Send a message before subscribing a new user to stream
|
||||
message_one_id = self.send_stream_message(user,
|
||||
stream_name, "Message one")
|
||||
|
||||
later_subscribed_user = self.example_user("cordelia")
|
||||
# Subscribe a user to private-protected history stream
|
||||
self.subscribe(later_subscribed_user, stream_name)
|
||||
|
||||
# Send a message after subscribing a new user to stream
|
||||
message_two_id = self.send_stream_message(user,
|
||||
stream_name, "Message two")
|
||||
|
||||
message_ids = [message_one_id, message_two_id]
|
||||
messages = [Message.objects.select_related().get(id=message_id)
|
||||
for message_id in message_ids]
|
||||
|
||||
filtered_messages = bulk_access_messages(later_subscribed_user, messages)
|
||||
|
||||
# Message sent before subscribing wouldn't be accessible by later
|
||||
# subscribed user as stream has protected history
|
||||
self.assertEqual(len(filtered_messages), 1)
|
||||
self.assertEqual(filtered_messages[0].id, message_two_id)
|
||||
|
||||
do_change_stream_invite_only(stream, True, history_public_to_subscribers=True)
|
||||
|
||||
filtered_messages = bulk_access_messages(later_subscribed_user, messages)
|
||||
|
||||
# Message sent before subscribing are accessible by 8user as stream
|
||||
# don't have protected history
|
||||
self.assertEqual(len(filtered_messages), 2)
|
||||
|
||||
# Testing messages accessiblity for an unsubscribed user
|
||||
unsubscribed_user = self.example_user("ZOE")
|
||||
|
||||
filtered_messages = bulk_access_messages(unsubscribed_user, messages)
|
||||
|
||||
self.assertEqual(len(filtered_messages), 0)
|
||||
|
||||
def test_bulk_access_messages_public_stream(self) -> None:
|
||||
user = self.example_user("hamlet")
|
||||
self.login_user(user)
|
||||
|
||||
# Testing messages accessiblity including a public stream message
|
||||
stream_name = "public_stream"
|
||||
self.subscribe(user, stream_name)
|
||||
message_one_id = self.send_stream_message(user,
|
||||
stream_name, "Message one")
|
||||
|
||||
later_subscribed_user = self.example_user("cordelia")
|
||||
self.subscribe(later_subscribed_user, stream_name)
|
||||
|
||||
# Send a message after subscribing a new user to stream
|
||||
message_two_id = self.send_stream_message(user,
|
||||
stream_name, "Message two")
|
||||
|
||||
message_ids = [message_one_id, message_two_id]
|
||||
messages = [Message.objects.select_related().get(id=message_id)
|
||||
for message_id in message_ids]
|
||||
|
||||
# All public stream messages are always accessible
|
||||
filtered_messages = bulk_access_messages(later_subscribed_user, messages)
|
||||
self.assertEqual(len(filtered_messages), 2)
|
||||
|
||||
unsubscribed_user = self.example_user("ZOE")
|
||||
filtered_messages = bulk_access_messages(unsubscribed_user, messages)
|
||||
|
||||
self.assertEqual(len(filtered_messages), 2)
|
||||
|
|
|
@ -1,9 +1,7 @@
|
|||
import datetime
|
||||
from typing import Any, Dict, List
|
||||
from typing import Dict, List
|
||||
from unittest import mock
|
||||
|
||||
import ujson
|
||||
from django.http import HttpResponse
|
||||
from django.utils.timezone import now as timezone_now
|
||||
|
||||
from analytics.lib.counts import COUNT_STATS
|
||||
|
@ -11,7 +9,6 @@ from analytics.models import RealmCount
|
|||
from zerver.decorator import JsonableError
|
||||
from zerver.lib.actions import (
|
||||
check_message,
|
||||
do_change_stream_invite_only,
|
||||
do_claim_attachments,
|
||||
do_create_user,
|
||||
do_update_message,
|
||||
|
@ -24,7 +21,6 @@ from zerver.lib.actions import (
|
|||
from zerver.lib.addressee import Addressee
|
||||
from zerver.lib.markdown import MentionData
|
||||
from zerver.lib.message import (
|
||||
bulk_access_messages,
|
||||
get_first_visible_message_id,
|
||||
maybe_update_first_visible_message_id,
|
||||
render_markdown,
|
||||
|
@ -45,7 +41,6 @@ from zerver.models import (
|
|||
Message,
|
||||
Recipient,
|
||||
Subscription,
|
||||
UserMessage,
|
||||
UserPresence,
|
||||
UserProfile,
|
||||
bulk_get_huddle_user_ids,
|
||||
|
@ -125,366 +120,6 @@ class PersonalMessagesTest(ZulipTestCase):
|
|||
f'<UserMessage: recip / {user_profile.email} ([])>',
|
||||
)
|
||||
|
||||
class MessageAccessTests(ZulipTestCase):
|
||||
def test_update_invalid_flags(self) -> None:
|
||||
message = self.send_personal_message(
|
||||
self.example_user("cordelia"),
|
||||
self.example_user("hamlet"),
|
||||
"hello",
|
||||
)
|
||||
|
||||
self.login('hamlet')
|
||||
result = self.client_post("/json/messages/flags",
|
||||
{"messages": ujson.dumps([message]),
|
||||
"op": "add",
|
||||
"flag": "invalid"})
|
||||
self.assert_json_error(result, "Invalid flag: 'invalid'")
|
||||
|
||||
result = self.client_post("/json/messages/flags",
|
||||
{"messages": ujson.dumps([message]),
|
||||
"op": "add",
|
||||
"flag": "is_private"})
|
||||
self.assert_json_error(result, "Invalid flag: 'is_private'")
|
||||
|
||||
result = self.client_post("/json/messages/flags",
|
||||
{"messages": ujson.dumps([message]),
|
||||
"op": "add",
|
||||
"flag": "active_mobile_push_notification"})
|
||||
self.assert_json_error(result, "Invalid flag: 'active_mobile_push_notification'")
|
||||
|
||||
result = self.client_post("/json/messages/flags",
|
||||
{"messages": ujson.dumps([message]),
|
||||
"op": "add",
|
||||
"flag": "mentioned"})
|
||||
self.assert_json_error(result, "Flag not editable: 'mentioned'")
|
||||
|
||||
def change_star(self, messages: List[int], add: bool=True, **kwargs: Any) -> HttpResponse:
|
||||
return self.client_post("/json/messages/flags",
|
||||
{"messages": ujson.dumps(messages),
|
||||
"op": "add" if add else "remove",
|
||||
"flag": "starred"},
|
||||
**kwargs)
|
||||
|
||||
def test_change_star(self) -> None:
|
||||
"""
|
||||
You can set a message as starred/un-starred through
|
||||
POST /json/messages/flags.
|
||||
"""
|
||||
self.login('hamlet')
|
||||
message_ids = [self.send_personal_message(self.example_user("hamlet"),
|
||||
self.example_user("hamlet"),
|
||||
"test")]
|
||||
|
||||
# Star a message.
|
||||
result = self.change_star(message_ids)
|
||||
self.assert_json_success(result)
|
||||
|
||||
for msg in self.get_messages():
|
||||
if msg['id'] in message_ids:
|
||||
self.assertEqual(msg['flags'], ['starred'])
|
||||
else:
|
||||
self.assertEqual(msg['flags'], ['read'])
|
||||
|
||||
result = self.change_star(message_ids, False)
|
||||
self.assert_json_success(result)
|
||||
|
||||
# Remove the stars.
|
||||
for msg in self.get_messages():
|
||||
if msg['id'] in message_ids:
|
||||
self.assertEqual(msg['flags'], [])
|
||||
|
||||
def test_change_star_public_stream_historical(self) -> None:
|
||||
"""
|
||||
You can set a message as starred/un-starred through
|
||||
POST /json/messages/flags.
|
||||
"""
|
||||
stream_name = "new_stream"
|
||||
self.subscribe(self.example_user("hamlet"), stream_name)
|
||||
self.login('hamlet')
|
||||
message_ids = [
|
||||
self.send_stream_message(self.example_user("hamlet"), stream_name, "test"),
|
||||
]
|
||||
# Send a second message so we can verify it isn't modified
|
||||
other_message_ids = [
|
||||
self.send_stream_message(self.example_user("hamlet"), stream_name, "test_unused"),
|
||||
]
|
||||
received_message_ids = [
|
||||
self.send_personal_message(
|
||||
self.example_user("hamlet"),
|
||||
self.example_user("cordelia"),
|
||||
"test_received",
|
||||
),
|
||||
]
|
||||
|
||||
# Now login as another user who wasn't on that stream
|
||||
self.login('cordelia')
|
||||
# Send a message to yourself to make sure we have at least one with the read flag
|
||||
sent_message_ids = [
|
||||
self.send_personal_message(
|
||||
self.example_user("cordelia"),
|
||||
self.example_user("cordelia"),
|
||||
"test_read_message",
|
||||
),
|
||||
]
|
||||
result = self.client_post("/json/messages/flags",
|
||||
{"messages": ujson.dumps(sent_message_ids),
|
||||
"op": "add",
|
||||
"flag": "read"})
|
||||
|
||||
# We can't change flags other than "starred" on historical messages:
|
||||
result = self.client_post("/json/messages/flags",
|
||||
{"messages": ujson.dumps(message_ids),
|
||||
"op": "add",
|
||||
"flag": "read"})
|
||||
self.assert_json_error(result, 'Invalid message(s)')
|
||||
|
||||
# Trying to change a list of more than one historical message fails
|
||||
result = self.change_star(message_ids * 2)
|
||||
self.assert_json_error(result, 'Invalid message(s)')
|
||||
|
||||
# Confirm that one can change the historical flag now
|
||||
result = self.change_star(message_ids)
|
||||
self.assert_json_success(result)
|
||||
|
||||
for msg in self.get_messages():
|
||||
if msg['id'] in message_ids:
|
||||
self.assertEqual(set(msg['flags']), {'starred', 'historical', 'read'})
|
||||
elif msg['id'] in received_message_ids:
|
||||
self.assertEqual(msg['flags'], [])
|
||||
else:
|
||||
self.assertEqual(msg['flags'], ['read'])
|
||||
self.assertNotIn(msg['id'], other_message_ids)
|
||||
|
||||
result = self.change_star(message_ids, False)
|
||||
self.assert_json_success(result)
|
||||
|
||||
# But it still doesn't work if you're in another realm
|
||||
user = self.mit_user('sipbtest')
|
||||
self.login_user(user)
|
||||
result = self.change_star(message_ids, subdomain="zephyr")
|
||||
self.assert_json_error(result, 'Invalid message(s)')
|
||||
|
||||
def test_change_star_private_message_security(self) -> None:
|
||||
"""
|
||||
You can set a message as starred/un-starred through
|
||||
POST /json/messages/flags.
|
||||
"""
|
||||
self.login('hamlet')
|
||||
message_ids = [
|
||||
self.send_personal_message(
|
||||
self.example_user("hamlet"),
|
||||
self.example_user("hamlet"),
|
||||
"test",
|
||||
),
|
||||
]
|
||||
|
||||
# Starring private messages you didn't receive fails.
|
||||
self.login('cordelia')
|
||||
result = self.change_star(message_ids)
|
||||
self.assert_json_error(result, 'Invalid message(s)')
|
||||
|
||||
def test_change_star_private_stream_security(self) -> None:
|
||||
stream_name = "private_stream"
|
||||
self.make_stream(stream_name, invite_only=True)
|
||||
self.subscribe(self.example_user("hamlet"), stream_name)
|
||||
self.login('hamlet')
|
||||
message_ids = [
|
||||
self.send_stream_message(self.example_user("hamlet"), stream_name, "test"),
|
||||
]
|
||||
|
||||
# Starring private stream messages you received works
|
||||
result = self.change_star(message_ids)
|
||||
self.assert_json_success(result)
|
||||
|
||||
# Starring private stream messages you didn't receive fails.
|
||||
self.login('cordelia')
|
||||
result = self.change_star(message_ids)
|
||||
self.assert_json_error(result, 'Invalid message(s)')
|
||||
|
||||
stream_name = "private_stream_2"
|
||||
self.make_stream(stream_name, invite_only=True,
|
||||
history_public_to_subscribers=True)
|
||||
self.subscribe(self.example_user("hamlet"), stream_name)
|
||||
self.login('hamlet')
|
||||
message_ids = [
|
||||
self.send_stream_message(self.example_user("hamlet"), stream_name, "test"),
|
||||
]
|
||||
|
||||
# With stream.history_public_to_subscribers = True, you still
|
||||
# can't see it if you didn't receive the message and are
|
||||
# not subscribed.
|
||||
self.login('cordelia')
|
||||
result = self.change_star(message_ids)
|
||||
self.assert_json_error(result, 'Invalid message(s)')
|
||||
|
||||
# But if you subscribe, then you can star the message
|
||||
self.subscribe(self.example_user("cordelia"), stream_name)
|
||||
result = self.change_star(message_ids)
|
||||
self.assert_json_success(result)
|
||||
|
||||
def test_new_message(self) -> None:
|
||||
"""
|
||||
New messages aren't starred.
|
||||
"""
|
||||
sender = self.example_user('hamlet')
|
||||
self.login_user(sender)
|
||||
content = "Test message for star"
|
||||
self.send_stream_message(sender, "Verona",
|
||||
content=content)
|
||||
|
||||
sent_message = UserMessage.objects.filter(
|
||||
user_profile=self.example_user('hamlet'),
|
||||
).order_by("id").reverse()[0]
|
||||
self.assertEqual(sent_message.message.content, content)
|
||||
self.assertFalse(sent_message.flags.starred)
|
||||
|
||||
def test_change_star_public_stream_security_for_guest_user(self) -> None:
|
||||
# Guest user can't access(star) unsubscribed public stream messages
|
||||
normal_user = self.example_user("hamlet")
|
||||
stream_name = "public_stream"
|
||||
self.make_stream(stream_name)
|
||||
self.subscribe(normal_user, stream_name)
|
||||
self.login_user(normal_user)
|
||||
|
||||
message_id = [
|
||||
self.send_stream_message(normal_user, stream_name, "test 1"),
|
||||
]
|
||||
|
||||
guest_user = self.example_user('polonius')
|
||||
self.login_user(guest_user)
|
||||
result = self.change_star(message_id)
|
||||
self.assert_json_error(result, 'Invalid message(s)')
|
||||
|
||||
# Subscribed guest users can access public stream messages sent before they join
|
||||
self.subscribe(guest_user, stream_name)
|
||||
result = self.change_star(message_id)
|
||||
self.assert_json_success(result)
|
||||
|
||||
# And messages sent after they join
|
||||
self.login_user(normal_user)
|
||||
message_id = [
|
||||
self.send_stream_message(normal_user, stream_name, "test 2"),
|
||||
]
|
||||
self.login_user(guest_user)
|
||||
result = self.change_star(message_id)
|
||||
self.assert_json_success(result)
|
||||
|
||||
def test_change_star_private_stream_security_for_guest_user(self) -> None:
|
||||
# Guest users can't access(star) unsubscribed private stream messages
|
||||
normal_user = self.example_user("hamlet")
|
||||
stream_name = "private_stream"
|
||||
stream = self.make_stream(stream_name, invite_only=True)
|
||||
self.subscribe(normal_user, stream_name)
|
||||
self.login_user(normal_user)
|
||||
|
||||
message_id = [
|
||||
self.send_stream_message(normal_user, stream_name, "test 1"),
|
||||
]
|
||||
|
||||
guest_user = self.example_user('polonius')
|
||||
self.login_user(guest_user)
|
||||
result = self.change_star(message_id)
|
||||
self.assert_json_error(result, 'Invalid message(s)')
|
||||
|
||||
# Guest user can't access messages of subscribed private streams if
|
||||
# history is not public to subscribers
|
||||
self.subscribe(guest_user, stream_name)
|
||||
result = self.change_star(message_id)
|
||||
self.assert_json_error(result, 'Invalid message(s)')
|
||||
|
||||
# Guest user can access messages of subscribed private streams if
|
||||
# history is public to subscribers
|
||||
do_change_stream_invite_only(stream, True, history_public_to_subscribers=True)
|
||||
result = self.change_star(message_id)
|
||||
self.assert_json_success(result)
|
||||
|
||||
# With history not public to subscribers, they can still see new messages
|
||||
do_change_stream_invite_only(stream, True, history_public_to_subscribers=False)
|
||||
self.login_user(normal_user)
|
||||
message_id = [
|
||||
self.send_stream_message(normal_user, stream_name, "test 2"),
|
||||
]
|
||||
self.login_user(guest_user)
|
||||
result = self.change_star(message_id)
|
||||
self.assert_json_success(result)
|
||||
|
||||
def test_bulk_access_messages_private_stream(self) -> None:
|
||||
user = self.example_user("hamlet")
|
||||
self.login_user(user)
|
||||
|
||||
stream_name = "private_stream"
|
||||
stream = self.make_stream(stream_name, invite_only=True,
|
||||
history_public_to_subscribers=False)
|
||||
|
||||
self.subscribe(user, stream_name)
|
||||
# Send a message before subscribing a new user to stream
|
||||
message_one_id = self.send_stream_message(user,
|
||||
stream_name, "Message one")
|
||||
|
||||
later_subscribed_user = self.example_user("cordelia")
|
||||
# Subscribe a user to private-protected history stream
|
||||
self.subscribe(later_subscribed_user, stream_name)
|
||||
|
||||
# Send a message after subscribing a new user to stream
|
||||
message_two_id = self.send_stream_message(user,
|
||||
stream_name, "Message two")
|
||||
|
||||
message_ids = [message_one_id, message_two_id]
|
||||
messages = [Message.objects.select_related().get(id=message_id)
|
||||
for message_id in message_ids]
|
||||
|
||||
filtered_messages = bulk_access_messages(later_subscribed_user, messages)
|
||||
|
||||
# Message sent before subscribing wouldn't be accessible by later
|
||||
# subscribed user as stream has protected history
|
||||
self.assertEqual(len(filtered_messages), 1)
|
||||
self.assertEqual(filtered_messages[0].id, message_two_id)
|
||||
|
||||
do_change_stream_invite_only(stream, True, history_public_to_subscribers=True)
|
||||
|
||||
filtered_messages = bulk_access_messages(later_subscribed_user, messages)
|
||||
|
||||
# Message sent before subscribing are accessible by 8user as stream
|
||||
# don't have protected history
|
||||
self.assertEqual(len(filtered_messages), 2)
|
||||
|
||||
# Testing messages accessiblity for an unsubscribed user
|
||||
unsubscribed_user = self.example_user("ZOE")
|
||||
|
||||
filtered_messages = bulk_access_messages(unsubscribed_user, messages)
|
||||
|
||||
self.assertEqual(len(filtered_messages), 0)
|
||||
|
||||
def test_bulk_access_messages_public_stream(self) -> None:
|
||||
user = self.example_user("hamlet")
|
||||
self.login_user(user)
|
||||
|
||||
# Testing messages accessiblity including a public stream message
|
||||
stream_name = "public_stream"
|
||||
self.subscribe(user, stream_name)
|
||||
message_one_id = self.send_stream_message(user,
|
||||
stream_name, "Message one")
|
||||
|
||||
later_subscribed_user = self.example_user("cordelia")
|
||||
self.subscribe(later_subscribed_user, stream_name)
|
||||
|
||||
# Send a message after subscribing a new user to stream
|
||||
message_two_id = self.send_stream_message(user,
|
||||
stream_name, "Message two")
|
||||
|
||||
message_ids = [message_one_id, message_two_id]
|
||||
messages = [Message.objects.select_related().get(id=message_id)
|
||||
for message_id in message_ids]
|
||||
|
||||
# All public stream messages are always accessible
|
||||
filtered_messages = bulk_access_messages(later_subscribed_user, messages)
|
||||
self.assertEqual(len(filtered_messages), 2)
|
||||
|
||||
unsubscribed_user = self.example_user("ZOE")
|
||||
filtered_messages = bulk_access_messages(unsubscribed_user, messages)
|
||||
|
||||
self.assertEqual(len(filtered_messages), 2)
|
||||
|
||||
class MessageHasKeywordsTest(ZulipTestCase):
|
||||
'''Test for keywords like has_link, has_image, has_attachment.'''
|
||||
|
||||
|
|
Loading…
Reference in New Issue