diff --git a/zerver/actions/streams.py b/zerver/actions/streams.py index 205fb02972..3c687d69f1 100644 --- a/zerver/actions/streams.py +++ b/zerver/actions/streams.py @@ -147,6 +147,65 @@ def do_deactivate_stream( ) +def bulk_delete_cache_keys(message_ids_to_clear: List[int]) -> None: + while len(message_ids_to_clear) > 0: + batch = message_ids_to_clear[0:5000] + + keys_to_delete = [to_dict_cache_key_id(message_id) for message_id in batch] + cache_delete_many(keys_to_delete) + + message_ids_to_clear = message_ids_to_clear[5000:] + + +def merge_streams( + realm: Realm, stream_to_keep: Stream, stream_to_destroy: Stream +) -> Tuple[int, int, int]: + recipient_to_destroy = stream_to_destroy.recipient + recipient_to_keep = stream_to_keep.recipient + if recipient_to_destroy.id == recipient_to_keep.id: + return (0, 0, 0) + + # The high-level approach here is to move all the messages to + # the surviving stream, deactivate all the subscriptions on + # the stream to be removed and deactivate the stream, and add + # new subscriptions to the stream to keep for any users who + # were only on the now-deactivated stream. + + # Move the messages, and delete the old copies from caches. + message_ids_to_clear = list( + Message.objects.filter(recipient=recipient_to_destroy).values_list("id", flat=True) + ) + count = Message.objects.filter(recipient=recipient_to_destroy).update( + recipient=recipient_to_keep + ) + bulk_delete_cache_keys(message_ids_to_clear) + + # Move the Subscription objects. This algorithm doesn't + # preserve any stream settings/colors/etc. from the stream + # being destroyed, but it's convenient. + existing_subs = Subscription.objects.filter(recipient=recipient_to_keep) + users_already_subscribed = {sub.user_profile_id: sub.active for sub in existing_subs} + + subs_to_deactivate = Subscription.objects.filter(recipient=recipient_to_destroy, active=True) + users_to_activate = [ + sub.user_profile + for sub in subs_to_deactivate + if not users_already_subscribed.get(sub.user_profile_id, False) + ] + + if len(subs_to_deactivate) > 0: + bulk_remove_subscriptions( + realm, + [sub.user_profile for sub in subs_to_deactivate], + [stream_to_destroy], + acting_user=None, + ) + do_deactivate_stream(stream_to_destroy, acting_user=None) + if len(users_to_activate) > 0: + bulk_add_subscriptions(realm, [stream_to_keep], users_to_activate, acting_user=None) + return (len(users_to_activate), count, len(subs_to_deactivate)) + + def get_subscriber_ids( stream: Stream, requesting_user: Optional[UserProfile] = None ) -> "ValuesQuerySet[Subscription, int]": diff --git a/zerver/management/commands/merge_streams.py b/zerver/management/commands/merge_streams.py index 05949d609f..0005a7801e 100644 --- a/zerver/management/commands/merge_streams.py +++ b/zerver/management/commands/merge_streams.py @@ -1,24 +1,9 @@ from argparse import ArgumentParser -from typing import Any, List +from typing import Any -from zerver.actions.streams import ( - bulk_add_subscriptions, - bulk_remove_subscriptions, - do_deactivate_stream, -) -from zerver.lib.cache import cache_delete_many, to_dict_cache_key_id +from zerver.actions.streams import merge_streams from zerver.lib.management import ZulipBaseCommand -from zerver.models import Message, Subscription, get_stream - - -def bulk_delete_cache_keys(message_ids_to_clear: List[int]) -> None: - while len(message_ids_to_clear) > 0: - batch = message_ids_to_clear[0:5000] - - keys_to_delete = [to_dict_cache_key_id(message_id) for message_id in batch] - cache_delete_many(keys_to_delete) - - message_ids_to_clear = message_ids_to_clear[5000:] +from zerver.models import get_stream class Command(ZulipBaseCommand): @@ -36,50 +21,7 @@ class Command(ZulipBaseCommand): assert realm is not None # Should be ensured by parser stream_to_keep = get_stream(options["stream_to_keep"], realm) stream_to_destroy = get_stream(options["stream_to_destroy"], realm) - - recipient_to_destroy = stream_to_destroy.recipient - recipient_to_keep = stream_to_keep.recipient - - # The high-level approach here is to move all the messages to - # the surviving stream, deactivate all the subscriptions on - # the stream to be removed and deactivate the stream, and add - # new subscriptions to the stream to keep for any users who - # were only on the now-deactivated stream. - - # Move the messages, and delete the old copies from caches. - message_ids_to_clear = list( - Message.objects.filter(recipient=recipient_to_destroy).values_list("id", flat=True) - ) - count = Message.objects.filter(recipient=recipient_to_destroy).update( - recipient=recipient_to_keep - ) - print(f"Moved {count} messages") - bulk_delete_cache_keys(message_ids_to_clear) - - # Move the Subscription objects. This algorithm doesn't - # preserve any stream settings/colors/etc. from the stream - # being destroyed, but it's convenient. - existing_subs = Subscription.objects.filter(recipient=recipient_to_keep) - users_already_subscribed = {sub.user_profile_id: sub.active for sub in existing_subs} - - subs_to_deactivate = Subscription.objects.filter( - recipient=recipient_to_destroy, active=True - ) - users_to_activate = [ - sub.user_profile - for sub in subs_to_deactivate - if not users_already_subscribed.get(sub.user_profile_id, False) - ] - - if len(subs_to_deactivate) > 0: - print(f"Deactivating {len(subs_to_deactivate)} subscriptions") - bulk_remove_subscriptions( - realm, - [sub.user_profile for sub in subs_to_deactivate], - [stream_to_destroy], - acting_user=None, - ) - do_deactivate_stream(stream_to_destroy, acting_user=None) - if len(users_to_activate) > 0: - print(f"Adding {len(users_to_activate)} subscriptions") - bulk_add_subscriptions(realm, [stream_to_keep], users_to_activate, acting_user=None) + stats = merge_streams(realm, stream_to_keep, stream_to_destroy) + print(f"Added { stats[0] } subscriptions") + print(f"Moved { stats[1] } messages") + print(f"Deactivated { stats[2] } subscriptions") diff --git a/zerver/tests/test_realm.py b/zerver/tests/test_realm.py index 0b5aa7bad3..f0c12723e2 100644 --- a/zerver/tests/test_realm.py +++ b/zerver/tests/test_realm.py @@ -20,7 +20,7 @@ from zerver.actions.realm_settings import ( do_set_realm_property, do_set_realm_user_default_setting, ) -from zerver.actions.streams import do_deactivate_stream +from zerver.actions.streams import do_deactivate_stream, merge_streams from zerver.lib.realm_description import get_realm_rendered_description, get_realm_text_description from zerver.lib.send_email import send_future_email from zerver.lib.streams import create_stream_if_needed @@ -445,6 +445,31 @@ class RealmTest(ZulipTestCase): do_deactivate_stream(notifications_stream, acting_user=None) self.assertIsNone(realm.get_notifications_stream()) + def test_merge_streams(self) -> None: + realm = get_realm("zulip") + denmark = get_stream("Denmark", realm) + cordelia = self.example_user("cordelia") + notifications_stream = realm.get_notifications_stream() + assert notifications_stream is not None + + create_stream_if_needed(realm, "Atlantis") + self.subscribe(cordelia, "Atlantis") + self.send_stream_message(cordelia, "Atlantis") + atlantis = get_stream("Atlantis", realm) + + stats = merge_streams(realm, denmark, denmark) + self.assertEqual(stats, (0, 0, 0)) + + stats = merge_streams(realm, denmark, atlantis) + self.assertEqual(stats, (1, 1, 1)) + + with self.assertRaises(Stream.DoesNotExist): + get_stream("Atlantis", realm) + + stats = merge_streams(realm, denmark, notifications_stream) + self.assertEqual(stats, (2, 1, 10)) + self.assertIsNone(realm.get_notifications_stream()) + def test_change_signup_notifications_stream(self) -> None: # We need an admin user. self.login("iago")