From 88550edd9e7930393fb24c280141af4e009b7c8b Mon Sep 17 00:00:00 2001 From: Tim Abbott Date: Fri, 29 Jul 2016 20:40:10 -0700 Subject: [PATCH] Annotate queue_processors.py. --- zerver/worker/queue_processors.py | 36 +++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 7d445c8b9b..ff4036420f 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -1,5 +1,5 @@ from __future__ import absolute_import -from typing import Any +from typing import Any, Callable, Mapping from django.conf import settings from django.core.handlers.wsgi import WSGIRequest @@ -39,35 +39,43 @@ class WorkerDeclarationException(Exception): pass def assign_queue(queue_name, enabled=True): + # type: (str, bool) -> Callable[[QueueProcessingWorker], QueueProcessingWorker] def decorate(clazz): + # type: (QueueProcessingWorker) -> QueueProcessingWorker clazz.queue_name = queue_name if enabled: register_worker(queue_name, clazz) return clazz return decorate -worker_classes = {} +worker_classes = {} # type: Dict[str, Any] # Any here should be QueueProcessingWorker type def register_worker(queue_name, clazz): + # type: (str, QueueProcessingWorker) -> None worker_classes[queue_name] = clazz def get_worker(queue_name): + # type: (str) -> QueueProcessingWorker return worker_classes[queue_name]() def get_active_worker_queues(): + # type: () -> List[str] return list(worker_classes.keys()) class QueueProcessingWorker(object): queue_name = None # type: str def __init__(self): + # type: () -> None self.q = None # type: SimpleQueueClient if self.queue_name is None: raise WorkerDeclarationException("Queue worker declared without queue_name") def consume(self, data): + # type: (Mapping[str, Any]) -> None raise WorkerDeclarationException("No consumer defined!") def consume_wrapper(self, data): + # type: (Mapping[str, Any]) -> None try: self.consume(data) except Exception: @@ -84,16 +92,20 @@ class QueueProcessingWorker(object): reset_queries() def _log_problem(self): + # type: () -> None logging.exception("Problem handling data on queue %s" % (self.queue_name,)) def setup(self): + # type: () -> None self.q = SimpleQueueClient() def start(self): + # type: () -> None self.q.register_json_consumer(self.queue_name, self.consume_wrapper) self.q.start_consuming() def stop(self): + # type: () -> None self.q.stop_consuming() if settings.MAILCHIMP_API_KEY: @@ -102,11 +114,13 @@ if settings.MAILCHIMP_API_KEY: @assign_queue('signups') class SignupWorker(QueueProcessingWorker): def __init__(self): + # type: () -> None super(SignupWorker, self).__init__() if settings.MAILCHIMP_API_KEY: self.pm = PostMonkey(settings.MAILCHIMP_API_KEY, timeout=10) def consume(self, data): + # type: (Mapping[str, Any]) -> None merge_vars=data['merge_vars'] # This should clear out any invitation reminder emails clear_followup_emails_queue(data["EMAIL"]) @@ -131,6 +145,7 @@ class SignupWorker(QueueProcessingWorker): @assign_queue('invites') class ConfirmationEmailWorker(QueueProcessingWorker): def consume(self, data): + # type: (Mapping[str, Any]) -> None invitee = get_prereg_user_by_email(data["email"]) referrer = get_user_profile_by_email(data["referrer_email"]) do_send_confirmation_email(invitee, referrer) @@ -151,6 +166,7 @@ class ConfirmationEmailWorker(QueueProcessingWorker): @assign_queue('user_activity') class UserActivityWorker(QueueProcessingWorker): def consume(self, event): + # type: (Mapping[str, Any]) -> None user_profile = get_user_profile_by_id(event["user_profile_id"]) client = get_client(event["client"]) log_time = timestamp_to_datetime(event["time"]) @@ -160,6 +176,7 @@ class UserActivityWorker(QueueProcessingWorker): @assign_queue('user_activity_interval') class UserActivityIntervalWorker(QueueProcessingWorker): def consume(self, event): + # type: (Mapping[str, Any]) -> None user_profile = get_user_profile_by_id(event["user_profile_id"]) log_time = timestamp_to_datetime(event["time"]) do_update_user_activity_interval(user_profile, log_time) @@ -167,6 +184,7 @@ class UserActivityIntervalWorker(QueueProcessingWorker): @assign_queue('user_presence') class UserPresenceWorker(QueueProcessingWorker): def consume(self, event): + # type: (Mapping[str, Any]) -> None logging.info("Received event: %s" % (event),) user_profile = get_user_profile_by_id(event["user_profile_id"]) client = get_client(event["client"]) @@ -177,6 +195,7 @@ class UserPresenceWorker(QueueProcessingWorker): @assign_queue('missedmessage_emails') class MissedMessageWorker(QueueProcessingWorker): def start(self): + # type: () -> None while True: missed_events = self.q.drain_queue("missedmessage_emails", json=True) by_recipient = defaultdict(list) # type: Dict[int, List[Dict[str, Any]]] @@ -196,9 +215,11 @@ class MissedMessageWorker(QueueProcessingWorker): @assign_queue('missedmessage_mobile_notifications') class PushNotificationsWorker(QueueProcessingWorker): def consume(self, data): + # type: (Mapping[str, Any]) -> None handle_push_notification(data['user_profile_id'], data) def make_feedback_client(): + # type: () -> Any # Should be zulip.Client, but not necessarily importable sys.path.append(os.path.join(os.path.dirname(__file__), '../../api')) import zulip return zulip.Client( @@ -212,6 +233,7 @@ def make_feedback_client(): @assign_queue('feedback_messages') class FeedbackBot(QueueProcessingWorker): def start(self): + # type: () -> None if settings.ENABLE_FEEDBACK and settings.FEEDBACK_EMAIL is None: self.staging_client = make_feedback_client() self.staging_client._register( @@ -223,6 +245,7 @@ class FeedbackBot(QueueProcessingWorker): QueueProcessingWorker.start(self) def consume(self, event): + # type: (Mapping[str, Any]) -> None if not settings.ENABLE_FEEDBACK: return if settings.FEEDBACK_EMAIL is not None: @@ -239,6 +262,7 @@ class FeedbackBot(QueueProcessingWorker): @assign_queue('error_reports') class ErrorReporter(QueueProcessingWorker): def start(self): + # type: () -> None if settings.DEPLOYMENT_ROLE_KEY: self.staging_client = make_feedback_client() self.staging_client._register( @@ -250,6 +274,7 @@ class ErrorReporter(QueueProcessingWorker): QueueProcessingWorker.start(self) def consume(self, event): + # type: (Mapping[str, Any]) -> None if not settings.DEPLOYMENT_ROLE_KEY: return self.staging_client.forward_error(event['type'], event['report']) @@ -257,12 +282,14 @@ class ErrorReporter(QueueProcessingWorker): @assign_queue('slow_queries') class SlowQueryWorker(QueueProcessingWorker): def start(self): + # type: () -> None while True: self.process_one_batch() # Aggregate all slow query messages in 1-minute chunks to avoid message spam time.sleep(1 * 60) def process_one_batch(self): + # type: () -> None slow_queries = self.q.drain_queue("slow_queries", json=True) if settings.ERROR_BOT is None: @@ -282,12 +309,14 @@ class SlowQueryWorker(QueueProcessingWorker): @assign_queue("message_sender") class MessageSenderWorker(QueueProcessingWorker): def __init__(self): + # type: () -> None super(MessageSenderWorker, self).__init__() self.redis_client = get_redis_client() self.handler = BaseHandler() self.handler.load_middleware() def consume(self, event): + # type: (Mapping[str, Any]) -> None server_meta = event['server_meta'] environ = {'REQUEST_METHOD': 'SOCKET', @@ -331,6 +360,7 @@ class DigestWorker(QueueProcessingWorker): # Who gets a digest is entirely determined by the enqueue_digest_emails # management command, not here. def consume(self, event): + # type: (Mapping[str, Any]) -> None logging.info("Received digest event: %s" % (event,)) handle_digest_email(event["user_profile_id"], event["cutoff"]) @@ -339,6 +369,7 @@ class MirrorWorker(QueueProcessingWorker): # who gets a digest is entirely determined by the enqueue_digest_emails # management command, not here. def consume(self, event): + # type: (Mapping[str, Any]) -> None mirror_email(email.message_from_string(event["message"]), rcpt_to=event["rcpt_to"], pre_checked=True) @@ -349,6 +380,7 @@ class TestWorker(QueueProcessingWorker): # for troubleshooting prod/staging. It pulls a message off the test queue # and appends it to a file in /tmp. def consume(self, event): + # type: (Mapping[str, Any]) -> None fn = settings.ZULIP_WORKER_TEST_FILE message = ujson.dumps(event) logging.info("TestWorker should append this message to %s: %s" % (fn, message))