streams: Extract code for sending events in bulk_remove_subscriptions.

This is a prep commit such that we can avoid duplicate code when we
unsubscribe bots for inaccessible private streams when changing owner
or reactivating them.
This commit is contained in:
Sahil Batra 2022-05-11 12:12:08 +05:30 committed by Tim Abbott
parent 44266987ec
commit d3468e3f78
1 changed files with 36 additions and 27 deletions

View File

@ -630,6 +630,39 @@ def notify_subscriptions_removed(
SubAndRemovedT = Tuple[List[Tuple[UserProfile, Stream]], List[Tuple[UserProfile, Stream]]]
def send_subscription_remove_events(
realm: Realm,
users: List[UserProfile],
streams: List[Stream],
removed_subs: List[Tuple[UserProfile, Stream]],
) -> None:
altered_user_dict: Dict[int, Set[int]] = defaultdict(set)
streams_by_user: Dict[int, List[Stream]] = defaultdict(list)
for (user, stream) in removed_subs:
streams_by_user[user.id].append(stream)
altered_user_dict[stream.id].add(user.id)
for user_profile in users:
if len(streams_by_user[user_profile.id]) == 0:
continue
notify_subscriptions_removed(realm, user_profile, streams_by_user[user_profile.id])
event = {
"type": "mark_stream_messages_as_read",
"user_profile_id": user_profile.id,
"stream_recipient_ids": [
stream.recipient_id for stream in streams_by_user[user_profile.id]
],
}
queue_json_publish("deferred_work", event)
send_peer_remove_events(
realm=realm,
streams=streams,
altered_user_dict=altered_user_dict,
)
def bulk_remove_subscriptions(
realm: Realm,
users: Iterable[UserProfile],
@ -709,32 +742,8 @@ def bulk_remove_subscriptions(
# Now since we have all log objects generated we can do a bulk insert
RealmAuditLog.objects.bulk_create(all_subscription_logs)
altered_user_dict: Dict[int, Set[int]] = defaultdict(set)
streams_by_user: Dict[int, List[Stream]] = defaultdict(list)
for sub_info in subs_to_deactivate:
stream = sub_info.stream
streams_by_user[sub_info.user.id].append(stream)
altered_user_dict[stream.id].add(sub_info.user.id)
for user_profile in users:
if len(streams_by_user[user_profile.id]) == 0:
continue
notify_subscriptions_removed(realm, user_profile, streams_by_user[user_profile.id])
event = {
"type": "mark_stream_messages_as_read",
"user_profile_id": user_profile.id,
"stream_recipient_ids": [
stream.recipient_id for stream in streams_by_user[user_profile.id]
],
}
queue_json_publish("deferred_work", event)
send_peer_remove_events(
realm=realm,
streams=streams,
altered_user_dict=altered_user_dict,
)
removed_sub_tuples = [(sub_info.user, sub_info.stream) for sub_info in subs_to_deactivate]
send_subscription_remove_events(realm, users, streams, removed_sub_tuples)
new_vacant_streams = set(streams_to_unsubscribe) - set(occupied_streams_after)
new_vacant_private_streams = [stream for stream in new_vacant_streams if stream.invite_only]
@ -745,7 +754,7 @@ def bulk_remove_subscriptions(
do_deactivate_stream(stream, acting_user=acting_user)
return (
[(sub_info.user, sub_info.stream) for sub_info in subs_to_deactivate],
removed_sub_tuples,
not_subscribed,
)