From f22daeba1aaf2c4dfacece5f5e4456500847e7c6 Mon Sep 17 00:00:00 2001 From: rht Date: Mon, 5 Jun 2017 07:47:03 +0200 Subject: [PATCH] Cleanup rabbitmq queues when doing dev db rebuild. Modified by tabbott to cleanup the argument handling in the modified purge_queue management command. Fixes #1335. --- tools/do-destroy-rebuild-database | 1 + zerver/management/commands/purge_queue.py | 31 ++++++++++++++++++----- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/tools/do-destroy-rebuild-database b/tools/do-destroy-rebuild-database index 44c745914f..562f6bd05d 100755 --- a/tools/do-destroy-rebuild-database +++ b/tools/do-destroy-rebuild-database @@ -15,6 +15,7 @@ EOF sh "$(dirname "$0")/../scripts/setup/flush-memcached" +./manage.py purge_queue --all ./manage.py migrate --noinput migration_status "var/migration_status_dev" ./manage.py createcachetable third_party_api_results diff --git a/zerver/management/commands/purge_queue.py b/zerver/management/commands/purge_queue.py index 2446454a62..e7ccf3ffd4 100644 --- a/zerver/management/commands/purge_queue.py +++ b/zerver/management/commands/purge_queue.py @@ -5,19 +5,38 @@ from argparse import ArgumentParser from django.core.management.base import BaseCommand from django.core.management import CommandError from zerver.lib.queue import SimpleQueueClient +from zerver.worker.queue_processors import get_active_worker_queues class Command(BaseCommand): def add_arguments(self, parser): # type: (ArgumentParser) -> None - parser.add_argument('queue_name', metavar='', type=str, - help="queue to purge") + parser.add_argument(dest="queue_name", type=str, nargs='?', + help="queue to purge", default=None) + parser.add_argument('--all', dest="all", action="store_true", + default=False, help="purge all queues") help = "Discards all messages from the given queue" def handle(self, *args, **options): # type: (*Any, **str) -> None - queue_name = options['queue_name'] - queue = SimpleQueueClient() - queue.ensure_queue(queue_name, lambda: None) - queue.channel.queue_purge(queue_name) + def purge_queue(queue_name): + # type: (str) -> None + queue = SimpleQueueClient() + queue.ensure_queue(queue_name, lambda: None) + queue.channel.queue_purge(queue_name) + + if options['all']: + for queue_name in get_active_worker_queues(): + purge_queue(queue_name) + print("All queues purged") + elif not options['queue_name']: + raise CommandError("Missing queue_name argument!") + else: + queue_name = options['queue_name'] + if queue_name not in get_active_worker_queues(): + raise CommandError("Unknown queue %s" % (queue_name,)) + + print("Purging queue %s" % (queue_name,)) + purge_queue(queue_name) + print("Done")