diff --git a/zerver/lib/queue.py b/zerver/lib/queue.py index ebf2123acd..51bff26170 100644 --- a/zerver/lib/queue.py +++ b/zerver/lib/queue.py @@ -110,7 +110,7 @@ class SimpleQueueClient(object): def register_json_consumer(self, queue_name, callback): def wrapped_callback(ch, method, properties, body): - return callback(ch, method, properties, ujson.loads(body)) + return callback(ujson.loads(body)) return self.register_consumer(queue_name, wrapped_callback) def drain_queue(self, queue_name, json=False): diff --git a/zerver/lib/socket.py b/zerver/lib/socket.py index 1389286e52..1165500a4c 100644 --- a/zerver/lib/socket.py +++ b/zerver/lib/socket.py @@ -156,9 +156,9 @@ def fake_message_sender(event): result = {'response': resp, 'client_meta': event['client_meta'], 'server_meta': event['server_meta']} - respond_send_message(None, None, None, result) + respond_send_message(result) -def respond_send_message(chan, method, props, data): +def respond_send_message(data): connection = get_connection(data['server_meta']['connection_id']) if connection is not None: connection.session.send_message({'client_meta': data['client_meta'], 'response': data['response']}) diff --git a/zerver/management/commands/runtornado.py b/zerver/management/commands/runtornado.py index 64f5692d45..f1adcca717 100644 --- a/zerver/management/commands/runtornado.py +++ b/zerver/management/commands/runtornado.py @@ -82,9 +82,7 @@ class Command(BaseCommand): if settings.USING_RABBITMQ: queue_client = get_queue_client() # Process notifications received via RabbitMQ - def process_notification(chan, method, props, data): - tornado_callbacks.process_notification(data) - queue_client.register_json_consumer('notify_tornado', process_notification) + queue_client.register_json_consumer('notify_tornado', tornado_callbacks.process_notification) queue_client.register_json_consumer('tornado_return', respond_send_message) try: diff --git a/zerver/tests.py b/zerver/tests.py index 9881aeddab..13cc35c282 100644 --- a/zerver/tests.py +++ b/zerver/tests.py @@ -356,7 +356,7 @@ class WorkerTest(TestCase): def start_consuming(self): for queue_name, data in self.queue: callback = self.consumers[queue_name] - callback(None, None, None, data) + callback(data) def test_UserActivityWorker(self): @@ -391,7 +391,7 @@ class WorkerTest(TestCase): @queue_processors.assign_queue('flake') class FlakyWorker(queue_processors.QueueProcessingWorker): - def consume(self, ch, method, properties, data): + def consume(self, data): if data == 'freak out': raise Exception('Freaking out!') processed.append(data) diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 944b41cf2a..09079400b8 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -48,10 +48,10 @@ class QueueProcessingWorker(object): def __init__(self): self.q = SimpleQueueClient() - def consume_wrapper(self, ch, method, properties, data): + def consume_wrapper(self, data): try: with commit_on_success(): - self.consume(ch, method, properties, data) + self.consume(data) except Exception: self._log_problem() if not os.path.exists(settings.QUEUE_ERROR_DIR): @@ -83,7 +83,7 @@ class SignupWorker(QueueProcessingWorker): # Changes to this should also be reflected in # zerver/management/commands/queue_followup_emails.py:queue() - def consume(self, ch, method, properties, data): + def consume(self, data): if settings.MAILCHIMP_API_KEY == '': return @@ -128,7 +128,7 @@ class SignupWorker(QueueProcessingWorker): @assign_queue('invites', enabled=settings.DEPLOYED) class ConfirmationEmailWorker(QueueProcessingWorker): - def consume(self, ch, method, properties, data): + def consume(self, data): invitee = get_prereg_user_by_email(data["email"]) referrer = get_user_profile_by_email(data["referrer_email"]) do_send_confirmation_email(invitee, referrer) @@ -144,7 +144,7 @@ class ConfirmationEmailWorker(QueueProcessingWorker): @assign_queue('user_activity') class UserActivityWorker(QueueProcessingWorker): - def consume(self, ch, method, properties, event): + def consume(self, event): user_profile = get_user_profile_by_id(event["user_profile_id"]) client = get_client(event["client"]) log_time = timestamp_to_datetime(event["time"]) @@ -153,14 +153,14 @@ class UserActivityWorker(QueueProcessingWorker): @assign_queue('user_activity_interval') class UserActivityIntervalWorker(QueueProcessingWorker): - def consume(self, ch, method, properties, event): + def consume(self, event): user_profile = get_user_profile_by_id(event["user_profile_id"]) log_time = timestamp_to_datetime(event["time"]) do_update_user_activity_interval(user_profile, log_time) @assign_queue('user_presence') class UserPresenceWorker(QueueProcessingWorker): - def consume(self, ch, method, properties, event): + def consume(self, event): logging.info("Received event: %s" % (event),) user_profile = get_user_profile_by_id(event["user_profile_id"]) client = get_client(event["client"]) @@ -205,7 +205,7 @@ class FeedbackBot(QueueProcessingWorker): ) QueueProcessingWorker.start(self) - def consume(self, ch, method, properties, event): + def consume(self, event): self.staging_client.forward_feedback(event) @assign_queue('slow_queries') @@ -229,7 +229,7 @@ class SlowQueryWorker(QueueProcessingWorker): @assign_queue("message_sender") class MessageSenderWorker(QueueProcessingWorker): - def consume(self, ch, method, properties, event): + def consume(self, event): req = event['request'] try: sender = get_user_profile_by_id(req['sender_id']) @@ -250,7 +250,7 @@ class MessageSenderWorker(QueueProcessingWorker): class DigestWorker(QueueProcessingWorker): # Who gets a digest is entirely determined by the queue_digest_emails # management command, not here. - def consume(self, ch, method, properties, event): + def consume(self, event): logging.info("Received digest event: %s" % (event,)) handle_digest_email(event["user_profile_id"], event["cutoff"]) @@ -260,7 +260,7 @@ class TestWorker(QueueProcessingWorker): # creating significant side effects. It can be useful in development or # for troubleshooting prod/staging. It pulls a message off the test queue # and appends it to a file in /tmp. - def consume(self, ch, method, properties, event): + def consume(self, event): fn = settings.ZULIP_WORKER_TEST_FILE message = ujson.dumps(event) logging.info("TestWorker should append this message to %s: %s" % (fn, message))