2017-11-16 19:54:24 +01:00
|
|
|
# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html
|
2020-06-05 23:35:52 +02:00
|
|
|
import base64
|
2018-01-30 20:06:23 +01:00
|
|
|
import copy
|
2020-06-11 00:54:34 +02:00
|
|
|
import datetime
|
|
|
|
import email
|
2020-06-05 23:26:35 +02:00
|
|
|
import email.policy
|
2020-06-11 00:54:34 +02:00
|
|
|
import logging
|
|
|
|
import os
|
2017-07-03 12:52:55 +02:00
|
|
|
import signal
|
2020-06-11 00:54:34 +02:00
|
|
|
import smtplib
|
|
|
|
import socket
|
2019-03-27 00:57:33 +01:00
|
|
|
import tempfile
|
2020-06-11 00:54:34 +02:00
|
|
|
import time
|
|
|
|
import urllib
|
|
|
|
from abc import ABC, abstractmethod
|
|
|
|
from collections import defaultdict, deque
|
2020-06-05 23:26:35 +02:00
|
|
|
from email.message import EmailMessage
|
2017-09-15 09:38:12 +02:00
|
|
|
from functools import wraps
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
from threading import Timer
|
2020-06-11 00:54:34 +02:00
|
|
|
from typing import (
|
|
|
|
Any,
|
|
|
|
Callable,
|
|
|
|
Dict,
|
|
|
|
List,
|
|
|
|
Mapping,
|
|
|
|
MutableSequence,
|
|
|
|
Optional,
|
|
|
|
Tuple,
|
|
|
|
Type,
|
|
|
|
TypeVar,
|
|
|
|
cast,
|
|
|
|
)
|
2017-09-15 09:38:12 +02:00
|
|
|
|
2020-06-11 00:54:34 +02:00
|
|
|
import requests
|
|
|
|
import ujson
|
2013-09-03 22:33:20 +02:00
|
|
|
from django.conf import settings
|
2017-07-03 12:52:55 +02:00
|
|
|
from django.db import connection
|
2020-04-16 23:00:24 +02:00
|
|
|
from django.utils.timezone import now as timezone_now
|
2020-06-11 00:54:34 +02:00
|
|
|
from zulip_bots.lib import ExternalBotHandler, extract_query_without_mention
|
|
|
|
|
|
|
|
from zerver.context_processors import common_context
|
|
|
|
from zerver.lib.actions import (
|
|
|
|
do_mark_stream_messages_as_read,
|
|
|
|
do_send_confirmation_email,
|
|
|
|
do_update_embedded_data,
|
|
|
|
do_update_user_activity,
|
|
|
|
do_update_user_activity_interval,
|
|
|
|
do_update_user_presence,
|
|
|
|
internal_send_private_message,
|
|
|
|
notify_realm_export,
|
|
|
|
render_incoming_message,
|
|
|
|
)
|
|
|
|
from zerver.lib.bot_lib import EmbeddedBotHandler, EmbeddedBotQuitException, get_bot_handler
|
2013-10-29 20:03:42 +01:00
|
|
|
from zerver.lib.context_managers import lockfile
|
2014-01-07 22:20:29 +01:00
|
|
|
from zerver.lib.db import reset_queries
|
2020-06-11 00:54:34 +02:00
|
|
|
from zerver.lib.digest import handle_digest_email
|
|
|
|
from zerver.lib.email_mirror import decode_stream_email_address, is_missed_message_address
|
|
|
|
from zerver.lib.email_mirror import process_message as mirror_email
|
|
|
|
from zerver.lib.email_mirror import rate_limit_mirror_by_realm
|
|
|
|
from zerver.lib.email_notifications import handle_missedmessage_emails
|
|
|
|
from zerver.lib.error_notify import do_report_error
|
2019-03-16 11:39:09 +01:00
|
|
|
from zerver.lib.exceptions import RateLimited
|
2019-03-27 00:57:33 +01:00
|
|
|
from zerver.lib.export import export_realm_wrapper
|
2020-06-11 00:54:34 +02:00
|
|
|
from zerver.lib.outgoing_webhook import do_rest_call, get_outgoing_webhook_service_handler
|
|
|
|
from zerver.lib.push_notifications import (
|
|
|
|
clear_push_device_tokens,
|
|
|
|
handle_push_notification,
|
|
|
|
handle_remove_push_notification,
|
|
|
|
initialize_push_notifications,
|
|
|
|
)
|
2019-12-20 00:00:45 +01:00
|
|
|
from zerver.lib.pysa import mark_sanitized
|
2020-06-11 00:54:34 +02:00
|
|
|
from zerver.lib.queue import SimpleQueueClient, retry_event
|
|
|
|
from zerver.lib.remote_server import PushNotificationBouncerRetryLaterError
|
|
|
|
from zerver.lib.send_email import (
|
|
|
|
EmailNotDeliveredException,
|
|
|
|
FromAddress,
|
|
|
|
handle_send_email_format_changes,
|
|
|
|
send_email_from_dict,
|
|
|
|
send_future_email,
|
|
|
|
)
|
|
|
|
from zerver.lib.streams import access_stream_by_id
|
|
|
|
from zerver.lib.timestamp import timestamp_to_datetime
|
|
|
|
from zerver.lib.url_preview import preview as url_preview
|
|
|
|
from zerver.models import (
|
|
|
|
Client,
|
|
|
|
Message,
|
|
|
|
PreregistrationUser,
|
|
|
|
Realm,
|
|
|
|
RealmAuditLog,
|
|
|
|
UserMessage,
|
|
|
|
UserProfile,
|
|
|
|
flush_per_request_caches,
|
|
|
|
get_bot_services,
|
|
|
|
get_client,
|
|
|
|
get_system_bot,
|
|
|
|
get_user_profile_by_id,
|
|
|
|
)
|
2017-05-25 20:41:29 +02:00
|
|
|
|
2017-12-20 18:08:35 +01:00
|
|
|
logger = logging.getLogger(__name__)
|
2013-08-29 23:41:03 +02:00
|
|
|
|
2016-01-26 02:06:26 +01:00
|
|
|
class WorkerDeclarationException(Exception):
|
|
|
|
pass
|
|
|
|
|
2018-03-09 19:29:20 +01:00
|
|
|
ConcreteQueueWorker = TypeVar('ConcreteQueueWorker', bound='QueueProcessingWorker')
|
|
|
|
|
2018-03-10 08:29:46 +01:00
|
|
|
def assign_queue(
|
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
|
|
|
queue_name: str, enabled: bool=True, queue_type: str="consumer",
|
2018-03-10 08:29:46 +01:00
|
|
|
) -> Callable[[Type[ConcreteQueueWorker]], Type[ConcreteQueueWorker]]:
|
|
|
|
def decorate(clazz: Type[ConcreteQueueWorker]) -> Type[ConcreteQueueWorker]:
|
2013-08-29 23:41:03 +02:00
|
|
|
clazz.queue_name = queue_name
|
2013-10-23 20:17:33 +02:00
|
|
|
if enabled:
|
2017-02-17 07:16:43 +01:00
|
|
|
register_worker(queue_name, clazz, queue_type)
|
2013-08-29 23:41:03 +02:00
|
|
|
return clazz
|
|
|
|
return decorate
|
|
|
|
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
|
|
|
|
queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
|
2018-03-10 08:29:46 +01:00
|
|
|
def register_worker(queue_name: str, clazz: Type['QueueProcessingWorker'], queue_type: str) -> None:
|
2017-02-17 07:16:43 +01:00
|
|
|
if queue_type not in queues:
|
|
|
|
queues[queue_type] = {}
|
|
|
|
queues[queue_type][queue_name] = clazz
|
2013-08-29 23:41:03 +02:00
|
|
|
worker_classes[queue_name] = clazz
|
|
|
|
|
2018-03-10 08:29:46 +01:00
|
|
|
def get_worker(queue_name: str) -> 'QueueProcessingWorker':
|
2013-08-29 23:41:03 +02:00
|
|
|
return worker_classes[queue_name]()
|
|
|
|
|
2018-03-10 08:29:46 +01:00
|
|
|
def get_active_worker_queues(queue_type: Optional[str]=None) -> List[str]:
|
2017-02-17 07:16:43 +01:00
|
|
|
"""Returns all the non-test worker queues."""
|
|
|
|
if queue_type is None:
|
|
|
|
return list(worker_classes.keys())
|
|
|
|
return list(queues[queue_type].keys())
|
2013-10-23 20:50:21 +02:00
|
|
|
|
2018-03-10 08:29:46 +01:00
|
|
|
def check_and_send_restart_signal() -> None:
|
2017-07-03 12:52:55 +02:00
|
|
|
try:
|
|
|
|
if not connection.is_usable():
|
|
|
|
logging.warning("*** Sending self SIGUSR1 to trigger a restart.")
|
|
|
|
os.kill(os.getpid(), signal.SIGUSR1)
|
|
|
|
except Exception:
|
|
|
|
pass
|
|
|
|
|
2018-03-10 08:29:46 +01:00
|
|
|
def retry_send_email_failures(
|
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
|
|
|
func: Callable[[ConcreteQueueWorker, Dict[str, Any]], None],
|
2018-03-10 08:29:46 +01:00
|
|
|
) -> Callable[['QueueProcessingWorker', Dict[str, Any]], None]:
|
2017-09-15 09:38:12 +02:00
|
|
|
|
|
|
|
@wraps(func)
|
2018-03-10 19:57:20 +01:00
|
|
|
def wrapper(worker: ConcreteQueueWorker, data: Dict[str, Any]) -> None:
|
2017-09-15 09:38:12 +02:00
|
|
|
try:
|
|
|
|
func(worker, data)
|
2018-01-30 20:06:23 +01:00
|
|
|
except (smtplib.SMTPServerDisconnected, socket.gaierror, EmailNotDeliveredException):
|
2018-03-10 08:29:46 +01:00
|
|
|
def on_failure(event: Dict[str, Any]) -> None:
|
2020-06-12 01:35:37 +02:00
|
|
|
logging.exception("Event %r failed", event)
|
2017-09-15 09:38:12 +02:00
|
|
|
|
|
|
|
retry_event(worker.queue_name, data, on_failure)
|
|
|
|
|
|
|
|
return wrapper
|
|
|
|
|
2019-12-27 15:20:01 +01:00
|
|
|
class QueueProcessingWorker(ABC):
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
queue_name: str = None
|
2020-03-18 20:48:49 +01:00
|
|
|
CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM = 50
|
2016-01-26 02:06:26 +01:00
|
|
|
|
2018-03-10 08:29:46 +01:00
|
|
|
def __init__(self) -> None:
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
self.q: SimpleQueueClient = None
|
2016-01-26 02:06:26 +01:00
|
|
|
if self.queue_name is None:
|
|
|
|
raise WorkerDeclarationException("Queue worker declared without queue_name")
|
|
|
|
|
2020-03-18 20:48:49 +01:00
|
|
|
self.initialize_statistics()
|
|
|
|
|
|
|
|
def initialize_statistics(self) -> None:
|
|
|
|
self.queue_last_emptied_timestamp = time.time()
|
|
|
|
self.consumed_since_last_emptied = 0
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
self.recent_consume_times: MutableSequence[Tuple[int, float]] = deque(maxlen=50)
|
2020-03-18 20:48:49 +01:00
|
|
|
self.consume_interation_counter = 0
|
|
|
|
|
|
|
|
self.update_statistics(0)
|
|
|
|
|
|
|
|
def update_statistics(self, remaining_queue_size: int) -> None:
|
|
|
|
total_seconds = sum([seconds for _, seconds in self.recent_consume_times])
|
|
|
|
total_events = sum([events_number for events_number, _ in self.recent_consume_times])
|
|
|
|
if total_events == 0:
|
|
|
|
recent_average_consume_time = None
|
|
|
|
else:
|
|
|
|
recent_average_consume_time = total_seconds / total_events
|
|
|
|
stats_dict = dict(
|
|
|
|
update_time=time.time(),
|
|
|
|
recent_average_consume_time=recent_average_consume_time,
|
|
|
|
current_queue_size=remaining_queue_size,
|
|
|
|
queue_last_emptied_timestamp=self.queue_last_emptied_timestamp,
|
|
|
|
consumed_since_last_emptied=self.consumed_since_last_emptied,
|
|
|
|
)
|
|
|
|
|
|
|
|
os.makedirs(settings.QUEUE_STATS_DIR, exist_ok=True)
|
|
|
|
|
2020-06-10 06:41:04 +02:00
|
|
|
fname = f'{self.queue_name}.stats'
|
2020-03-18 20:48:49 +01:00
|
|
|
fn = os.path.join(settings.QUEUE_STATS_DIR, fname)
|
|
|
|
with lockfile(fn + '.lock'):
|
|
|
|
tmp_fn = fn + '.tmp'
|
|
|
|
with open(tmp_fn, 'w') as f:
|
2020-04-10 21:45:30 +02:00
|
|
|
serialized_dict = ujson.dumps(stats_dict, indent=2)
|
|
|
|
serialized_dict += '\n'
|
|
|
|
f.write(serialized_dict)
|
2020-03-18 20:48:49 +01:00
|
|
|
os.rename(tmp_fn, fn)
|
|
|
|
|
2019-12-27 15:20:01 +01:00
|
|
|
@abstractmethod
|
2018-03-10 08:29:46 +01:00
|
|
|
def consume(self, data: Dict[str, Any]) -> None:
|
2019-12-27 15:20:01 +01:00
|
|
|
pass
|
2013-08-29 23:41:03 +02:00
|
|
|
|
2020-03-18 20:04:20 +01:00
|
|
|
def do_consume(self, consume_func: Callable[[List[Dict[str, Any]]], None],
|
|
|
|
events: List[Dict[str, Any]]) -> None:
|
2013-10-29 20:03:42 +01:00
|
|
|
try:
|
2020-03-18 20:48:49 +01:00
|
|
|
time_start = time.time()
|
2020-03-18 20:04:20 +01:00
|
|
|
consume_func(events)
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
consume_time_seconds: Optional[float] = time.time() - time_start
|
2020-03-18 20:48:49 +01:00
|
|
|
self.consumed_since_last_emptied += len(events)
|
2013-10-29 20:03:42 +01:00
|
|
|
except Exception:
|
2020-03-18 20:04:20 +01:00
|
|
|
self._handle_consume_exception(events)
|
2020-03-18 20:48:49 +01:00
|
|
|
consume_time_seconds = None
|
2017-07-03 12:52:55 +02:00
|
|
|
finally:
|
2020-03-04 00:18:26 +01:00
|
|
|
flush_per_request_caches()
|
2017-07-03 12:52:55 +02:00
|
|
|
reset_queries()
|
2013-10-29 20:03:42 +01:00
|
|
|
|
2020-03-18 20:48:49 +01:00
|
|
|
if consume_time_seconds is not None:
|
|
|
|
self.recent_consume_times.append((len(events), consume_time_seconds))
|
|
|
|
|
|
|
|
if self.q is not None:
|
|
|
|
remaining_queue_size = self.q.queue_size()
|
|
|
|
else:
|
|
|
|
remaining_queue_size = 0
|
|
|
|
|
|
|
|
if remaining_queue_size == 0:
|
|
|
|
self.queue_last_emptied_timestamp = time.time()
|
|
|
|
self.consumed_since_last_emptied = 0
|
|
|
|
|
|
|
|
self.consume_interation_counter += 1
|
|
|
|
if self.consume_interation_counter >= self.CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM:
|
|
|
|
|
|
|
|
self.consume_interation_counter = 0
|
|
|
|
self.update_statistics(remaining_queue_size)
|
|
|
|
|
2020-03-18 20:04:20 +01:00
|
|
|
def consume_wrapper(self, data: Dict[str, Any]) -> None:
|
|
|
|
consume_func = lambda events: self.consume(events[0])
|
|
|
|
self.do_consume(consume_func, [data])
|
|
|
|
|
2019-12-26 21:11:55 +01:00
|
|
|
def _handle_consume_exception(self, events: List[Dict[str, Any]]) -> None:
|
|
|
|
self._log_problem()
|
|
|
|
if not os.path.exists(settings.QUEUE_ERROR_DIR):
|
|
|
|
os.mkdir(settings.QUEUE_ERROR_DIR) # nocoverage
|
2019-12-20 00:00:45 +01:00
|
|
|
# Use 'mark_sanitized' to prevent Pysa from detecting this false positive
|
|
|
|
# flow. 'queue_name' is always a constant string.
|
|
|
|
fname = mark_sanitized(f'{self.queue_name}.errors')
|
2019-12-26 21:11:55 +01:00
|
|
|
fn = os.path.join(settings.QUEUE_ERROR_DIR, fname)
|
2020-06-10 06:41:04 +02:00
|
|
|
line = f'{time.asctime()}\t{ujson.dumps(events)}\n'
|
2019-12-26 21:11:55 +01:00
|
|
|
lock_fn = fn + '.lock'
|
|
|
|
with lockfile(lock_fn):
|
|
|
|
with open(fn, 'ab') as f:
|
|
|
|
f.write(line.encode('utf-8'))
|
|
|
|
check_and_send_restart_signal()
|
|
|
|
|
2018-03-10 08:29:46 +01:00
|
|
|
def _log_problem(self) -> None:
|
2020-06-12 01:35:37 +02:00
|
|
|
logging.exception("Problem handling data on queue %s", self.queue_name)
|
2013-10-17 18:55:23 +02:00
|
|
|
|
2018-03-10 08:29:46 +01:00
|
|
|
def setup(self) -> None:
|
2015-11-24 07:01:35 +01:00
|
|
|
self.q = SimpleQueueClient()
|
|
|
|
|
2018-03-10 08:29:46 +01:00
|
|
|
def start(self) -> None:
|
2020-03-18 20:48:49 +01:00
|
|
|
self.initialize_statistics()
|
2013-10-29 20:03:42 +01:00
|
|
|
self.q.register_json_consumer(self.queue_name, self.consume_wrapper)
|
2013-08-29 23:41:03 +02:00
|
|
|
self.q.start_consuming()
|
|
|
|
|
2018-03-10 08:29:46 +01:00
|
|
|
def stop(self) -> None: # nocoverage
|
2013-08-29 23:41:03 +02:00
|
|
|
self.q.stop_consuming()
|
2013-09-03 22:33:20 +02:00
|
|
|
|
2017-11-03 22:34:12 +01:00
|
|
|
class LoopQueueProcessingWorker(QueueProcessingWorker):
|
|
|
|
sleep_delay = 0
|
2019-09-18 01:52:37 +02:00
|
|
|
sleep_only_if_empty = True
|
2017-11-03 22:34:12 +01:00
|
|
|
|
2017-11-10 12:43:53 +01:00
|
|
|
def start(self) -> None: # nocoverage
|
2020-03-18 20:48:49 +01:00
|
|
|
self.initialize_statistics()
|
2017-11-03 22:34:12 +01:00
|
|
|
while True:
|
2020-06-06 04:22:15 +02:00
|
|
|
events = self.q.json_drain_queue(self.queue_name)
|
2020-03-18 20:04:20 +01:00
|
|
|
self.do_consume(self.consume_batch, events)
|
2019-09-18 01:52:37 +02:00
|
|
|
# To avoid spinning the CPU, we go to sleep if there's
|
|
|
|
# nothing in the queue, or for certain queues with
|
|
|
|
# sleep_only_if_empty=False, unconditionally.
|
|
|
|
if not self.sleep_only_if_empty or len(events) == 0:
|
|
|
|
time.sleep(self.sleep_delay)
|
2017-11-03 22:34:12 +01:00
|
|
|
|
2019-12-27 15:20:01 +01:00
|
|
|
@abstractmethod
|
2019-12-26 21:11:55 +01:00
|
|
|
def consume_batch(self, events: List[Dict[str, Any]]) -> None:
|
2019-12-27 15:20:01 +01:00
|
|
|
pass
|
2017-11-03 22:34:12 +01:00
|
|
|
|
|
|
|
def consume(self, event: Dict[str, Any]) -> None:
|
|
|
|
"""In LoopQueueProcessingWorker, consume is used just for automated tests"""
|
|
|
|
self.consume_batch([event])
|
|
|
|
|
2013-11-01 19:31:00 +01:00
|
|
|
@assign_queue('signups')
|
2013-09-03 22:33:20 +02:00
|
|
|
class SignupWorker(QueueProcessingWorker):
|
2018-03-10 08:29:46 +01:00
|
|
|
def consume(self, data: Dict[str, Any]) -> None:
|
|
|
|
# TODO: This is the only implementation with Dict cf Mapping; should we simplify?
|
2017-09-22 22:57:35 +02:00
|
|
|
user_profile = get_user_profile_by_id(data['user_id'])
|
2020-05-02 08:44:14 +02:00
|
|
|
logging.info(
|
|
|
|
"Processing signup for user %s in realm %s",
|
|
|
|
user_profile.id, user_profile.realm.string_id,
|
|
|
|
)
|
2015-09-25 08:19:47 +02:00
|
|
|
if settings.MAILCHIMP_API_KEY and settings.PRODUCTION:
|
2016-12-28 22:24:56 +01:00
|
|
|
endpoint = "https://%s.api.mailchimp.com/3.0/lists/%s/members" % \
|
|
|
|
(settings.MAILCHIMP_API_KEY.split('-')[1], settings.ZULIP_FRIENDS_LIST_ID)
|
|
|
|
params = dict(data)
|
2017-07-11 05:48:09 +02:00
|
|
|
del params['user_id']
|
2016-12-28 22:24:56 +01:00
|
|
|
params['list_id'] = settings.ZULIP_FRIENDS_LIST_ID
|
|
|
|
params['status'] = 'subscribed'
|
|
|
|
r = requests.post(endpoint, auth=('apikey', settings.MAILCHIMP_API_KEY), json=params, timeout=10)
|
|
|
|
if r.status_code == 400 and ujson.loads(r.text)['title'] == 'Member Exists':
|
2020-05-02 08:44:14 +02:00
|
|
|
logging.warning("Attempted to sign up already existing email to list: %s",
|
|
|
|
data['email_address'])
|
2017-10-06 06:41:18 +02:00
|
|
|
elif r.status_code == 400:
|
2019-12-04 19:08:48 +01:00
|
|
|
retry_event(self.queue_name, data, lambda e: r.raise_for_status())
|
2016-12-28 22:24:56 +01:00
|
|
|
else:
|
|
|
|
r.raise_for_status()
|
|
|
|
|
2013-11-01 19:31:00 +01:00
|
|
|
@assign_queue('invites')
|
2013-09-03 22:33:20 +02:00
|
|
|
class ConfirmationEmailWorker(QueueProcessingWorker):
|
2018-03-10 08:29:46 +01:00
|
|
|
def consume(self, data: Mapping[str, Any]) -> None:
|
2017-12-05 09:01:41 +01:00
|
|
|
if "email" in data:
|
|
|
|
# When upgrading from a version up through 1.7.1, there may be
|
|
|
|
# existing items in the queue with `email` instead of `prereg_id`.
|
|
|
|
invitee = PreregistrationUser.objects.filter(
|
|
|
|
email__iexact=data["email"].strip()).latest("invited_at")
|
|
|
|
else:
|
|
|
|
invitee = PreregistrationUser.objects.filter(id=data["prereg_id"]).first()
|
|
|
|
if invitee is None:
|
|
|
|
# The invitation could have been revoked
|
|
|
|
return
|
|
|
|
|
2017-05-10 23:23:59 +02:00
|
|
|
referrer = get_user_profile_by_id(data["referrer_id"])
|
2020-05-02 08:44:14 +02:00
|
|
|
logger.info("Sending invitation for realm %s to %s", referrer.realm.string_id, invitee.email)
|
2019-09-12 17:22:51 +02:00
|
|
|
activate_url = do_send_confirmation_email(invitee, referrer)
|
2013-09-03 22:33:20 +02:00
|
|
|
|
2019-08-23 03:32:22 +02:00
|
|
|
# queue invitation reminder
|
|
|
|
if settings.INVITATION_LINK_VALIDITY_DAYS >= 4:
|
|
|
|
context = common_context(referrer)
|
|
|
|
context.update({
|
2019-09-12 17:22:51 +02:00
|
|
|
'activate_url': activate_url,
|
2019-08-23 03:32:22 +02:00
|
|
|
'referrer_name': referrer.full_name,
|
|
|
|
'referrer_email': referrer.delivery_email,
|
|
|
|
'referrer_realm_name': referrer.realm.name,
|
|
|
|
})
|
|
|
|
send_future_email(
|
|
|
|
"zerver/emails/invitation_reminder",
|
|
|
|
referrer.realm,
|
|
|
|
to_emails=[invitee.email],
|
2020-03-12 20:28:05 +01:00
|
|
|
from_address=FromAddress.tokenized_no_reply_placeholder,
|
2019-08-23 03:32:22 +02:00
|
|
|
language=referrer.realm.default_language,
|
|
|
|
context=context,
|
|
|
|
delay=datetime.timedelta(days=settings.INVITATION_LINK_VALIDITY_DAYS - 2))
|
2013-10-10 20:39:43 +02:00
|
|
|
|
2019-09-18 01:52:37 +02:00
|
|
|
@assign_queue('user_activity', queue_type="loop")
|
|
|
|
class UserActivityWorker(LoopQueueProcessingWorker):
|
|
|
|
"""The UserActivity queue is perhaps our highest-traffic queue, and
|
|
|
|
requires some care to ensure it performes adequately.
|
|
|
|
|
|
|
|
We use a LoopQueueProcessingWorker as a performance optimization
|
|
|
|
for managing the queue. The structure of UserActivity records is
|
|
|
|
such that they are easily deduplicated before being sent to the
|
|
|
|
database; we take advantage of that to make this queue highly
|
|
|
|
effective at dealing with a backlog containing many similar
|
|
|
|
events. Such a backlog happen in a few ways:
|
|
|
|
|
|
|
|
* In abuse/DoS situations, if a client is sending huge numbers of
|
|
|
|
similar requests to the server.
|
|
|
|
* If the queue ends up with several minutes of backlog e.g. due to
|
|
|
|
downtime of the queue processor, many clients will have several
|
|
|
|
common events from doing an action multiple times.
|
|
|
|
|
|
|
|
"""
|
|
|
|
sleep_delay = 10
|
|
|
|
sleep_only_if_empty = True
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
client_id_map: Dict[str, int] = {}
|
2019-09-18 01:42:27 +02:00
|
|
|
|
|
|
|
def start(self) -> None:
|
|
|
|
# For our unit tests to make sense, we need to clear this on startup.
|
|
|
|
self.client_id_map = {}
|
|
|
|
super().start()
|
|
|
|
|
2019-09-18 01:52:37 +02:00
|
|
|
def consume_batch(self, user_activity_events: List[Dict[str, Any]]) -> None:
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
uncommitted_events: Dict[Tuple[int, int, str], Tuple[int, float]] = {}
|
2019-09-18 01:42:27 +02:00
|
|
|
|
2019-09-18 01:52:37 +02:00
|
|
|
# First, we drain the queue of all user_activity events and
|
|
|
|
# deduplicate them for insertion into the database.
|
|
|
|
for event in user_activity_events:
|
|
|
|
user_profile_id = event["user_profile_id"]
|
|
|
|
|
2020-03-27 16:33:06 +01:00
|
|
|
if "client_id" not in event:
|
|
|
|
# This is for compatibility with older events still stuck in the queue,
|
|
|
|
# that used the client name in event["client"] instead of having
|
|
|
|
# event["client_id"] directly.
|
|
|
|
# TODO: This can be deleted for release >= 2.3.
|
|
|
|
if event["client"] not in self.client_id_map:
|
|
|
|
client = get_client(event["client"])
|
|
|
|
self.client_id_map[event["client"]] = client.id
|
|
|
|
client_id = self.client_id_map[event["client"]]
|
|
|
|
else:
|
|
|
|
client_id = event["client_id"]
|
2019-09-18 01:52:37 +02:00
|
|
|
|
|
|
|
key_tuple = (user_profile_id, client_id, event["query"])
|
|
|
|
if key_tuple not in uncommitted_events:
|
|
|
|
uncommitted_events[key_tuple] = (1, event['time'])
|
|
|
|
else:
|
|
|
|
count, time = uncommitted_events[key_tuple]
|
|
|
|
uncommitted_events[key_tuple] = (count + 1, max(time, event['time']))
|
|
|
|
|
|
|
|
# Then we insert the updates into the database.
|
|
|
|
#
|
|
|
|
# TODO: Doing these updates in sequence individually is likely
|
|
|
|
# inefficient; the idealized version would do some sort of
|
|
|
|
# bulk insert_or_update query.
|
|
|
|
for key_tuple in uncommitted_events:
|
|
|
|
(user_profile_id, client_id, query) = key_tuple
|
|
|
|
count, time = uncommitted_events[key_tuple]
|
|
|
|
log_time = timestamp_to_datetime(time)
|
|
|
|
do_update_user_activity(user_profile_id, client_id, query, count, log_time)
|
2013-09-04 00:00:44 +02:00
|
|
|
|
|
|
|
@assign_queue('user_activity_interval')
|
|
|
|
class UserActivityIntervalWorker(QueueProcessingWorker):
|
2018-03-10 08:29:46 +01:00
|
|
|
def consume(self, event: Mapping[str, Any]) -> None:
|
2013-09-04 00:00:44 +02:00
|
|
|
user_profile = get_user_profile_by_id(event["user_profile_id"])
|
|
|
|
log_time = timestamp_to_datetime(event["time"])
|
|
|
|
do_update_user_activity_interval(user_profile, log_time)
|
|
|
|
|
|
|
|
@assign_queue('user_presence')
|
|
|
|
class UserPresenceWorker(QueueProcessingWorker):
|
2018-03-10 08:29:46 +01:00
|
|
|
def consume(self, event: Mapping[str, Any]) -> None:
|
2020-05-02 08:44:14 +02:00
|
|
|
logging.debug("Received presence event: %s", event)
|
2013-09-04 00:00:44 +02:00
|
|
|
user_profile = get_user_profile_by_id(event["user_profile_id"])
|
|
|
|
client = get_client(event["client"])
|
|
|
|
log_time = timestamp_to_datetime(event["time"])
|
|
|
|
status = event["status"]
|
|
|
|
do_update_user_presence(user_profile, client, log_time, status)
|
2013-09-03 22:33:20 +02:00
|
|
|
|
2017-02-17 07:16:43 +01:00
|
|
|
@assign_queue('missedmessage_emails', queue_type="loop")
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
class MissedMessageWorker(QueueProcessingWorker):
|
|
|
|
# Aggregate all messages received over the last BATCH_DURATION
|
|
|
|
# seconds to let someone finish sending a batch of messages and/or
|
|
|
|
# editing them before they are sent out as emails to recipients.
|
|
|
|
#
|
|
|
|
# The timer is running whenever; we poll at most every TIMER_FREQUENCY
|
|
|
|
# seconds, to avoid excessive activity.
|
|
|
|
#
|
|
|
|
# TODO: Since this process keeps events in memory for up to 2
|
|
|
|
# minutes, it now will lose approximately BATCH_DURATION worth of
|
|
|
|
# missed_message emails whenever it is restarted as part of a
|
|
|
|
# server restart. We should probably add some sort of save/reload
|
|
|
|
# mechanism for that case.
|
|
|
|
TIMER_FREQUENCY = 5
|
|
|
|
BATCH_DURATION = 120
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
timer_event: Optional[Timer] = None
|
|
|
|
events_by_recipient: Dict[int, List[Dict[str, Any]]] = defaultdict(list)
|
|
|
|
batch_start_by_recipient: Dict[int, float] = {}
|
2013-09-03 22:33:20 +02:00
|
|
|
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
def consume(self, event: Dict[str, Any]) -> None:
|
2020-05-02 08:44:14 +02:00
|
|
|
logging.debug("Received missedmessage_emails event: %s", event)
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
|
|
|
|
# When we process an event, just put it into the queue and ensure we have a timer going.
|
|
|
|
user_profile_id = event['user_profile_id']
|
|
|
|
if user_profile_id not in self.batch_start_by_recipient:
|
|
|
|
self.batch_start_by_recipient[user_profile_id] = time.time()
|
|
|
|
self.events_by_recipient[user_profile_id].append(event)
|
2013-09-03 22:33:20 +02:00
|
|
|
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
self.ensure_timer()
|
2013-09-03 22:33:20 +02:00
|
|
|
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
def ensure_timer(self) -> None:
|
|
|
|
if self.timer_event is not None:
|
|
|
|
return
|
|
|
|
self.timer_event = Timer(self.TIMER_FREQUENCY, MissedMessageWorker.maybe_send_batched_emails, [self])
|
|
|
|
self.timer_event.start()
|
|
|
|
|
|
|
|
def stop_timer(self) -> None:
|
2019-08-02 22:53:53 +02:00
|
|
|
if self.timer_event and self.timer_event.is_alive():
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
self.timer_event.cancel()
|
|
|
|
self.timer_event = None
|
|
|
|
|
|
|
|
def maybe_send_batched_emails(self) -> None:
|
|
|
|
self.stop_timer()
|
|
|
|
|
|
|
|
current_time = time.time()
|
|
|
|
for user_profile_id, timestamp in list(self.batch_start_by_recipient.items()):
|
|
|
|
if current_time - timestamp < self.BATCH_DURATION:
|
|
|
|
continue
|
|
|
|
events = self.events_by_recipient[user_profile_id]
|
2020-05-02 08:44:14 +02:00
|
|
|
logging.info("Batch-processing %s missedmessage_emails events for user %s",
|
|
|
|
len(events), user_profile_id)
|
2017-11-03 22:34:12 +01:00
|
|
|
handle_missedmessage_emails(user_profile_id, events)
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
del self.events_by_recipient[user_profile_id]
|
|
|
|
del self.batch_start_by_recipient[user_profile_id]
|
|
|
|
|
|
|
|
# By only restarting the timer if there are actually events in
|
|
|
|
# the queue, we ensure this queue processor is idle when there
|
|
|
|
# are no missed-message emails to process.
|
|
|
|
if len(self.batch_start_by_recipient) > 0:
|
|
|
|
self.ensure_timer()
|
2013-09-30 17:53:46 +02:00
|
|
|
|
2017-11-29 08:25:57 +01:00
|
|
|
@assign_queue('email_senders')
|
|
|
|
class EmailSendingWorker(QueueProcessingWorker):
|
2017-09-15 09:38:12 +02:00
|
|
|
@retry_send_email_failures
|
2018-01-30 20:06:23 +01:00
|
|
|
def consume(self, event: Dict[str, Any]) -> None:
|
|
|
|
# Copy the event, so that we don't pass the `failed_tries'
|
|
|
|
# data to send_email_from_dict (which neither takes that
|
|
|
|
# argument nor needs that data).
|
|
|
|
copied_event = copy.deepcopy(event)
|
|
|
|
if 'failed_tries' in copied_event:
|
|
|
|
del copied_event['failed_tries']
|
2018-12-04 23:34:04 +01:00
|
|
|
handle_send_email_format_changes(copied_event)
|
2018-01-30 20:06:23 +01:00
|
|
|
send_email_from_dict(copied_event)
|
2017-03-06 08:45:59 +01:00
|
|
|
|
2013-11-19 00:55:24 +01:00
|
|
|
@assign_queue('missedmessage_mobile_notifications')
|
2018-02-25 23:52:38 +01:00
|
|
|
class PushNotificationsWorker(QueueProcessingWorker): # nocoverage
|
2018-11-27 18:12:11 +01:00
|
|
|
def start(self) -> None:
|
|
|
|
# initialize_push_notifications doesn't strictly do anything
|
|
|
|
# beyond printing some logging warnings if push notifications
|
|
|
|
# are not available in the current configuration.
|
|
|
|
initialize_push_notifications()
|
|
|
|
super().start()
|
|
|
|
|
2019-12-02 19:46:11 +01:00
|
|
|
def consume(self, event: Dict[str, Any]) -> None:
|
|
|
|
try:
|
|
|
|
if event.get("type", "add") == "remove":
|
|
|
|
message_ids = event.get('message_ids')
|
|
|
|
if message_ids is None: # legacy task across an upgrade
|
|
|
|
message_ids = [event['message_id']]
|
|
|
|
handle_remove_push_notification(event['user_profile_id'], message_ids)
|
|
|
|
else:
|
|
|
|
handle_push_notification(event['user_profile_id'], event)
|
|
|
|
except PushNotificationBouncerRetryLaterError:
|
|
|
|
def failure_processor(event: Dict[str, Any]) -> None:
|
|
|
|
logger.warning(
|
2020-05-02 08:44:14 +02:00
|
|
|
"Maximum retries exceeded for trigger:%s event:push_notification",
|
|
|
|
event['user_profile_id'])
|
2019-12-04 19:08:48 +01:00
|
|
|
retry_event(self.queue_name, event, failure_processor)
|
2013-11-19 00:55:24 +01:00
|
|
|
|
2013-11-13 19:12:22 +01:00
|
|
|
@assign_queue('error_reports')
|
|
|
|
class ErrorReporter(QueueProcessingWorker):
|
2018-03-10 08:29:46 +01:00
|
|
|
def consume(self, event: Mapping[str, Any]) -> None:
|
2020-05-02 08:44:14 +02:00
|
|
|
logging.info("Processing traceback with type %s for %s", event['type'], event.get('user_email'))
|
2017-10-24 06:14:22 +02:00
|
|
|
if settings.ERROR_REPORTING:
|
2017-01-28 21:04:35 +01:00
|
|
|
do_report_error(event['report']['host'], event['type'], event['report'])
|
2013-11-13 19:12:22 +01:00
|
|
|
|
2013-10-21 23:26:41 +02:00
|
|
|
@assign_queue('digest_emails')
|
2018-02-25 23:52:38 +01:00
|
|
|
class DigestWorker(QueueProcessingWorker): # nocoverage
|
2013-10-30 20:48:04 +01:00
|
|
|
# Who gets a digest is entirely determined by the enqueue_digest_emails
|
2013-10-21 23:26:41 +02:00
|
|
|
# management command, not here.
|
2018-03-10 08:29:46 +01:00
|
|
|
def consume(self, event: Mapping[str, Any]) -> None:
|
2020-05-02 08:44:14 +02:00
|
|
|
logging.info("Received digest event: %s", event)
|
2013-10-28 20:56:43 +01:00
|
|
|
handle_digest_email(event["user_profile_id"], event["cutoff"])
|
2013-10-28 20:45:35 +01:00
|
|
|
|
2013-12-17 22:37:51 +01:00
|
|
|
@assign_queue('email_mirror')
|
|
|
|
class MirrorWorker(QueueProcessingWorker):
|
2018-03-10 08:29:46 +01:00
|
|
|
def consume(self, event: Mapping[str, Any]) -> None:
|
2019-03-16 11:39:09 +01:00
|
|
|
rcpt_to = event['rcpt_to']
|
2020-06-05 23:35:52 +02:00
|
|
|
msg = email.message_from_bytes(
|
|
|
|
base64.b64decode(event["msg_base64"]),
|
|
|
|
policy=email.policy.default,
|
|
|
|
)
|
2020-06-05 23:26:35 +02:00
|
|
|
assert isinstance(msg, EmailMessage) # https://github.com/python/typeshed/issues/2417
|
2019-03-16 11:39:09 +01:00
|
|
|
if not is_missed_message_address(rcpt_to):
|
|
|
|
# Missed message addresses are one-time use, so we don't need
|
|
|
|
# to worry about emails to them resulting in message spam.
|
2020-01-10 10:25:56 +01:00
|
|
|
recipient_realm = decode_stream_email_address(rcpt_to)[0].realm
|
2019-03-16 11:39:09 +01:00
|
|
|
try:
|
|
|
|
rate_limit_mirror_by_realm(recipient_realm)
|
|
|
|
except RateLimited:
|
|
|
|
logger.warning("MirrorWorker: Rejecting an email from: %s "
|
2020-05-02 08:44:14 +02:00
|
|
|
"to realm: %s - rate limited.",
|
|
|
|
msg['From'], recipient_realm.name)
|
2019-03-16 11:39:09 +01:00
|
|
|
return
|
|
|
|
|
2020-06-05 23:26:35 +02:00
|
|
|
mirror_email(msg, rcpt_to=rcpt_to)
|
2013-12-17 22:37:51 +01:00
|
|
|
|
2017-02-17 07:16:43 +01:00
|
|
|
@assign_queue('test', queue_type="test")
|
2013-10-30 16:01:18 +01:00
|
|
|
class TestWorker(QueueProcessingWorker):
|
|
|
|
# This worker allows you to test the queue worker infrastructure without
|
|
|
|
# creating significant side effects. It can be useful in development or
|
|
|
|
# for troubleshooting prod/staging. It pulls a message off the test queue
|
|
|
|
# and appends it to a file in /tmp.
|
2018-03-10 08:29:46 +01:00
|
|
|
def consume(self, event: Mapping[str, Any]) -> None: # nocoverage
|
2013-10-30 16:01:18 +01:00
|
|
|
fn = settings.ZULIP_WORKER_TEST_FILE
|
|
|
|
message = ujson.dumps(event)
|
2020-05-02 08:44:14 +02:00
|
|
|
logging.info("TestWorker should append this message to %s: %s", fn, message)
|
2013-10-30 16:01:18 +01:00
|
|
|
with open(fn, 'a') as f:
|
|
|
|
f.write(message + '\n')
|
2016-10-27 12:06:44 +02:00
|
|
|
|
|
|
|
@assign_queue('embed_links')
|
|
|
|
class FetchLinksEmbedData(QueueProcessingWorker):
|
2018-03-10 08:29:46 +01:00
|
|
|
def consume(self, event: Mapping[str, Any]) -> None:
|
2016-10-27 12:06:44 +02:00
|
|
|
for url in event['urls']:
|
2020-05-21 16:15:38 +02:00
|
|
|
start_time = time.time()
|
2016-10-27 12:06:44 +02:00
|
|
|
url_preview.get_link_embed_data(url)
|
2020-05-21 16:15:38 +02:00
|
|
|
logging.info("Time spent on get_link_embed_data for %s: %s", url, time.time() - start_time)
|
2016-10-27 12:06:44 +02:00
|
|
|
|
|
|
|
message = Message.objects.get(id=event['message_id'])
|
|
|
|
# If the message changed, we will run this task after updating the message
|
|
|
|
# in zerver.views.messages.update_message_backend
|
|
|
|
if message.content != event['message_content']:
|
|
|
|
return
|
|
|
|
if message.content is not None:
|
2017-09-09 02:50:57 +02:00
|
|
|
query = UserMessage.objects.filter(
|
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
|
|
|
message=message.id,
|
2017-09-09 02:50:57 +02:00
|
|
|
)
|
|
|
|
message_user_ids = set(query.values_list('user_profile_id', flat=True))
|
2017-01-22 05:55:30 +01:00
|
|
|
|
|
|
|
# Fetch the realm whose settings we're using for rendering
|
|
|
|
realm = Realm.objects.get(id=event['message_realm_id'])
|
|
|
|
|
2016-10-27 12:06:44 +02:00
|
|
|
# If rendering fails, the called code will raise a JsonableError.
|
|
|
|
rendered_content = render_incoming_message(
|
|
|
|
message,
|
2017-01-22 05:55:30 +01:00
|
|
|
message.content,
|
2017-09-09 02:50:57 +02:00
|
|
|
message_user_ids,
|
2017-01-22 05:55:30 +01:00
|
|
|
realm)
|
2016-10-27 12:06:44 +02:00
|
|
|
do_update_embedded_data(
|
|
|
|
message.sender, message, message.content, rendered_content)
|
2017-04-20 22:04:08 +02:00
|
|
|
|
|
|
|
@assign_queue('outgoing_webhooks')
|
|
|
|
class OutgoingWebhookWorker(QueueProcessingWorker):
|
2018-03-10 08:29:46 +01:00
|
|
|
def consume(self, event: Mapping[str, Any]) -> None:
|
2016-07-23 07:51:30 +02:00
|
|
|
message = event['message']
|
|
|
|
dup_event = cast(Dict[str, Any], event)
|
|
|
|
dup_event['command'] = message['content']
|
|
|
|
|
2017-05-26 16:37:45 +02:00
|
|
|
services = get_bot_services(event['user_profile_id'])
|
2016-07-23 07:51:30 +02:00
|
|
|
for service in services:
|
|
|
|
dup_event['service_name'] = str(service.name)
|
2017-05-26 16:37:45 +02:00
|
|
|
service_handler = get_outgoing_webhook_service_handler(service)
|
2018-10-11 00:45:19 +02:00
|
|
|
request_data = service_handler.build_bot_request(dup_event)
|
|
|
|
if request_data:
|
2018-10-11 00:24:55 +02:00
|
|
|
do_rest_call(service.base_url,
|
|
|
|
request_data,
|
|
|
|
dup_event,
|
|
|
|
service_handler)
|
2017-05-25 20:41:29 +02:00
|
|
|
|
|
|
|
@assign_queue('embedded_bots')
|
|
|
|
class EmbeddedBotWorker(QueueProcessingWorker):
|
|
|
|
|
2018-03-10 08:29:46 +01:00
|
|
|
def get_bot_api_client(self, user_profile: UserProfile) -> EmbeddedBotHandler:
|
2017-06-20 12:22:55 +02:00
|
|
|
return EmbeddedBotHandler(user_profile)
|
2017-05-25 20:41:29 +02:00
|
|
|
|
2018-03-10 08:29:46 +01:00
|
|
|
def consume(self, event: Mapping[str, Any]) -> None:
|
2017-05-25 20:41:29 +02:00
|
|
|
user_profile_id = event['user_profile_id']
|
|
|
|
user_profile = get_user_profile_by_id(user_profile_id)
|
|
|
|
|
|
|
|
message = cast(Dict[str, Any], event['message'])
|
|
|
|
|
|
|
|
# TODO: Do we actually want to allow multiple Services per bot user?
|
|
|
|
services = get_bot_services(user_profile_id)
|
|
|
|
for service in services:
|
2017-07-25 19:03:09 +02:00
|
|
|
bot_handler = get_bot_handler(str(service.name))
|
|
|
|
if bot_handler is None:
|
2020-05-02 08:44:14 +02:00
|
|
|
logging.error(
|
|
|
|
"Error: User %s has bot with invalid embedded bot service %s",
|
|
|
|
user_profile_id, service.name,
|
|
|
|
)
|
2017-07-25 19:03:09 +02:00
|
|
|
continue
|
2018-02-08 15:51:38 +01:00
|
|
|
try:
|
|
|
|
if hasattr(bot_handler, 'initialize'):
|
2019-01-31 14:32:37 +01:00
|
|
|
bot_handler.initialize(self.get_bot_api_client(user_profile))
|
2018-02-08 15:51:38 +01:00
|
|
|
if event['trigger'] == 'mention':
|
|
|
|
message['content'] = extract_query_without_mention(
|
|
|
|
message=message,
|
2019-08-07 03:44:04 +02:00
|
|
|
client=cast(ExternalBotHandler, self.get_bot_api_client(user_profile)),
|
2018-02-08 15:51:38 +01:00
|
|
|
)
|
2018-02-25 19:52:47 +01:00
|
|
|
assert message['content'] is not None
|
2018-02-08 15:51:38 +01:00
|
|
|
bot_handler.handle_message(
|
2017-10-10 14:29:04 +02:00
|
|
|
message=message,
|
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
|
|
|
bot_handler=self.get_bot_api_client(user_profile),
|
2017-10-10 14:29:04 +02:00
|
|
|
)
|
2018-02-08 15:51:38 +01:00
|
|
|
except EmbeddedBotQuitException as e:
|
|
|
|
logging.warning(str(e))
|
2017-11-13 21:24:51 +01:00
|
|
|
|
|
|
|
@assign_queue('deferred_work')
|
|
|
|
class DeferredWorker(QueueProcessingWorker):
|
2019-12-03 20:19:38 +01:00
|
|
|
def consume(self, event: Dict[str, Any]) -> None:
|
2017-11-13 21:24:51 +01:00
|
|
|
if event['type'] == 'mark_stream_messages_as_read':
|
|
|
|
user_profile = get_user_profile_by_id(event['user_profile_id'])
|
2018-03-14 00:09:11 +01:00
|
|
|
client = Client.objects.get(id=event['client_id'])
|
2017-11-13 21:24:51 +01:00
|
|
|
|
|
|
|
for stream_id in event['stream_ids']:
|
2017-11-29 23:35:33 +01:00
|
|
|
# Since the user just unsubscribed, we don't require
|
|
|
|
# an active Subscription object (otherwise, private
|
|
|
|
# streams would never be accessible)
|
|
|
|
(stream, recipient, sub) = access_stream_by_id(user_profile, stream_id,
|
|
|
|
require_active=False)
|
2018-03-14 00:09:11 +01:00
|
|
|
do_mark_stream_messages_as_read(user_profile, client, stream)
|
2019-11-19 03:12:54 +01:00
|
|
|
elif event['type'] == 'clear_push_device_tokens':
|
2019-12-03 20:19:38 +01:00
|
|
|
try:
|
|
|
|
clear_push_device_tokens(event["user_profile_id"])
|
|
|
|
except PushNotificationBouncerRetryLaterError:
|
|
|
|
def failure_processor(event: Dict[str, Any]) -> None:
|
|
|
|
logger.warning(
|
2020-05-02 08:44:14 +02:00
|
|
|
"Maximum retries exceeded for trigger:%s event:clear_push_device_tokens",
|
|
|
|
event['user_profile_id'])
|
2019-12-04 19:08:48 +01:00
|
|
|
retry_event(self.queue_name, event, failure_processor)
|
2019-06-24 02:51:13 +02:00
|
|
|
elif event['type'] == 'realm_export':
|
2019-08-13 03:02:02 +02:00
|
|
|
start = time.time()
|
2019-03-27 00:57:33 +01:00
|
|
|
realm = Realm.objects.get(id=event['realm_id'])
|
|
|
|
output_dir = tempfile.mkdtemp(prefix="zulip-export-")
|
2020-04-16 23:00:24 +02:00
|
|
|
export_event = RealmAuditLog.objects.get(id=event['id'])
|
|
|
|
user_profile = get_user_profile_by_id(event['user_profile_id'])
|
|
|
|
|
|
|
|
try:
|
|
|
|
public_url = export_realm_wrapper(realm=realm, output_dir=output_dir,
|
|
|
|
threads=6, upload=True, public_only=True,
|
|
|
|
delete_after_upload=True)
|
|
|
|
except Exception:
|
|
|
|
export_event.extra_data = ujson.dumps(dict(
|
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
|
|
|
failed_timestamp=timezone_now().timestamp(),
|
2020-04-16 23:00:24 +02:00
|
|
|
))
|
|
|
|
export_event.save(update_fields=['extra_data'])
|
2020-05-02 08:44:14 +02:00
|
|
|
logging.error(
|
|
|
|
"Data export for %s failed after %s",
|
|
|
|
user_profile.realm.string_id, time.time() - start,
|
|
|
|
)
|
2020-04-16 23:00:24 +02:00
|
|
|
notify_realm_export(user_profile)
|
|
|
|
return
|
2019-03-27 00:57:33 +01:00
|
|
|
|
|
|
|
assert public_url is not None
|
|
|
|
|
2019-08-11 20:17:16 +02:00
|
|
|
# Update the extra_data field now that the export is complete.
|
|
|
|
export_event.extra_data = ujson.dumps(dict(
|
|
|
|
export_path=urllib.parse.urlparse(public_url).path,
|
|
|
|
))
|
2019-05-17 00:54:56 +02:00
|
|
|
export_event.save(update_fields=['extra_data'])
|
|
|
|
|
2019-03-27 00:57:33 +01:00
|
|
|
# Send a private message notification letting the user who
|
|
|
|
# triggered the export know the export finished.
|
2020-06-10 06:41:04 +02:00
|
|
|
content = f"Your data export is complete and has been uploaded here:\n\n{public_url}"
|
2019-03-27 00:57:33 +01:00
|
|
|
internal_send_private_message(
|
|
|
|
realm=user_profile.realm,
|
|
|
|
sender=get_system_bot(settings.NOTIFICATION_BOT),
|
|
|
|
recipient_user=user_profile,
|
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
|
|
|
content=content,
|
2019-03-27 00:57:33 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
# For future frontend use, also notify administrator
|
2019-06-24 02:51:13 +02:00
|
|
|
# clients that the export happened.
|
2019-08-02 00:14:58 +02:00
|
|
|
notify_realm_export(user_profile)
|
2020-05-02 08:44:14 +02:00
|
|
|
logging.info(
|
|
|
|
"Completed data export for %s in %s",
|
|
|
|
user_profile.realm.string_id, time.time() - start,
|
|
|
|
)
|