zulip/zerver/management/commands/merge_streams.py

87 lines
3.9 KiB
Python

from __future__ import absolute_import
from __future__ import print_function
from typing import Any, List
from django.core.management.base import BaseCommand
from zerver.lib.actions import bulk_remove_subscriptions, bulk_add_subscriptions, \
do_deactivate_stream
from zerver.lib.cache import cache_delete_many, to_dict_cache_key_id
from zerver.lib.str_utils import force_text
from zerver.models import Realm, get_realm, get_stream, Subscription, \
Recipient, get_recipient, Message
from argparse import ArgumentParser
import sys
def bulk_delete_cache_keys(message_ids_to_clear):
# type: (List[int]) -> None
while len(message_ids_to_clear) > 0:
batch = message_ids_to_clear[0:5000]
keys_to_delete = [to_dict_cache_key_id(message_id, True) for message_id in batch]
cache_delete_many(keys_to_delete)
keys_to_delete = [to_dict_cache_key_id(message_id, False) for message_id in batch]
cache_delete_many(keys_to_delete)
message_ids_to_clear = message_ids_to_clear[5000:]
class Command(BaseCommand):
help = """Merge two streams."""
def add_arguments(self, parser):
# type: (ArgumentParser) -> None
parser.add_argument('realm', metavar='<realm>', type=str,
help='realm in which to merge the streams')
parser.add_argument('stream_to_keep', type=str,
help='name of stream to keep')
parser.add_argument('stream_to_destroy', type=str,
help='name of stream to merge into the stream being kept')
def handle(self, *args, **options):
# type: (*Any, **str) -> None
string_id = options['realm']
encoding = sys.getfilesystemencoding()
realm = get_realm(force_text(string_id, encoding))
stream_to_keep = get_stream(options["stream_to_keep"], realm)
stream_to_destroy = get_stream(options["stream_to_destroy"], realm)
recipient_to_destroy = get_recipient(Recipient.STREAM, stream_to_destroy.id)
recipient_to_keep = get_recipient(Recipient.STREAM, stream_to_keep.id)
# The high-level approach here is to move all the messages to
# the surviving stream, deactivate all the subscriptions on
# the stream to be removed and deactivate the stream, and add
# new subscriptions to the stream to keep for any users who
# were only on the now-deactivated stream.
# Move the messages, and delete the old copies from caches.
message_ids_to_clear = list(Message.objects.filter(
recipient=recipient_to_destroy).values_list("id", flat=True))
count = Message.objects.filter(recipient=recipient_to_destroy).update(recipient=recipient_to_keep)
print("Moved %s messages" % (count,))
bulk_delete_cache_keys(message_ids_to_clear)
# Move the Subscription objects. This algorithm doesn't
# preserve any stream settings/colors/etc. from the stream
# being destroyed, but it's convenient.
existing_subs = Subscription.objects.filter(recipient=recipient_to_keep)
users_already_subscribed = dict((sub.user_profile_id, sub.active) for sub in existing_subs)
subs_to_deactivate = Subscription.objects.filter(recipient=recipient_to_destroy, active=True)
users_to_activate = [
sub.user_profile for sub in subs_to_deactivate
if not users_already_subscribed.get(sub.user_profile_id, False)
]
if len(subs_to_deactivate) > 0:
print("Deactivating %s subscriptions" % (len(subs_to_deactivate),))
bulk_remove_subscriptions([sub.user_profile for sub in subs_to_deactivate],
[stream_to_destroy])
do_deactivate_stream(stream_to_destroy)
if len(users_to_activate) > 0:
print("Adding %s subscriptions" % (len(users_to_activate),))
bulk_add_subscriptions([stream_to_keep], users_to_activate)