From 70c1c31b3a601893e22c6d5cbc8b07e429899f4c Mon Sep 17 00:00:00 2001 From: Steve Howell Date: Wed, 30 Oct 2013 17:03:50 -0400 Subject: [PATCH] Simplify callback protocol for register_json_consumer(). The register_json_consumer() function now expects its callback function to accept a single argument, which is the payload, as none of the callbacks cared about channel, method, and properties. This change breaks down as follows: * A couple test stubs and subclasses were simplified. * All the consume() and consume_wrapper() functions in queue_processors.py were simplified. * Two callbacks via runtornado.py were simplified. One of the callbacks was socket.respond_send_message, which had an additional caller, i.e. not register_json_consumer() calling back to it, and the caller was simplified not to pass None for the three removed arguments. (imported from commit 792316e20be619458dd5036745233f37e6ffcf43) --- zerver/lib/queue.py | 2 +- zerver/lib/socket.py | 4 ++-- zerver/management/commands/runtornado.py | 4 +--- zerver/tests.py | 4 ++-- zerver/worker/queue_processors.py | 22 +++++++++++----------- 5 files changed, 17 insertions(+), 19 deletions(-) diff --git a/zerver/lib/queue.py b/zerver/lib/queue.py index ebf2123acd..51bff26170 100644 --- a/zerver/lib/queue.py +++ b/zerver/lib/queue.py @@ -110,7 +110,7 @@ class SimpleQueueClient(object): def register_json_consumer(self, queue_name, callback): def wrapped_callback(ch, method, properties, body): - return callback(ch, method, properties, ujson.loads(body)) + return callback(ujson.loads(body)) return self.register_consumer(queue_name, wrapped_callback) def drain_queue(self, queue_name, json=False): diff --git a/zerver/lib/socket.py b/zerver/lib/socket.py index 1389286e52..1165500a4c 100644 --- a/zerver/lib/socket.py +++ b/zerver/lib/socket.py @@ -156,9 +156,9 @@ def fake_message_sender(event): result = {'response': resp, 'client_meta': event['client_meta'], 'server_meta': event['server_meta']} - respond_send_message(None, None, None, result) + respond_send_message(result) -def respond_send_message(chan, method, props, data): +def respond_send_message(data): connection = get_connection(data['server_meta']['connection_id']) if connection is not None: connection.session.send_message({'client_meta': data['client_meta'], 'response': data['response']}) diff --git a/zerver/management/commands/runtornado.py b/zerver/management/commands/runtornado.py index 64f5692d45..f1adcca717 100644 --- a/zerver/management/commands/runtornado.py +++ b/zerver/management/commands/runtornado.py @@ -82,9 +82,7 @@ class Command(BaseCommand): if settings.USING_RABBITMQ: queue_client = get_queue_client() # Process notifications received via RabbitMQ - def process_notification(chan, method, props, data): - tornado_callbacks.process_notification(data) - queue_client.register_json_consumer('notify_tornado', process_notification) + queue_client.register_json_consumer('notify_tornado', tornado_callbacks.process_notification) queue_client.register_json_consumer('tornado_return', respond_send_message) try: diff --git a/zerver/tests.py b/zerver/tests.py index 9881aeddab..13cc35c282 100644 --- a/zerver/tests.py +++ b/zerver/tests.py @@ -356,7 +356,7 @@ class WorkerTest(TestCase): def start_consuming(self): for queue_name, data in self.queue: callback = self.consumers[queue_name] - callback(None, None, None, data) + callback(data) def test_UserActivityWorker(self): @@ -391,7 +391,7 @@ class WorkerTest(TestCase): @queue_processors.assign_queue('flake') class FlakyWorker(queue_processors.QueueProcessingWorker): - def consume(self, ch, method, properties, data): + def consume(self, data): if data == 'freak out': raise Exception('Freaking out!') processed.append(data) diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 944b41cf2a..09079400b8 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -48,10 +48,10 @@ class QueueProcessingWorker(object): def __init__(self): self.q = SimpleQueueClient() - def consume_wrapper(self, ch, method, properties, data): + def consume_wrapper(self, data): try: with commit_on_success(): - self.consume(ch, method, properties, data) + self.consume(data) except Exception: self._log_problem() if not os.path.exists(settings.QUEUE_ERROR_DIR): @@ -83,7 +83,7 @@ class SignupWorker(QueueProcessingWorker): # Changes to this should also be reflected in # zerver/management/commands/queue_followup_emails.py:queue() - def consume(self, ch, method, properties, data): + def consume(self, data): if settings.MAILCHIMP_API_KEY == '': return @@ -128,7 +128,7 @@ class SignupWorker(QueueProcessingWorker): @assign_queue('invites', enabled=settings.DEPLOYED) class ConfirmationEmailWorker(QueueProcessingWorker): - def consume(self, ch, method, properties, data): + def consume(self, data): invitee = get_prereg_user_by_email(data["email"]) referrer = get_user_profile_by_email(data["referrer_email"]) do_send_confirmation_email(invitee, referrer) @@ -144,7 +144,7 @@ class ConfirmationEmailWorker(QueueProcessingWorker): @assign_queue('user_activity') class UserActivityWorker(QueueProcessingWorker): - def consume(self, ch, method, properties, event): + def consume(self, event): user_profile = get_user_profile_by_id(event["user_profile_id"]) client = get_client(event["client"]) log_time = timestamp_to_datetime(event["time"]) @@ -153,14 +153,14 @@ class UserActivityWorker(QueueProcessingWorker): @assign_queue('user_activity_interval') class UserActivityIntervalWorker(QueueProcessingWorker): - def consume(self, ch, method, properties, event): + def consume(self, event): user_profile = get_user_profile_by_id(event["user_profile_id"]) log_time = timestamp_to_datetime(event["time"]) do_update_user_activity_interval(user_profile, log_time) @assign_queue('user_presence') class UserPresenceWorker(QueueProcessingWorker): - def consume(self, ch, method, properties, event): + def consume(self, event): logging.info("Received event: %s" % (event),) user_profile = get_user_profile_by_id(event["user_profile_id"]) client = get_client(event["client"]) @@ -205,7 +205,7 @@ class FeedbackBot(QueueProcessingWorker): ) QueueProcessingWorker.start(self) - def consume(self, ch, method, properties, event): + def consume(self, event): self.staging_client.forward_feedback(event) @assign_queue('slow_queries') @@ -229,7 +229,7 @@ class SlowQueryWorker(QueueProcessingWorker): @assign_queue("message_sender") class MessageSenderWorker(QueueProcessingWorker): - def consume(self, ch, method, properties, event): + def consume(self, event): req = event['request'] try: sender = get_user_profile_by_id(req['sender_id']) @@ -250,7 +250,7 @@ class MessageSenderWorker(QueueProcessingWorker): class DigestWorker(QueueProcessingWorker): # Who gets a digest is entirely determined by the queue_digest_emails # management command, not here. - def consume(self, ch, method, properties, event): + def consume(self, event): logging.info("Received digest event: %s" % (event,)) handle_digest_email(event["user_profile_id"], event["cutoff"]) @@ -260,7 +260,7 @@ class TestWorker(QueueProcessingWorker): # creating significant side effects. It can be useful in development or # for troubleshooting prod/staging. It pulls a message off the test queue # and appends it to a file in /tmp. - def consume(self, ch, method, properties, event): + def consume(self, event): fn = settings.ZULIP_WORKER_TEST_FILE message = ujson.dumps(event) logging.info("TestWorker should append this message to %s: %s" % (fn, message))