2017-11-16 19:54:24 +01:00
|
|
|
# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html
|
2016-07-23 07:51:30 +02:00
|
|
|
from typing import Any, Callable, Dict, List, Mapping, Optional, cast
|
2013-08-29 23:41:03 +02:00
|
|
|
|
2017-07-03 12:52:55 +02:00
|
|
|
import signal
|
2017-05-30 08:10:19 +02:00
|
|
|
import sys
|
|
|
|
import os
|
2017-09-15 09:38:12 +02:00
|
|
|
from functools import wraps
|
|
|
|
|
|
|
|
import smtplib
|
|
|
|
import socket
|
2017-05-30 08:10:19 +02:00
|
|
|
|
2013-09-03 22:33:20 +02:00
|
|
|
from django.conf import settings
|
2017-07-03 12:52:55 +02:00
|
|
|
from django.db import connection
|
2013-11-08 02:02:48 +01:00
|
|
|
from django.core.handlers.wsgi import WSGIRequest
|
|
|
|
from django.core.handlers.base import BaseHandler
|
2017-05-22 23:37:15 +02:00
|
|
|
from zerver.models import \
|
2017-12-05 09:01:41 +01:00
|
|
|
get_client, get_system_bot, ScheduledEmail, PreregistrationUser, \
|
2017-05-25 20:41:29 +02:00
|
|
|
get_user_profile_by_id, Message, Realm, Service, UserMessage, UserProfile
|
2013-10-29 20:03:42 +01:00
|
|
|
from zerver.lib.context_managers import lockfile
|
2017-01-24 07:37:46 +01:00
|
|
|
from zerver.lib.error_notify import do_report_error
|
2017-03-06 08:43:20 +01:00
|
|
|
from zerver.lib.feedback import handle_feedback
|
2017-09-15 09:38:12 +02:00
|
|
|
from zerver.lib.queue import SimpleQueueClient, queue_json_publish, retry_event
|
2013-09-04 00:00:44 +02:00
|
|
|
from zerver.lib.timestamp import timestamp_to_datetime
|
2017-09-21 14:29:25 +02:00
|
|
|
from zerver.lib.notifications import handle_missedmessage_emails, enqueue_welcome_emails
|
2017-03-06 03:05:04 +01:00
|
|
|
from zerver.lib.push_notifications import handle_push_notification
|
2014-01-24 22:29:17 +01:00
|
|
|
from zerver.lib.actions import do_send_confirmation_email, \
|
2013-09-30 17:53:46 +02:00
|
|
|
do_update_user_activity, do_update_user_activity_interval, do_update_user_presence, \
|
2014-01-24 22:29:17 +01:00
|
|
|
internal_send_message, check_send_message, extract_recipients, \
|
2017-11-13 21:24:51 +01:00
|
|
|
render_incoming_message, do_update_embedded_data, do_mark_stream_messages_as_read
|
2016-10-27 12:06:44 +02:00
|
|
|
from zerver.lib.url_preview import preview as url_preview
|
2013-10-21 23:26:41 +02:00
|
|
|
from zerver.lib.digest import handle_digest_email
|
2017-07-12 01:05:59 +02:00
|
|
|
from zerver.lib.send_email import send_future_email, send_email_from_dict, \
|
|
|
|
FromAddress, EmailNotDeliveredException
|
2013-12-17 22:37:51 +01:00
|
|
|
from zerver.lib.email_mirror import process_message as mirror_email
|
2017-11-13 21:24:51 +01:00
|
|
|
from zerver.lib.streams import access_stream_by_id
|
2013-09-04 00:00:44 +02:00
|
|
|
from zerver.decorator import JsonableError
|
2017-11-03 22:07:19 +01:00
|
|
|
from zerver.tornado.socket import req_redis_key, respond_send_message
|
2017-07-08 04:38:13 +02:00
|
|
|
from confirmation.models import Confirmation, create_confirmation_link
|
2014-01-07 22:20:29 +01:00
|
|
|
from zerver.lib.db import reset_queries
|
2014-02-05 00:35:32 +01:00
|
|
|
from zerver.lib.redis_utils import get_redis_client
|
2017-04-05 11:46:14 +02:00
|
|
|
from zerver.lib.str_utils import force_str
|
2016-11-08 10:07:47 +01:00
|
|
|
from zerver.context_processors import common_context
|
2017-07-24 07:51:18 +02:00
|
|
|
from zerver.lib.outgoing_webhook import do_rest_call, get_outgoing_webhook_service_handler
|
2016-07-23 07:51:30 +02:00
|
|
|
from zerver.models import get_bot_services
|
2017-05-25 20:41:29 +02:00
|
|
|
from zulip import Client
|
2017-07-21 17:54:34 +02:00
|
|
|
from zerver.lib.bot_lib import EmbeddedBotHandler, get_bot_handler
|
2013-10-10 20:39:43 +02:00
|
|
|
|
2013-09-03 22:33:20 +02:00
|
|
|
import os
|
2013-10-17 22:55:09 +02:00
|
|
|
import sys
|
2013-09-03 22:33:20 +02:00
|
|
|
import ujson
|
|
|
|
from collections import defaultdict
|
2013-12-17 22:37:51 +01:00
|
|
|
import email
|
2013-09-03 22:33:20 +02:00
|
|
|
import time
|
|
|
|
import datetime
|
|
|
|
import logging
|
2016-12-28 22:24:56 +01:00
|
|
|
import requests
|
2017-10-12 07:54:25 +02:00
|
|
|
import ujson
|
2017-11-06 02:56:09 +01:00
|
|
|
from io import StringIO
|
2016-07-23 07:51:30 +02:00
|
|
|
import re
|
2017-05-25 20:41:29 +02:00
|
|
|
import importlib
|
|
|
|
|
2017-12-20 18:08:35 +01:00
|
|
|
logger = logging.getLogger(__name__)
|
2013-08-29 23:41:03 +02:00
|
|
|
|
2016-01-26 02:06:26 +01:00
|
|
|
class WorkerDeclarationException(Exception):
|
|
|
|
pass
|
|
|
|
|
2017-02-17 07:16:43 +01:00
|
|
|
def assign_queue(queue_name, enabled=True, queue_type="consumer"):
|
2017-08-06 21:41:21 +02:00
|
|
|
# type: (str, bool, str) -> Callable[[QueueProcessingWorker], QueueProcessingWorker]
|
2013-08-29 23:41:03 +02:00
|
|
|
def decorate(clazz):
|
2016-07-30 05:40:10 +02:00
|
|
|
# type: (QueueProcessingWorker) -> QueueProcessingWorker
|
2013-08-29 23:41:03 +02:00
|
|
|
clazz.queue_name = queue_name
|
2013-10-23 20:17:33 +02:00
|
|
|
if enabled:
|
2017-02-17 07:16:43 +01:00
|
|
|
register_worker(queue_name, clazz, queue_type)
|
2013-08-29 23:41:03 +02:00
|
|
|
return clazz
|
|
|
|
return decorate
|
|
|
|
|
2017-07-09 02:09:29 +02:00
|
|
|
worker_classes = {} # type: Dict[str, Any] # Any here should be QueueProcessingWorker type
|
2017-02-17 07:16:43 +01:00
|
|
|
queues = {} # type: Dict[str, Dict[str, QueueProcessingWorker]]
|
|
|
|
def register_worker(queue_name, clazz, queue_type):
|
|
|
|
# type: (str, QueueProcessingWorker, str) -> None
|
|
|
|
if queue_type not in queues:
|
|
|
|
queues[queue_type] = {}
|
|
|
|
queues[queue_type][queue_name] = clazz
|
2013-08-29 23:41:03 +02:00
|
|
|
worker_classes[queue_name] = clazz
|
|
|
|
|
|
|
|
def get_worker(queue_name):
|
2016-07-30 05:40:10 +02:00
|
|
|
# type: (str) -> QueueProcessingWorker
|
2013-08-29 23:41:03 +02:00
|
|
|
return worker_classes[queue_name]()
|
|
|
|
|
2017-02-17 07:16:43 +01:00
|
|
|
def get_active_worker_queues(queue_type=None):
|
|
|
|
# type: (Optional[str]) -> List[str]
|
|
|
|
"""Returns all the non-test worker queues."""
|
|
|
|
if queue_type is None:
|
|
|
|
return list(worker_classes.keys())
|
|
|
|
return list(queues[queue_type].keys())
|
2013-10-23 20:50:21 +02:00
|
|
|
|
2017-07-03 12:52:55 +02:00
|
|
|
def check_and_send_restart_signal():
|
|
|
|
# type: () -> None
|
|
|
|
try:
|
|
|
|
if not connection.is_usable():
|
|
|
|
logging.warning("*** Sending self SIGUSR1 to trigger a restart.")
|
|
|
|
os.kill(os.getpid(), signal.SIGUSR1)
|
|
|
|
except Exception:
|
|
|
|
pass
|
|
|
|
|
2017-09-15 09:38:12 +02:00
|
|
|
def retry_send_email_failures(func):
|
|
|
|
# type: (Callable[[Any, Dict[str, Any]], None]) -> Callable[[QueueProcessingWorker, Dict[str, Any]], None]
|
|
|
|
# If we don't use cast() and use QueueProcessingWorker instead of Any in
|
|
|
|
# function type annotation then mypy complains.
|
|
|
|
func = cast(Callable[[QueueProcessingWorker, Dict[str, Any]], None], func)
|
|
|
|
|
|
|
|
@wraps(func)
|
|
|
|
def wrapper(worker, data):
|
|
|
|
# type: (QueueProcessingWorker, Dict[str, Any]) -> None
|
|
|
|
try:
|
|
|
|
func(worker, data)
|
|
|
|
except (smtplib.SMTPServerDisconnected, socket.gaierror):
|
|
|
|
|
|
|
|
def on_failure(event):
|
|
|
|
# type: (Dict[str, Any]) -> None
|
|
|
|
logging.exception("Event {} failed".format(event['id']))
|
|
|
|
|
|
|
|
retry_event(worker.queue_name, data, on_failure)
|
|
|
|
|
|
|
|
return wrapper
|
|
|
|
|
2017-11-05 11:53:59 +01:00
|
|
|
class QueueProcessingWorker:
|
2017-07-09 02:09:29 +02:00
|
|
|
queue_name = None # type: str
|
2016-01-26 02:06:26 +01:00
|
|
|
|
2013-08-29 23:41:03 +02:00
|
|
|
def __init__(self):
|
2016-07-30 05:40:10 +02:00
|
|
|
# type: () -> None
|
2017-07-09 02:09:29 +02:00
|
|
|
self.q = None # type: SimpleQueueClient
|
2016-01-26 02:06:26 +01:00
|
|
|
if self.queue_name is None:
|
|
|
|
raise WorkerDeclarationException("Queue worker declared without queue_name")
|
|
|
|
|
|
|
|
def consume(self, data):
|
2017-09-21 11:11:50 +02:00
|
|
|
# type: (Dict[str, Any]) -> None
|
2016-01-26 02:06:26 +01:00
|
|
|
raise WorkerDeclarationException("No consumer defined!")
|
2013-08-29 23:41:03 +02:00
|
|
|
|
2013-10-30 22:03:50 +01:00
|
|
|
def consume_wrapper(self, data):
|
2017-09-21 11:11:50 +02:00
|
|
|
# type: (Dict[str, Any]) -> None
|
2013-10-29 20:03:42 +01:00
|
|
|
try:
|
2013-11-01 19:02:11 +01:00
|
|
|
self.consume(data)
|
2013-10-29 20:03:42 +01:00
|
|
|
except Exception:
|
|
|
|
self._log_problem()
|
|
|
|
if not os.path.exists(settings.QUEUE_ERROR_DIR):
|
2017-11-10 12:43:53 +01:00
|
|
|
os.mkdir(settings.QUEUE_ERROR_DIR) # nocoverage
|
2013-10-29 20:03:42 +01:00
|
|
|
fname = '%s.errors' % (self.queue_name,)
|
|
|
|
fn = os.path.join(settings.QUEUE_ERROR_DIR, fname)
|
2016-01-24 05:17:25 +01:00
|
|
|
line = u'%s\t%s\n' % (time.asctime(), ujson.dumps(data))
|
2013-10-29 20:03:42 +01:00
|
|
|
lock_fn = fn + '.lock'
|
|
|
|
with lockfile(lock_fn):
|
2016-07-10 00:32:15 +02:00
|
|
|
with open(fn, 'ab') as f:
|
2016-01-27 02:25:31 +01:00
|
|
|
f.write(line.encode('utf-8'))
|
2017-07-03 12:52:55 +02:00
|
|
|
check_and_send_restart_signal()
|
|
|
|
finally:
|
|
|
|
reset_queries()
|
2013-10-29 20:03:42 +01:00
|
|
|
|
|
|
|
def _log_problem(self):
|
2016-07-30 05:40:10 +02:00
|
|
|
# type: () -> None
|
2013-10-29 20:03:42 +01:00
|
|
|
logging.exception("Problem handling data on queue %s" % (self.queue_name,))
|
2013-10-17 18:55:23 +02:00
|
|
|
|
2015-11-24 07:01:35 +01:00
|
|
|
def setup(self):
|
2016-07-30 05:40:10 +02:00
|
|
|
# type: () -> None
|
2015-11-24 07:01:35 +01:00
|
|
|
self.q = SimpleQueueClient()
|
|
|
|
|
2013-08-29 23:41:03 +02:00
|
|
|
def start(self):
|
2016-07-30 05:40:10 +02:00
|
|
|
# type: () -> None
|
2013-10-29 20:03:42 +01:00
|
|
|
self.q.register_json_consumer(self.queue_name, self.consume_wrapper)
|
2013-08-29 23:41:03 +02:00
|
|
|
self.q.start_consuming()
|
|
|
|
|
|
|
|
def stop(self):
|
2016-07-30 05:40:10 +02:00
|
|
|
# type: () -> None
|
2013-08-29 23:41:03 +02:00
|
|
|
self.q.stop_consuming()
|
2013-09-03 22:33:20 +02:00
|
|
|
|
2017-11-03 22:34:12 +01:00
|
|
|
class LoopQueueProcessingWorker(QueueProcessingWorker):
|
|
|
|
sleep_delay = 0
|
|
|
|
|
2017-11-10 12:43:53 +01:00
|
|
|
def start(self) -> None: # nocoverage
|
2017-11-03 22:34:12 +01:00
|
|
|
while True:
|
|
|
|
# TODO: Probably it'd be better to share code with consume_wrapper()
|
|
|
|
events = self.q.drain_queue(self.queue_name, json=True)
|
|
|
|
try:
|
|
|
|
self.consume_batch(events)
|
|
|
|
finally:
|
|
|
|
reset_queries()
|
|
|
|
time.sleep(self.sleep_delay)
|
|
|
|
|
|
|
|
def consume_batch(self, event: List[Dict[str, Any]]) -> None:
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
def consume(self, event: Dict[str, Any]) -> None:
|
|
|
|
"""In LoopQueueProcessingWorker, consume is used just for automated tests"""
|
|
|
|
self.consume_batch([event])
|
|
|
|
|
2013-11-01 19:31:00 +01:00
|
|
|
@assign_queue('signups')
|
2013-09-03 22:33:20 +02:00
|
|
|
class SignupWorker(QueueProcessingWorker):
|
2013-10-30 22:03:50 +01:00
|
|
|
def consume(self, data):
|
2017-10-06 07:28:59 +02:00
|
|
|
# type: (Dict[str, Any]) -> None
|
2017-09-22 22:57:35 +02:00
|
|
|
user_profile = get_user_profile_by_id(data['user_id'])
|
|
|
|
logging.info("Processing signup for user %s in realm %s" % (
|
|
|
|
user_profile.email, user_profile.realm.string_id))
|
2015-09-25 08:19:47 +02:00
|
|
|
if settings.MAILCHIMP_API_KEY and settings.PRODUCTION:
|
2016-12-28 22:24:56 +01:00
|
|
|
endpoint = "https://%s.api.mailchimp.com/3.0/lists/%s/members" % \
|
|
|
|
(settings.MAILCHIMP_API_KEY.split('-')[1], settings.ZULIP_FRIENDS_LIST_ID)
|
|
|
|
params = dict(data)
|
2017-07-11 05:48:09 +02:00
|
|
|
del params['user_id']
|
2016-12-28 22:24:56 +01:00
|
|
|
params['list_id'] = settings.ZULIP_FRIENDS_LIST_ID
|
|
|
|
params['status'] = 'subscribed'
|
|
|
|
r = requests.post(endpoint, auth=('apikey', settings.MAILCHIMP_API_KEY), json=params, timeout=10)
|
|
|
|
if r.status_code == 400 and ujson.loads(r.text)['title'] == 'Member Exists':
|
|
|
|
logging.warning("Attempted to sign up already existing email to list: %s" %
|
|
|
|
(data['email_address'],))
|
2017-10-06 06:41:18 +02:00
|
|
|
elif r.status_code == 400:
|
|
|
|
retry_event('signups', data, lambda e: r.raise_for_status())
|
2016-12-28 22:24:56 +01:00
|
|
|
else:
|
|
|
|
r.raise_for_status()
|
|
|
|
|
2013-11-01 19:31:00 +01:00
|
|
|
@assign_queue('invites')
|
2013-09-03 22:33:20 +02:00
|
|
|
class ConfirmationEmailWorker(QueueProcessingWorker):
|
2013-10-30 22:03:50 +01:00
|
|
|
def consume(self, data):
|
2016-07-30 05:40:10 +02:00
|
|
|
# type: (Mapping[str, Any]) -> None
|
2017-12-05 09:01:41 +01:00
|
|
|
|
|
|
|
if "email" in data:
|
|
|
|
# When upgrading from a version up through 1.7.1, there may be
|
|
|
|
# existing items in the queue with `email` instead of `prereg_id`.
|
|
|
|
invitee = PreregistrationUser.objects.filter(
|
|
|
|
email__iexact=data["email"].strip()).latest("invited_at")
|
|
|
|
else:
|
|
|
|
invitee = PreregistrationUser.objects.filter(id=data["prereg_id"]).first()
|
|
|
|
if invitee is None:
|
|
|
|
# The invitation could have been revoked
|
|
|
|
return
|
|
|
|
|
2017-05-10 23:23:59 +02:00
|
|
|
referrer = get_user_profile_by_id(data["referrer_id"])
|
2017-12-20 18:08:35 +01:00
|
|
|
logger.info("Sending invitation for realm %s to %s" % (referrer.realm.string_id, invitee.email))
|
2017-12-06 22:31:11 +01:00
|
|
|
do_send_confirmation_email(invitee, referrer)
|
2013-09-03 22:33:20 +02:00
|
|
|
|
2013-10-10 20:39:43 +02:00
|
|
|
# queue invitation reminder for two days from now.
|
2017-07-08 04:38:13 +02:00
|
|
|
link = create_confirmation_link(invitee, referrer.realm.host, Confirmation.INVITATION)
|
2016-11-08 10:07:47 +01:00
|
|
|
context = common_context(referrer)
|
|
|
|
context.update({
|
|
|
|
'activate_url': link,
|
2017-05-10 23:29:51 +02:00
|
|
|
'referrer_name': referrer.full_name,
|
2017-05-04 06:34:30 +02:00
|
|
|
'referrer_email': referrer.email,
|
|
|
|
'referrer_realm_name': referrer.realm.name,
|
2016-11-08 10:07:47 +01:00
|
|
|
})
|
2017-05-03 18:20:16 +02:00
|
|
|
send_future_email(
|
2017-05-01 23:43:00 +02:00
|
|
|
"zerver/emails/invitation_reminder",
|
2017-12-05 03:19:48 +01:00
|
|
|
referrer.realm,
|
2017-12-05 09:01:41 +01:00
|
|
|
to_email=invitee.email,
|
2017-07-05 19:02:02 +02:00
|
|
|
from_address=FromAddress.NOREPLY,
|
2017-05-03 18:20:16 +02:00
|
|
|
context=context,
|
2017-05-04 06:51:05 +02:00
|
|
|
delay=datetime.timedelta(days=2))
|
2013-10-10 20:39:43 +02:00
|
|
|
|
2013-09-03 22:33:20 +02:00
|
|
|
@assign_queue('user_activity')
|
|
|
|
class UserActivityWorker(QueueProcessingWorker):
|
2013-10-30 22:03:50 +01:00
|
|
|
def consume(self, event):
|
2016-07-30 05:40:10 +02:00
|
|
|
# type: (Mapping[str, Any]) -> None
|
2013-09-04 00:00:44 +02:00
|
|
|
user_profile = get_user_profile_by_id(event["user_profile_id"])
|
|
|
|
client = get_client(event["client"])
|
|
|
|
log_time = timestamp_to_datetime(event["time"])
|
|
|
|
query = event["query"]
|
|
|
|
do_update_user_activity(user_profile, client, query, log_time)
|
|
|
|
|
|
|
|
@assign_queue('user_activity_interval')
|
|
|
|
class UserActivityIntervalWorker(QueueProcessingWorker):
|
2013-10-30 22:03:50 +01:00
|
|
|
def consume(self, event):
|
2016-07-30 05:40:10 +02:00
|
|
|
# type: (Mapping[str, Any]) -> None
|
2013-09-04 00:00:44 +02:00
|
|
|
user_profile = get_user_profile_by_id(event["user_profile_id"])
|
|
|
|
log_time = timestamp_to_datetime(event["time"])
|
|
|
|
do_update_user_activity_interval(user_profile, log_time)
|
|
|
|
|
|
|
|
@assign_queue('user_presence')
|
|
|
|
class UserPresenceWorker(QueueProcessingWorker):
|
2013-10-30 22:03:50 +01:00
|
|
|
def consume(self, event):
|
2016-07-30 05:40:10 +02:00
|
|
|
# type: (Mapping[str, Any]) -> None
|
2017-09-28 23:34:10 +02:00
|
|
|
logging.debug("Received presence event: %s" % (event),)
|
2013-09-04 00:00:44 +02:00
|
|
|
user_profile = get_user_profile_by_id(event["user_profile_id"])
|
|
|
|
client = get_client(event["client"])
|
|
|
|
log_time = timestamp_to_datetime(event["time"])
|
|
|
|
status = event["status"]
|
|
|
|
do_update_user_presence(user_profile, client, log_time, status)
|
2013-09-03 22:33:20 +02:00
|
|
|
|
2017-02-17 07:16:43 +01:00
|
|
|
@assign_queue('missedmessage_emails', queue_type="loop")
|
2017-11-03 22:34:12 +01:00
|
|
|
class MissedMessageWorker(LoopQueueProcessingWorker):
|
|
|
|
# Aggregate all messages received every 2 minutes to let someone finish sending a batch
|
|
|
|
# of messages
|
|
|
|
sleep_delay = 2 * 60
|
2013-09-03 22:33:20 +02:00
|
|
|
|
2017-11-03 22:34:12 +01:00
|
|
|
def consume_batch(self, missed_events: List[Dict[str, Any]]) -> None:
|
|
|
|
by_recipient = defaultdict(list) # type: Dict[int, List[Dict[str, Any]]]
|
2013-09-03 22:33:20 +02:00
|
|
|
|
2017-11-03 22:34:12 +01:00
|
|
|
for event in missed_events:
|
|
|
|
logging.debug("Received missedmessage_emails event: %s" % (event,))
|
|
|
|
by_recipient[event['user_profile_id']].append(event)
|
2013-09-03 22:33:20 +02:00
|
|
|
|
2017-11-03 22:34:12 +01:00
|
|
|
for user_profile_id, events in by_recipient.items():
|
|
|
|
handle_missedmessage_emails(user_profile_id, events)
|
2013-09-30 17:53:46 +02:00
|
|
|
|
2017-11-29 08:25:57 +01:00
|
|
|
@assign_queue('email_senders')
|
|
|
|
class EmailSendingWorker(QueueProcessingWorker):
|
2017-09-15 09:38:12 +02:00
|
|
|
@retry_send_email_failures
|
2017-03-06 08:45:59 +01:00
|
|
|
def consume(self, data):
|
2017-09-21 11:11:50 +02:00
|
|
|
# type: (Dict[str, Any]) -> None
|
2017-07-12 01:05:59 +02:00
|
|
|
try:
|
|
|
|
send_email_from_dict(data)
|
|
|
|
except EmailNotDeliveredException:
|
|
|
|
# TODO: Do something smarter here ..
|
|
|
|
pass
|
2017-03-06 08:45:59 +01:00
|
|
|
|
2017-11-29 08:25:57 +01:00
|
|
|
@assign_queue('missedmessage_email_senders')
|
|
|
|
class MissedMessageSendingWorker(EmailSendingWorker):
|
|
|
|
"""
|
|
|
|
Note: Class decorators are not inherited.
|
|
|
|
|
|
|
|
The `missedmessage_email_senders` queue was used up through 1.7.1, so we
|
|
|
|
keep consuming from it in case we've just upgraded from an old version.
|
|
|
|
After the 1.8 release, we can delete it and tell admins to upgrade to 1.8
|
|
|
|
first.
|
|
|
|
"""
|
|
|
|
# TODO: zulip-1.8: Delete code related to missedmessage_email_senders queue.
|
|
|
|
pass
|
|
|
|
|
2013-11-19 00:55:24 +01:00
|
|
|
@assign_queue('missedmessage_mobile_notifications')
|
|
|
|
class PushNotificationsWorker(QueueProcessingWorker):
|
|
|
|
def consume(self, data):
|
2016-07-30 05:40:10 +02:00
|
|
|
# type: (Mapping[str, Any]) -> None
|
2013-11-19 00:55:24 +01:00
|
|
|
handle_push_notification(data['user_profile_id'], data)
|
|
|
|
|
2015-08-21 08:02:44 +02:00
|
|
|
# We probably could stop running this queue worker at all if ENABLE_FEEDBACK is False
|
2013-10-17 22:55:09 +02:00
|
|
|
@assign_queue('feedback_messages')
|
|
|
|
class FeedbackBot(QueueProcessingWorker):
|
2013-10-30 22:03:50 +01:00
|
|
|
def consume(self, event):
|
2016-07-30 05:40:10 +02:00
|
|
|
# type: (Mapping[str, Any]) -> None
|
2016-10-29 09:00:44 +02:00
|
|
|
logging.info("Received feedback from %s" % (event["sender_email"],))
|
2017-03-06 08:43:20 +01:00
|
|
|
handle_feedback(event)
|
2013-10-17 22:55:09 +02:00
|
|
|
|
2013-11-13 19:12:22 +01:00
|
|
|
@assign_queue('error_reports')
|
|
|
|
class ErrorReporter(QueueProcessingWorker):
|
|
|
|
def consume(self, event):
|
2016-07-30 05:40:10 +02:00
|
|
|
# type: (Mapping[str, Any]) -> None
|
2017-01-24 07:56:37 +01:00
|
|
|
logging.info("Processing traceback with type %s for %s" % (event['type'], event.get('user_email')))
|
2017-10-24 06:14:22 +02:00
|
|
|
if settings.ERROR_REPORTING:
|
2017-01-28 21:04:35 +01:00
|
|
|
do_report_error(event['report']['host'], event['type'], event['report'])
|
2013-11-13 19:12:22 +01:00
|
|
|
|
2017-02-17 07:16:43 +01:00
|
|
|
@assign_queue('slow_queries', queue_type="loop")
|
2017-11-03 22:34:12 +01:00
|
|
|
class SlowQueryWorker(LoopQueueProcessingWorker):
|
|
|
|
# Sleep 1 minute between checking the queue
|
|
|
|
sleep_delay = 60 * 1
|
2013-11-13 02:14:15 +01:00
|
|
|
|
2017-11-03 22:34:12 +01:00
|
|
|
def consume_batch(self, slow_queries):
|
|
|
|
# type: (List[Dict[str, Any]]) -> None
|
2017-09-22 23:07:57 +02:00
|
|
|
for query in slow_queries:
|
|
|
|
logging.info("Slow query: %s" % (query))
|
|
|
|
|
2013-11-13 01:55:06 +01:00
|
|
|
if settings.ERROR_BOT is None:
|
|
|
|
return
|
2013-09-30 17:53:46 +02:00
|
|
|
|
2013-11-13 01:55:06 +01:00
|
|
|
if len(slow_queries) > 0:
|
2015-11-17 05:16:53 +01:00
|
|
|
topic = "%s: slow queries" % (settings.EXTERNAL_HOST,)
|
2013-09-30 17:53:46 +02:00
|
|
|
|
2013-11-13 01:55:06 +01:00
|
|
|
content = ""
|
|
|
|
for query in slow_queries:
|
|
|
|
content += " %s\n" % (query,)
|
2013-09-30 17:53:46 +02:00
|
|
|
|
2017-05-22 23:37:15 +02:00
|
|
|
error_bot_realm = get_system_bot(settings.ERROR_BOT).realm
|
2017-01-22 05:23:36 +01:00
|
|
|
internal_send_message(error_bot_realm, settings.ERROR_BOT,
|
|
|
|
"stream", "logs", topic, content)
|
2013-09-07 00:27:10 +02:00
|
|
|
|
|
|
|
@assign_queue("message_sender")
|
|
|
|
class MessageSenderWorker(QueueProcessingWorker):
|
2013-11-05 23:05:03 +01:00
|
|
|
def __init__(self):
|
2016-07-30 05:40:10 +02:00
|
|
|
# type: () -> None
|
2017-10-27 08:28:23 +02:00
|
|
|
super().__init__()
|
2014-02-05 00:35:32 +01:00
|
|
|
self.redis_client = get_redis_client()
|
2013-11-08 02:02:48 +01:00
|
|
|
self.handler = BaseHandler()
|
|
|
|
self.handler.load_middleware()
|
2013-11-05 23:05:03 +01:00
|
|
|
|
2013-10-30 22:03:50 +01:00
|
|
|
def consume(self, event):
|
2016-07-30 05:40:10 +02:00
|
|
|
# type: (Mapping[str, Any]) -> None
|
2013-11-08 02:02:48 +01:00
|
|
|
server_meta = event['server_meta']
|
|
|
|
|
2017-02-17 23:56:42 +01:00
|
|
|
environ = {
|
|
|
|
'REQUEST_METHOD': 'SOCKET',
|
|
|
|
'SCRIPT_NAME': '',
|
|
|
|
'PATH_INFO': '/json/messages',
|
|
|
|
'SERVER_NAME': '127.0.0.1',
|
|
|
|
'SERVER_PORT': 9993,
|
|
|
|
'SERVER_PROTOCOL': 'ZULIP_SOCKET/1.0',
|
|
|
|
'wsgi.version': (1, 0),
|
|
|
|
'wsgi.input': StringIO(),
|
|
|
|
'wsgi.errors': sys.stderr,
|
|
|
|
'wsgi.multithread': False,
|
|
|
|
'wsgi.multiprocess': True,
|
|
|
|
'wsgi.run_once': False,
|
|
|
|
'zulip.emulated_method': 'POST'
|
|
|
|
}
|
2017-02-17 23:59:25 +01:00
|
|
|
|
|
|
|
if 'socket_user_agent' in event['request']:
|
|
|
|
environ['HTTP_USER_AGENT'] = event['request']['socket_user_agent']
|
|
|
|
del event['request']['socket_user_agent']
|
|
|
|
|
2013-11-08 02:02:48 +01:00
|
|
|
# We're mostly using a WSGIRequest for convenience
|
|
|
|
environ.update(server_meta['request_environ'])
|
|
|
|
request = WSGIRequest(environ)
|
2016-11-05 19:30:59 +01:00
|
|
|
# Note: If we ever support non-POST methods, we'll need to change this.
|
|
|
|
request._post = event['request']
|
2013-11-08 02:02:48 +01:00
|
|
|
request.csrf_processing_done = True
|
|
|
|
|
|
|
|
user_profile = get_user_profile_by_id(server_meta['user_id'])
|
|
|
|
request._cached_user = user_profile
|
|
|
|
|
|
|
|
resp = self.handler.get_response(request)
|
2013-11-08 23:11:37 +01:00
|
|
|
server_meta['time_request_finished'] = time.time()
|
|
|
|
server_meta['worker_log_data'] = request._log_data
|
2013-11-08 02:02:48 +01:00
|
|
|
|
2016-07-12 15:10:01 +02:00
|
|
|
resp_content = resp.content.decode('utf-8')
|
2017-07-03 12:52:55 +02:00
|
|
|
response_data = ujson.loads(resp_content)
|
|
|
|
if response_data['result'] == 'error':
|
|
|
|
check_and_send_restart_signal()
|
|
|
|
|
2017-07-07 09:12:58 +02:00
|
|
|
result = {'response': response_data, 'req_id': event['req_id'],
|
2013-11-08 02:02:48 +01:00
|
|
|
'server_meta': server_meta}
|
|
|
|
|
2014-01-17 21:35:25 +01:00
|
|
|
redis_key = req_redis_key(event['req_id'])
|
2013-11-05 23:05:03 +01:00
|
|
|
self.redis_client.hmset(redis_key, {'status': 'complete',
|
2016-11-09 13:44:29 +01:00
|
|
|
'response': resp_content})
|
2013-11-05 23:05:03 +01:00
|
|
|
|
2017-11-03 22:07:19 +01:00
|
|
|
queue_json_publish(server_meta['return_queue'], result,
|
|
|
|
respond_send_message)
|
2013-09-07 00:27:10 +02:00
|
|
|
|
2013-10-21 23:26:41 +02:00
|
|
|
@assign_queue('digest_emails')
|
|
|
|
class DigestWorker(QueueProcessingWorker):
|
2013-10-30 20:48:04 +01:00
|
|
|
# Who gets a digest is entirely determined by the enqueue_digest_emails
|
2013-10-21 23:26:41 +02:00
|
|
|
# management command, not here.
|
2013-10-30 22:03:50 +01:00
|
|
|
def consume(self, event):
|
2016-07-30 05:40:10 +02:00
|
|
|
# type: (Mapping[str, Any]) -> None
|
2013-10-28 20:56:43 +01:00
|
|
|
logging.info("Received digest event: %s" % (event,))
|
|
|
|
handle_digest_email(event["user_profile_id"], event["cutoff"])
|
2013-10-28 20:45:35 +01:00
|
|
|
|
2013-12-17 22:37:51 +01:00
|
|
|
@assign_queue('email_mirror')
|
|
|
|
class MirrorWorker(QueueProcessingWorker):
|
|
|
|
# who gets a digest is entirely determined by the enqueue_digest_emails
|
|
|
|
# management command, not here.
|
|
|
|
def consume(self, event):
|
2016-07-30 05:40:10 +02:00
|
|
|
# type: (Mapping[str, Any]) -> None
|
2017-04-05 11:46:14 +02:00
|
|
|
message = force_str(event["message"])
|
|
|
|
mirror_email(email.message_from_string(message),
|
2014-07-25 10:40:40 +02:00
|
|
|
rcpt_to=event["rcpt_to"], pre_checked=True)
|
2013-12-17 22:37:51 +01:00
|
|
|
|
2017-02-17 07:16:43 +01:00
|
|
|
@assign_queue('test', queue_type="test")
|
2013-10-30 16:01:18 +01:00
|
|
|
class TestWorker(QueueProcessingWorker):
|
|
|
|
# This worker allows you to test the queue worker infrastructure without
|
|
|
|
# creating significant side effects. It can be useful in development or
|
|
|
|
# for troubleshooting prod/staging. It pulls a message off the test queue
|
|
|
|
# and appends it to a file in /tmp.
|
2017-11-06 20:49:27 +01:00
|
|
|
def consume(self, event): # nocoverage
|
2016-07-30 05:40:10 +02:00
|
|
|
# type: (Mapping[str, Any]) -> None
|
2013-10-30 16:01:18 +01:00
|
|
|
fn = settings.ZULIP_WORKER_TEST_FILE
|
|
|
|
message = ujson.dumps(event)
|
|
|
|
logging.info("TestWorker should append this message to %s: %s" % (fn, message))
|
|
|
|
with open(fn, 'a') as f:
|
|
|
|
f.write(message + '\n')
|
2016-10-27 12:06:44 +02:00
|
|
|
|
|
|
|
@assign_queue('embed_links')
|
|
|
|
class FetchLinksEmbedData(QueueProcessingWorker):
|
|
|
|
def consume(self, event):
|
|
|
|
# type: (Mapping[str, Any]) -> None
|
|
|
|
for url in event['urls']:
|
|
|
|
url_preview.get_link_embed_data(url)
|
|
|
|
|
|
|
|
message = Message.objects.get(id=event['message_id'])
|
|
|
|
# If the message changed, we will run this task after updating the message
|
|
|
|
# in zerver.views.messages.update_message_backend
|
|
|
|
if message.content != event['message_content']:
|
|
|
|
return
|
|
|
|
if message.content is not None:
|
2017-09-09 02:50:57 +02:00
|
|
|
query = UserMessage.objects.filter(
|
|
|
|
message=message.id
|
|
|
|
)
|
|
|
|
message_user_ids = set(query.values_list('user_profile_id', flat=True))
|
2017-01-22 05:55:30 +01:00
|
|
|
|
|
|
|
# Fetch the realm whose settings we're using for rendering
|
|
|
|
realm = Realm.objects.get(id=event['message_realm_id'])
|
|
|
|
|
2016-10-27 12:06:44 +02:00
|
|
|
# If rendering fails, the called code will raise a JsonableError.
|
|
|
|
rendered_content = render_incoming_message(
|
|
|
|
message,
|
2017-01-22 05:55:30 +01:00
|
|
|
message.content,
|
2017-09-09 02:50:57 +02:00
|
|
|
message_user_ids,
|
2017-01-22 05:55:30 +01:00
|
|
|
realm)
|
2016-10-27 12:06:44 +02:00
|
|
|
do_update_embedded_data(
|
|
|
|
message.sender, message, message.content, rendered_content)
|
2017-04-20 22:04:08 +02:00
|
|
|
|
|
|
|
@assign_queue('outgoing_webhooks')
|
|
|
|
class OutgoingWebhookWorker(QueueProcessingWorker):
|
|
|
|
def consume(self, event):
|
|
|
|
# type: (Mapping[str, Any]) -> None
|
2016-07-23 07:51:30 +02:00
|
|
|
message = event['message']
|
|
|
|
dup_event = cast(Dict[str, Any], event)
|
|
|
|
dup_event['command'] = message['content']
|
|
|
|
|
2017-05-26 16:37:45 +02:00
|
|
|
services = get_bot_services(event['user_profile_id'])
|
2016-07-23 07:51:30 +02:00
|
|
|
for service in services:
|
|
|
|
dup_event['service_name'] = str(service.name)
|
2017-05-26 16:37:45 +02:00
|
|
|
service_handler = get_outgoing_webhook_service_handler(service)
|
|
|
|
rest_operation, request_data = service_handler.process_event(dup_event)
|
|
|
|
do_rest_call(rest_operation, request_data, dup_event, service_handler)
|
2017-05-25 20:41:29 +02:00
|
|
|
|
|
|
|
@assign_queue('embedded_bots')
|
|
|
|
class EmbeddedBotWorker(QueueProcessingWorker):
|
|
|
|
|
|
|
|
def get_bot_api_client(self, user_profile):
|
2017-06-20 12:22:55 +02:00
|
|
|
# type: (UserProfile) -> EmbeddedBotHandler
|
|
|
|
return EmbeddedBotHandler(user_profile)
|
2017-05-25 20:41:29 +02:00
|
|
|
|
|
|
|
def consume(self, event):
|
|
|
|
# type: (Mapping[str, Any]) -> None
|
|
|
|
user_profile_id = event['user_profile_id']
|
|
|
|
user_profile = get_user_profile_by_id(user_profile_id)
|
|
|
|
|
|
|
|
message = cast(Dict[str, Any], event['message'])
|
|
|
|
|
|
|
|
# TODO: Do we actually want to allow multiple Services per bot user?
|
|
|
|
services = get_bot_services(user_profile_id)
|
|
|
|
for service in services:
|
2017-07-25 19:03:09 +02:00
|
|
|
bot_handler = get_bot_handler(str(service.name))
|
|
|
|
if bot_handler is None:
|
2017-10-27 02:36:54 +02:00
|
|
|
logging.error("Error: User %s has bot with invalid embedded bot service %s" % (
|
|
|
|
user_profile_id, service.name))
|
2017-07-25 19:03:09 +02:00
|
|
|
continue
|
|
|
|
bot_handler.handle_message(
|
2017-05-25 20:41:29 +02:00
|
|
|
message=message,
|
2017-11-28 04:12:54 +01:00
|
|
|
bot_handler=self.get_bot_api_client(user_profile)
|
|
|
|
)
|
2017-11-13 21:24:51 +01:00
|
|
|
|
|
|
|
@assign_queue('deferred_work')
|
|
|
|
class DeferredWorker(QueueProcessingWorker):
|
|
|
|
def consume(self, event: Mapping[str, Any]) -> None:
|
|
|
|
if event['type'] == 'mark_stream_messages_as_read':
|
|
|
|
user_profile = get_user_profile_by_id(event['user_profile_id'])
|
|
|
|
|
|
|
|
for stream_id in event['stream_ids']:
|
2017-11-29 23:35:33 +01:00
|
|
|
# Since the user just unsubscribed, we don't require
|
|
|
|
# an active Subscription object (otherwise, private
|
|
|
|
# streams would never be accessible)
|
|
|
|
(stream, recipient, sub) = access_stream_by_id(user_profile, stream_id,
|
|
|
|
require_active=False)
|
2017-11-13 21:24:51 +01:00
|
|
|
do_mark_stream_messages_as_read(user_profile, stream)
|