Run queue processers multithreaded in development.

This change drops the memory used for Python processes run by Zulip in
development from about 1GB to 300MB on my laptop.

On the front of safety, http://pika.readthedocs.org/en/latest/faq.html
explains "Pika does not have any notion of threading in the code. If
you want to use Pika with threading, make sure you have a Pika
connection per thread, created in that thread. It is not safe to share
one Pika connection across threads.".  Since this code only connects
to rabbitmq inside the individual threads, I believe this should be
safe.

Progress towards #32.
This commit is contained in:
Tim Abbott 2015-11-23 22:01:35 -08:00
parent 49b55af9cd
commit cd2348e9ae
5 changed files with 43 additions and 20 deletions

View File

@ -16,6 +16,5 @@ from zerver.worker.queue_processors import get_active_worker_queues
queues = get_active_worker_queues()
args = sys.argv[1:]
for queue in queues:
subprocess.Popen(['python', 'manage.py', 'process_queue'] + args + [queue],
subprocess.Popen(['python', 'manage.py', 'process_queue', '--all'] + args,
stderr=subprocess.STDOUT)

View File

@ -41,4 +41,5 @@ class TimeTrackingConnection(connection):
def reset_queries():
from django.db import connections
for conn in connections.all():
if conn.connection is not None:
conn.connection.queries = []

View File

@ -3,30 +3,39 @@ from __future__ import absolute_import
from django.core.management.base import BaseCommand
from django.core.management import CommandError
from django.conf import settings
from zerver.worker.queue_processors import get_worker
from zerver.worker.queue_processors import get_worker, get_active_worker_queues
import sys
import signal
import logging
import threading
class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument('queue_name', metavar='<queue name>', type=str,
parser.add_argument('--queue_name', metavar='<queue name>', type=str,
help="queue to process")
parser.add_argument('worker_num', metavar='<worker number>', type=int, nargs='?', default=0,
parser.add_argument('--worker_num', metavar='<worker number>', type=int, nargs='?', default=0,
help="worker label")
parser.add_argument('--all', dest="all", action="store_true", default=False,
help="run all queues")
help = "Runs a queue processing worker"
def handle(self, *args, **options):
logging.basicConfig()
logger = logging.getLogger('process_queue')
queue_name = options['queue_name']
worker_num = options['worker_num']
if not settings.USING_RABBITMQ:
logger.error("Cannot run a queue processor when USING_RABBITMQ is False!")
sys.exit(1)
if options['all']:
for queue_name in get_active_worker_queues():
logger.info('launching queue worker thread ' + queue_name)
td = Threaded_worker(queue_name)
td.start()
else:
queue_name = options['queue_name']
worker_num = options['worker_num']
logger.info("Worker %d connecting to queue %s" % (worker_num, queue_name))
worker = get_worker(queue_name)
@ -39,3 +48,12 @@ class Command(BaseCommand):
worker.start()
class Threaded_worker(threading.Thread):
def __init__(self, queue_name):
threading.Thread.__init__(self)
self.worker = get_worker(queue_name)
def run(self):
self.worker.setup()
logging.debug('starting consuming ' + self.worker.queue_name)
self.worker.start()

View File

@ -232,6 +232,7 @@ class WorkerTest(TestCase):
with simulated_queue_client(lambda: fake_client):
worker = queue_processors.UserActivityWorker()
worker.setup()
worker.start()
activity_records = UserActivity.objects.filter(
user_profile = user.id,
@ -266,6 +267,7 @@ class WorkerTest(TestCase):
with simulated_queue_client(lambda: fake_client):
worker = UnreliableWorker()
worker.setup()
worker.start()
self.assertEqual(processed, ['good', 'fine', 'back to normal'])

View File

@ -59,7 +59,7 @@ class QueueProcessingWorker(object):
queue_name = None
def __init__(self):
self.q = SimpleQueueClient()
self.q = None
if self.queue_name is None:
raise WorkerDeclarationException("Queue worker declared without queue_name")
@ -85,6 +85,9 @@ class QueueProcessingWorker(object):
def _log_problem(self):
logging.exception("Problem handling data on queue %s" % (self.queue_name,))
def setup(self):
self.q = SimpleQueueClient()
def start(self):
self.q.register_json_consumer(self.queue_name, self.consume_wrapper)
self.q.start_consuming()