tests: Fix clumsy narrow test.

We now explicitly write messages to three different streams,
as well a DM, to make sure each narrow result filters out
all the noise.
This commit is contained in:
Steve Howell 2023-07-16 10:41:09 +00:00 committed by Tim Abbott
parent 860eee94fd
commit 538f498447
1 changed files with 37 additions and 21 deletions

View File

@ -51,6 +51,7 @@ from zerver.models import (
Attachment,
Message,
Realm,
Recipient,
Subscription,
UserMessage,
UserProfile,
@ -2280,30 +2281,45 @@ class GetOldMessagesTest(ZulipTestCase):
self.assertEqual(new_message["flags"], ["mentioned"])
def test_get_messages_with_narrow_stream(self) -> None:
"""
A request for old messages with a narrow by stream only returns
messages for that stream.
"""
self.login("hamlet")
# We need to subscribe to a stream and then send a message to
# it to ensure that we actually have a stream message in this
# narrow view.
self.subscribe(self.example_user("hamlet"), "Scotland")
self.send_stream_message(self.example_user("hamlet"), "Scotland")
messages = get_user_messages(self.example_user("hamlet"))
hamlet = self.example_user("hamlet")
self.login_user(hamlet)
realm = hamlet.realm
num_messages_per_stream = 5
stream_names = ["Scotland", "Verona", "Venice"]
def send_messages_to_all_streams() -> None:
Message.objects.filter(recipient__type=Recipient.STREAM).delete()
for stream_name in stream_names:
self.subscribe(hamlet, stream_name)
for i in range(num_messages_per_stream):
message_id = self.send_stream_message(hamlet, stream_name, content=f"test {i}")
message = Message.objects.get(id=message_id)
self.assertEqual(get_display_recipient(message.recipient), stream_name)
send_messages_to_all_streams()
self.send_personal_message(hamlet, hamlet)
messages = get_user_messages(hamlet)
stream_messages = [msg for msg in messages if msg.is_stream_message()]
stream_name = get_display_recipient(stream_messages[0].recipient)
assert isinstance(stream_name, str)
stream_id = get_stream(stream_name, stream_messages[0].get_realm()).id
stream_recipient_id = stream_messages[0].recipient.id
self.assertGreater(len(messages), len(stream_messages))
self.assert_length(stream_messages, num_messages_per_stream * len(stream_names))
for operand in [stream_name, stream_id]:
narrow = [dict(operator="stream", operand=operand)]
result = self.get_and_check_messages(dict(narrow=orjson.dumps(narrow).decode()))
for stream_name in stream_names:
stream = get_stream(stream_name, realm)
for operand in [stream.name, stream.id]:
narrow = [dict(operator="stream", operand=operand)]
result = self.get_and_check_messages(
dict(narrow=orjson.dumps(narrow).decode(), num_after=100)
)
fetched_messages: List[Dict[str, object]] = result["messages"]
self.assert_length(fetched_messages, num_messages_per_stream)
for message in result["messages"]:
self.assertEqual(message["type"], "stream")
self.assertEqual(message["recipient_id"], stream_recipient_id)
for message_dict in fetched_messages:
self.assertEqual(message_dict["type"], "stream")
self.assertEqual(message_dict["display_recipient"], stream_name)
self.assertEqual(message_dict["recipient_id"], stream.recipient_id)
def test_get_visible_messages_with_narrow_stream(self) -> None:
self.login("hamlet")