Annotate queue_processors.py.

This commit is contained in:
Tim Abbott 2016-07-29 20:40:10 -07:00
parent 4e8054b84a
commit 88550edd9e
1 changed files with 34 additions and 2 deletions

View File

@ -1,5 +1,5 @@
from __future__ import absolute_import from __future__ import absolute_import
from typing import Any from typing import Any, Callable, Mapping
from django.conf import settings from django.conf import settings
from django.core.handlers.wsgi import WSGIRequest from django.core.handlers.wsgi import WSGIRequest
@ -39,35 +39,43 @@ class WorkerDeclarationException(Exception):
pass pass
def assign_queue(queue_name, enabled=True): def assign_queue(queue_name, enabled=True):
# type: (str, bool) -> Callable[[QueueProcessingWorker], QueueProcessingWorker]
def decorate(clazz): def decorate(clazz):
# type: (QueueProcessingWorker) -> QueueProcessingWorker
clazz.queue_name = queue_name clazz.queue_name = queue_name
if enabled: if enabled:
register_worker(queue_name, clazz) register_worker(queue_name, clazz)
return clazz return clazz
return decorate return decorate
worker_classes = {} worker_classes = {} # type: Dict[str, Any] # Any here should be QueueProcessingWorker type
def register_worker(queue_name, clazz): def register_worker(queue_name, clazz):
# type: (str, QueueProcessingWorker) -> None
worker_classes[queue_name] = clazz worker_classes[queue_name] = clazz
def get_worker(queue_name): def get_worker(queue_name):
# type: (str) -> QueueProcessingWorker
return worker_classes[queue_name]() return worker_classes[queue_name]()
def get_active_worker_queues(): def get_active_worker_queues():
# type: () -> List[str]
return list(worker_classes.keys()) return list(worker_classes.keys())
class QueueProcessingWorker(object): class QueueProcessingWorker(object):
queue_name = None # type: str queue_name = None # type: str
def __init__(self): def __init__(self):
# type: () -> None
self.q = None # type: SimpleQueueClient self.q = None # type: SimpleQueueClient
if self.queue_name is None: if self.queue_name is None:
raise WorkerDeclarationException("Queue worker declared without queue_name") raise WorkerDeclarationException("Queue worker declared without queue_name")
def consume(self, data): def consume(self, data):
# type: (Mapping[str, Any]) -> None
raise WorkerDeclarationException("No consumer defined!") raise WorkerDeclarationException("No consumer defined!")
def consume_wrapper(self, data): def consume_wrapper(self, data):
# type: (Mapping[str, Any]) -> None
try: try:
self.consume(data) self.consume(data)
except Exception: except Exception:
@ -84,16 +92,20 @@ class QueueProcessingWorker(object):
reset_queries() reset_queries()
def _log_problem(self): def _log_problem(self):
# type: () -> None
logging.exception("Problem handling data on queue %s" % (self.queue_name,)) logging.exception("Problem handling data on queue %s" % (self.queue_name,))
def setup(self): def setup(self):
# type: () -> None
self.q = SimpleQueueClient() self.q = SimpleQueueClient()
def start(self): def start(self):
# type: () -> None
self.q.register_json_consumer(self.queue_name, self.consume_wrapper) self.q.register_json_consumer(self.queue_name, self.consume_wrapper)
self.q.start_consuming() self.q.start_consuming()
def stop(self): def stop(self):
# type: () -> None
self.q.stop_consuming() self.q.stop_consuming()
if settings.MAILCHIMP_API_KEY: if settings.MAILCHIMP_API_KEY:
@ -102,11 +114,13 @@ if settings.MAILCHIMP_API_KEY:
@assign_queue('signups') @assign_queue('signups')
class SignupWorker(QueueProcessingWorker): class SignupWorker(QueueProcessingWorker):
def __init__(self): def __init__(self):
# type: () -> None
super(SignupWorker, self).__init__() super(SignupWorker, self).__init__()
if settings.MAILCHIMP_API_KEY: if settings.MAILCHIMP_API_KEY:
self.pm = PostMonkey(settings.MAILCHIMP_API_KEY, timeout=10) self.pm = PostMonkey(settings.MAILCHIMP_API_KEY, timeout=10)
def consume(self, data): def consume(self, data):
# type: (Mapping[str, Any]) -> None
merge_vars=data['merge_vars'] merge_vars=data['merge_vars']
# This should clear out any invitation reminder emails # This should clear out any invitation reminder emails
clear_followup_emails_queue(data["EMAIL"]) clear_followup_emails_queue(data["EMAIL"])
@ -131,6 +145,7 @@ class SignupWorker(QueueProcessingWorker):
@assign_queue('invites') @assign_queue('invites')
class ConfirmationEmailWorker(QueueProcessingWorker): class ConfirmationEmailWorker(QueueProcessingWorker):
def consume(self, data): def consume(self, data):
# type: (Mapping[str, Any]) -> None
invitee = get_prereg_user_by_email(data["email"]) invitee = get_prereg_user_by_email(data["email"])
referrer = get_user_profile_by_email(data["referrer_email"]) referrer = get_user_profile_by_email(data["referrer_email"])
do_send_confirmation_email(invitee, referrer) do_send_confirmation_email(invitee, referrer)
@ -151,6 +166,7 @@ class ConfirmationEmailWorker(QueueProcessingWorker):
@assign_queue('user_activity') @assign_queue('user_activity')
class UserActivityWorker(QueueProcessingWorker): class UserActivityWorker(QueueProcessingWorker):
def consume(self, event): def consume(self, event):
# type: (Mapping[str, Any]) -> None
user_profile = get_user_profile_by_id(event["user_profile_id"]) user_profile = get_user_profile_by_id(event["user_profile_id"])
client = get_client(event["client"]) client = get_client(event["client"])
log_time = timestamp_to_datetime(event["time"]) log_time = timestamp_to_datetime(event["time"])
@ -160,6 +176,7 @@ class UserActivityWorker(QueueProcessingWorker):
@assign_queue('user_activity_interval') @assign_queue('user_activity_interval')
class UserActivityIntervalWorker(QueueProcessingWorker): class UserActivityIntervalWorker(QueueProcessingWorker):
def consume(self, event): def consume(self, event):
# type: (Mapping[str, Any]) -> None
user_profile = get_user_profile_by_id(event["user_profile_id"]) user_profile = get_user_profile_by_id(event["user_profile_id"])
log_time = timestamp_to_datetime(event["time"]) log_time = timestamp_to_datetime(event["time"])
do_update_user_activity_interval(user_profile, log_time) do_update_user_activity_interval(user_profile, log_time)
@ -167,6 +184,7 @@ class UserActivityIntervalWorker(QueueProcessingWorker):
@assign_queue('user_presence') @assign_queue('user_presence')
class UserPresenceWorker(QueueProcessingWorker): class UserPresenceWorker(QueueProcessingWorker):
def consume(self, event): def consume(self, event):
# type: (Mapping[str, Any]) -> None
logging.info("Received event: %s" % (event),) logging.info("Received event: %s" % (event),)
user_profile = get_user_profile_by_id(event["user_profile_id"]) user_profile = get_user_profile_by_id(event["user_profile_id"])
client = get_client(event["client"]) client = get_client(event["client"])
@ -177,6 +195,7 @@ class UserPresenceWorker(QueueProcessingWorker):
@assign_queue('missedmessage_emails') @assign_queue('missedmessage_emails')
class MissedMessageWorker(QueueProcessingWorker): class MissedMessageWorker(QueueProcessingWorker):
def start(self): def start(self):
# type: () -> None
while True: while True:
missed_events = self.q.drain_queue("missedmessage_emails", json=True) missed_events = self.q.drain_queue("missedmessage_emails", json=True)
by_recipient = defaultdict(list) # type: Dict[int, List[Dict[str, Any]]] by_recipient = defaultdict(list) # type: Dict[int, List[Dict[str, Any]]]
@ -196,9 +215,11 @@ class MissedMessageWorker(QueueProcessingWorker):
@assign_queue('missedmessage_mobile_notifications') @assign_queue('missedmessage_mobile_notifications')
class PushNotificationsWorker(QueueProcessingWorker): class PushNotificationsWorker(QueueProcessingWorker):
def consume(self, data): def consume(self, data):
# type: (Mapping[str, Any]) -> None
handle_push_notification(data['user_profile_id'], data) handle_push_notification(data['user_profile_id'], data)
def make_feedback_client(): def make_feedback_client():
# type: () -> Any # Should be zulip.Client, but not necessarily importable
sys.path.append(os.path.join(os.path.dirname(__file__), '../../api')) sys.path.append(os.path.join(os.path.dirname(__file__), '../../api'))
import zulip import zulip
return zulip.Client( return zulip.Client(
@ -212,6 +233,7 @@ def make_feedback_client():
@assign_queue('feedback_messages') @assign_queue('feedback_messages')
class FeedbackBot(QueueProcessingWorker): class FeedbackBot(QueueProcessingWorker):
def start(self): def start(self):
# type: () -> None
if settings.ENABLE_FEEDBACK and settings.FEEDBACK_EMAIL is None: if settings.ENABLE_FEEDBACK and settings.FEEDBACK_EMAIL is None:
self.staging_client = make_feedback_client() self.staging_client = make_feedback_client()
self.staging_client._register( self.staging_client._register(
@ -223,6 +245,7 @@ class FeedbackBot(QueueProcessingWorker):
QueueProcessingWorker.start(self) QueueProcessingWorker.start(self)
def consume(self, event): def consume(self, event):
# type: (Mapping[str, Any]) -> None
if not settings.ENABLE_FEEDBACK: if not settings.ENABLE_FEEDBACK:
return return
if settings.FEEDBACK_EMAIL is not None: if settings.FEEDBACK_EMAIL is not None:
@ -239,6 +262,7 @@ class FeedbackBot(QueueProcessingWorker):
@assign_queue('error_reports') @assign_queue('error_reports')
class ErrorReporter(QueueProcessingWorker): class ErrorReporter(QueueProcessingWorker):
def start(self): def start(self):
# type: () -> None
if settings.DEPLOYMENT_ROLE_KEY: if settings.DEPLOYMENT_ROLE_KEY:
self.staging_client = make_feedback_client() self.staging_client = make_feedback_client()
self.staging_client._register( self.staging_client._register(
@ -250,6 +274,7 @@ class ErrorReporter(QueueProcessingWorker):
QueueProcessingWorker.start(self) QueueProcessingWorker.start(self)
def consume(self, event): def consume(self, event):
# type: (Mapping[str, Any]) -> None
if not settings.DEPLOYMENT_ROLE_KEY: if not settings.DEPLOYMENT_ROLE_KEY:
return return
self.staging_client.forward_error(event['type'], event['report']) self.staging_client.forward_error(event['type'], event['report'])
@ -257,12 +282,14 @@ class ErrorReporter(QueueProcessingWorker):
@assign_queue('slow_queries') @assign_queue('slow_queries')
class SlowQueryWorker(QueueProcessingWorker): class SlowQueryWorker(QueueProcessingWorker):
def start(self): def start(self):
# type: () -> None
while True: while True:
self.process_one_batch() self.process_one_batch()
# Aggregate all slow query messages in 1-minute chunks to avoid message spam # Aggregate all slow query messages in 1-minute chunks to avoid message spam
time.sleep(1 * 60) time.sleep(1 * 60)
def process_one_batch(self): def process_one_batch(self):
# type: () -> None
slow_queries = self.q.drain_queue("slow_queries", json=True) slow_queries = self.q.drain_queue("slow_queries", json=True)
if settings.ERROR_BOT is None: if settings.ERROR_BOT is None:
@ -282,12 +309,14 @@ class SlowQueryWorker(QueueProcessingWorker):
@assign_queue("message_sender") @assign_queue("message_sender")
class MessageSenderWorker(QueueProcessingWorker): class MessageSenderWorker(QueueProcessingWorker):
def __init__(self): def __init__(self):
# type: () -> None
super(MessageSenderWorker, self).__init__() super(MessageSenderWorker, self).__init__()
self.redis_client = get_redis_client() self.redis_client = get_redis_client()
self.handler = BaseHandler() self.handler = BaseHandler()
self.handler.load_middleware() self.handler.load_middleware()
def consume(self, event): def consume(self, event):
# type: (Mapping[str, Any]) -> None
server_meta = event['server_meta'] server_meta = event['server_meta']
environ = {'REQUEST_METHOD': 'SOCKET', environ = {'REQUEST_METHOD': 'SOCKET',
@ -331,6 +360,7 @@ class DigestWorker(QueueProcessingWorker):
# Who gets a digest is entirely determined by the enqueue_digest_emails # Who gets a digest is entirely determined by the enqueue_digest_emails
# management command, not here. # management command, not here.
def consume(self, event): def consume(self, event):
# type: (Mapping[str, Any]) -> None
logging.info("Received digest event: %s" % (event,)) logging.info("Received digest event: %s" % (event,))
handle_digest_email(event["user_profile_id"], event["cutoff"]) handle_digest_email(event["user_profile_id"], event["cutoff"])
@ -339,6 +369,7 @@ class MirrorWorker(QueueProcessingWorker):
# who gets a digest is entirely determined by the enqueue_digest_emails # who gets a digest is entirely determined by the enqueue_digest_emails
# management command, not here. # management command, not here.
def consume(self, event): def consume(self, event):
# type: (Mapping[str, Any]) -> None
mirror_email(email.message_from_string(event["message"]), mirror_email(email.message_from_string(event["message"]),
rcpt_to=event["rcpt_to"], pre_checked=True) rcpt_to=event["rcpt_to"], pre_checked=True)
@ -349,6 +380,7 @@ class TestWorker(QueueProcessingWorker):
# for troubleshooting prod/staging. It pulls a message off the test queue # for troubleshooting prod/staging. It pulls a message off the test queue
# and appends it to a file in /tmp. # and appends it to a file in /tmp.
def consume(self, event): def consume(self, event):
# type: (Mapping[str, Any]) -> None
fn = settings.ZULIP_WORKER_TEST_FILE fn = settings.ZULIP_WORKER_TEST_FILE
message = ujson.dumps(event) message = ujson.dumps(event)
logging.info("TestWorker should append this message to %s: %s" % (fn, message)) logging.info("TestWorker should append this message to %s: %s" % (fn, message))