zulip/zerver/worker/queue_processors.py

397 lines
15 KiB
Python

from __future__ import absolute_import
from typing import Any, Callable, Mapping
from django.conf import settings
from django.core.handlers.wsgi import WSGIRequest
from django.core.handlers.base import BaseHandler
from zerver.models import get_user_profile_by_email, \
get_user_profile_by_id, get_prereg_user_by_email, get_client
from zerver.lib.context_managers import lockfile
from zerver.lib.queue import SimpleQueueClient, queue_json_publish
from zerver.lib.timestamp import timestamp_to_datetime
from zerver.lib.notifications import handle_missedmessage_emails, enqueue_welcome_emails, \
clear_followup_emails_queue, send_local_email_template_with_delay
from zerver.lib.actions import do_send_confirmation_email, \
do_update_user_activity, do_update_user_activity_interval, do_update_user_presence, \
internal_send_message, check_send_message, extract_recipients, \
handle_push_notification
from zerver.lib.digest import handle_digest_email
from zerver.lib.email_mirror import process_message as mirror_email
from zerver.decorator import JsonableError
from zerver.tornado.socket import req_redis_key
from confirmation.models import Confirmation
from zerver.lib.db import reset_queries
from django.core.mail import EmailMessage
from zerver.lib.redis_utils import get_redis_client
from zerver.context_processors import common_context
import os
import sys
import ujson
from collections import defaultdict
import email
import time
import datetime
import logging
import simplejson
from six.moves import cStringIO as StringIO
class WorkerDeclarationException(Exception):
pass
def assign_queue(queue_name, enabled=True):
# type: (str, bool) -> Callable[[QueueProcessingWorker], QueueProcessingWorker]
def decorate(clazz):
# type: (QueueProcessingWorker) -> QueueProcessingWorker
clazz.queue_name = queue_name
if enabled:
register_worker(queue_name, clazz)
return clazz
return decorate
worker_classes = {} # type: Dict[str, Any] # Any here should be QueueProcessingWorker type
def register_worker(queue_name, clazz):
# type: (str, QueueProcessingWorker) -> None
worker_classes[queue_name] = clazz
def get_worker(queue_name):
# type: (str) -> QueueProcessingWorker
return worker_classes[queue_name]()
def get_active_worker_queues():
# type: () -> List[str]
return list(worker_classes.keys())
class QueueProcessingWorker(object):
queue_name = None # type: str
def __init__(self):
# type: () -> None
self.q = None # type: SimpleQueueClient
if self.queue_name is None:
raise WorkerDeclarationException("Queue worker declared without queue_name")
def consume(self, data):
# type: (Mapping[str, Any]) -> None
raise WorkerDeclarationException("No consumer defined!")
def consume_wrapper(self, data):
# type: (Mapping[str, Any]) -> None
try:
self.consume(data)
except Exception:
self._log_problem()
if not os.path.exists(settings.QUEUE_ERROR_DIR):
os.mkdir(settings.QUEUE_ERROR_DIR)
fname = '%s.errors' % (self.queue_name,)
fn = os.path.join(settings.QUEUE_ERROR_DIR, fname)
line = u'%s\t%s\n' % (time.asctime(), ujson.dumps(data))
lock_fn = fn + '.lock'
with lockfile(lock_fn):
with open(fn, 'ab') as f:
f.write(line.encode('utf-8'))
reset_queries()
def _log_problem(self):
# type: () -> None
logging.exception("Problem handling data on queue %s" % (self.queue_name,))
def setup(self):
# type: () -> None
self.q = SimpleQueueClient()
def start(self):
# type: () -> None
self.q.register_json_consumer(self.queue_name, self.consume_wrapper)
self.q.start_consuming()
def stop(self):
# type: () -> None
self.q.stop_consuming()
if settings.MAILCHIMP_API_KEY:
from postmonkey import PostMonkey, MailChimpException
@assign_queue('signups')
class SignupWorker(QueueProcessingWorker):
def __init__(self):
# type: () -> None
super(SignupWorker, self).__init__()
if settings.MAILCHIMP_API_KEY:
self.pm = PostMonkey(settings.MAILCHIMP_API_KEY, timeout=10)
def consume(self, data):
# type: (Mapping[str, Any]) -> None
merge_vars = data['merge_vars']
# This should clear out any invitation reminder emails
clear_followup_emails_queue(data["EMAIL"])
if settings.MAILCHIMP_API_KEY and settings.PRODUCTION:
try:
self.pm.listSubscribe(
id=settings.ZULIP_FRIENDS_LIST_ID,
email_address=data['EMAIL'],
merge_vars=merge_vars,
double_optin=False,
send_welcome=False)
except MailChimpException as e:
if e.code == 214:
logging.warning("Attempted to sign up already existing email to list: %s" % (data['EMAIL'],))
else:
raise e
email = data.get("EMAIL")
name = merge_vars.get("NAME")
enqueue_welcome_emails(email, name)
@assign_queue('invites')
class ConfirmationEmailWorker(QueueProcessingWorker):
def consume(self, data):
# type: (Mapping[str, Any]) -> None
invitee = get_prereg_user_by_email(data["email"])
referrer = get_user_profile_by_email(data["referrer_email"])
do_send_confirmation_email(invitee, referrer)
# queue invitation reminder for two days from now.
link = Confirmation.objects.get_link_for_object(invitee, host=referrer.realm.host)
context = common_context(referrer)
context.update({
'activate_url': link,
'referrer': referrer,
'verbose_support_offers': settings.VERBOSE_SUPPORT_OFFERS,
'support_email': settings.ZULIP_ADMINISTRATOR
})
send_local_email_template_with_delay(
[{'email': data["email"], 'name': ""}],
"zerver/emails/invitation/invitation_reminder_email",
context,
datetime.timedelta(days=2),
tags=["invitation-reminders"],
sender={'email': settings.ZULIP_ADMINISTRATOR, 'name': 'Zulip'})
@assign_queue('user_activity')
class UserActivityWorker(QueueProcessingWorker):
def consume(self, event):
# type: (Mapping[str, Any]) -> None
user_profile = get_user_profile_by_id(event["user_profile_id"])
client = get_client(event["client"])
log_time = timestamp_to_datetime(event["time"])
query = event["query"]
do_update_user_activity(user_profile, client, query, log_time)
@assign_queue('user_activity_interval')
class UserActivityIntervalWorker(QueueProcessingWorker):
def consume(self, event):
# type: (Mapping[str, Any]) -> None
user_profile = get_user_profile_by_id(event["user_profile_id"])
log_time = timestamp_to_datetime(event["time"])
do_update_user_activity_interval(user_profile, log_time)
@assign_queue('user_presence')
class UserPresenceWorker(QueueProcessingWorker):
def consume(self, event):
# type: (Mapping[str, Any]) -> None
logging.info("Received event: %s" % (event),)
user_profile = get_user_profile_by_id(event["user_profile_id"])
client = get_client(event["client"])
log_time = timestamp_to_datetime(event["time"])
status = event["status"]
do_update_user_presence(user_profile, client, log_time, status)
@assign_queue('missedmessage_emails')
class MissedMessageWorker(QueueProcessingWorker):
def start(self):
# type: () -> None
while True:
missed_events = self.q.drain_queue("missedmessage_emails", json=True)
by_recipient = defaultdict(list) # type: Dict[int, List[Dict[str, Any]]]
for event in missed_events:
logging.info("Received event: %s" % (event,))
by_recipient[event['user_profile_id']].append(event)
for user_profile_id, events in by_recipient.items():
handle_missedmessage_emails(user_profile_id, events)
reset_queries()
# Aggregate all messages received every 2 minutes to let someone finish sending a batch
# of messages
time.sleep(2 * 60)
@assign_queue('missedmessage_mobile_notifications')
class PushNotificationsWorker(QueueProcessingWorker):
def consume(self, data):
# type: (Mapping[str, Any]) -> None
handle_push_notification(data['user_profile_id'], data)
def make_feedback_client():
# type: () -> Any # Should be zulip.Client, but not necessarily importable
sys.path.append(os.path.join(os.path.dirname(__file__), '../../api'))
import zulip
return zulip.Client(
client="ZulipFeedback/0.1",
email=settings.DEPLOYMENT_ROLE_NAME,
api_key=settings.DEPLOYMENT_ROLE_KEY,
verbose=True,
site=settings.FEEDBACK_TARGET)
# We probably could stop running this queue worker at all if ENABLE_FEEDBACK is False
@assign_queue('feedback_messages')
class FeedbackBot(QueueProcessingWorker):
def start(self):
# type: () -> None
if settings.ENABLE_FEEDBACK and settings.FEEDBACK_EMAIL is None:
self.staging_client = make_feedback_client()
self.staging_client._register(
'forward_feedback',
method='POST',
url='deployments/feedback',
make_request=(lambda request: {'message': simplejson.dumps(request)}),
)
QueueProcessingWorker.start(self)
def consume(self, event):
# type: (Mapping[str, Any]) -> None
if not settings.ENABLE_FEEDBACK:
return
if settings.FEEDBACK_EMAIL is not None:
to_email = settings.FEEDBACK_EMAIL
subject = "Zulip feedback from %s" % (event["sender_email"],)
content = event["content"]
from_email = '"%s" <%s>' % (event["sender_full_name"], settings.ZULIP_ADMINISTRATOR)
headers = {'Reply-To': '"%s" <%s>' % (event["sender_full_name"], event["sender_email"])}
msg = EmailMessage(subject, content, from_email, [to_email], headers=headers)
msg.send()
else:
self.staging_client.forward_feedback(event)
@assign_queue('error_reports')
class ErrorReporter(QueueProcessingWorker):
def start(self):
# type: () -> None
if settings.DEPLOYMENT_ROLE_KEY:
self.staging_client = make_feedback_client()
self.staging_client._register(
'forward_error',
method='POST',
url='deployments/report_error',
make_request=(lambda type, report: {'type': type, 'report': simplejson.dumps(report)}),
)
QueueProcessingWorker.start(self)
def consume(self, event):
# type: (Mapping[str, Any]) -> None
if settings.DEPLOYMENT_ROLE_KEY:
self.staging_client.forward_error(event['type'], event['report'])
elif settings.ZILENCER_ENABLED:
from zilencer.views import do_report_error
do_report_error(settings.DEPLOYMENT_ROLE_NAME, event['type'], event['report'])
@assign_queue('slow_queries')
class SlowQueryWorker(QueueProcessingWorker):
def start(self):
# type: () -> None
while True:
self.process_one_batch()
# Aggregate all slow query messages in 1-minute chunks to avoid message spam
time.sleep(1 * 60)
def process_one_batch(self):
# type: () -> None
slow_queries = self.q.drain_queue("slow_queries", json=True)
if settings.ERROR_BOT is None:
return
if len(slow_queries) > 0:
topic = "%s: slow queries" % (settings.EXTERNAL_HOST,)
content = ""
for query in slow_queries:
content += " %s\n" % (query,)
internal_send_message(settings.ERROR_BOT, "stream", "logs", topic, content)
reset_queries()
@assign_queue("message_sender")
class MessageSenderWorker(QueueProcessingWorker):
def __init__(self):
# type: () -> None
super(MessageSenderWorker, self).__init__()
self.redis_client = get_redis_client()
self.handler = BaseHandler()
self.handler.load_middleware()
def consume(self, event):
# type: (Mapping[str, Any]) -> None
server_meta = event['server_meta']
environ = {'REQUEST_METHOD': 'SOCKET',
'SCRIPT_NAME': '',
'PATH_INFO': '/json/messages',
'SERVER_NAME': '127.0.0.1',
'SERVER_PORT': 9993,
'SERVER_PROTOCOL': 'ZULIP_SOCKET/1.0',
'wsgi.version': (1, 0),
'wsgi.input': StringIO(),
'wsgi.errors': sys.stderr,
'wsgi.multithread': False,
'wsgi.multiprocess': True,
'wsgi.run_once': False,
'zulip.emulated_method': 'POST'}
# We're mostly using a WSGIRequest for convenience
environ.update(server_meta['request_environ'])
request = WSGIRequest(environ)
# Note: If we ever support non-POST methods, we'll need to change this.
request._post = event['request']
request.csrf_processing_done = True
user_profile = get_user_profile_by_id(server_meta['user_id'])
request._cached_user = user_profile
resp = self.handler.get_response(request)
server_meta['time_request_finished'] = time.time()
server_meta['worker_log_data'] = request._log_data
resp_content = resp.content.decode('utf-8')
result = {'response': ujson.loads(resp_content), 'req_id': event['req_id'],
'server_meta': server_meta}
redis_key = req_redis_key(event['req_id'])
self.redis_client.hmset(redis_key, {'status': 'complete',
'response': resp_content})
queue_json_publish(server_meta['return_queue'], result, lambda e: None)
@assign_queue('digest_emails')
class DigestWorker(QueueProcessingWorker):
# Who gets a digest is entirely determined by the enqueue_digest_emails
# management command, not here.
def consume(self, event):
# type: (Mapping[str, Any]) -> None
logging.info("Received digest event: %s" % (event,))
handle_digest_email(event["user_profile_id"], event["cutoff"])
@assign_queue('email_mirror')
class MirrorWorker(QueueProcessingWorker):
# who gets a digest is entirely determined by the enqueue_digest_emails
# management command, not here.
def consume(self, event):
# type: (Mapping[str, Any]) -> None
mirror_email(email.message_from_string(event["message"]),
rcpt_to=event["rcpt_to"], pre_checked=True)
@assign_queue('test')
class TestWorker(QueueProcessingWorker):
# This worker allows you to test the queue worker infrastructure without
# creating significant side effects. It can be useful in development or
# for troubleshooting prod/staging. It pulls a message off the test queue
# and appends it to a file in /tmp.
def consume(self, event):
# type: (Mapping[str, Any]) -> None
fn = settings.ZULIP_WORKER_TEST_FILE
message = ujson.dumps(event)
logging.info("TestWorker should append this message to %s: %s" % (fn, message))
with open(fn, 'a') as f:
f.write(message + '\n')