from __future__ import absolute_import from __future__ import print_function from typing import Any, List from django.core.management.base import BaseCommand from zerver.lib.actions import bulk_remove_subscriptions, bulk_add_subscriptions, \ do_deactivate_stream from zerver.lib.cache import cache_delete_many, to_dict_cache_key_id from zerver.lib.str_utils import force_text from zerver.models import Realm, get_realm, get_stream, Subscription, \ Recipient, get_recipient, Message from argparse import ArgumentParser import sys def bulk_delete_cache_keys(message_ids_to_clear): # type: (List[int]) -> None while len(message_ids_to_clear) > 0: batch = message_ids_to_clear[0:5000] keys_to_delete = [to_dict_cache_key_id(message_id, True) for message_id in batch] cache_delete_many(keys_to_delete) keys_to_delete = [to_dict_cache_key_id(message_id, False) for message_id in batch] cache_delete_many(keys_to_delete) message_ids_to_clear = message_ids_to_clear[5000:] class Command(BaseCommand): help = """Merge two streams.""" def add_arguments(self, parser): # type: (ArgumentParser) -> None parser.add_argument('realm', metavar='', type=str, help='realm in which to merge the streams') parser.add_argument('stream_to_keep', type=str, help='name of stream to keep') parser.add_argument('stream_to_destroy', type=str, help='name of stream to merge into the stream being kept') def handle(self, *args, **options): # type: (*Any, **str) -> None string_id = options['realm'] encoding = sys.getfilesystemencoding() realm = get_realm(force_text(string_id, encoding)) stream_to_keep = get_stream(options["stream_to_keep"], realm) stream_to_destroy = get_stream(options["stream_to_destroy"], realm) recipient_to_destroy = get_recipient(Recipient.STREAM, stream_to_destroy.id) recipient_to_keep = get_recipient(Recipient.STREAM, stream_to_keep.id) # The high-level approach here is to move all the messages to # the surviving stream, deactivate all the subscriptions on # the stream to be removed and deactivate the stream, and add # new subscriptions to the stream to keep for any users who # were only on the now-deactivated stream. # Move the messages, and delete the old copies from caches. message_ids_to_clear = list(Message.objects.filter( recipient=recipient_to_destroy).values_list("id", flat=True)) count = Message.objects.filter(recipient=recipient_to_destroy).update(recipient=recipient_to_keep) print("Moved %s messages" % (count,)) bulk_delete_cache_keys(message_ids_to_clear) # Move the Subscription objects. This algorithm doesn't # preserve any stream settings/colors/etc. from the stream # being destroyed, but it's convenient. existing_subs = Subscription.objects.filter(recipient=recipient_to_keep) users_already_subscribed = dict((sub.user_profile_id, sub.active) for sub in existing_subs) subs_to_deactivate = Subscription.objects.filter(recipient=recipient_to_destroy, active=True) users_to_activate = [ sub.user_profile for sub in subs_to_deactivate if not users_already_subscribed.get(sub.user_profile_id, False) ] if len(subs_to_deactivate) > 0: print("Deactivating %s subscriptions" % (len(subs_to_deactivate),)) bulk_remove_subscriptions([sub.user_profile for sub in subs_to_deactivate], [stream_to_destroy]) do_deactivate_stream(stream_to_destroy) if len(users_to_activate) > 0: print("Adding %s subscriptions" % (len(users_to_activate),)) bulk_add_subscriptions([stream_to_keep], users_to_activate)