merge_streams: Move logic to zerver/actions/streams.py.

This commit is contained in:
Adam Sah 2022-07-11 02:47:11 -04:00 committed by Tim Abbott
parent 6b4474d4b5
commit cb6382369e
3 changed files with 92 additions and 66 deletions

View File

@ -147,6 +147,65 @@ def do_deactivate_stream(
)
def bulk_delete_cache_keys(message_ids_to_clear: List[int]) -> None:
while len(message_ids_to_clear) > 0:
batch = message_ids_to_clear[0:5000]
keys_to_delete = [to_dict_cache_key_id(message_id) for message_id in batch]
cache_delete_many(keys_to_delete)
message_ids_to_clear = message_ids_to_clear[5000:]
def merge_streams(
realm: Realm, stream_to_keep: Stream, stream_to_destroy: Stream
) -> Tuple[int, int, int]:
recipient_to_destroy = stream_to_destroy.recipient
recipient_to_keep = stream_to_keep.recipient
if recipient_to_destroy.id == recipient_to_keep.id:
return (0, 0, 0)
# The high-level approach here is to move all the messages to
# the surviving stream, deactivate all the subscriptions on
# the stream to be removed and deactivate the stream, and add
# new subscriptions to the stream to keep for any users who
# were only on the now-deactivated stream.
# Move the messages, and delete the old copies from caches.
message_ids_to_clear = list(
Message.objects.filter(recipient=recipient_to_destroy).values_list("id", flat=True)
)
count = Message.objects.filter(recipient=recipient_to_destroy).update(
recipient=recipient_to_keep
)
bulk_delete_cache_keys(message_ids_to_clear)
# Move the Subscription objects. This algorithm doesn't
# preserve any stream settings/colors/etc. from the stream
# being destroyed, but it's convenient.
existing_subs = Subscription.objects.filter(recipient=recipient_to_keep)
users_already_subscribed = {sub.user_profile_id: sub.active for sub in existing_subs}
subs_to_deactivate = Subscription.objects.filter(recipient=recipient_to_destroy, active=True)
users_to_activate = [
sub.user_profile
for sub in subs_to_deactivate
if not users_already_subscribed.get(sub.user_profile_id, False)
]
if len(subs_to_deactivate) > 0:
bulk_remove_subscriptions(
realm,
[sub.user_profile for sub in subs_to_deactivate],
[stream_to_destroy],
acting_user=None,
)
do_deactivate_stream(stream_to_destroy, acting_user=None)
if len(users_to_activate) > 0:
bulk_add_subscriptions(realm, [stream_to_keep], users_to_activate, acting_user=None)
return (len(users_to_activate), count, len(subs_to_deactivate))
def get_subscriber_ids(
stream: Stream, requesting_user: Optional[UserProfile] = None
) -> "ValuesQuerySet[Subscription, int]":

View File

