diff --git a/zerver/actions/streams.py b/zerver/actions/streams.py index 82f735faec..ac5b88c82b 100644 --- a/zerver/actions/streams.py +++ b/zerver/actions/streams.py @@ -630,6 +630,39 @@ def notify_subscriptions_removed( SubAndRemovedT = Tuple[List[Tuple[UserProfile, Stream]], List[Tuple[UserProfile, Stream]]] +def send_subscription_remove_events( + realm: Realm, + users: List[UserProfile], + streams: List[Stream], + removed_subs: List[Tuple[UserProfile, Stream]], +) -> None: + altered_user_dict: Dict[int, Set[int]] = defaultdict(set) + streams_by_user: Dict[int, List[Stream]] = defaultdict(list) + for (user, stream) in removed_subs: + streams_by_user[user.id].append(stream) + altered_user_dict[stream.id].add(user.id) + + for user_profile in users: + if len(streams_by_user[user_profile.id]) == 0: + continue + notify_subscriptions_removed(realm, user_profile, streams_by_user[user_profile.id]) + + event = { + "type": "mark_stream_messages_as_read", + "user_profile_id": user_profile.id, + "stream_recipient_ids": [ + stream.recipient_id for stream in streams_by_user[user_profile.id] + ], + } + queue_json_publish("deferred_work", event) + + send_peer_remove_events( + realm=realm, + streams=streams, + altered_user_dict=altered_user_dict, + ) + + def bulk_remove_subscriptions( realm: Realm, users: Iterable[UserProfile], @@ -709,32 +742,8 @@ def bulk_remove_subscriptions( # Now since we have all log objects generated we can do a bulk insert RealmAuditLog.objects.bulk_create(all_subscription_logs) - altered_user_dict: Dict[int, Set[int]] = defaultdict(set) - streams_by_user: Dict[int, List[Stream]] = defaultdict(list) - for sub_info in subs_to_deactivate: - stream = sub_info.stream - streams_by_user[sub_info.user.id].append(stream) - altered_user_dict[stream.id].add(sub_info.user.id) - - for user_profile in users: - if len(streams_by_user[user_profile.id]) == 0: - continue - notify_subscriptions_removed(realm, user_profile, streams_by_user[user_profile.id]) - - event = { - "type": "mark_stream_messages_as_read", - "user_profile_id": user_profile.id, - "stream_recipient_ids": [ - stream.recipient_id for stream in streams_by_user[user_profile.id] - ], - } - queue_json_publish("deferred_work", event) - - send_peer_remove_events( - realm=realm, - streams=streams, - altered_user_dict=altered_user_dict, - ) + removed_sub_tuples = [(sub_info.user, sub_info.stream) for sub_info in subs_to_deactivate] + send_subscription_remove_events(realm, users, streams, removed_sub_tuples) new_vacant_streams = set(streams_to_unsubscribe) - set(occupied_streams_after) new_vacant_private_streams = [stream for stream in new_vacant_streams if stream.invite_only] @@ -745,7 +754,7 @@ def bulk_remove_subscriptions( do_deactivate_stream(stream, acting_user=acting_user) return ( - [(sub_info.user, sub_info.stream) for sub_info in subs_to_deactivate], + removed_sub_tuples, not_subscribed, )