message_send: Update do_send_messages codepath to send event on commit.

Earlier, we were using 'send_event' & 'queue_json_publish' in
'do_send_messages' which can lead to a situation where we enqueue
events but the transaction fails at a later stage.

Events should not be sent until we know we're not rolling back.
This commit is contained in:
Prakhar Pratyush 2024-05-15 22:54:37 +05:30 committed by Tim Abbott
parent 96c9950115
commit e726012244
15 changed files with 473 additions and 270 deletions

View File

@ -68,7 +68,7 @@ from zerver.lib.notification_data import (
user_allows_notifications_in_StreamTopic,
)
from zerver.lib.query_helpers import query_for_ids
from zerver.lib.queue import queue_json_publish
from zerver.lib.queue import queue_event_on_commit
from zerver.lib.recipient_users import recipient_for_user_profiles
from zerver.lib.stream_subscription import (
get_subscriptions_for_send_message,
@ -108,7 +108,7 @@ from zerver.models.recipients import get_huddle_user_ids
from zerver.models.scheduled_jobs import NotificationTriggers
from zerver.models.streams import get_stream, get_stream_by_id_in_realm
from zerver.models.users import get_system_bot, get_user_by_delivery_email, is_cross_realm_bot_email
from zerver.tornado.django_api import send_event
from zerver.tornado.django_api import send_event_on_commit
def compute_irc_user_fullname(email: str) -> str:
@ -843,6 +843,7 @@ def get_active_presence_idle_user_ids(
return filter_presence_idle_user_ids(user_ids)
@transaction.atomic(savepoint=False)
def do_send_messages(
send_message_requests_maybe_none: Sequence[Optional[SendMessageRequest]],
*,
@ -862,65 +863,65 @@ def do_send_messages(
# Save the message receipts in the database
user_message_flags: Dict[int, Dict[int, List[str]]] = defaultdict(dict)
with transaction.atomic(savepoint=False):
Message.objects.bulk_create(send_request.message for send_request in send_message_requests)
# Claim attachments in message
for send_request in send_message_requests:
if do_claim_attachments(
send_request.message, send_request.rendering_result.potential_attachment_path_ids
):
send_request.message.has_attachment = True
send_request.message.save(update_fields=["has_attachment"])
Message.objects.bulk_create(send_request.message for send_request in send_message_requests)
ums: List[UserMessageLite] = []
for send_request in send_message_requests:
# Service bots (outgoing webhook bots and embedded bots) don't store UserMessage rows;
# they will be processed later.
mentioned_user_ids = send_request.rendering_result.mentions_user_ids
# Claim attachments in message
for send_request in send_message_requests:
if do_claim_attachments(
send_request.message, send_request.rendering_result.potential_attachment_path_ids
):
send_request.message.has_attachment = True
send_request.message.save(update_fields=["has_attachment"])
# Extend the set with users who have muted the sender.
mark_as_read_user_ids = send_request.muted_sender_user_ids
mark_as_read_user_ids.update(mark_as_read)
ums: List[UserMessageLite] = []
for send_request in send_message_requests:
# Service bots (outgoing webhook bots and embedded bots) don't store UserMessage rows;
# they will be processed later.
mentioned_user_ids = send_request.rendering_result.mentions_user_ids
user_messages = create_user_messages(
message=send_request.message,
rendering_result=send_request.rendering_result,
um_eligible_user_ids=send_request.um_eligible_user_ids,
long_term_idle_user_ids=send_request.long_term_idle_user_ids,
stream_push_user_ids=send_request.stream_push_user_ids,
stream_email_user_ids=send_request.stream_email_user_ids,
mentioned_user_ids=mentioned_user_ids,
followed_topic_push_user_ids=send_request.followed_topic_push_user_ids,
followed_topic_email_user_ids=send_request.followed_topic_email_user_ids,
mark_as_read_user_ids=mark_as_read_user_ids,
limit_unread_user_ids=send_request.limit_unread_user_ids,
topic_participant_user_ids=send_request.topic_participant_user_ids,
)
# Extend the set with users who have muted the sender.
mark_as_read_user_ids = send_request.muted_sender_user_ids
mark_as_read_user_ids.update(mark_as_read)
for um in user_messages:
user_message_flags[send_request.message.id][um.user_profile_id] = um.flags_list()
user_messages = create_user_messages(
message=send_request.message,
rendering_result=send_request.rendering_result,
um_eligible_user_ids=send_request.um_eligible_user_ids,
long_term_idle_user_ids=send_request.long_term_idle_user_ids,
stream_push_user_ids=send_request.stream_push_user_ids,
stream_email_user_ids=send_request.stream_email_user_ids,
mentioned_user_ids=mentioned_user_ids,
followed_topic_push_user_ids=send_request.followed_topic_push_user_ids,
followed_topic_email_user_ids=send_request.followed_topic_email_user_ids,
mark_as_read_user_ids=mark_as_read_user_ids,
limit_unread_user_ids=send_request.limit_unread_user_ids,
topic_participant_user_ids=send_request.topic_participant_user_ids,
)
ums.extend(user_messages)
for um in user_messages:
user_message_flags[send_request.message.id][um.user_profile_id] = um.flags_list()
send_request.service_queue_events = get_service_bot_events(
sender=send_request.message.sender,
service_bot_tuples=send_request.service_bot_tuples,
mentioned_user_ids=mentioned_user_ids,
active_user_ids=send_request.active_user_ids,
recipient_type=send_request.message.recipient.type,
)
ums.extend(user_messages)
bulk_insert_ums(ums)
send_request.service_queue_events = get_service_bot_events(
sender=send_request.message.sender,
service_bot_tuples=send_request.service_bot_tuples,
mentioned_user_ids=mentioned_user_ids,
active_user_ids=send_request.active_user_ids,
recipient_type=send_request.message.recipient.type,
)
for send_request in send_message_requests:
do_widget_post_save_actions(send_request)
bulk_insert_ums(ums)
for send_request in send_message_requests:
do_widget_post_save_actions(send_request)
# This next loop is responsible for notifying other parts of the
# Zulip system about the messages we just committed to the database:
# * Sender automatically follows or unmutes the topic depending on 'automatically_follow_topics_policy'
# and 'automatically_unmute_topics_in_muted_streams_policy' user settings.
# * Notifying clients via send_event
# * Notifying clients via send_event_on_commit
# * Triggering outgoing webhooks via the service event queue.
# * Updating the `first_message_id` field for streams without any message history.
# * Implementing the Welcome Bot reply hack
@ -1175,7 +1176,7 @@ def do_send_messages(
event["local_id"] = send_request.local_id
if send_request.sender_queue_id is not None:
event["sender_queue_id"] = send_request.sender_queue_id
send_event(send_request.realm, event, users)
send_event_on_commit(send_request.realm, event, users)
if send_request.links_for_embed:
event_data = {
@ -1184,7 +1185,7 @@ def do_send_messages(
"message_realm_id": send_request.realm.id,
"urls": list(send_request.links_for_embed),
}
queue_json_publish("embed_links", event_data)
queue_event_on_commit("embed_links", event_data)
if send_request.message.recipient.type == Recipient.PERSONAL:
welcome_bot_id = get_system_bot(settings.WELCOME_BOT, send_request.realm.id).id
@ -1199,7 +1200,7 @@ def do_send_messages(
assert send_request.service_queue_events is not None
for queue_name, events in send_request.service_queue_events.items():
for event in events:
queue_json_publish(
queue_event_on_commit(
queue_name,
{
"message": wide_message_dict,

View File

@ -470,13 +470,13 @@ def send_subscription_add_events(
# Send a notification to the user who subscribed.
event = dict(type="subscription", op="add", subscriptions=sub_dicts)
send_event(realm, event, [user_id])
send_event_on_commit(realm, event, [user_id])
# This function contains all the database changes as part of
# subscribing users to streams; we use a transaction to ensure that
# the RealmAuditLog entries are created atomically with the
# Subscription object creation (and updates).
# subscribing users to streams; the transaction ensures that the
# RealmAuditLog entries are created atomically with the Subscription
# object creation (and updates).
@transaction.atomic(savepoint=False)
def bulk_add_subs_to_db_with_logging(
realm: Realm,
@ -703,6 +703,7 @@ def send_user_creation_events_on_adding_subscriptions(
SubT: TypeAlias = Tuple[List[SubInfo], List[SubInfo]]
@transaction.atomic(savepoint=False)
def bulk_add_subscriptions(
realm: Realm,
streams: Collection[Stream],
@ -1190,6 +1191,7 @@ def send_change_stream_permission_notification(
)
@transaction.atomic(savepoint=False)
def do_change_stream_permission(
stream: Stream,
*,
@ -1205,78 +1207,76 @@ def do_change_stream_permission(
stream.is_web_public = is_web_public
stream.invite_only = invite_only
stream.history_public_to_subscribers = history_public_to_subscribers
stream.save(update_fields=["invite_only", "history_public_to_subscribers", "is_web_public"])
realm = stream.realm
with transaction.atomic():
stream.save(update_fields=["invite_only", "history_public_to_subscribers", "is_web_public"])
event_time = timezone_now()
if old_invite_only_value != stream.invite_only:
# Reset the Attachment.is_realm_public cache for all
# messages in the stream whose permissions were changed.
assert stream.recipient_id is not None
Attachment.objects.filter(messages__recipient_id=stream.recipient_id).update(
is_realm_public=None
)
# We need to do the same for ArchivedAttachment to avoid
# bugs if deleted attachments are later restored.
ArchivedAttachment.objects.filter(messages__recipient_id=stream.recipient_id).update(
is_realm_public=None
)
event_time = timezone_now()
if old_invite_only_value != stream.invite_only:
# Reset the Attachment.is_realm_public cache for all
# messages in the stream whose permissions were changed.
assert stream.recipient_id is not None
Attachment.objects.filter(messages__recipient_id=stream.recipient_id).update(
is_realm_public=None
)
# We need to do the same for ArchivedAttachment to avoid
# bugs if deleted attachments are later restored.
ArchivedAttachment.objects.filter(messages__recipient_id=stream.recipient_id).update(
is_realm_public=None
)
RealmAuditLog.objects.create(
realm=realm,
acting_user=acting_user,
modified_stream=stream,
event_type=RealmAuditLog.STREAM_PROPERTY_CHANGED,
event_time=event_time,
extra_data={
RealmAuditLog.OLD_VALUE: old_invite_only_value,
RealmAuditLog.NEW_VALUE: stream.invite_only,
"property": "invite_only",
},
)
RealmAuditLog.objects.create(
realm=realm,
acting_user=acting_user,
modified_stream=stream,
event_type=RealmAuditLog.STREAM_PROPERTY_CHANGED,
event_time=event_time,
extra_data={
RealmAuditLog.OLD_VALUE: old_invite_only_value,
RealmAuditLog.NEW_VALUE: stream.invite_only,
"property": "invite_only",
},
)
if old_history_public_to_subscribers_value != stream.history_public_to_subscribers:
RealmAuditLog.objects.create(
realm=realm,
acting_user=acting_user,
modified_stream=stream,
event_type=RealmAuditLog.STREAM_PROPERTY_CHANGED,
event_time=event_time,
extra_data={
RealmAuditLog.OLD_VALUE: old_history_public_to_subscribers_value,
RealmAuditLog.NEW_VALUE: stream.history_public_to_subscribers,
"property": "history_public_to_subscribers",
},
)
if old_history_public_to_subscribers_value != stream.history_public_to_subscribers:
RealmAuditLog.objects.create(
realm=realm,
acting_user=acting_user,
modified_stream=stream,
event_type=RealmAuditLog.STREAM_PROPERTY_CHANGED,
event_time=event_time,
extra_data={
RealmAuditLog.OLD_VALUE: old_history_public_to_subscribers_value,
RealmAuditLog.NEW_VALUE: stream.history_public_to_subscribers,
"property": "history_public_to_subscribers",
},
)
if old_is_web_public_value != stream.is_web_public:
# Reset the Attachment.is_realm_public cache for all
# messages in the stream whose permissions were changed.
assert stream.recipient_id is not None
Attachment.objects.filter(messages__recipient_id=stream.recipient_id).update(
is_web_public=None
)
# We need to do the same for ArchivedAttachment to avoid
# bugs if deleted attachments are later restored.
ArchivedAttachment.objects.filter(messages__recipient_id=stream.recipient_id).update(
is_web_public=None
)
if old_is_web_public_value != stream.is_web_public:
# Reset the Attachment.is_realm_public cache for all
# messages in the stream whose permissions were changed.
assert stream.recipient_id is not None
Attachment.objects.filter(messages__recipient_id=stream.recipient_id).update(
is_web_public=None
)
# We need to do the same for ArchivedAttachment to avoid
# bugs if deleted attachments are later restored.
ArchivedAttachment.objects.filter(messages__recipient_id=stream.recipient_id).update(
is_web_public=None
)
RealmAuditLog.objects.create(
realm=realm,
acting_user=acting_user,
modified_stream=stream,
event_type=RealmAuditLog.STREAM_PROPERTY_CHANGED,
event_time=event_time,
extra_data={
RealmAuditLog.OLD_VALUE: old_is_web_public_value,
RealmAuditLog.NEW_VALUE: stream.is_web_public,
"property": "is_web_public",
},
)
RealmAuditLog.objects.create(
realm=realm,
acting_user=acting_user,
modified_stream=stream,
event_type=RealmAuditLog.STREAM_PROPERTY_CHANGED,
event_time=event_time,
extra_data={
RealmAuditLog.OLD_VALUE: old_is_web_public_value,
RealmAuditLog.NEW_VALUE: stream.is_web_public,
"property": "is_web_public",
},
)
notify_stream_creation_ids = set()
if old_invite_only_value and not stream.invite_only:
@ -1309,7 +1309,7 @@ def do_change_stream_permission(
stream_ids=[stream.id],
user_ids=sorted(stream_subscriber_user_ids),
)
send_event(stream.realm, peer_add_event, peer_notify_user_ids)
send_event_on_commit(stream.realm, peer_add_event, peer_notify_user_ids)
event = dict(
op="update",
@ -1324,7 +1324,7 @@ def do_change_stream_permission(
# we do not need to send update events to the users who received creation event
# since they already have the updated stream info.
notify_stream_update_ids = can_access_stream_user_ids(stream) - notify_stream_creation_ids
send_event(stream.realm, event, notify_stream_update_ids)
send_event_on_commit(stream.realm, event, notify_stream_update_ids)
old_policy_name = get_stream_permission_policy_name(
invite_only=old_invite_only_value,

View File

@ -5,7 +5,7 @@ from zerver.lib.attachments import get_old_unclaimed_attachments, validate_attac
from zerver.lib.markdown import MessageRenderingResult
from zerver.lib.upload import claim_attachment, delete_message_attachment
from zerver.models import Attachment, Message, ScheduledMessage, Stream, UserProfile
from zerver.tornado.django_api import send_event
from zerver.tornado.django_api import send_event_on_commit
def notify_attachment_update(
@ -17,7 +17,7 @@ def notify_attachment_update(
"attachment": attachment_dict,
"upload_space_used": user_profile.realm.currently_used_upload_space_bytes(),
}
send_event(user_profile.realm, event, [user_profile.id])
send_event_on_commit(user_profile.realm, event, [user_profile.id])
def do_claim_attachments(

View File

@ -1,6 +1,7 @@
from datetime import datetime
from typing import Any, Dict, List, Optional
from django.db import transaction
from django.utils.timezone import now as timezone_now
from zerver.lib.timestamp import datetime_to_timestamp
@ -9,9 +10,10 @@ from zerver.lib.user_topics import (
get_topic_mutes,
)
from zerver.models import Stream, UserProfile
from zerver.tornado.django_api import send_event
from zerver.tornado.django_api import send_event_on_commit
@transaction.atomic(savepoint=False)
def bulk_do_set_user_topic_visibility_policy(
user_profiles: List[UserProfile],
stream: Stream,
@ -47,7 +49,7 @@ def bulk_do_set_user_topic_visibility_policy(
muted_topics_event = dict(
type="muted_topics", muted_topics=get_topic_mutes(user_profile)
)
send_event(user_profile.realm, muted_topics_event, [user_profile.id])
send_event_on_commit(user_profile.realm, muted_topics_event, [user_profile.id])
user_topic_event: Dict[str, Any] = {
"type": "user_topic",
@ -57,7 +59,7 @@ def bulk_do_set_user_topic_visibility_policy(
"visibility_policy": visibility_policy,
}
send_event(user_profile.realm, user_topic_event, [user_profile.id])
send_event_on_commit(user_profile.realm, user_topic_event, [user_profile.id])
def do_set_user_topic_visibility_policy(

View File

@ -41,7 +41,7 @@ from zerver.models.streams import (
get_stream_by_id_in_realm,
)
from zerver.models.users import active_non_guest_user_ids, active_user_ids, is_cross_realm_bot_email
from zerver.tornado.django_api import send_event
from zerver.tornado.django_api import send_event_on_commit
class StreamDict(TypedDict, total=False):
@ -123,9 +123,10 @@ def send_stream_creation_event(
recent_traffic: Optional[Dict[int, int]] = None,
) -> None:
event = dict(type="stream", op="create", streams=[stream_to_dict(stream, recent_traffic)])
send_event(realm, event, user_ids)
send_event_on_commit(realm, event, user_ids)
@transaction.atomic(savepoint=False)
def create_stream_if_needed(
realm: Realm,
stream_name: str,
@ -149,40 +150,39 @@ def create_stream_if_needed(
)
assert can_remove_subscribers_group is not None
with transaction.atomic():
(stream, created) = Stream.objects.get_or_create(
(stream, created) = Stream.objects.get_or_create(
realm=realm,
name__iexact=stream_name,
defaults=dict(
name=stream_name,
creator=acting_user,
description=stream_description,
invite_only=invite_only,
is_web_public=is_web_public,
stream_post_policy=stream_post_policy,
history_public_to_subscribers=history_public_to_subscribers,
is_in_zephyr_realm=realm.is_zephyr_mirror_realm,
message_retention_days=message_retention_days,
can_remove_subscribers_group=can_remove_subscribers_group,
),
)
if created:
recipient = Recipient.objects.create(type_id=stream.id, type=Recipient.STREAM)
stream.recipient = recipient
stream.rendered_description = render_stream_description(stream_description, realm)
stream.save(update_fields=["recipient", "rendered_description"])
event_time = timezone_now()
RealmAuditLog.objects.create(
realm=realm,
name__iexact=stream_name,
defaults=dict(
name=stream_name,
creator=acting_user,
description=stream_description,
invite_only=invite_only,
is_web_public=is_web_public,
stream_post_policy=stream_post_policy,
history_public_to_subscribers=history_public_to_subscribers,
is_in_zephyr_realm=realm.is_zephyr_mirror_realm,
message_retention_days=message_retention_days,
can_remove_subscribers_group=can_remove_subscribers_group,
),
acting_user=acting_user,
modified_stream=stream,
event_type=RealmAuditLog.STREAM_CREATED,
event_time=event_time,
)
if created:
recipient = Recipient.objects.create(type_id=stream.id, type=Recipient.STREAM)
stream.recipient = recipient
stream.rendered_description = render_stream_description(stream_description, realm)
stream.save(update_fields=["recipient", "rendered_description"])
event_time = timezone_now()
RealmAuditLog.objects.create(
realm=realm,
acting_user=acting_user,
modified_stream=stream,
event_type=RealmAuditLog.STREAM_CREATED,
event_time=event_time,
)
if created:
if stream.is_public():
if stream.is_web_public:
notify_user_ids = active_user_ids(stream.realm_id)

View File

@ -2001,6 +2001,135 @@ class ZulipTestCase(ZulipTestCaseMixin, TestCase):
self.assert_length(lst, expected_num_events)
@override
def send_personal_message(
self,
from_user: UserProfile,
to_user: UserProfile,
content: str = "test content",
*,
read_by_sender: bool = True,
skip_capture_on_commit_callbacks: bool = False,
) -> int:
"""This function is a wrapper on 'send_personal_message',
defined in 'ZulipTestCaseMixin' with an extra parameter
'skip_capture_on_commit_callbacks'.
It should be set to 'True' when making a call with either
'verify_action' or 'capture_send_event_calls' as context manager
because they already have 'self.captureOnCommitCallbacks'
(See the comment in 'capture_send_event_calls').
For all other cases, we should call 'send_personal_message' with
'self.captureOnCommitCallbacks' for 'send_event_on_commit' or/and
'queue_event_on_commit' to work.
"""
if skip_capture_on_commit_callbacks:
message_id = super().send_personal_message(
from_user,
to_user,
content,
read_by_sender=read_by_sender,
)
else:
with self.captureOnCommitCallbacks(execute=True):
message_id = super().send_personal_message(
from_user,
to_user,
content,
read_by_sender=read_by_sender,
)
return message_id
@override
def send_huddle_message(
self,
from_user: UserProfile,
to_users: List[UserProfile],
content: str = "test content",
*,
read_by_sender: bool = True,
skip_capture_on_commit_callbacks: bool = False,
) -> int:
"""This function is a wrapper on 'send_huddle_message',
defined in 'ZulipTestCaseMixin' with an extra parameter
'skip_capture_on_commit_callbacks'.
It should be set to 'True' when making a call with either
'verify_action' or 'capture_send_event_calls' as context manager
because they already have 'self.captureOnCommitCallbacks'
(See the comment in 'capture_send_event_calls').
For all other cases, we should call 'send_huddle_message' with
'self.captureOnCommitCallbacks' for 'send_event_on_commit' or/and
'queue_event_on_commit' to work.
"""
if skip_capture_on_commit_callbacks:
message_id = super().send_huddle_message(
from_user,
to_users,
content,
read_by_sender=read_by_sender,
)
else:
with self.captureOnCommitCallbacks(execute=True):
message_id = super().send_huddle_message(
from_user,
to_users,
content,
read_by_sender=read_by_sender,
)
return message_id
@override
def send_stream_message(
self,
sender: UserProfile,
stream_name: str,
content: str = "test content",
topic_name: str = "test",
recipient_realm: Optional[Realm] = None,
*,
allow_unsubscribed_sender: bool = False,
read_by_sender: bool = True,
skip_capture_on_commit_callbacks: bool = False,
) -> int:
"""This function is a wrapper on 'send_stream_message',
defined in 'ZulipTestCaseMixin' with an extra parameter
'skip_capture_on_commit_callbacks'.
It should be set to 'True' when making a call with either
'verify_action' or 'capture_send_event_calls' as context manager
because they already have 'self.captureOnCommitCallbacks'
(See the comment in 'capture_send_event_calls').
For all other cases, we should call 'send_stream_message' with
'self.captureOnCommitCallbacks' for 'send_event_on_commit' or/and
'queue_event_on_commit' to work.
"""
if skip_capture_on_commit_callbacks:
message_id = super().send_stream_message(
sender,
stream_name,
content,
topic_name,
recipient_realm,
allow_unsubscribed_sender=allow_unsubscribed_sender,
read_by_sender=read_by_sender,
)
else:
with self.captureOnCommitCallbacks(execute=True):
message_id = super().send_stream_message(
sender,
stream_name,
content,
topic_name,
recipient_realm,
allow_unsubscribed_sender=allow_unsubscribed_sender,
read_by_sender=read_by_sender,
)
return message_id
def get_row_ids_in_all_tables() -> Iterator[Tuple[str, Set[int]]]:
all_models = apps.get_models(include_auto_created=True)

View File

@ -78,6 +78,7 @@ def get_topic_mutes(
]
@transaction.atomic(savepoint=False)
def set_topic_visibility_policy(
user_profile: UserProfile,
topics: List[List[str]],
@ -125,7 +126,6 @@ def get_topic_visibility_policy(
return visibility_policy
@transaction.atomic(savepoint=False)
def bulk_set_user_topic_visibility_policy_in_database(
user_profiles: List[UserProfile],
stream_id: int,

View File

@ -1154,9 +1154,10 @@ class MissedMessageHookTest(ZulipTestCase):
def test_disable_external_notifications(self) -> None:
# The disable_external_notifications parameter, used for messages sent by welcome bot,
# should result in no email/push notifications being sent regardless of the message type.
msg_id = internal_send_private_message(
self.iago, self.user_profile, "Test Content", disable_external_notifications=True
)
with self.captureOnCommitCallbacks(execute=True):
msg_id = internal_send_private_message(
self.iago, self.user_profile, "Test Content", disable_external_notifications=True
)
assert msg_id is not None
with mock.patch("zerver.tornado.event_queue.maybe_enqueue_notifications") as mock_enqueue:
missedmessage_hook(self.user_profile.id, self.client_descriptor, True)

View File

@ -341,16 +341,17 @@ class GetEventsTest(ZulipTestCase):
self.assert_length(events, 0)
local_id = "10.01"
check_send_message(
sender=user_profile,
client=get_client("whatever"),
recipient_type_name="private",
message_to=[recipient_email],
topic_name=None,
message_content="hello",
local_id=local_id,
sender_queue_id=queue_id,
)
with self.captureOnCommitCallbacks(execute=True):
check_send_message(
sender=user_profile,
client=get_client("whatever"),
recipient_type_name="private",
message_to=[recipient_email],
topic_name=None,
message_content="hello",
local_id=local_id,
sender_queue_id=queue_id,
)
result = self.tornado_call(
get_events,
@ -374,16 +375,17 @@ class GetEventsTest(ZulipTestCase):
last_event_id = events[0]["id"]
local_id = "10.02"
check_send_message(
sender=user_profile,
client=get_client("whatever"),
recipient_type_name="private",
message_to=[recipient_email],
topic_name=None,
message_content="hello",
local_id=local_id,
sender_queue_id=queue_id,
)
with self.captureOnCommitCallbacks(execute=True):
check_send_message(
sender=user_profile,
client=get_client("whatever"),
recipient_type_name="private",
message_to=[recipient_email],
topic_name=None,
message_content="hello",
local_id=local_id,
sender_queue_id=queue_id,
)
result = self.tornado_call(
get_events,

View File

@ -481,7 +481,12 @@ class NormalActionsTest(BaseAction):
for i in range(3):
content = "mentioning... @**" + user.full_name + "** hello " + str(i)
with self.verify_action():
self.send_stream_message(self.example_user("cordelia"), "Verona", content)
self.send_stream_message(
self.example_user("cordelia"),
"Verona",
content,
skip_capture_on_commit_callbacks=True,
)
def test_automatically_follow_topic_where_mentioned(self) -> None:
user = self.example_user("hamlet")
@ -509,24 +514,42 @@ class NormalActionsTest(BaseAction):
for i in range(3):
content = "mentioning... @**" + user.full_name + "** hello " + str(i)
with self.verify_action(num_events=get_num_events()):
self.send_stream_message(self.example_user("cordelia"), "Verona", content)
self.send_stream_message(
self.example_user("cordelia"),
"Verona",
content,
skip_capture_on_commit_callbacks=True,
)
def test_topic_wildcard_mentioned_send_message_events(self) -> None:
for i in range(3):
content = "mentioning... @**topic** hello " + str(i)
with self.verify_action():
self.send_stream_message(self.example_user("cordelia"), "Verona", content)
self.send_stream_message(
self.example_user("cordelia"),
"Verona",
content,
skip_capture_on_commit_callbacks=True,
)
def test_stream_wildcard_mentioned_send_message_events(self) -> None:
for i in range(3):
content = "mentioning... @**all** hello " + str(i)
with self.verify_action():
self.send_stream_message(self.example_user("cordelia"), "Verona", content)
self.send_stream_message(
self.example_user("cordelia"),
"Verona",
content,
skip_capture_on_commit_callbacks=True,
)
def test_pm_send_message_events(self) -> None:
with self.verify_action():
self.send_personal_message(
self.example_user("cordelia"), self.example_user("hamlet"), "hola"
self.example_user("cordelia"),
self.example_user("hamlet"),
"hola",
skip_capture_on_commit_callbacks=True,
)
# Verify direct message editing - content only edit
@ -571,7 +594,9 @@ class NormalActionsTest(BaseAction):
self.example_user("othello"),
]
with self.verify_action():
self.send_huddle_message(self.example_user("cordelia"), huddle, "hola")
self.send_huddle_message(
self.example_user("cordelia"), huddle, "hola", skip_capture_on_commit_callbacks=True
)
def test_user_creation_events_on_sending_messages(self) -> None:
self.set_up_db_for_testing_user_access()
@ -584,24 +609,28 @@ class NormalActionsTest(BaseAction):
# for bots as they can access all the bots.
bot = self.create_test_bot("test2", cordelia, full_name="Test bot")
with self.verify_action(num_events=1) as events:
self.send_personal_message(bot, polonius, "hola")
self.send_personal_message(bot, polonius, "hola", skip_capture_on_commit_callbacks=True)
check_direct_message("events[0]", events[0])
with self.verify_action(num_events=2) as events:
self.send_personal_message(cordelia, polonius, "hola")
check_direct_message("events[0]", events[0])
check_realm_user_add("events[1]", events[1])
self.assertEqual(events[1]["person"]["user_id"], cordelia.id)
self.send_personal_message(
cordelia, polonius, "hola", skip_capture_on_commit_callbacks=True
)
check_realm_user_add("events[0]", events[0])
check_direct_message("events[1]", events[1])
self.assertEqual(events[0]["person"]["user_id"], cordelia.id)
othello = self.example_user("othello")
desdemona = self.example_user("desdemona")
with self.verify_action(num_events=3) as events:
self.send_huddle_message(othello, [polonius, desdemona, bot], "hola")
check_direct_message("events[0]", events[0])
self.send_huddle_message(
othello, [polonius, desdemona, bot], "hola", skip_capture_on_commit_callbacks=True
)
check_realm_user_add("events[0]", events[0])
check_realm_user_add("events[1]", events[1])
check_realm_user_add("events[2]", events[2])
user_creation_user_ids = {events[1]["person"]["user_id"], events[2]["person"]["user_id"]}
check_direct_message("events[2]", events[2])
user_creation_user_ids = {events[0]["person"]["user_id"], events[1]["person"]["user_id"]}
self.assertEqual(user_creation_user_ids, {othello.id, desdemona.id})
def test_stream_send_message_events(self) -> None:
@ -687,7 +716,9 @@ class NormalActionsTest(BaseAction):
# Three events are generated:
# 2 for following the topic and 1 for the message sent.
with self.verify_action(client_gravatar=False, num_events=3) as events:
self.send_stream_message(hamlet, "Verona", "hello", "topic")
self.send_stream_message(
hamlet, "Verona", "hello", "topic", skip_capture_on_commit_callbacks=True
)
verify_events_generated_and_reset_visibility_policy(events, "Verona", "topic")
# action: initiation
@ -714,7 +745,13 @@ class NormalActionsTest(BaseAction):
# Three events are generated:
# 2 for following the topic and 1 for the message sent.
with self.verify_action(client_gravatar=False, num_events=3) as events:
self.send_stream_message(hamlet, "Denmark", "hello", f"new topic {index}")
self.send_stream_message(
hamlet,
"Denmark",
"hello",
f"new topic {index}",
skip_capture_on_commit_callbacks=True,
)
verify_events_generated_and_reset_visibility_policy(
events, "Denmark", f"new topic {index}"
)
@ -744,7 +781,9 @@ class NormalActionsTest(BaseAction):
# Three events are generated:
# 2 for unmuting the topic and 1 for the message sent.
with self.verify_action(client_gravatar=False, num_events=3) as events:
self.send_stream_message(hamlet, "core team", "hello", "topic")
self.send_stream_message(
hamlet, "core team", "hello", "topic", skip_capture_on_commit_callbacks=True
)
verify_events_generated_and_reset_visibility_policy(events, "core team", "topic")
# If current_visibility_policy is already set to the value the policies would set.
@ -762,7 +801,9 @@ class NormalActionsTest(BaseAction):
)
# 1 event for the message sent
with self.verify_action(client_gravatar=False, num_events=1) as events:
self.send_stream_message(hamlet, "core team", "hello", "new Topic")
self.send_stream_message(
hamlet, "core team", "hello", "new Topic", skip_capture_on_commit_callbacks=True
)
do_change_user_setting(
user_profile=hamlet,
@ -772,7 +813,9 @@ class NormalActionsTest(BaseAction):
)
# Only one message event is generated
with self.verify_action(client_gravatar=True) as events:
self.send_stream_message(hamlet, "core team", "hello")
self.send_stream_message(
hamlet, "core team", "hello", skip_capture_on_commit_callbacks=True
)
# event-type: message
check_message("events[0]", events[0])
assert isinstance(events[0]["message"]["avatar_url"], str)
@ -785,7 +828,9 @@ class NormalActionsTest(BaseAction):
)
with self.verify_action(client_gravatar=True) as events:
self.send_stream_message(hamlet, "core team", "hello")
self.send_stream_message(
hamlet, "core team", "hello", skip_capture_on_commit_callbacks=True
)
check_message("events[0]", events[0])
assert events[0]["message"]["avatar_url"] is None
@ -800,7 +845,9 @@ class NormalActionsTest(BaseAction):
visibility_policy=UserTopic.VisibilityPolicy.UNMUTED,
)
with self.verify_action(state_change_expected=True):
self.send_stream_message(self.example_user("aaron"), "Verona", "hello")
self.send_stream_message(
self.example_user("aaron"), "Verona", "hello", skip_capture_on_commit_callbacks=True
)
def test_stream_update_message_events(self) -> None:
iago = self.example_user("iago")
@ -1032,7 +1079,9 @@ class NormalActionsTest(BaseAction):
"hello 1",
)
with self.verify_action(state_change_expected=True):
self.send_stream_message(sender, "Verona", "hello 2")
self.send_stream_message(
sender, "Verona", "hello 2", skip_capture_on_commit_callbacks=True
)
def test_events_for_message_from_inaccessible_sender(self) -> None:
reset_email_visibility_to_everyone_in_zulip_realm()
@ -1042,7 +1091,11 @@ class NormalActionsTest(BaseAction):
with self.verify_action() as events:
self.send_stream_message(
othello, "test_stream1", "hello 2", allow_unsubscribed_sender=True
othello,
"test_stream1",
"hello 2",
allow_unsubscribed_sender=True,
skip_capture_on_commit_callbacks=True,
)
check_message("events[0]", events[0])
message_obj = events[0]["message"]
@ -1053,7 +1106,11 @@ class NormalActionsTest(BaseAction):
iago = self.example_user("iago")
with self.verify_action() as events:
self.send_stream_message(
iago, "test_stream1", "hello 2", allow_unsubscribed_sender=True
iago,
"test_stream1",
"hello 2",
allow_unsubscribed_sender=True,
skip_capture_on_commit_callbacks=True,
)
check_message("events[0]", events[0])
message_obj = events[0]["message"]
@ -1483,20 +1540,20 @@ class NormalActionsTest(BaseAction):
self.register("test1@zulip.com", "test1")
self.assert_length(events, 5)
check_realm_user_add("events[1]", events[1])
check_realm_user_add("events[0]", events[0])
new_user_profile = get_user_by_delivery_email("test1@zulip.com", self.user_profile.realm)
self.assertEqual(new_user_profile.delivery_email, "test1@zulip.com")
check_subscription_peer_add("events[4]", events[4])
check_subscription_peer_add("events[3]", events[3])
check_message("events[0]", events[0])
check_message("events[4]", events[4])
self.assertIn(
f'data-user-id="{new_user_profile.id}">test1_zulip.com</span> joined this organization.',
events[0]["message"]["content"],
events[4]["message"]["content"],
)
check_user_group_add_members("events[1]", events[1])
check_user_group_add_members("events[2]", events[2])
check_user_group_add_members("events[3]", events[3])
def test_register_events_email_address_visibility(self) -> None:
realm_user_default = RealmUserDefault.objects.get(realm=self.user_profile.realm)
@ -1513,20 +1570,20 @@ class NormalActionsTest(BaseAction):
with self.verify_action(num_events=5) as events:
self.register("test1@zulip.com", "test1")
self.assert_length(events, 5)
check_realm_user_add("events[1]", events[1])
check_realm_user_add("events[0]", events[0])
new_user_profile = get_user_by_delivery_email("test1@zulip.com", self.user_profile.realm)
self.assertEqual(new_user_profile.email, f"user{new_user_profile.id}@zulip.testserver")
check_subscription_peer_add("events[4]", events[4])
check_subscription_peer_add("events[3]", events[3])
check_message("events[0]", events[0])
check_message("events[4]", events[4])
self.assertIn(
f'data-user-id="{new_user_profile.id}">test1_zulip.com</span> joined this organization',
events[0]["message"]["content"],
events[4]["message"]["content"],
)
check_user_group_add_members("events[1]", events[1])
check_user_group_add_members("events[2]", events[2])
check_user_group_add_members("events[3]", events[3])
def test_register_events_for_restricted_users(self) -> None:
self.set_up_db_for_testing_user_access()
@ -3146,7 +3203,13 @@ class NormalActionsTest(BaseAction):
assert url is not None
body = f"First message ...[zulip.txt](http://{hamlet.realm.host}" + url + ")"
with self.verify_action(num_events=2) as events:
self.send_stream_message(self.example_user("hamlet"), "Denmark", body, "test")
self.send_stream_message(
self.example_user("hamlet"),
"Denmark",
body,
"test",
skip_capture_on_commit_callbacks=True,
)
check_attachment_update("events[0]", events[0])
self.assertEqual(events[0]["upload_space_used"], 6)

View File

@ -382,7 +382,7 @@ class PreviewTestCase(ZulipTestCase):
url = "http://test.org/"
# Ensure the cache for this is empty
cache_delete(preview_url_cache_key(url))
with mock_queue_publish("zerver.actions.message_send.queue_json_publish") as patched:
with mock_queue_publish("zerver.actions.message_send.queue_event_on_commit") as patched:
msg_id = self.send_personal_message(
sender,
self.example_user("cordelia"),
@ -425,7 +425,7 @@ class PreviewTestCase(ZulipTestCase):
self.login_user(user)
original_url = "http://test.org/"
edited_url = "http://edited.org/"
with mock_queue_publish("zerver.actions.message_send.queue_json_publish") as patched:
with mock_queue_publish("zerver.actions.message_send.queue_event_on_commit") as patched:
msg_id = self.send_stream_message(
user, "Denmark", topic_name="foo", content=original_url
)
@ -490,7 +490,7 @@ class PreviewTestCase(ZulipTestCase):
user = self.example_user("hamlet")
self.login_user(user)
url = "http://test.org/"
with mock_queue_publish("zerver.actions.message_send.queue_json_publish") as patched:
with mock_queue_publish("zerver.actions.message_send.queue_event_on_commit") as patched:
msg_id = self.send_stream_message(user, "Denmark", topic_name="foo", content=url)
patched.assert_called_once()
queue = patched.call_args[0][0]
@ -565,7 +565,7 @@ class PreviewTestCase(ZulipTestCase):
user = self.example_user("hamlet")
self.login_user(user)
url = "http://test.org/"
with mock_queue_publish("zerver.actions.message_send.queue_json_publish") as patched:
with mock_queue_publish("zerver.actions.message_send.queue_event_on_commit") as patched:
msg_id = self.send_stream_message(user, "Denmark", topic_name="foo", content=url)
patched.assert_called_once()
queue = patched.call_args[0][0]
@ -602,7 +602,7 @@ class PreviewTestCase(ZulipTestCase):
@override_settings(INLINE_URL_EMBED_PREVIEW=True)
def test_inline_relative_url_embed_preview(self) -> None:
# Relative URLs should not be sent for URL preview.
with mock_queue_publish("zerver.actions.message_send.queue_json_publish") as patched:
with mock_queue_publish("zerver.actions.message_send.queue_event_on_commit") as patched:
self.send_personal_message(
self.example_user("prospero"),
self.example_user("cordelia"),
@ -661,7 +661,7 @@ class PreviewTestCase(ZulipTestCase):
user = self.example_user("hamlet")
self.login_user(user)
url = "http://test.org/audio.mp3"
with mock_queue_publish("zerver.actions.message_send.queue_json_publish") as patched:
with mock_queue_publish("zerver.actions.message_send.queue_event_on_commit") as patched:
msg_id = self.send_stream_message(user, "Denmark", topic_name="foo", content=url)
patched.assert_called_once()
queue = patched.call_args[0][0]
@ -693,7 +693,7 @@ class PreviewTestCase(ZulipTestCase):
user = self.example_user("hamlet")
self.login_user(user)
url = "http://test.org/foo.html"
with mock_queue_publish("zerver.actions.message_send.queue_json_publish") as patched:
with mock_queue_publish("zerver.actions.message_send.queue_event_on_commit") as patched:
msg_id = self.send_stream_message(user, "Denmark", topic_name="foo", content=url)
patched.assert_called_once()
queue = patched.call_args[0][0]
@ -729,7 +729,7 @@ class PreviewTestCase(ZulipTestCase):
user = self.example_user("hamlet")
self.login_user(user)
url = "http://test.org/foo.html"
with mock_queue_publish("zerver.actions.message_send.queue_json_publish") as patched:
with mock_queue_publish("zerver.actions.message_send.queue_event_on_commit") as patched:
msg_id = self.send_stream_message(user, "Denmark", topic_name="foo", content=url)
patched.assert_called_once()
queue = patched.call_args[0][0]
@ -770,7 +770,7 @@ class PreviewTestCase(ZulipTestCase):
user = self.example_user("hamlet")
self.login_user(user)
url = "http://test.org/foo.html"
with mock_queue_publish("zerver.actions.message_send.queue_json_publish") as patched:
with mock_queue_publish("zerver.actions.message_send.queue_event_on_commit") as patched:
msg_id = self.send_stream_message(user, "Denmark", topic_name="foo", content=url)
patched.assert_called_once()
queue = patched.call_args[0][0]
@ -808,7 +808,7 @@ class PreviewTestCase(ZulipTestCase):
user = self.example_user("hamlet")
self.login_user(user)
url = "http://test.org/"
with mock_queue_publish("zerver.actions.message_send.queue_json_publish") as patched:
with mock_queue_publish("zerver.actions.message_send.queue_event_on_commit") as patched:
msg_id = self.send_stream_message(user, "Denmark", topic_name="foo", content=url)
patched.assert_called_once()
queue = patched.call_args[0][0]
@ -836,7 +836,7 @@ class PreviewTestCase(ZulipTestCase):
@override_settings(INLINE_URL_EMBED_PREVIEW=True)
def test_valid_content_type_error_get_data(self) -> None:
url = "http://test.org/"
with mock_queue_publish("zerver.actions.message_send.queue_json_publish"):
with mock_queue_publish("zerver.actions.message_send.queue_event_on_commit"):
msg_id = self.send_personal_message(
self.example_user("hamlet"),
self.example_user("cordelia"),
@ -881,7 +881,7 @@ class PreviewTestCase(ZulipTestCase):
def test_invalid_url(self) -> None:
url = "http://test.org/"
error_url = "http://test.org/x"
with mock_queue_publish("zerver.actions.message_send.queue_json_publish"):
with mock_queue_publish("zerver.actions.message_send.queue_event_on_commit"):
msg_id = self.send_personal_message(
self.example_user("hamlet"),
self.example_user("cordelia"),
@ -918,7 +918,7 @@ class PreviewTestCase(ZulipTestCase):
@override_settings(INLINE_URL_EMBED_PREVIEW=True)
def test_safe_oembed_html_url(self) -> None:
url = "http://test.org/"
with mock_queue_publish("zerver.actions.message_send.queue_json_publish"):
with mock_queue_publish("zerver.actions.message_send.queue_event_on_commit"):
msg_id = self.send_personal_message(
self.example_user("hamlet"),
self.example_user("cordelia"),
@ -960,7 +960,7 @@ class PreviewTestCase(ZulipTestCase):
@override_settings(INLINE_URL_EMBED_PREVIEW=True)
def test_youtube_url_title_replaces_url(self) -> None:
url = "https://www.youtube.com/watch?v=eSJTXC7Ixgg"
with mock_queue_publish("zerver.actions.message_send.queue_json_publish"):
with mock_queue_publish("zerver.actions.message_send.queue_event_on_commit"):
msg_id = self.send_personal_message(
self.example_user("hamlet"),
self.example_user("cordelia"),
@ -998,7 +998,7 @@ class PreviewTestCase(ZulipTestCase):
@override_settings(INLINE_URL_EMBED_PREVIEW=True)
def test_custom_title_replaces_youtube_url_title(self) -> None:
url = "[YouTube link](https://www.youtube.com/watch?v=eSJTXC7Ixgg)"
with mock_queue_publish("zerver.actions.message_send.queue_json_publish"):
with mock_queue_publish("zerver.actions.message_send.queue_event_on_commit"):
msg_id = self.send_personal_message(
self.example_user("hamlet"),
self.example_user("cordelia"),

View File

@ -192,8 +192,8 @@ class MessageMoveTopicTest(ZulipTestCase):
users_to_be_notified = list(map(notify, [hamlet.id]))
do_update_message_topic_success(hamlet, message, "Change again", users_to_be_notified)
@mock.patch("zerver.actions.user_topics.send_event")
def test_edit_muted_topic(self, mock_send_event: mock.MagicMock) -> None:
@mock.patch("zerver.actions.user_topics.send_event_on_commit")
def test_edit_muted_topic(self, mock_send_event_on_commit: mock.MagicMock) -> None:
stream_name = "Stream 123"
stream = self.make_stream(stream_name)
hamlet = self.example_user("hamlet")
@ -273,7 +273,7 @@ class MessageMoveTopicTest(ZulipTestCase):
# Here we assert that the expected users are notified properly.
users_notified_via_muted_topics_event: List[int] = []
users_notified_via_user_topic_event: List[int] = []
for call_args in mock_send_event.call_args_list:
for call_args in mock_send_event_on_commit.call_args_list:
(arg_realm, arg_event, arg_notified_users) = call_args[0]
if arg_event["type"] == "user_topic":
users_notified_via_user_topic_event.append(*arg_notified_users)
@ -459,8 +459,8 @@ class MessageMoveTopicTest(ZulipTestCase):
assert_is_topic_muted(cordelia, new_public_stream.id, "final topic name", muted=False)
assert_is_topic_muted(aaron, new_public_stream.id, "final topic name", muted=False)
@mock.patch("zerver.actions.user_topics.send_event")
def test_edit_unmuted_topic(self, mock_send_event: mock.MagicMock) -> None:
@mock.patch("zerver.actions.user_topics.send_event_on_commit")
def test_edit_unmuted_topic(self, mock_send_event_on_commit: mock.MagicMock) -> None:
stream_name = "Stream 123"
stream = self.make_stream(stream_name)
@ -535,7 +535,7 @@ class MessageMoveTopicTest(ZulipTestCase):
# Here we assert that the expected users are notified properly.
users_notified_via_muted_topics_event: List[int] = []
users_notified_via_user_topic_event: List[int] = []
for call_args in mock_send_event.call_args_list:
for call_args in mock_send_event_on_commit.call_args_list:
(arg_realm, arg_event, arg_notified_users) = call_args[0]
if arg_event["type"] == "user_topic":
users_notified_via_user_topic_event.append(*arg_notified_users)

View File

@ -1769,6 +1769,7 @@ class StreamMessagesTest(ZulipTestCase):
user,
stream_name,
content=content,
skip_capture_on_commit_callbacks=True,
)
users = events[0]["users"]
user_ids = {u["id"] for u in users}

View File

@ -480,8 +480,10 @@ class TestServiceBotEventTriggers(ZulipTestCase):
)
@for_all_bot_types
@patch_queue_publish("zerver.actions.message_send.queue_json_publish")
def test_trigger_on_stream_mention_from_user(self, mock_queue_json_publish: mock.Mock) -> None:
@patch_queue_publish("zerver.actions.message_send.queue_event_on_commit")
def test_trigger_on_stream_mention_from_user(
self, mock_queue_event_on_commit: mock.Mock
) -> None:
content = "@**FooBot** foo bar!!!"
recipient = "Denmark"
trigger = "mention"
@ -501,31 +503,31 @@ class TestServiceBotEventTriggers(ZulipTestCase):
self.assertEqual(trigger_event["trigger"], trigger)
self.assertEqual(trigger_event["user_profile_id"], self.bot_profile.id)
mock_queue_json_publish.side_effect = check_values_passed
mock_queue_event_on_commit.side_effect = check_values_passed
self.send_stream_message(self.user_profile, "Denmark", content)
self.assertTrue(mock_queue_json_publish.called)
self.assertTrue(mock_queue_event_on_commit.called)
@patch_queue_publish("zerver.actions.message_send.queue_json_publish")
@patch_queue_publish("zerver.actions.message_send.queue_event_on_commit")
def test_no_trigger_on_stream_message_without_mention(
self, mock_queue_json_publish: mock.Mock
self, mock_queue_event_on_commit: mock.Mock
) -> None:
sender = self.user_profile
self.send_stream_message(sender, "Denmark")
self.assertFalse(mock_queue_json_publish.called)
self.assertFalse(mock_queue_event_on_commit.called)
@for_all_bot_types
@patch_queue_publish("zerver.actions.message_send.queue_json_publish")
@patch_queue_publish("zerver.actions.message_send.queue_event_on_commit")
def test_no_trigger_on_stream_mention_from_bot(
self, mock_queue_json_publish: mock.Mock
self, mock_queue_event_on_commit: mock.Mock
) -> None:
self.send_stream_message(self.second_bot_profile, "Denmark", "@**FooBot** foo bar!!!")
self.assertFalse(mock_queue_json_publish.called)
self.assertFalse(mock_queue_event_on_commit.called)
@for_all_bot_types
@patch_queue_publish("zerver.actions.message_send.queue_json_publish")
@patch_queue_publish("zerver.actions.message_send.queue_event_on_commit")
def test_trigger_on_personal_message_from_user(
self, mock_queue_json_publish: mock.Mock
self, mock_queue_event_on_commit: mock.Mock
) -> None:
sender = self.user_profile
recipient = self.bot_profile
@ -547,24 +549,26 @@ class TestServiceBotEventTriggers(ZulipTestCase):
self.assertTrue(sender.email in display_recipients)
self.assertTrue(recipient.email in display_recipients)
mock_queue_json_publish.side_effect = check_values_passed
mock_queue_event_on_commit.side_effect = check_values_passed
self.send_personal_message(sender, recipient, "test")
self.assertTrue(mock_queue_json_publish.called)
self.assertTrue(mock_queue_event_on_commit.called)
@for_all_bot_types
@patch_queue_publish("zerver.actions.message_send.queue_json_publish")
@patch_queue_publish("zerver.actions.message_send.queue_event_on_commit")
def test_no_trigger_on_personal_message_from_bot(
self, mock_queue_json_publish: mock.Mock
self, mock_queue_event_on_commit: mock.Mock
) -> None:
sender = self.second_bot_profile
recipient = self.bot_profile
self.send_personal_message(sender, recipient)
self.assertFalse(mock_queue_json_publish.called)
self.assertFalse(mock_queue_event_on_commit.called)
@for_all_bot_types
@patch_queue_publish("zerver.actions.message_send.queue_json_publish")
def test_trigger_on_huddle_message_from_user(self, mock_queue_json_publish: mock.Mock) -> None:
@patch_queue_publish("zerver.actions.message_send.queue_event_on_commit")
def test_trigger_on_huddle_message_from_user(
self, mock_queue_event_on_commit: mock.Mock
) -> None:
self.second_bot_profile.bot_type = self.bot_profile.bot_type
self.second_bot_profile.save()
@ -585,17 +589,17 @@ class TestServiceBotEventTriggers(ZulipTestCase):
self.assertEqual(trigger_event["message"]["sender_email"], sender.email)
self.assertEqual(trigger_event["message"]["type"], "private")
mock_queue_json_publish.side_effect = check_values_passed
mock_queue_event_on_commit.side_effect = check_values_passed
self.send_huddle_message(sender, recipients, "test")
self.assertEqual(mock_queue_json_publish.call_count, 2)
self.assertEqual(mock_queue_event_on_commit.call_count, 2)
@for_all_bot_types
@patch_queue_publish("zerver.actions.message_send.queue_json_publish")
@patch_queue_publish("zerver.actions.message_send.queue_event_on_commit")
def test_no_trigger_on_huddle_message_from_bot(
self, mock_queue_json_publish: mock.Mock
self, mock_queue_event_on_commit: mock.Mock
) -> None:
sender = self.second_bot_profile
recipients = [self.user_profile, self.bot_profile]
self.send_huddle_message(sender, recipients)
self.assertFalse(mock_queue_json_publish.called)
self.assertFalse(mock_queue_event_on_commit.called)

View File

@ -1512,12 +1512,9 @@ class StreamAdminTest(ZulipTestCase):
self.assertEqual(events[1]["event"]["subscriptions"][0]["stream_id"], stream.id)
self.assertEqual(events[1]["users"], [desdemona.id])
# Send a message there logging the reactivation
self.assertEqual(events[2]["event"]["type"], "message")
# iago (as an admin) gets to know that desdemona (the owner) is now subscribed.
self.assertEqual(
events[3],
events[2],
{
"event": {
"op": "peer_add",
@ -1529,6 +1526,9 @@ class StreamAdminTest(ZulipTestCase):
},
)
# Send a message there logging the reactivation
self.assertEqual(events[3]["event"]["type"], "message")
stream = Stream.objects.get(id=stream.id)
self.assertFalse(stream.deactivated)
self.assertTrue(stream.invite_only)
@ -5056,8 +5056,8 @@ class SubscriptionAPITest(ZulipTestCase):
# Verify that peer_event events are never sent in Zephyr
# realm. This does generate stream creation events from
# send_stream_creation_events_for_previously_inaccessible_streams.
with self.capture_send_event_calls(expected_num_events=num_streams + 1) as events:
with self.assert_database_query_count(num_streams + 11):
with self.assert_database_query_count(num_streams + 11):
with self.capture_send_event_calls(expected_num_events=num_streams + 1) as events:
self.common_subscribe_to_streams(
mit_user,
stream_names,