@ -1,24 +1,9 @@
from argparse import ArgumentParser
from typing import Any, List
from typing import Any
from zerver.actions.streams import (
bulk_add_subscriptions,
bulk_remove_subscriptions,
do_deactivate_stream,
)
from zerver.lib.cache import cache_delete_many, to_dict_cache_key_id
from zerver.actions.streams import merge_streams
from zerver.lib.management import ZulipBaseCommand
from zerver.models import Message, Subscription, get_stream
def bulk_delete_cache_keys(message_ids_to_clear: List[int]) -> None:
while len(message_ids_to_clear) > 0:
batch = message_ids_to_clear[0:5000]
keys_to_delete = [to_dict_cache_key_id(message_id) for message_id in batch]
cache_delete_many(keys_to_delete)
message_ids_to_clear = message_ids_to_clear[5000:]
from zerver.models import get_stream
class Command(ZulipBaseCommand):
@ -36,50 +21,7 @@ class Command(ZulipBaseCommand):
assert realm is not None # Should be ensured by parser
stream_to_keep = get_stream(options["stream_to_keep"], realm)
stream_to_destroy = get_stream(options["stream_to_destroy"], realm)
recipient_to_destroy = stream_to_destroy.recipient
recipient_to_keep = stream_to_keep.recipient
# The high-level approach here is to move all the messages to
# the surviving stream, deactivate all the subscriptions on
# the stream to be removed and deactivate the stream, and add
# new subscriptions to the stream to keep for any users who
# were only on the now-deactivated stream.
# Move the messages, and delete the old copies from caches.
message_ids_to_clear = list(
Message.objects.filter(recipient=recipient_to_destroy).values_list("id", flat=True)
)
count = Message.objects.filter(recipient=recipient_to_destroy).update(
recipient=recipient_to_keep
)
print(f"Moved {count} messages")
bulk_delete_cache_keys(message_ids_to_clear)
# Move the Subscription objects. This algorithm doesn't
# preserve any stream settings/colors/etc. from the stream
# being destroyed, but it's convenient.
existing_subs = Subscription.objects.filter(recipient=recipient_to_keep)
users_already_subscribed = {sub.user_profile_id: sub.active for sub in existing_subs}
subs_to_deactivate = Subscription.objects.filter(
recipient=recipient_to_destroy, active=True
)
users_to_activate = [
sub.user_profile
for sub in subs_to_deactivate
if not users_already_subscribed.get(sub.user_profile_id, False)
]
if len(subs_to_deactivate) > 0:
print(f"Deactivating {len(subs_to_deactivate)} subscriptions")
bulk_remove_subscriptions(
realm,
[sub.user_profile for sub in subs_to_deactivate],
[stream_to_destroy],
acting_user=None,
)
do_deactivate_stream(stream_to_destroy, acting_user=None)
if len(users_to_activate) > 0:
print(f"Adding {len(users_to_activate)} subscriptions")
bulk_add_subscriptions(realm, [stream_to_keep], users_to_activate, acting_user=None)
stats = merge_streams(realm, stream_to_keep, stream_to_destroy)
print(f"Added { stats[0] } subscriptions")
print(f"Moved { stats[1] } messages")
print(f"Deactivated { stats[2] } subscriptions")

View File

@ -20,7 +20,7 @@ from zerver.actions.realm_settings import (
do_set_realm_property,
do_set_realm_user_default_setting,
)
from zerver.actions.streams import do_deactivate_stream
from zerver.actions.streams import do_deactivate_stream, merge_streams
from zerver.lib.realm_description import get_realm_rendered_description, get_realm_text_description
from zerver.lib.send_email import send_future_email
from zerver.lib.streams import create_stream_if_needed
@ -445,6 +445,31 @@ class RealmTest(ZulipTestCase):
do_deactivate_stream(notifications_stream, acting_user=None)
self.assertIsNone(realm.get_notifications_stream())
def test_merge_streams(self) -> None:
realm = get_realm("zulip")
denmark = get_stream("Denmark", realm)
cordelia = self.example_user("cordelia")
notifications_stream = realm.get_notifications_stream()
assert notifications_stream is not None
create_stream_if_needed(realm, "Atlantis")
self.subscribe(cordelia, "Atlantis")
self.send_stream_message(cordelia, "Atlantis")
atlantis = get_stream("Atlantis", realm)
stats = merge_streams(realm, denmark, denmark)
self.assertEqual(stats, (0, 0, 0))
stats = merge_streams(realm, denmark, atlantis)
self.assertEqual(stats, (1, 1, 1))
with self.assertRaises(Stream.DoesNotExist):
get_stream("Atlantis", realm)
stats = merge_streams(realm, denmark, notifications_stream)
self.assertEqual(stats, (2, 1, 10))
self.assertIsNone(realm.get_notifications_stream())
def test_change_signup_notifications_stream(self) -> None:
# We need an admin user.
self.login("iago")