Cleanup rabbitmq queues when doing dev db rebuild.

Modified by tabbott to cleanup the argument handling in the modified
purge_queue management command.

Fixes #1335.
This commit is contained in:
rht 2017-06-05 07:47:03 +02:00 committed by Tim Abbott
parent bcb9c76457
commit f22daeba1a
2 changed files with 26 additions and 6 deletions

View File

@ -15,6 +15,7 @@ EOF
sh "$(dirname "$0")/../scripts/setup/flush-memcached" sh "$(dirname "$0")/../scripts/setup/flush-memcached"
./manage.py purge_queue --all
./manage.py migrate --noinput ./manage.py migrate --noinput
migration_status "var/migration_status_dev" migration_status "var/migration_status_dev"
./manage.py createcachetable third_party_api_results ./manage.py createcachetable third_party_api_results

View File

@ -5,19 +5,38 @@ from argparse import ArgumentParser
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
from django.core.management import CommandError from django.core.management import CommandError
from zerver.lib.queue import SimpleQueueClient from zerver.lib.queue import SimpleQueueClient
from zerver.worker.queue_processors import get_active_worker_queues
class Command(BaseCommand): class Command(BaseCommand):
def add_arguments(self, parser): def add_arguments(self, parser):
# type: (ArgumentParser) -> None # type: (ArgumentParser) -> None
parser.add_argument('queue_name', metavar='<queue name>', type=str, parser.add_argument(dest="queue_name", type=str, nargs='?',
help="queue to purge") help="queue to purge", default=None)
parser.add_argument('--all', dest="all", action="store_true",
default=False, help="purge all queues")
help = "Discards all messages from the given queue" help = "Discards all messages from the given queue"
def handle(self, *args, **options): def handle(self, *args, **options):
# type: (*Any, **str) -> None # type: (*Any, **str) -> None
queue_name = options['queue_name'] def purge_queue(queue_name):
# type: (str) -> None
queue = SimpleQueueClient() queue = SimpleQueueClient()
queue.ensure_queue(queue_name, lambda: None) queue.ensure_queue(queue_name, lambda: None)
queue.channel.queue_purge(queue_name) queue.channel.queue_purge(queue_name)
if options['all']:
for queue_name in get_active_worker_queues():
purge_queue(queue_name)
print("All queues purged")
elif not options['queue_name']:
raise CommandError("Missing queue_name argument!")
else:
queue_name = options['queue_name']
if queue_name not in get_active_worker_queues():
raise CommandError("Unknown queue %s" % (queue_name,))
print("Purging queue %s" % (queue_name,))
purge_queue(queue_name)
print("Done") print("Done")