2017-11-16 19:54:24 +01:00
|
|
|
# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html
|
2020-06-05 23:35:52 +02:00
|
|
|
import base64
|
2018-01-30 20:06:23 +01:00
|
|
|
import copy
|
2020-06-11 00:54:34 +02:00
|
|
|
import datetime
|
|
|
|
import email
|
2020-06-05 23:26:35 +02:00
|
|
|
import email.policy
|
2020-09-29 01:16:54 +02:00
|
|
|
import functools
|
2020-06-11 00:54:34 +02:00
|
|
|
import logging
|
|
|
|
import os
|
2017-07-03 12:52:55 +02:00
|
|
|
import signal
|
2020-06-11 00:54:34 +02:00
|
|
|
import socket
|
2019-03-27 00:57:33 +01:00
|
|
|
import tempfile
|
2020-06-11 00:54:34 +02:00
|
|
|
import time
|
|
|
|
import urllib
|
|
|
|
from abc import ABC, abstractmethod
|
2021-07-07 16:59:03 +02:00
|
|
|
from collections import deque
|
2020-06-05 23:26:35 +02:00
|
|
|
from email.message import EmailMessage
|
2017-09-15 09:38:12 +02:00
|
|
|
from functools import wraps
|
queue_processors: Shut down background missedmessage_emails thread.
Python's behaviour on `sys.exit` is to wait for all non-daemon threads
to exit. In the context of the missedmessage_emails worker, if any
work is pending, a non-daemon Timer thread exists, which is waiting
for 5 seconds. As soon as that thread is serviced, it sets up another
5-second Timer, a process which repeats until all
ScheduledMessageNotificationEmail records have been handled. This
likely takes two minutes, but may theoretically take up to a week
until the thread exits, and thus sys.exit can complete.
Supervisor only gives the process 30 seconds to shut down, so
something else must prevent this endless Timer.
When `stop` is called, take the lock so we can mutate the timer.
However, since `stop` may have been called from a signal handler, our
thread may _already_ have the lock. As Python provides no way to know
if our thread is the one which has the lock, make the lock a
re-entrant one, allowing us to always try to take it.
With the lock in hand, cancel any outstanding timers. A race exists
where the timer may not be able to be canceled because it has
finished, maybe_send_batched_emails has been called, and is itself
blocked on the lock. Handle this case by timing out the thread join
in `stop()`, and signal the running thread to exit by unsetting the
timer event, which will be detected once it claims the lock.
2021-11-22 23:52:05 +01:00
|
|
|
from threading import RLock, Timer
|
2020-09-29 01:16:54 +02:00
|
|
|
from types import FrameType
|
2020-06-11 00:54:34 +02:00
|
|
|
from typing import (
|
|
|
|
Any,
|
|
|
|
Callable,
|
|
|
|
Dict,
|
|
|
|
List,
|
|
|
|
Mapping,
|
|
|
|
MutableSequence,
|
|
|
|
Optional,
|
2021-04-12 23:52:10 +02:00
|
|
|
Sequence,
|
2020-10-03 01:29:49 +02:00
|
|
|
Set,
|
2020-06-11 00:54:34 +02:00
|
|
|
Tuple,
|
|
|
|
Type,
|
|
|
|
TypeVar,
|
|
|
|
)
|
2017-09-15 09:38:12 +02:00
|
|
|
|
2020-08-07 01:09:47 +02:00
|
|
|
import orjson
|
2020-10-27 23:18:20 +01:00
|
|
|
import sentry_sdk
|
2013-09-03 22:33:20 +02:00
|
|
|
from django.conf import settings
|
2022-07-19 17:32:52 +02:00
|
|
|
from django.core.mail.backends.base import BaseEmailBackend
|
2021-07-07 16:59:03 +02:00
|
|
|
from django.db import connection, transaction
|
2020-08-06 18:21:42 +02:00
|
|
|
from django.db.models import F
|
2021-07-22 06:52:58 +02:00
|
|
|
from django.db.utils import IntegrityError
|
2020-04-16 23:00:24 +02:00
|
|
|
from django.utils.timezone import now as timezone_now
|
2021-04-16 00:57:30 +02:00
|
|
|
from django.utils.translation import gettext as _
|
2020-06-26 15:24:37 +02:00
|
|
|
from django.utils.translation import override as override_language
|
2020-09-18 23:13:13 +02:00
|
|
|
from sentry_sdk import add_breadcrumb, configure_scope
|
2021-03-26 02:27:19 +01:00
|
|
|
from zulip_bots.lib import extract_query_without_mention
|
2020-06-11 00:54:34 +02:00
|
|
|
|
2022-04-14 23:36:07 +02:00
|
|
|
from zerver.actions.invites import do_send_confirmation_email
|
2022-04-14 23:55:52 +02:00
|
|
|
from zerver.actions.message_edit import do_update_embedded_data
|
2022-04-14 23:54:53 +02:00
|
|
|
from zerver.actions.message_flags import do_mark_stream_messages_as_read
|
2022-04-14 23:50:10 +02:00
|
|
|
from zerver.actions.message_send import internal_send_private_message, render_incoming_message
|
2022-04-14 23:44:33 +02:00
|
|
|
from zerver.actions.presence import do_update_user_presence
|
2022-04-14 23:40:10 +02:00
|
|
|
from zerver.actions.realm_export import notify_realm_export
|
2022-04-14 23:41:59 +02:00
|
|
|
from zerver.actions.user_activity import do_update_user_activity, do_update_user_activity_interval
|
2020-06-11 00:54:34 +02:00
|
|
|
from zerver.context_processors import common_context
|
|
|
|
from zerver.lib.bot_lib import EmbeddedBotHandler, EmbeddedBotQuitException, get_bot_handler
|
2013-10-29 20:03:42 +01:00
|
|
|
from zerver.lib.context_managers import lockfile
|
2014-01-07 22:20:29 +01:00
|
|
|
from zerver.lib.db import reset_queries
|
2020-11-13 18:13:13 +01:00
|
|
|
from zerver.lib.digest import bulk_handle_digest_email
|
2020-06-11 00:54:34 +02:00
|
|
|
from zerver.lib.email_mirror import decode_stream_email_address, is_missed_message_address
|
|
|
|
from zerver.lib.email_mirror import process_message as mirror_email
|
|
|
|
from zerver.lib.email_mirror import rate_limit_mirror_by_realm
|
|
|
|
from zerver.lib.email_notifications import handle_missedmessage_emails
|
|
|
|
from zerver.lib.error_notify import do_report_error
|
2019-03-16 11:39:09 +01:00
|
|
|
from zerver.lib.exceptions import RateLimited
|
2019-03-27 00:57:33 +01:00
|
|
|
from zerver.lib.export import export_realm_wrapper
|
2020-06-11 00:54:34 +02:00
|
|
|
from zerver.lib.outgoing_webhook import do_rest_call, get_outgoing_webhook_service_handler
|
|
|
|
from zerver.lib.push_notifications import (
|
|
|
|
clear_push_device_tokens,
|
|
|
|
handle_push_notification,
|
|
|
|
handle_remove_push_notification,
|
|
|
|
initialize_push_notifications,
|
|
|
|
)
|
2019-12-20 00:00:45 +01:00
|
|
|
from zerver.lib.pysa import mark_sanitized
|
2020-06-11 00:54:34 +02:00
|
|
|
from zerver.lib.queue import SimpleQueueClient, retry_event
|
|
|
|
from zerver.lib.remote_server import PushNotificationBouncerRetryLaterError
|
|
|
|
from zerver.lib.send_email import (
|
|
|
|
EmailNotDeliveredException,
|
|
|
|
FromAddress,
|
|
|
|
handle_send_email_format_changes,
|
2021-03-20 14:07:02 +01:00
|
|
|
initialize_connection,
|
2021-04-27 01:51:50 +02:00
|
|
|
send_email,
|
2020-06-11 00:54:34 +02:00
|
|
|
send_future_email,
|
|
|
|
)
|
2022-04-15 04:51:41 +02:00
|
|
|
from zerver.lib.soft_deactivation import reactivate_user_if_soft_deactivated
|
2020-06-11 00:54:34 +02:00
|
|
|
from zerver.lib.timestamp import timestamp_to_datetime
|
2022-02-10 22:06:11 +01:00
|
|
|
from zerver.lib.upload import handle_reupload_emojis_event
|
2020-06-11 00:54:34 +02:00
|
|
|
from zerver.lib.url_preview import preview as url_preview
|
2022-04-14 21:57:20 +02:00
|
|
|
from zerver.lib.url_preview.types import UrlEmbedData
|
2020-06-11 00:54:34 +02:00
|
|
|
from zerver.models import (
|
|
|
|
Message,
|
|
|
|
PreregistrationUser,
|
|
|
|
Realm,
|
|
|
|
RealmAuditLog,
|
2021-07-07 16:59:03 +02:00
|
|
|
ScheduledMessageNotificationEmail,
|
2020-06-11 00:54:34 +02:00
|
|
|
UserMessage,
|
|
|
|
UserProfile,
|
2020-06-12 16:19:17 +02:00
|
|
|
filter_to_valid_prereg_users,
|
2020-06-11 00:54:34 +02:00
|
|
|
flush_per_request_caches,
|
|
|
|
get_bot_services,
|
|
|
|
get_client,
|
|
|
|
get_system_bot,
|
|
|
|
get_user_profile_by_id,
|
|
|
|
)
|
2017-05-25 20:41:29 +02:00
|
|
|
|
2017-12-20 18:08:35 +01:00
|
|
|
logger = logging.getLogger(__name__)
|
2013-08-29 23:41:03 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2020-09-29 01:16:54 +02:00
|
|
|
class WorkerTimeoutException(Exception):
|
2021-09-02 03:03:47 +02:00
|
|
|
def __init__(self, queue_name: str, limit: int, event_count: int) -> None:
|
|
|
|
self.queue_name = queue_name
|
2020-09-29 01:16:54 +02:00
|
|
|
self.limit = limit
|
|
|
|
self.event_count = event_count
|
|
|
|
|
|
|
|
def __str__(self) -> str:
|
2021-09-02 03:03:47 +02:00
|
|
|
return f"Timed out in {self.queue_name} after {self.limit * self.event_count} seconds processing {self.event_count} events"
|
2020-09-29 01:16:54 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2021-07-07 13:25:42 +02:00
|
|
|
class InterruptConsumeException(Exception):
|
|
|
|
"""
|
|
|
|
This exception is to be thrown inside event consume function
|
|
|
|
if the intention is to simply interrupt the processing
|
|
|
|
of the current event and normally continue the work of the queue.
|
|
|
|
"""
|
|
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
2016-01-26 02:06:26 +01:00
|
|
|
class WorkerDeclarationException(Exception):
|
|
|
|
pass
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
ConcreteQueueWorker = TypeVar("ConcreteQueueWorker", bound="QueueProcessingWorker")
|
2018-03-09 19:29:20 +01:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2018-03-10 08:29:46 +01:00
|
|
|
def assign_queue(
|
2021-02-12 08:19:30 +01:00
|
|
|
queue_name: str,
|
|
|
|
enabled: bool = True,
|
|
|
|
is_test_queue: bool = False,
|
2018-03-10 08:29:46 +01:00
|
|
|
) -> Callable[[Type[ConcreteQueueWorker]], Type[ConcreteQueueWorker]]:
|
|
|
|
def decorate(clazz: Type[ConcreteQueueWorker]) -> Type[ConcreteQueueWorker]:
|
2013-08-29 23:41:03 +02:00
|
|
|
clazz.queue_name = queue_name
|
2013-10-23 20:17:33 +02:00
|
|
|
if enabled:
|
2020-10-24 02:24:10 +02:00
|
|
|
register_worker(queue_name, clazz, is_test_queue)
|
2013-08-29 23:41:03 +02:00
|
|
|
return clazz
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2013-08-29 23:41:03 +02:00
|
|
|
return decorate
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
|
2020-10-24 02:24:10 +02:00
|
|
|
test_queues: Set[str] = set()
|
2021-02-12 08:19:30 +01:00
|
|
|
|
|
|
|
|
|
|
|
def register_worker(
|
2021-02-12 08:20:45 +01:00
|
|
|
queue_name: str, clazz: Type["QueueProcessingWorker"], is_test_queue: bool = False
|
2021-02-12 08:19:30 +01:00
|
|
|
) -> None:
|
2013-08-29 23:41:03 +02:00
|
|
|
worker_classes[queue_name] = clazz
|
2020-10-24 02:24:10 +02:00
|
|
|
if is_test_queue:
|
|
|
|
test_queues.add(queue_name)
|
2013-08-29 23:41:03 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
def get_worker(queue_name: str) -> "QueueProcessingWorker":
|
2013-08-29 23:41:03 +02:00
|
|
|
return worker_classes[queue_name]()
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
|
|
|
def get_active_worker_queues(only_test_queues: bool = False) -> List[str]:
|
2020-10-24 02:24:10 +02:00
|
|
|
"""Returns all (either test, or real) worker queues."""
|
2021-02-12 08:19:30 +01:00
|
|
|
return [
|
|
|
|
queue_name
|
|
|
|
for queue_name in worker_classes.keys()
|
|
|
|
if bool(queue_name in test_queues) == only_test_queues
|
|
|
|
]
|
|
|
|
|
2013-10-23 20:50:21 +02:00
|
|
|
|
2018-03-10 08:29:46 +01:00
|
|
|
def check_and_send_restart_signal() -> None:
|
2017-07-03 12:52:55 +02:00
|
|
|
try:
|
|
|
|
if not connection.is_usable():
|
|
|
|
logging.warning("*** Sending self SIGUSR1 to trigger a restart.")
|
|
|
|
os.kill(os.getpid(), signal.SIGUSR1)
|
|
|
|
except Exception:
|
|
|
|
pass
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2021-03-20 14:07:02 +01:00
|
|
|
# If you change the function on which this decorator is used be careful that the new
|
|
|
|
# function doesn't delete the "failed_tries" attribute of "data" which is needed for
|
|
|
|
# "retry_event" to work correctly; see EmailSendingWorker for an example with deepcopy.
|
2018-03-10 08:29:46 +01:00
|
|
|
def retry_send_email_failures(
|
2021-02-12 08:19:30 +01:00
|
|
|
func: Callable[[ConcreteQueueWorker, Dict[str, Any]], None],
|
2021-01-26 19:56:16 +01:00
|
|
|
) -> Callable[[ConcreteQueueWorker, Dict[str, Any]], None]:
|
2017-09-15 09:38:12 +02:00
|
|
|
@wraps(func)
|
2018-03-10 19:57:20 +01:00
|
|
|
def wrapper(worker: ConcreteQueueWorker, data: Dict[str, Any]) -> None:
|
2017-09-15 09:38:12 +02:00
|
|
|
try:
|
|
|
|
func(worker, data)
|
2021-02-12 08:19:30 +01:00
|
|
|
except (
|
|
|
|
socket.gaierror,
|
|
|
|
socket.timeout,
|
|
|
|
EmailNotDeliveredException,
|
|
|
|
) as e:
|
2020-07-04 02:24:31 +02:00
|
|
|
error_class_name = e.__class__.__name__
|
|
|
|
|
2018-03-10 08:29:46 +01:00
|
|
|
def on_failure(event: Dict[str, Any]) -> None:
|
2021-02-12 08:19:30 +01:00
|
|
|
logging.exception(
|
|
|
|
"Event %r failed due to exception %s", event, error_class_name, stack_info=True
|
|
|
|
)
|
2017-09-15 09:38:12 +02:00
|
|
|
|
|
|
|
retry_event(worker.queue_name, data, on_failure)
|
|
|
|
|
|
|
|
return wrapper
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2019-12-27 15:20:01 +01:00
|
|
|
class QueueProcessingWorker(ABC):
|
2020-07-05 02:48:29 +02:00
|
|
|
queue_name: str
|
2020-10-27 23:20:02 +01:00
|
|
|
MAX_CONSUME_SECONDS: Optional[int] = 30
|
2021-10-21 02:23:56 +02:00
|
|
|
# The MAX_CONSUME_SECONDS timeout is only enabled when handling a
|
|
|
|
# single queue at once, with no threads.
|
2020-09-29 01:16:54 +02:00
|
|
|
ENABLE_TIMEOUTS = False
|
2020-03-18 20:48:49 +01:00
|
|
|
CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM = 50
|
2020-09-17 14:41:42 +02:00
|
|
|
MAX_SECONDS_BEFORE_UPDATE_STATS = 30
|
2016-01-26 02:06:26 +01:00
|
|
|
|
queue_processors: Set a bounded prefetch size on rabbitmq queues.
RabbitMQ clients have a setting called prefetch[1], which controls how
many un-acknowledged events the server forwards to the local queue in
the client. The default is 0; this means that when clients first
connect, the server must send them every message in the queue.
This itself may cause unbounded memory usage in the client, but also
has other detrimental effects. While the client is attempting to
process the head of the queue, it may be unable to read from the TCP
socket at the rate that the server is sending to it -- filling the TCP
buffers, and causing the server's writes to block. If the server
blocks for more than 30 seconds, it times out the send, and closes the
connection with:
```
closing AMQP connection <0.30902.126> (127.0.0.1:53870 -> 127.0.0.1:5672):
{writer,send_failed,{error,timeout}}
```
This is https://github.com/pika/pika/issues/753#issuecomment-318119222.
Set a prefetch limit of 100 messages, or the batch size, to better
handle queues which start with large numbers of outstanding events.
Setting prefetch=1 causes significant performance degradation in the
no-op queue worker, to 30% of the prefetch=0 performance. Setting
prefetch=100 achieves 90% of the prefetch=0 performance, and higher
values offer only minor gains above that. For batch workers, their
performance is not notably degraded by prefetch equal to their batch
size, and they cannot function on smaller prefetches than their batch
size.
We also set a 100-count prefetch on Tornado workers, as they are
potentially susceptible to the same effect.
[1] https://www.rabbitmq.com/confirms.html#channel-qos-prefetch
2021-11-09 17:41:19 +01:00
|
|
|
# How many un-acknowledged events the worker should have on hand,
|
|
|
|
# fetched from the rabbitmq server. Larger values may be more
|
|
|
|
# performant, but if queues are large, cause more network IO at
|
|
|
|
# startup and steady-state memory.
|
|
|
|
PREFETCH = 100
|
|
|
|
|
2018-03-10 08:29:46 +01:00
|
|
|
def __init__(self) -> None:
|
2020-07-05 02:48:29 +02:00
|
|
|
self.q: Optional[SimpleQueueClient] = None
|
|
|
|
if not hasattr(self, "queue_name"):
|
2016-01-26 02:06:26 +01:00
|
|
|
raise WorkerDeclarationException("Queue worker declared without queue_name")
|
|
|
|
|
2020-03-18 20:48:49 +01:00
|
|
|
self.initialize_statistics()
|
|
|
|
|
|
|
|
def initialize_statistics(self) -> None:
|
|
|
|
self.queue_last_emptied_timestamp = time.time()
|
|
|
|
self.consumed_since_last_emptied = 0
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
self.recent_consume_times: MutableSequence[Tuple[int, float]] = deque(maxlen=50)
|
2020-09-17 14:32:34 +02:00
|
|
|
self.consume_iteration_counter = 0
|
2020-09-06 18:26:27 +02:00
|
|
|
self.idle = True
|
2020-09-22 00:47:27 +02:00
|
|
|
self.last_statistics_update_time = 0.0
|
2020-03-18 20:48:49 +01:00
|
|
|
|
2021-11-15 22:32:01 +01:00
|
|
|
self.update_statistics()
|
2020-03-18 20:48:49 +01:00
|
|
|
|
2021-11-15 22:32:01 +01:00
|
|
|
def update_statistics(self) -> None:
|
2020-09-02 06:20:26 +02:00
|
|
|
total_seconds = sum(seconds for _, seconds in self.recent_consume_times)
|
|
|
|
total_events = sum(events_number for events_number, _ in self.recent_consume_times)
|
2020-03-18 20:48:49 +01:00
|
|
|
if total_events == 0:
|
|
|
|
recent_average_consume_time = None
|
|
|
|
else:
|
|
|
|
recent_average_consume_time = total_seconds / total_events
|
|
|
|
stats_dict = dict(
|
|
|
|
update_time=time.time(),
|
|
|
|
recent_average_consume_time=recent_average_consume_time,
|
|
|
|
queue_last_emptied_timestamp=self.queue_last_emptied_timestamp,
|
|
|
|
consumed_since_last_emptied=self.consumed_since_last_emptied,
|
|
|
|
)
|
|
|
|
|
|
|
|
os.makedirs(settings.QUEUE_STATS_DIR, exist_ok=True)
|
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
fname = f"{self.queue_name}.stats"
|
2020-03-18 20:48:49 +01:00
|
|
|
fn = os.path.join(settings.QUEUE_STATS_DIR, fname)
|
2021-02-12 08:20:45 +01:00
|
|
|
with lockfile(fn + ".lock"):
|
|
|
|
tmp_fn = fn + ".tmp"
|
|
|
|
with open(tmp_fn, "wb") as f:
|
2020-08-07 01:09:47 +02:00
|
|
|
f.write(
|
|
|
|
orjson.dumps(stats_dict, option=orjson.OPT_APPEND_NEWLINE | orjson.OPT_INDENT_2)
|
|
|
|
)
|
2020-03-18 20:48:49 +01:00
|
|
|
os.rename(tmp_fn, fn)
|
2020-09-17 14:41:42 +02:00
|
|
|
self.last_statistics_update_time = time.time()
|
2020-03-18 20:48:49 +01:00
|
|
|
|
queue: Rename queue_size, and update for all local queues.
Despite its name, the `queue_size` method does not return the number
of items in the queue; it returns the number of items that the local
consumer has delivered but unprocessed. These are often, but not
always, the same.
RabbitMQ's queues maintain the queue of unacknowledged messages; when
a consumer connects, it sends to the consumer some number of messages
to handle, known as the "prefetch." This is a performance
optimization, to ensure the consumer code does not need to wait for a
network round-trip before having new data to consume.
The default prefetch is 0, which means that RabbitMQ immediately dumps
all outstanding messages to the consumer, which slowly processes and
acknowledges them. If a second consumer were to connect to the same
queue, they would receive no messages to process, as the first
consumer has already been allocated them. If the first consumer
disconnects or crashes, all prior events sent to it are then made
available for other consumers on the queue.
The consumer does not know the total size of the queue -- merely how
many messages it has been handed.
No change is made to the prefetch here; however, future changes may
wish to limit the prefetch, either for memory-saving, or to allow
multiple consumers to work the same queue.
Rename the method to make clear that it only contains information
about the local queue in the consumer, not the full RabbitMQ queue.
Also include the waiting message count, which is used by the
`consume()` iterator for similar purpose to the pending events list.
2020-10-09 22:12:55 +02:00
|
|
|
def get_remaining_local_queue_size(self) -> int:
|
2020-09-06 17:30:21 +02:00
|
|
|
if self.q is not None:
|
queue: Rename queue_size, and update for all local queues.
Despite its name, the `queue_size` method does not return the number
of items in the queue; it returns the number of items that the local
consumer has delivered but unprocessed. These are often, but not
always, the same.
RabbitMQ's queues maintain the queue of unacknowledged messages; when
a consumer connects, it sends to the consumer some number of messages
to handle, known as the "prefetch." This is a performance
optimization, to ensure the consumer code does not need to wait for a
network round-trip before having new data to consume.
The default prefetch is 0, which means that RabbitMQ immediately dumps
all outstanding messages to the consumer, which slowly processes and
acknowledges them. If a second consumer were to connect to the same
queue, they would receive no messages to process, as the first
consumer has already been allocated them. If the first consumer
disconnects or crashes, all prior events sent to it are then made
available for other consumers on the queue.
The consumer does not know the total size of the queue -- merely how
many messages it has been handed.
No change is made to the prefetch here; however, future changes may
wish to limit the prefetch, either for memory-saving, or to allow
multiple consumers to work the same queue.
Rename the method to make clear that it only contains information
about the local queue in the consumer, not the full RabbitMQ queue.
Also include the waiting message count, which is used by the
`consume()` iterator for similar purpose to the pending events list.
2020-10-09 22:12:55 +02:00
|
|
|
return self.q.local_queue_size()
|
2020-09-06 17:30:21 +02:00
|
|
|
else:
|
|
|
|
# This is a special case that will happen if we're operating without
|
|
|
|
# using RabbitMQ (e.g. in tests). In that case there's no queuing to speak of
|
|
|
|
# and the only reasonable size to return is 0.
|
|
|
|
return 0
|
|
|
|
|
2019-12-27 15:20:01 +01:00
|
|
|
@abstractmethod
|
2018-03-10 08:29:46 +01:00
|
|
|
def consume(self, data: Dict[str, Any]) -> None:
|
2019-12-27 15:20:01 +01:00
|
|
|
pass
|
2013-08-29 23:41:03 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
def do_consume(
|
|
|
|
self, consume_func: Callable[[List[Dict[str, Any]]], None], events: List[Dict[str, Any]]
|
|
|
|
) -> None:
|
2020-08-11 14:04:15 +02:00
|
|
|
consume_time_seconds: Optional[float] = None
|
2020-09-18 23:13:13 +02:00
|
|
|
with configure_scope() as scope:
|
|
|
|
scope.clear_breadcrumbs()
|
|
|
|
add_breadcrumb(
|
2021-02-12 08:20:45 +01:00
|
|
|
type="debug",
|
|
|
|
category="queue_processor",
|
2020-09-18 23:13:13 +02:00
|
|
|
message=f"Consuming {self.queue_name}",
|
queue: Rename queue_size, and update for all local queues.
Despite its name, the `queue_size` method does not return the number
of items in the queue; it returns the number of items that the local
consumer has delivered but unprocessed. These are often, but not
always, the same.
RabbitMQ's queues maintain the queue of unacknowledged messages; when
a consumer connects, it sends to the consumer some number of messages
to handle, known as the "prefetch." This is a performance
optimization, to ensure the consumer code does not need to wait for a
network round-trip before having new data to consume.
The default prefetch is 0, which means that RabbitMQ immediately dumps
all outstanding messages to the consumer, which slowly processes and
acknowledges them. If a second consumer were to connect to the same
queue, they would receive no messages to process, as the first
consumer has already been allocated them. If the first consumer
disconnects or crashes, all prior events sent to it are then made
available for other consumers on the queue.
The consumer does not know the total size of the queue -- merely how
many messages it has been handed.
No change is made to the prefetch here; however, future changes may
wish to limit the prefetch, either for memory-saving, or to allow
multiple consumers to work the same queue.
Rename the method to make clear that it only contains information
about the local queue in the consumer, not the full RabbitMQ queue.
Also include the waiting message count, which is used by the
`consume()` iterator for similar purpose to the pending events list.
2020-10-09 22:12:55 +02:00
|
|
|
data={"events": events, "local_queue_size": self.get_remaining_local_queue_size()},
|
2020-09-18 23:13:13 +02:00
|
|
|
)
|
2013-10-29 20:03:42 +01:00
|
|
|
try:
|
2020-09-06 18:26:27 +02:00
|
|
|
if self.idle:
|
|
|
|
# We're reactivating after having gone idle due to emptying the queue.
|
|
|
|
# We should update the stats file to keep it fresh and to make it clear
|
|
|
|
# that the queue started processing, in case the event we're about to process
|
|
|
|
# makes us freeze.
|
|
|
|
self.idle = False
|
2021-11-15 22:32:01 +01:00
|
|
|
self.update_statistics()
|
2020-09-06 18:26:27 +02:00
|
|
|
|
2020-03-18 20:48:49 +01:00
|
|
|
time_start = time.time()
|
2020-09-29 01:16:54 +02:00
|
|
|
if self.MAX_CONSUME_SECONDS and self.ENABLE_TIMEOUTS:
|
2020-10-07 03:05:17 +02:00
|
|
|
try:
|
|
|
|
signal.signal(
|
|
|
|
signal.SIGALRM,
|
2021-07-06 14:56:23 +02:00
|
|
|
functools.partial(self.timer_expired, self.MAX_CONSUME_SECONDS, events),
|
2020-10-07 03:05:17 +02:00
|
|
|
)
|
|
|
|
try:
|
|
|
|
signal.alarm(self.MAX_CONSUME_SECONDS * len(events))
|
|
|
|
consume_func(events)
|
|
|
|
finally:
|
|
|
|
signal.alarm(0)
|
|
|
|
finally:
|
|
|
|
signal.signal(signal.SIGALRM, signal.SIG_DFL)
|
2020-09-29 01:16:54 +02:00
|
|
|
else:
|
|
|
|
consume_func(events)
|
2020-08-11 14:04:15 +02:00
|
|
|
consume_time_seconds = time.time() - time_start
|
2020-03-18 20:48:49 +01:00
|
|
|
self.consumed_since_last_emptied += len(events)
|
2020-09-29 01:16:54 +02:00
|
|
|
except Exception as e:
|
|
|
|
self._handle_consume_exception(events, e)
|
2017-07-03 12:52:55 +02:00
|
|
|
finally:
|
2020-03-04 00:18:26 +01:00
|
|
|
flush_per_request_caches()
|
2017-07-03 12:52:55 +02:00
|
|
|
reset_queries()
|
2013-10-29 20:03:42 +01:00
|
|
|
|
2020-03-18 20:48:49 +01:00
|
|
|
if consume_time_seconds is not None:
|
|
|
|
self.recent_consume_times.append((len(events), consume_time_seconds))
|
|
|
|
|
queue: Rename queue_size, and update for all local queues.
Despite its name, the `queue_size` method does not return the number
of items in the queue; it returns the number of items that the local
consumer has delivered but unprocessed. These are often, but not
always, the same.
RabbitMQ's queues maintain the queue of unacknowledged messages; when
a consumer connects, it sends to the consumer some number of messages
to handle, known as the "prefetch." This is a performance
optimization, to ensure the consumer code does not need to wait for a
network round-trip before having new data to consume.
The default prefetch is 0, which means that RabbitMQ immediately dumps
all outstanding messages to the consumer, which slowly processes and
acknowledges them. If a second consumer were to connect to the same
queue, they would receive no messages to process, as the first
consumer has already been allocated them. If the first consumer
disconnects or crashes, all prior events sent to it are then made
available for other consumers on the queue.
The consumer does not know the total size of the queue -- merely how
many messages it has been handed.
No change is made to the prefetch here; however, future changes may
wish to limit the prefetch, either for memory-saving, or to allow
multiple consumers to work the same queue.
Rename the method to make clear that it only contains information
about the local queue in the consumer, not the full RabbitMQ queue.
Also include the waiting message count, which is used by the
`consume()` iterator for similar purpose to the pending events list.
2020-10-09 22:12:55 +02:00
|
|
|
remaining_local_queue_size = self.get_remaining_local_queue_size()
|
|
|
|
if remaining_local_queue_size == 0:
|
2020-03-18 20:48:49 +01:00
|
|
|
self.queue_last_emptied_timestamp = time.time()
|
|
|
|
self.consumed_since_last_emptied = 0
|
2020-09-06 18:26:27 +02:00
|
|
|
# We've cleared all the events from the queue, so we don't
|
|
|
|
# need to worry about the small overhead of doing a disk write.
|
|
|
|
# We take advantage of this to update the stats file to keep it fresh,
|
|
|
|
# especially since the queue might go idle until new events come in.
|
2021-11-15 22:32:01 +01:00
|
|
|
self.update_statistics()
|
2020-09-06 18:26:27 +02:00
|
|
|
self.idle = True
|
|
|
|
return
|
2020-03-18 20:48:49 +01:00
|
|
|
|
2020-09-17 14:32:34 +02:00
|
|
|
self.consume_iteration_counter += 1
|
2021-02-12 08:19:30 +01:00
|
|
|
if (
|
|
|
|
self.consume_iteration_counter >= self.CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM
|
|
|
|
or time.time() - self.last_statistics_update_time
|
|
|
|
>= self.MAX_SECONDS_BEFORE_UPDATE_STATS
|
|
|
|
):
|
2020-09-17 14:32:34 +02:00
|
|
|
self.consume_iteration_counter = 0
|
2021-11-15 22:32:01 +01:00
|
|
|
self.update_statistics()
|
2020-03-18 20:48:49 +01:00
|
|
|
|
2020-10-09 22:44:15 +02:00
|
|
|
def consume_single_event(self, event: Dict[str, Any]) -> None:
|
2020-03-18 20:04:20 +01:00
|
|
|
consume_func = lambda events: self.consume(events[0])
|
2020-10-09 22:44:15 +02:00
|
|
|
self.do_consume(consume_func, [event])
|
2020-03-18 20:04:20 +01:00
|
|
|
|
2021-07-06 14:56:23 +02:00
|
|
|
def timer_expired(
|
|
|
|
self, limit: int, events: List[Dict[str, Any]], signal: int, frame: FrameType
|
|
|
|
) -> None:
|
2021-09-02 03:03:47 +02:00
|
|
|
raise WorkerTimeoutException(self.queue_name, limit, len(events))
|
2021-07-06 14:36:27 +02:00
|
|
|
|
2020-09-29 01:16:54 +02:00
|
|
|
def _handle_consume_exception(self, events: List[Dict[str, Any]], exception: Exception) -> None:
|
2021-07-07 13:25:42 +02:00
|
|
|
if isinstance(exception, InterruptConsumeException):
|
|
|
|
# The exception signals that no further error handling
|
|
|
|
# is needed and the worker can proceed.
|
|
|
|
return
|
|
|
|
|
2020-09-18 23:13:13 +02:00
|
|
|
with configure_scope() as scope:
|
2021-02-12 08:19:30 +01:00
|
|
|
scope.set_context(
|
|
|
|
"events",
|
|
|
|
{
|
|
|
|
"data": events,
|
|
|
|
"queue_name": self.queue_name,
|
|
|
|
},
|
|
|
|
)
|
2020-09-29 01:16:54 +02:00
|
|
|
if isinstance(exception, WorkerTimeoutException):
|
2020-10-27 23:18:20 +01:00
|
|
|
with sentry_sdk.push_scope() as scope:
|
2021-02-12 08:20:45 +01:00
|
|
|
scope.fingerprint = ["worker-timeout", self.queue_name]
|
2021-09-02 03:03:47 +02:00
|
|
|
logging.exception(exception, stack_info=True)
|
2020-09-29 01:16:54 +02:00
|
|
|
else:
|
2021-02-12 08:19:30 +01:00
|
|
|
logging.exception(
|
|
|
|
"Problem handling data on queue %s", self.queue_name, stack_info=True
|
|
|
|
)
|
2019-12-26 21:11:55 +01:00
|
|
|
if not os.path.exists(settings.QUEUE_ERROR_DIR):
|
|
|
|
os.mkdir(settings.QUEUE_ERROR_DIR) # nocoverage
|
2019-12-20 00:00:45 +01:00
|
|
|
# Use 'mark_sanitized' to prevent Pysa from detecting this false positive
|
|
|
|
# flow. 'queue_name' is always a constant string.
|
2021-02-12 08:20:45 +01:00
|
|
|
fname = mark_sanitized(f"{self.queue_name}.errors")
|
2019-12-26 21:11:55 +01:00
|
|
|
fn = os.path.join(settings.QUEUE_ERROR_DIR, fname)
|
2021-02-12 08:20:45 +01:00
|
|
|
line = f"{time.asctime()}\t{orjson.dumps(events).decode()}\n"
|
|
|
|
lock_fn = fn + ".lock"
|
2019-12-26 21:11:55 +01:00
|
|
|
with lockfile(lock_fn):
|
2021-02-12 08:20:45 +01:00
|
|
|
with open(fn, "a") as f:
|
2020-10-30 02:02:10 +01:00
|
|
|
f.write(line)
|
2019-12-26 21:11:55 +01:00
|
|
|
check_and_send_restart_signal()
|
|
|
|
|
2018-03-10 08:29:46 +01:00
|
|
|
def setup(self) -> None:
|
queue_processors: Set a bounded prefetch size on rabbitmq queues.
RabbitMQ clients have a setting called prefetch[1], which controls how
many un-acknowledged events the server forwards to the local queue in
the client. The default is 0; this means that when clients first
connect, the server must send them every message in the queue.
This itself may cause unbounded memory usage in the client, but also
has other detrimental effects. While the client is attempting to
process the head of the queue, it may be unable to read from the TCP
socket at the rate that the server is sending to it -- filling the TCP
buffers, and causing the server's writes to block. If the server
blocks for more than 30 seconds, it times out the send, and closes the
connection with:
```
closing AMQP connection <0.30902.126> (127.0.0.1:53870 -> 127.0.0.1:5672):
{writer,send_failed,{error,timeout}}
```
This is https://github.com/pika/pika/issues/753#issuecomment-318119222.
Set a prefetch limit of 100 messages, or the batch size, to better
handle queues which start with large numbers of outstanding events.
Setting prefetch=1 causes significant performance degradation in the
no-op queue worker, to 30% of the prefetch=0 performance. Setting
prefetch=100 achieves 90% of the prefetch=0 performance, and higher
values offer only minor gains above that. For batch workers, their
performance is not notably degraded by prefetch equal to their batch
size, and they cannot function on smaller prefetches than their batch
size.
We also set a 100-count prefetch on Tornado workers, as they are
potentially susceptible to the same effect.
[1] https://www.rabbitmq.com/confirms.html#channel-qos-prefetch
2021-11-09 17:41:19 +01:00
|
|
|
self.q = SimpleQueueClient(prefetch=self.PREFETCH)
|
2015-11-24 07:01:35 +01:00
|
|
|
|
2018-03-10 08:29:46 +01:00
|
|
|
def start(self) -> None:
|
2020-07-05 02:48:29 +02:00
|
|
|
assert self.q is not None
|
2020-03-18 20:48:49 +01:00
|
|
|
self.initialize_statistics()
|
2020-10-09 22:53:11 +02:00
|
|
|
self.q.start_json_consumer(
|
|
|
|
self.queue_name,
|
|
|
|
lambda events: self.consume_single_event(events[0]),
|
|
|
|
)
|
2013-08-29 23:41:03 +02:00
|
|
|
|
2018-03-10 08:29:46 +01:00
|
|
|
def stop(self) -> None: # nocoverage
|
2020-07-05 02:48:29 +02:00
|
|
|
assert self.q is not None
|
2013-08-29 23:41:03 +02:00
|
|
|
self.q.stop_consuming()
|
2013-09-03 22:33:20 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2017-11-03 22:34:12 +01:00
|
|
|
class LoopQueueProcessingWorker(QueueProcessingWorker):
|
2020-10-09 22:50:53 +02:00
|
|
|
sleep_delay = 1
|
|
|
|
batch_size = 100
|
2017-11-03 22:34:12 +01:00
|
|
|
|
queue_processors: Set a bounded prefetch size on rabbitmq queues.
RabbitMQ clients have a setting called prefetch[1], which controls how
many un-acknowledged events the server forwards to the local queue in
the client. The default is 0; this means that when clients first
connect, the server must send them every message in the queue.
This itself may cause unbounded memory usage in the client, but also
has other detrimental effects. While the client is attempting to
process the head of the queue, it may be unable to read from the TCP
socket at the rate that the server is sending to it -- filling the TCP
buffers, and causing the server's writes to block. If the server
blocks for more than 30 seconds, it times out the send, and closes the
connection with:
```
closing AMQP connection <0.30902.126> (127.0.0.1:53870 -> 127.0.0.1:5672):
{writer,send_failed,{error,timeout}}
```
This is https://github.com/pika/pika/issues/753#issuecomment-318119222.
Set a prefetch limit of 100 messages, or the batch size, to better
handle queues which start with large numbers of outstanding events.
Setting prefetch=1 causes significant performance degradation in the
no-op queue worker, to 30% of the prefetch=0 performance. Setting
prefetch=100 achieves 90% of the prefetch=0 performance, and higher
values offer only minor gains above that. For batch workers, their
performance is not notably degraded by prefetch equal to their batch
size, and they cannot function on smaller prefetches than their batch
size.
We also set a 100-count prefetch on Tornado workers, as they are
potentially susceptible to the same effect.
[1] https://www.rabbitmq.com/confirms.html#channel-qos-prefetch
2021-11-09 17:41:19 +01:00
|
|
|
def setup(self) -> None:
|
|
|
|
self.q = SimpleQueueClient(prefetch=max(self.PREFETCH, self.batch_size))
|
|
|
|
|
2017-11-10 12:43:53 +01:00
|
|
|
def start(self) -> None: # nocoverage
|
2020-07-05 02:48:29 +02:00
|
|
|
assert self.q is not None
|
2020-03-18 20:48:49 +01:00
|
|
|
self.initialize_statistics()
|
2020-10-09 22:50:53 +02:00
|
|
|
self.q.start_json_consumer(
|
|
|
|
self.queue_name,
|
|
|
|
lambda events: self.do_consume(self.consume_batch, events),
|
|
|
|
batch_size=self.batch_size,
|
|
|
|
timeout=self.sleep_delay,
|
|
|
|
)
|
2020-09-29 00:34:42 +02:00
|
|
|
|
2019-12-27 15:20:01 +01:00
|
|
|
@abstractmethod
|
2019-12-26 21:11:55 +01:00
|
|
|
def consume_batch(self, events: List[Dict[str, Any]]) -> None:
|
2019-12-27 15:20:01 +01:00
|
|
|
pass
|
2017-11-03 22:34:12 +01:00
|
|
|
|
|
|
|
def consume(self, event: Dict[str, Any]) -> None:
|
|
|
|
"""In LoopQueueProcessingWorker, consume is used just for automated tests"""
|
|
|
|
self.consume_batch([event])
|
2016-12-28 22:24:56 +01:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
@assign_queue("invites")
|
2013-09-03 22:33:20 +02:00
|
|
|
class ConfirmationEmailWorker(QueueProcessingWorker):
|
2018-03-10 08:29:46 +01:00
|
|
|
def consume(self, data: Mapping[str, Any]) -> None:
|
2022-02-10 11:52:34 +01:00
|
|
|
if "invite_expires_in_days" in data:
|
|
|
|
invite_expires_in_minutes = data["invite_expires_in_days"] * 24 * 60
|
|
|
|
elif "invite_expires_in_minutes" in data:
|
|
|
|
invite_expires_in_minutes = data["invite_expires_in_minutes"]
|
2021-02-11 18:21:44 +01:00
|
|
|
invitee = filter_to_valid_prereg_users(
|
2022-02-10 11:52:34 +01:00
|
|
|
PreregistrationUser.objects.filter(id=data["prereg_id"]), invite_expires_in_minutes
|
2021-02-11 18:21:44 +01:00
|
|
|
).first()
|
|
|
|
if invitee is None:
|
|
|
|
# The invitation could have been revoked
|
|
|
|
return
|
2017-12-05 09:01:41 +01:00
|
|
|
|
2017-05-10 23:23:59 +02:00
|
|
|
referrer = get_user_profile_by_id(data["referrer_id"])
|
2021-02-12 08:19:30 +01:00
|
|
|
logger.info(
|
|
|
|
"Sending invitation for realm %s to %s", referrer.realm.string_id, invitee.email
|
|
|
|
)
|
2021-06-22 13:29:40 +02:00
|
|
|
if "email_language" in data:
|
|
|
|
email_language = data["email_language"]
|
|
|
|
else:
|
|
|
|
email_language = referrer.realm.default_language
|
2021-04-05 18:42:45 +02:00
|
|
|
|
|
|
|
activate_url = do_send_confirmation_email(
|
2022-02-10 11:52:34 +01:00
|
|
|
invitee, referrer, email_language, invite_expires_in_minutes
|
2021-04-05 18:42:45 +02:00
|
|
|
)
|
2022-02-10 11:52:34 +01:00
|
|
|
if invite_expires_in_minutes is None:
|
2021-11-30 13:34:37 +01:00
|
|
|
# We do not queue reminder email for never expiring
|
|
|
|
# invitations. This is probably a low importance bug; it
|
|
|
|
# would likely be more natural to send a reminder after 7
|
|
|
|
# days.
|
|
|
|
return
|
2013-09-03 22:33:20 +02:00
|
|
|
|
2019-08-23 03:32:22 +02:00
|
|
|
# queue invitation reminder
|
2022-02-10 11:52:34 +01:00
|
|
|
if invite_expires_in_minutes >= 4 * 24 * 60:
|
2019-08-23 03:32:22 +02:00
|
|
|
context = common_context(referrer)
|
2020-09-03 05:32:15 +02:00
|
|
|
context.update(
|
|
|
|
activate_url=activate_url,
|
|
|
|
referrer_name=referrer.full_name,
|
|
|
|
referrer_email=referrer.delivery_email,
|
|
|
|
referrer_realm_name=referrer.realm.name,
|
|
|
|
)
|
2019-08-23 03:32:22 +02:00
|
|
|
send_future_email(
|
|
|
|
"zerver/emails/invitation_reminder",
|
|
|
|
referrer.realm,
|
|
|
|
to_emails=[invitee.email],
|
2020-03-12 20:28:05 +01:00
|
|
|
from_address=FromAddress.tokenized_no_reply_placeholder,
|
2021-06-22 13:29:40 +02:00
|
|
|
language=email_language,
|
2019-08-23 03:32:22 +02:00
|
|
|
context=context,
|
2022-02-10 11:52:34 +01:00
|
|
|
delay=datetime.timedelta(minutes=invite_expires_in_minutes - (2 * 24 * 60)),
|
2021-02-12 08:19:30 +01:00
|
|
|
)
|
|
|
|
|
2013-10-10 20:39:43 +02:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
@assign_queue("user_activity")
|
2019-09-18 01:52:37 +02:00
|
|
|
class UserActivityWorker(LoopQueueProcessingWorker):
|
|
|
|
"""The UserActivity queue is perhaps our highest-traffic queue, and
|
2020-08-11 01:47:44 +02:00
|
|
|
requires some care to ensure it performs adequately.
|
2019-09-18 01:52:37 +02:00
|
|
|
|
|
|
|
We use a LoopQueueProcessingWorker as a performance optimization
|
|
|
|
for managing the queue. The structure of UserActivity records is
|
|
|
|
such that they are easily deduplicated before being sent to the
|
|
|
|
database; we take advantage of that to make this queue highly
|
|
|
|
effective at dealing with a backlog containing many similar
|
|
|
|
events. Such a backlog happen in a few ways:
|
|
|
|
|
|
|
|
* In abuse/DoS situations, if a client is sending huge numbers of
|
|
|
|
similar requests to the server.
|
|
|
|
* If the queue ends up with several minutes of backlog e.g. due to
|
|
|
|
downtime of the queue processor, many clients will have several
|
|
|
|
common events from doing an action multiple times.
|
|
|
|
|
|
|
|
"""
|
2021-02-12 08:19:30 +01:00
|
|
|
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
client_id_map: Dict[str, int] = {}
|
2019-09-18 01:42:27 +02:00
|
|
|
|
|
|
|
def start(self) -> None:
|
|
|
|
# For our unit tests to make sense, we need to clear this on startup.
|
|
|
|
self.client_id_map = {}
|
|
|
|
super().start()
|
|
|
|
|
2019-09-18 01:52:37 +02:00
|
|
|
def consume_batch(self, user_activity_events: List[Dict[str, Any]]) -> None:
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
uncommitted_events: Dict[Tuple[int, int, str], Tuple[int, float]] = {}
|
2019-09-18 01:42:27 +02:00
|
|
|
|
2019-09-18 01:52:37 +02:00
|
|
|
# First, we drain the queue of all user_activity events and
|
|
|
|
# deduplicate them for insertion into the database.
|
|
|
|
for event in user_activity_events:
|
|
|
|
user_profile_id = event["user_profile_id"]
|
|
|
|
|
2020-03-27 16:33:06 +01:00
|
|
|
if "client_id" not in event:
|
|
|
|
# This is for compatibility with older events still stuck in the queue,
|
|
|
|
# that used the client name in event["client"] instead of having
|
|
|
|
# event["client_id"] directly.
|
2021-04-16 18:55:14 +02:00
|
|
|
#
|
2021-04-25 22:54:23 +02:00
|
|
|
# TODO/compatibility: We can delete this once it is no
|
2021-09-01 00:15:31 +02:00
|
|
|
# longer possible to directly upgrade from 2.1 to main.
|
2020-03-27 16:33:06 +01:00
|
|
|
if event["client"] not in self.client_id_map:
|
|
|
|
client = get_client(event["client"])
|
|
|
|
self.client_id_map[event["client"]] = client.id
|
|
|
|
client_id = self.client_id_map[event["client"]]
|
|
|
|
else:
|
|
|
|
client_id = event["client_id"]
|
2019-09-18 01:52:37 +02:00
|
|
|
|
|
|
|
key_tuple = (user_profile_id, client_id, event["query"])
|
|
|
|
if key_tuple not in uncommitted_events:
|
2021-02-12 08:20:45 +01:00
|
|
|
uncommitted_events[key_tuple] = (1, event["time"])
|
2019-09-18 01:52:37 +02:00
|
|
|
else:
|
|
|
|
count, time = uncommitted_events[key_tuple]
|
2021-02-12 08:20:45 +01:00
|
|
|
uncommitted_events[key_tuple] = (count + 1, max(time, event["time"]))
|
2019-09-18 01:52:37 +02:00
|
|
|
|
|
|
|
# Then we insert the updates into the database.
|
|
|
|
#
|
|
|
|
# TODO: Doing these updates in sequence individually is likely
|
|
|
|
# inefficient; the idealized version would do some sort of
|
|
|
|
# bulk insert_or_update query.
|
|
|
|
for key_tuple in uncommitted_events:
|
|
|
|
(user_profile_id, client_id, query) = key_tuple
|
|
|
|
count, time = uncommitted_events[key_tuple]
|
|
|
|
log_time = timestamp_to_datetime(time)
|
|
|
|
do_update_user_activity(user_profile_id, client_id, query, count, log_time)
|
2013-09-04 00:00:44 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
@assign_queue("user_activity_interval")
|
2013-09-04 00:00:44 +02:00
|
|
|
class UserActivityIntervalWorker(QueueProcessingWorker):
|
2018-03-10 08:29:46 +01:00
|
|
|
def consume(self, event: Mapping[str, Any]) -> None:
|
2013-09-04 00:00:44 +02:00
|
|
|
user_profile = get_user_profile_by_id(event["user_profile_id"])
|
|
|
|
log_time = timestamp_to_datetime(event["time"])
|
|
|
|
do_update_user_activity_interval(user_profile, log_time)
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
@assign_queue("user_presence")
|
2013-09-04 00:00:44 +02:00
|
|
|
class UserPresenceWorker(QueueProcessingWorker):
|
2018-03-10 08:29:46 +01:00
|
|
|
def consume(self, event: Mapping[str, Any]) -> None:
|
2020-05-02 08:44:14 +02:00
|
|
|
logging.debug("Received presence event: %s", event)
|
2013-09-04 00:00:44 +02:00
|
|
|
user_profile = get_user_profile_by_id(event["user_profile_id"])
|
|
|
|
client = get_client(event["client"])
|
|
|
|
log_time = timestamp_to_datetime(event["time"])
|
|
|
|
status = event["status"]
|
|
|
|
do_update_user_presence(user_profile, client, log_time, status)
|
2013-09-03 22:33:20 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
@assign_queue("missedmessage_emails")
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
class MissedMessageWorker(QueueProcessingWorker):
|
|
|
|
# Aggregate all messages received over the last BATCH_DURATION
|
|
|
|
# seconds to let someone finish sending a batch of messages and/or
|
|
|
|
# editing them before they are sent out as emails to recipients.
|
|
|
|
#
|
|
|
|
# The timer is running whenever; we poll at most every TIMER_FREQUENCY
|
|
|
|
# seconds, to avoid excessive activity.
|
|
|
|
TIMER_FREQUENCY = 5
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
timer_event: Optional[Timer] = None
|
2013-09-03 22:33:20 +02:00
|
|
|
|
queue: Use locking to avoid race conditions in missedmessage_emails.
This queue had a race condition with creation of another Timer while
maybe_send_batched_emails is still doing its work, which may cause
two or more threads to be running maybe_send_batched_emails
at the same time, mutating the shared data simultaneously.
Another less likely potential race condition was that
maybe_send_batched_emails after sending out its email, can call
ensure_timer(). If the consume function is run simultaneously
in the main thread, it will call ensure_timer() too, which,
given unfortunate timings, might lead to both calls setting a new Timer.
We add locking to the queue to avoid such race conditions.
Tested manually, by print debugging with the following setup:
1. Making handle_missedmessage_emails sleep 2 seconds for each email,
and changed BATCH_DURATION to 1s to make the queue start working
right after launching.
2. Putting a bunch of events in the queue.
3. ./manage.py process_queue --queue_name missedmessage_emails
4. Once maybe_send_batched_emails is called and while it's processing
the events, I pushed more events to the queue. That triggers the
consume() function and ensure_timer().
Before implementing the locking mechanism, this causes two threads
to run maybe_send_batched_emails at the same time, mutating each other's
shared data, causing a traceback such as
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/lib/python3.6/threading.py", line 1182, in run
self.function(*self.args, **self.kwargs)
File "/srv/zulip/zerver/worker/queue_processors.py", line 507, in maybe_send_batched_emails
del self.events_by_recipient[user_profile_id]
KeyError: '5'
With the locking mechanism, things get handled as expected, and
ensure_timer() exits if it can't obtain the lock due to
maybe_send_batched_emails still working.
Co-authored-by: Tim Abbott <tabbott@zulip.com>
2020-08-26 21:40:59 +02:00
|
|
|
# This lock protects access to all of the data structures declared
|
|
|
|
# above. A lock is required because maybe_send_batched_emails, as
|
|
|
|
# the argument to Timer, runs in a separate thread from the rest
|
queue_processors: Shut down background missedmessage_emails thread.
Python's behaviour on `sys.exit` is to wait for all non-daemon threads
to exit. In the context of the missedmessage_emails worker, if any
work is pending, a non-daemon Timer thread exists, which is waiting
for 5 seconds. As soon as that thread is serviced, it sets up another
5-second Timer, a process which repeats until all
ScheduledMessageNotificationEmail records have been handled. This
likely takes two minutes, but may theoretically take up to a week
until the thread exits, and thus sys.exit can complete.
Supervisor only gives the process 30 seconds to shut down, so
something else must prevent this endless Timer.
When `stop` is called, take the lock so we can mutate the timer.
However, since `stop` may have been called from a signal handler, our
thread may _already_ have the lock. As Python provides no way to know
if our thread is the one which has the lock, make the lock a
re-entrant one, allowing us to always try to take it.
With the lock in hand, cancel any outstanding timers. A race exists
where the timer may not be able to be canceled because it has
finished, maybe_send_batched_emails has been called, and is itself
blocked on the lock. Handle this case by timing out the thread join
in `stop()`, and signal the running thread to exit by unsetting the
timer event, which will be detected once it claims the lock.
2021-11-22 23:52:05 +01:00
|
|
|
# of the consumer. This is a _re-entrant_ lock because we may
|
|
|
|
# need to take the lock when we already have it during shutdown
|
|
|
|
# (see the stop method).
|
|
|
|
lock = RLock()
|
queue: Use locking to avoid race conditions in missedmessage_emails.
This queue had a race condition with creation of another Timer while
maybe_send_batched_emails is still doing its work, which may cause
two or more threads to be running maybe_send_batched_emails
at the same time, mutating the shared data simultaneously.
Another less likely potential race condition was that
maybe_send_batched_emails after sending out its email, can call
ensure_timer(). If the consume function is run simultaneously
in the main thread, it will call ensure_timer() too, which,
given unfortunate timings, might lead to both calls setting a new Timer.
We add locking to the queue to avoid such race conditions.
Tested manually, by print debugging with the following setup:
1. Making handle_missedmessage_emails sleep 2 seconds for each email,
and changed BATCH_DURATION to 1s to make the queue start working
right after launching.
2. Putting a bunch of events in the queue.
3. ./manage.py process_queue --queue_name missedmessage_emails
4. Once maybe_send_batched_emails is called and while it's processing
the events, I pushed more events to the queue. That triggers the
consume() function and ensure_timer().
Before implementing the locking mechanism, this causes two threads
to run maybe_send_batched_emails at the same time, mutating each other's
shared data, causing a traceback such as
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/lib/python3.6/threading.py", line 1182, in run
self.function(*self.args, **self.kwargs)
File "/srv/zulip/zerver/worker/queue_processors.py", line 507, in maybe_send_batched_emails
del self.events_by_recipient[user_profile_id]
KeyError: '5'
With the locking mechanism, things get handled as expected, and
ensure_timer() exits if it can't obtain the lock due to
maybe_send_batched_emails still working.
Co-authored-by: Tim Abbott <tabbott@zulip.com>
2020-08-26 21:40:59 +02:00
|
|
|
|
2021-05-04 03:19:27 +02:00
|
|
|
# Because the background `maybe_send_batched_email` thread can
|
|
|
|
# hold the lock for an indeterminate amount of time, the `consume`
|
|
|
|
# can block on that for longer than 30s, the default worker
|
|
|
|
# timeout. Allow arbitrarily-long worker `consume` calls.
|
|
|
|
MAX_CONSUME_SECONDS = None
|
|
|
|
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
def consume(self, event: Dict[str, Any]) -> None:
|
queue: Use locking to avoid race conditions in missedmessage_emails.
This queue had a race condition with creation of another Timer while
maybe_send_batched_emails is still doing its work, which may cause
two or more threads to be running maybe_send_batched_emails
at the same time, mutating the shared data simultaneously.
Another less likely potential race condition was that
maybe_send_batched_emails after sending out its email, can call
ensure_timer(). If the consume function is run simultaneously
in the main thread, it will call ensure_timer() too, which,
given unfortunate timings, might lead to both calls setting a new Timer.
We add locking to the queue to avoid such race conditions.
Tested manually, by print debugging with the following setup:
1. Making handle_missedmessage_emails sleep 2 seconds for each email,
and changed BATCH_DURATION to 1s to make the queue start working
right after launching.
2. Putting a bunch of events in the queue.
3. ./manage.py process_queue --queue_name missedmessage_emails
4. Once maybe_send_batched_emails is called and while it's processing
the events, I pushed more events to the queue. That triggers the
consume() function and ensure_timer().
Before implementing the locking mechanism, this causes two threads
to run maybe_send_batched_emails at the same time, mutating each other's
shared data, causing a traceback such as
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/lib/python3.6/threading.py", line 1182, in run
self.function(*self.args, **self.kwargs)
File "/srv/zulip/zerver/worker/queue_processors.py", line 507, in maybe_send_batched_emails
del self.events_by_recipient[user_profile_id]
KeyError: '5'
With the locking mechanism, things get handled as expected, and
ensure_timer() exits if it can't obtain the lock due to
maybe_send_batched_emails still working.
Co-authored-by: Tim Abbott <tabbott@zulip.com>
2020-08-26 21:40:59 +02:00
|
|
|
with self.lock:
|
|
|
|
logging.debug("Received missedmessage_emails event: %s", event)
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
|
2021-07-07 16:59:03 +02:00
|
|
|
# When we consume an event, check if there are existing pending emails
|
|
|
|
# for that user, and if so use the same scheduled timestamp.
|
|
|
|
user_profile_id: int = event["user_profile_id"]
|
2021-07-19 11:04:18 +02:00
|
|
|
user_profile = get_user_profile_by_id(user_profile_id)
|
|
|
|
batch_duration_seconds = user_profile.email_notifications_batching_period_seconds
|
|
|
|
batch_duration = datetime.timedelta(seconds=batch_duration_seconds)
|
2021-07-07 16:59:03 +02:00
|
|
|
|
2021-07-21 12:01:12 +02:00
|
|
|
try:
|
|
|
|
pending_email = ScheduledMessageNotificationEmail.objects.filter(
|
|
|
|
user_profile_id=user_profile_id
|
|
|
|
)[0]
|
|
|
|
scheduled_timestamp = pending_email.scheduled_timestamp
|
|
|
|
except IndexError:
|
|
|
|
scheduled_timestamp = timezone_now() + batch_duration
|
|
|
|
|
2021-07-22 06:52:58 +02:00
|
|
|
try:
|
|
|
|
ScheduledMessageNotificationEmail.objects.create(
|
|
|
|
user_profile_id=user_profile_id,
|
|
|
|
message_id=event["message_id"],
|
|
|
|
trigger=event["trigger"],
|
|
|
|
scheduled_timestamp=scheduled_timestamp,
|
|
|
|
mentioned_user_group_id=event.get("mentioned_user_group_id"),
|
|
|
|
)
|
2013-09-03 22:33:20 +02:00
|
|
|
|
2021-07-22 06:52:58 +02:00
|
|
|
self.ensure_timer()
|
|
|
|
except IntegrityError:
|
|
|
|
logging.debug(
|
|
|
|
"ScheduledMessageNotificationEmail row could not be created. The message may have been deleted. Skipping event."
|
|
|
|
)
|
2013-09-03 22:33:20 +02:00
|
|
|
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
def ensure_timer(self) -> None:
|
queue: Use locking to avoid race conditions in missedmessage_emails.
This queue had a race condition with creation of another Timer while
maybe_send_batched_emails is still doing its work, which may cause
two or more threads to be running maybe_send_batched_emails
at the same time, mutating the shared data simultaneously.
Another less likely potential race condition was that
maybe_send_batched_emails after sending out its email, can call
ensure_timer(). If the consume function is run simultaneously
in the main thread, it will call ensure_timer() too, which,
given unfortunate timings, might lead to both calls setting a new Timer.
We add locking to the queue to avoid such race conditions.
Tested manually, by print debugging with the following setup:
1. Making handle_missedmessage_emails sleep 2 seconds for each email,
and changed BATCH_DURATION to 1s to make the queue start working
right after launching.
2. Putting a bunch of events in the queue.
3. ./manage.py process_queue --queue_name missedmessage_emails
4. Once maybe_send_batched_emails is called and while it's processing
the events, I pushed more events to the queue. That triggers the
consume() function and ensure_timer().
Before implementing the locking mechanism, this causes two threads
to run maybe_send_batched_emails at the same time, mutating each other's
shared data, causing a traceback such as
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/lib/python3.6/threading.py", line 1182, in run
self.function(*self.args, **self.kwargs)
File "/srv/zulip/zerver/worker/queue_processors.py", line 507, in maybe_send_batched_emails
del self.events_by_recipient[user_profile_id]
KeyError: '5'
With the locking mechanism, things get handled as expected, and
ensure_timer() exits if it can't obtain the lock due to
maybe_send_batched_emails still working.
Co-authored-by: Tim Abbott <tabbott@zulip.com>
2020-08-26 21:40:59 +02:00
|
|
|
# The caller is responsible for ensuring self.lock is held when it calls this.
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
if self.timer_event is not None:
|
|
|
|
return
|
queue: Use locking to avoid race conditions in missedmessage_emails.
This queue had a race condition with creation of another Timer while
maybe_send_batched_emails is still doing its work, which may cause
two or more threads to be running maybe_send_batched_emails
at the same time, mutating the shared data simultaneously.
Another less likely potential race condition was that
maybe_send_batched_emails after sending out its email, can call
ensure_timer(). If the consume function is run simultaneously
in the main thread, it will call ensure_timer() too, which,
given unfortunate timings, might lead to both calls setting a new Timer.
We add locking to the queue to avoid such race conditions.
Tested manually, by print debugging with the following setup:
1. Making handle_missedmessage_emails sleep 2 seconds for each email,
and changed BATCH_DURATION to 1s to make the queue start working
right after launching.
2. Putting a bunch of events in the queue.
3. ./manage.py process_queue --queue_name missedmessage_emails
4. Once maybe_send_batched_emails is called and while it's processing
the events, I pushed more events to the queue. That triggers the
consume() function and ensure_timer().
Before implementing the locking mechanism, this causes two threads
to run maybe_send_batched_emails at the same time, mutating each other's
shared data, causing a traceback such as
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/lib/python3.6/threading.py", line 1182, in run
self.function(*self.args, **self.kwargs)
File "/srv/zulip/zerver/worker/queue_processors.py", line 507, in maybe_send_batched_emails
del self.events_by_recipient[user_profile_id]
KeyError: '5'
With the locking mechanism, things get handled as expected, and
ensure_timer() exits if it can't obtain the lock due to
maybe_send_batched_emails still working.
Co-authored-by: Tim Abbott <tabbott@zulip.com>
2020-08-26 21:40:59 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
self.timer_event = Timer(
|
|
|
|
self.TIMER_FREQUENCY, MissedMessageWorker.maybe_send_batched_emails, [self]
|
|
|
|
)
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
self.timer_event.start()
|
|
|
|
|
queue: Use locking to avoid race conditions in missedmessage_emails.
This queue had a race condition with creation of another Timer while
maybe_send_batched_emails is still doing its work, which may cause
two or more threads to be running maybe_send_batched_emails
at the same time, mutating the shared data simultaneously.
Another less likely potential race condition was that
maybe_send_batched_emails after sending out its email, can call
ensure_timer(). If the consume function is run simultaneously
in the main thread, it will call ensure_timer() too, which,
given unfortunate timings, might lead to both calls setting a new Timer.
We add locking to the queue to avoid such race conditions.
Tested manually, by print debugging with the following setup:
1. Making handle_missedmessage_emails sleep 2 seconds for each email,
and changed BATCH_DURATION to 1s to make the queue start working
right after launching.
2. Putting a bunch of events in the queue.
3. ./manage.py process_queue --queue_name missedmessage_emails
4. Once maybe_send_batched_emails is called and while it's processing
the events, I pushed more events to the queue. That triggers the
consume() function and ensure_timer().
Before implementing the locking mechanism, this causes two threads
to run maybe_send_batched_emails at the same time, mutating each other's
shared data, causing a traceback such as
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/lib/python3.6/threading.py", line 1182, in run
self.function(*self.args, **self.kwargs)
File "/srv/zulip/zerver/worker/queue_processors.py", line 507, in maybe_send_batched_emails
del self.events_by_recipient[user_profile_id]
KeyError: '5'
With the locking mechanism, things get handled as expected, and
ensure_timer() exits if it can't obtain the lock due to
maybe_send_batched_emails still working.
Co-authored-by: Tim Abbott <tabbott@zulip.com>
2020-08-26 21:40:59 +02:00
|
|
|
def maybe_send_batched_emails(self) -> None:
|
|
|
|
with self.lock:
|
|
|
|
# self.timer_event just triggered execution of this
|
|
|
|
# function in a thread, so now that we hold the lock, we
|
|
|
|
# clear the timer_event attribute to record that no Timer
|
queue_processors: Shut down background missedmessage_emails thread.
Python's behaviour on `sys.exit` is to wait for all non-daemon threads
to exit. In the context of the missedmessage_emails worker, if any
work is pending, a non-daemon Timer thread exists, which is waiting
for 5 seconds. As soon as that thread is serviced, it sets up another
5-second Timer, a process which repeats until all
ScheduledMessageNotificationEmail records have been handled. This
likely takes two minutes, but may theoretically take up to a week
until the thread exits, and thus sys.exit can complete.
Supervisor only gives the process 30 seconds to shut down, so
something else must prevent this endless Timer.
When `stop` is called, take the lock so we can mutate the timer.
However, since `stop` may have been called from a signal handler, our
thread may _already_ have the lock. As Python provides no way to know
if our thread is the one which has the lock, make the lock a
re-entrant one, allowing us to always try to take it.
With the lock in hand, cancel any outstanding timers. A race exists
where the timer may not be able to be canceled because it has
finished, maybe_send_batched_emails has been called, and is itself
blocked on the lock. Handle this case by timing out the thread join
in `stop()`, and signal the running thread to exit by unsetting the
timer event, which will be detected once it claims the lock.
2021-11-22 23:52:05 +01:00
|
|
|
# is active. If it is already None, stop() is shutting us
|
|
|
|
# down.
|
|
|
|
if self.timer_event is None:
|
|
|
|
return
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
self.timer_event = None
|
|
|
|
|
2021-07-07 16:59:03 +02:00
|
|
|
current_time = timezone_now()
|
|
|
|
|
|
|
|
with transaction.atomic():
|
|
|
|
events_to_process = ScheduledMessageNotificationEmail.objects.filter(
|
|
|
|
scheduled_timestamp__lte=current_time
|
|
|
|
).select_related()
|
|
|
|
|
|
|
|
# Batch the entries by user
|
|
|
|
events_by_recipient: Dict[int, List[Dict[str, Any]]] = {}
|
|
|
|
for event in events_to_process:
|
|
|
|
entry = dict(
|
|
|
|
user_profile_id=event.user_profile_id,
|
|
|
|
message_id=event.message_id,
|
|
|
|
trigger=event.trigger,
|
|
|
|
mentioned_user_group_id=event.mentioned_user_group_id,
|
|
|
|
)
|
|
|
|
if event.user_profile_id in events_by_recipient:
|
|
|
|
events_by_recipient[event.user_profile_id].append(entry)
|
|
|
|
else:
|
|
|
|
events_by_recipient[event.user_profile_id] = [entry]
|
|
|
|
|
|
|
|
for user_profile_id in events_by_recipient.keys():
|
|
|
|
events: List[Dict[str, Any]] = events_by_recipient[user_profile_id]
|
|
|
|
|
|
|
|
logging.info(
|
|
|
|
"Batch-processing %s missedmessage_emails events for user %s",
|
|
|
|
len(events),
|
|
|
|
user_profile_id,
|
|
|
|
)
|
2021-08-11 08:22:19 +02:00
|
|
|
try:
|
|
|
|
# Because we process events in batches, an
|
|
|
|
# escaped exception here would lead to
|
|
|
|
# duplicate messages being sent for other
|
|
|
|
# users in the same events_to_process batch,
|
|
|
|
# and no guarantee of forward progress.
|
|
|
|
handle_missedmessage_emails(user_profile_id, events)
|
|
|
|
except Exception:
|
|
|
|
logging.exception(
|
|
|
|
"Failed to process %d missedmessage_emails for user %s",
|
|
|
|
len(events),
|
|
|
|
user_profile_id,
|
|
|
|
stack_info=True,
|
|
|
|
)
|
2021-07-07 16:59:03 +02:00
|
|
|
|
|
|
|
events_to_process.delete()
|
queue: Use locking to avoid race conditions in missedmessage_emails.
This queue had a race condition with creation of another Timer while
maybe_send_batched_emails is still doing its work, which may cause
two or more threads to be running maybe_send_batched_emails
at the same time, mutating the shared data simultaneously.
Another less likely potential race condition was that
maybe_send_batched_emails after sending out its email, can call
ensure_timer(). If the consume function is run simultaneously
in the main thread, it will call ensure_timer() too, which,
given unfortunate timings, might lead to both calls setting a new Timer.
We add locking to the queue to avoid such race conditions.
Tested manually, by print debugging with the following setup:
1. Making handle_missedmessage_emails sleep 2 seconds for each email,
and changed BATCH_DURATION to 1s to make the queue start working
right after launching.
2. Putting a bunch of events in the queue.
3. ./manage.py process_queue --queue_name missedmessage_emails
4. Once maybe_send_batched_emails is called and while it's processing
the events, I pushed more events to the queue. That triggers the
consume() function and ensure_timer().
Before implementing the locking mechanism, this causes two threads
to run maybe_send_batched_emails at the same time, mutating each other's
shared data, causing a traceback such as
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/lib/python3.6/threading.py", line 1182, in run
self.function(*self.args, **self.kwargs)
File "/srv/zulip/zerver/worker/queue_processors.py", line 507, in maybe_send_batched_emails
del self.events_by_recipient[user_profile_id]
KeyError: '5'
With the locking mechanism, things get handled as expected, and
ensure_timer() exits if it can't obtain the lock due to
maybe_send_batched_emails still working.
Co-authored-by: Tim Abbott <tabbott@zulip.com>
2020-08-26 21:40:59 +02:00
|
|
|
|
|
|
|
# By only restarting the timer if there are actually events in
|
|
|
|
# the queue, we ensure this queue processor is idle when there
|
|
|
|
# are no missed-message emails to process. This avoids
|
|
|
|
# constant CPU usage when there is no work to do.
|
2021-07-07 16:59:03 +02:00
|
|
|
if ScheduledMessageNotificationEmail.objects.exists():
|
queue: Use locking to avoid race conditions in missedmessage_emails.
This queue had a race condition with creation of another Timer while
maybe_send_batched_emails is still doing its work, which may cause
two or more threads to be running maybe_send_batched_emails
at the same time, mutating the shared data simultaneously.
Another less likely potential race condition was that
maybe_send_batched_emails after sending out its email, can call
ensure_timer(). If the consume function is run simultaneously
in the main thread, it will call ensure_timer() too, which,
given unfortunate timings, might lead to both calls setting a new Timer.
We add locking to the queue to avoid such race conditions.
Tested manually, by print debugging with the following setup:
1. Making handle_missedmessage_emails sleep 2 seconds for each email,
and changed BATCH_DURATION to 1s to make the queue start working
right after launching.
2. Putting a bunch of events in the queue.
3. ./manage.py process_queue --queue_name missedmessage_emails
4. Once maybe_send_batched_emails is called and while it's processing
the events, I pushed more events to the queue. That triggers the
consume() function and ensure_timer().
Before implementing the locking mechanism, this causes two threads
to run maybe_send_batched_emails at the same time, mutating each other's
shared data, causing a traceback such as
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/lib/python3.6/threading.py", line 1182, in run
self.function(*self.args, **self.kwargs)
File "/srv/zulip/zerver/worker/queue_processors.py", line 507, in maybe_send_batched_emails
del self.events_by_recipient[user_profile_id]
KeyError: '5'
With the locking mechanism, things get handled as expected, and
ensure_timer() exits if it can't obtain the lock due to
maybe_send_batched_emails still working.
Co-authored-by: Tim Abbott <tabbott@zulip.com>
2020-08-26 21:40:59 +02:00
|
|
|
self.ensure_timer()
|
2013-09-30 17:53:46 +02:00
|
|
|
|
queue_processors: Shut down background missedmessage_emails thread.
Python's behaviour on `sys.exit` is to wait for all non-daemon threads
to exit. In the context of the missedmessage_emails worker, if any
work is pending, a non-daemon Timer thread exists, which is waiting
for 5 seconds. As soon as that thread is serviced, it sets up another
5-second Timer, a process which repeats until all
ScheduledMessageNotificationEmail records have been handled. This
likely takes two minutes, but may theoretically take up to a week
until the thread exits, and thus sys.exit can complete.
Supervisor only gives the process 30 seconds to shut down, so
something else must prevent this endless Timer.
When `stop` is called, take the lock so we can mutate the timer.
However, since `stop` may have been called from a signal handler, our
thread may _already_ have the lock. As Python provides no way to know
if our thread is the one which has the lock, make the lock a
re-entrant one, allowing us to always try to take it.
With the lock in hand, cancel any outstanding timers. A race exists
where the timer may not be able to be canceled because it has
finished, maybe_send_batched_emails has been called, and is itself
blocked on the lock. Handle this case by timing out the thread join
in `stop()`, and signal the running thread to exit by unsetting the
timer event, which will be detected once it claims the lock.
2021-11-22 23:52:05 +01:00
|
|
|
def stop(self) -> None:
|
|
|
|
# This may be called from a signal handler when we _already_
|
|
|
|
# have the lock. Python doesn't give us a way to check if our
|
|
|
|
# thread has the lock, so we instead use a re-entrant lock to
|
|
|
|
# always take it.
|
|
|
|
with self.lock:
|
|
|
|
# With the lock,we can safely inspect the timer_event and
|
|
|
|
# cancel it if it is still pending.
|
|
|
|
if self.timer_event is not None:
|
|
|
|
# We cancel and then join the timer with a timeout to
|
|
|
|
# prevent deadlock, where we took the lock, the timer
|
|
|
|
# then ran out and started maybe_send_batched_emails,
|
|
|
|
# and then it started waiting for the lock. The timer
|
|
|
|
# isn't running anymore so can't be canceled, and the
|
|
|
|
# thread is blocked on the lock, so will never join().
|
|
|
|
self.timer_event.cancel()
|
|
|
|
self.timer_event.join(timeout=1)
|
|
|
|
# In case we did hit this deadlock, we signal to
|
|
|
|
# maybe_send_batched_emails that it should abort by,
|
|
|
|
# before releasing the lock, unsetting the timer.
|
|
|
|
self.timer_event = None
|
|
|
|
super().stop()
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
@assign_queue("email_senders")
|
2021-03-20 14:07:02 +01:00
|
|
|
class EmailSendingWorker(LoopQueueProcessingWorker):
|
|
|
|
def __init__(self) -> None:
|
|
|
|
super().__init__()
|
2022-07-19 17:32:52 +02:00
|
|
|
self.connection: BaseEmailBackend = initialize_connection(None)
|
2021-03-20 14:07:02 +01:00
|
|
|
|
2017-09-15 09:38:12 +02:00
|
|
|
@retry_send_email_failures
|
2021-03-20 14:07:02 +01:00
|
|
|
def send_email(self, event: Dict[str, Any]) -> None:
|
2018-01-30 20:06:23 +01:00
|
|
|
# Copy the event, so that we don't pass the `failed_tries'
|
2021-04-27 01:51:50 +02:00
|
|
|
# data to send_email (which neither takes that
|
2018-01-30 20:06:23 +01:00
|
|
|
# argument nor needs that data).
|
|
|
|
copied_event = copy.deepcopy(event)
|
2021-02-12 08:20:45 +01:00
|
|
|
if "failed_tries" in copied_event:
|
|
|
|
del copied_event["failed_tries"]
|
2018-12-04 23:34:04 +01:00
|
|
|
handle_send_email_format_changes(copied_event)
|
2021-03-20 14:07:02 +01:00
|
|
|
self.connection = initialize_connection(self.connection)
|
|
|
|
send_email(**copied_event, connection=self.connection)
|
|
|
|
|
|
|
|
def consume_batch(self, events: List[Dict[str, Any]]) -> None:
|
|
|
|
for event in events:
|
|
|
|
self.send_email(event)
|
|
|
|
|
|
|
|
def stop(self) -> None:
|
|
|
|
try:
|
|
|
|
self.connection.close()
|
|
|
|
finally:
|
|
|
|
super().stop()
|
2017-03-06 08:45:59 +01:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
@assign_queue("missedmessage_mobile_notifications")
|
2021-07-13 11:52:36 +02:00
|
|
|
class PushNotificationsWorker(QueueProcessingWorker):
|
queue_processors: Disable timeouts with PushNotificationsWorker.
Since 385328524174, PushNotificationsWorker uses the aioapns library
to send Apple push notifications. This introduces an asyncio event
loop into this worker process, which, if unlucky, can respond poorly
when a SIGALRM is introduced to it:
```
[asyncio] Task exception was never retrieved
future: <Task finished coro=<send_apple_push_notification.<locals>.attempt_send() done, defined at /path/to/zerver/lib/push_notifications.py:166> exception=WorkerTimeoutException(30, 1)>
Traceback (most recent call last):
File "/path/to/zerver/lib/push_notifications.py", line 169, in attempt_send
result = await apns_context.apns.send_notification(request)
File "/path/to/zulip-py3-venv/lib/python3.6/site-packages/aioapns/client.py", line 57, in send_notification
response = await self.pool.send_notification(request)
File "/path/to/zulip-py3-venv/lib/python3.6/site-packages/aioapns/connection.py", line 407, in send_notification
response = await connection.send_notification(request)
File "/path/to/zulip-py3-venv/lib/python3.6/site-packages/aioapns/connection.py", line 189, in send_notification
data = json.dumps(request.message, ensure_ascii=False).encode()
File "/usr/lib/python3.6/json/__init__.py", line 238, in dumps
**kw).encode(obj)
File "/usr/lib/python3.6/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/lib/python3.6/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/path/to/zerver/worker/queue_processors.py", line 353, in timer_expired
raise WorkerTimeoutException(limit, len(events))
zerver.worker.queue_processors.WorkerTimeoutException: Timed out after 30 seconds processing 1 events
```
...which subsequently leads to the worker failing to make any progress
on the queue.
Remove the timeout on the worker. This may result in failing to make
forward progress if Apple/Google take overly long handling requests,
but is likely preferable to failing to make forward progress if _one_
request takes too long and gets unlucky with when the signal comes
through.
2021-10-21 02:24:16 +02:00
|
|
|
# The use of aioapns in the backend means that we cannot use
|
|
|
|
# SIGALRM to limit how long a consume takes, as SIGALRM does not
|
|
|
|
# play well with asyncio.
|
|
|
|
MAX_CONSUME_SECONDS = None
|
|
|
|
|
2018-11-27 18:12:11 +01:00
|
|
|
def start(self) -> None:
|
|
|
|
# initialize_push_notifications doesn't strictly do anything
|
|
|
|
# beyond printing some logging warnings if push notifications
|
|
|
|
# are not available in the current configuration.
|
|
|
|
initialize_push_notifications()
|
|
|
|
super().start()
|
|
|
|
|
2019-12-02 19:46:11 +01:00
|
|
|
def consume(self, event: Dict[str, Any]) -> None:
|
|
|
|
try:
|
|
|
|
if event.get("type", "add") == "remove":
|
2021-02-12 08:20:45 +01:00
|
|
|
message_ids = event.get("message_ids")
|
2021-07-13 11:52:36 +02:00
|
|
|
if message_ids is None:
|
|
|
|
# TODO/compatibility: Previously, we sent only one `message_id` in
|
|
|
|
# a payload for notification remove events. This was later changed
|
|
|
|
# to send a list of `message_ids` (with that field name), but we need
|
|
|
|
# compatibility code for events present in the queue during upgrade.
|
|
|
|
# Remove this when one can no longer upgrade from 1.9.2 (or earlier)
|
|
|
|
# to any version after 2.0.0
|
2021-02-12 08:20:45 +01:00
|
|
|
message_ids = [event["message_id"]]
|
|
|
|
handle_remove_push_notification(event["user_profile_id"], message_ids)
|
2019-12-02 19:46:11 +01:00
|
|
|
else:
|
2021-02-12 08:20:45 +01:00
|
|
|
handle_push_notification(event["user_profile_id"], event)
|
2019-12-02 19:46:11 +01:00
|
|
|
except PushNotificationBouncerRetryLaterError:
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2019-12-02 19:46:11 +01:00
|
|
|
def failure_processor(event: Dict[str, Any]) -> None:
|
|
|
|
logger.warning(
|
2020-05-02 08:44:14 +02:00
|
|
|
"Maximum retries exceeded for trigger:%s event:push_notification",
|
2021-02-12 08:20:45 +01:00
|
|
|
event["user_profile_id"],
|
2021-02-12 08:19:30 +01:00
|
|
|
)
|
|
|
|
|
2019-12-04 19:08:48 +01:00
|
|
|
retry_event(self.queue_name, event, failure_processor)
|
2013-11-19 00:55:24 +01:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
@assign_queue("error_reports")
|
2013-11-13 19:12:22 +01:00
|
|
|
class ErrorReporter(QueueProcessingWorker):
|
2018-03-10 08:29:46 +01:00
|
|
|
def consume(self, event: Mapping[str, Any]) -> None:
|
2022-01-31 18:57:48 +01:00
|
|
|
error_types = ["browser", "server"]
|
|
|
|
assert event["type"] in error_types
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
logging.info(
|
2021-02-12 08:20:45 +01:00
|
|
|
"Processing traceback with type %s for %s", event["type"], event.get("user_email")
|
2021-02-12 08:19:30 +01:00
|
|
|
)
|
2017-10-24 06:14:22 +02:00
|
|
|
if settings.ERROR_REPORTING:
|
2021-02-12 08:20:45 +01:00
|
|
|
do_report_error(event["type"], event["report"])
|
2013-11-13 19:12:22 +01:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
@assign_queue("digest_emails")
|
2018-02-25 23:52:38 +01:00
|
|
|
class DigestWorker(QueueProcessingWorker): # nocoverage
|
2013-10-30 20:48:04 +01:00
|
|
|
# Who gets a digest is entirely determined by the enqueue_digest_emails
|
2013-10-21 23:26:41 +02:00
|
|
|
# management command, not here.
|
2018-03-10 08:29:46 +01:00
|
|
|
def consume(self, event: Mapping[str, Any]) -> None:
|
2020-11-13 18:13:13 +01:00
|
|
|
if "user_ids" in event:
|
|
|
|
user_ids = event["user_ids"]
|
|
|
|
else:
|
|
|
|
# legacy code may have enqueued a single id
|
|
|
|
user_ids = [event["user_profile_id"]]
|
|
|
|
bulk_handle_digest_email(user_ids, event["cutoff"])
|
2013-10-28 20:45:35 +01:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
@assign_queue("email_mirror")
|
2013-12-17 22:37:51 +01:00
|
|
|
class MirrorWorker(QueueProcessingWorker):
|
2018-03-10 08:29:46 +01:00
|
|
|
def consume(self, event: Mapping[str, Any]) -> None:
|
2021-02-12 08:20:45 +01:00
|
|
|
rcpt_to = event["rcpt_to"]
|
2020-06-05 23:35:52 +02:00
|
|
|
msg = email.message_from_bytes(
|
|
|
|
base64.b64decode(event["msg_base64"]),
|
|
|
|
policy=email.policy.default,
|
|
|
|
)
|
2020-06-05 23:26:35 +02:00
|
|
|
assert isinstance(msg, EmailMessage) # https://github.com/python/typeshed/issues/2417
|
2019-03-16 11:39:09 +01:00
|
|
|
if not is_missed_message_address(rcpt_to):
|
|
|
|
# Missed message addresses are one-time use, so we don't need
|
|
|
|
# to worry about emails to them resulting in message spam.
|
2020-01-10 10:25:56 +01:00
|
|
|
recipient_realm = decode_stream_email_address(rcpt_to)[0].realm
|
2019-03-16 11:39:09 +01:00
|
|
|
try:
|
|
|
|
rate_limit_mirror_by_realm(recipient_realm)
|
|
|
|
except RateLimited:
|
2021-02-12 08:19:30 +01:00
|
|
|
logger.warning(
|
|
|
|
"MirrorWorker: Rejecting an email from: %s to realm: %s - rate limited.",
|
2021-02-12 08:20:45 +01:00
|
|
|
msg["From"],
|
2021-11-22 13:05:07 +01:00
|
|
|
recipient_realm.subdomain,
|
2021-02-12 08:19:30 +01:00
|
|
|
)
|
2019-03-16 11:39:09 +01:00
|
|
|
return
|
|
|
|
|
2020-06-05 23:26:35 +02:00
|
|
|
mirror_email(msg, rcpt_to=rcpt_to)
|
2013-12-17 22:37:51 +01:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
@assign_queue("embed_links")
|
2016-10-27 12:06:44 +02:00
|
|
|
class FetchLinksEmbedData(QueueProcessingWorker):
|
2020-10-14 00:18:50 +02:00
|
|
|
# This is a slow queue with network requests, so a disk write is negligible.
|
2020-09-06 17:04:36 +02:00
|
|
|
# Update stats file after every consume call.
|
|
|
|
CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM = 1
|
|
|
|
|
2018-03-10 08:29:46 +01:00
|
|
|
def consume(self, event: Mapping[str, Any]) -> None:
|
2022-04-14 21:57:20 +02:00
|
|
|
url_embed_data: Dict[str, Optional[UrlEmbedData]] = {}
|
2021-02-12 08:20:45 +01:00
|
|
|
for url in event["urls"]:
|
2020-05-21 16:15:38 +02:00
|
|
|
start_time = time.time()
|
2022-04-14 21:57:20 +02:00
|
|
|
url_embed_data[url] = url_preview.get_link_embed_data(url)
|
2021-02-12 08:19:30 +01:00
|
|
|
logging.info(
|
|
|
|
"Time spent on get_link_embed_data for %s: %s", url, time.time() - start_time
|
|
|
|
)
|
2016-10-27 12:06:44 +02:00
|
|
|
|
2022-04-18 23:51:16 +02:00
|
|
|
with transaction.atomic():
|
2022-04-18 23:30:19 +02:00
|
|
|
try:
|
|
|
|
message = Message.objects.select_for_update().get(id=event["message_id"])
|
|
|
|
except Message.DoesNotExist:
|
|
|
|
# Message may have been deleted
|
|
|
|
return
|
|
|
|
|
2022-04-18 23:51:16 +02:00
|
|
|
# If the message changed, we will run this task after updating the message
|
|
|
|
# in zerver.actions.message_edit.check_update_message
|
|
|
|
if message.content != event["message_content"]:
|
|
|
|
return
|
|
|
|
|
2017-01-22 05:55:30 +01:00
|
|
|
# Fetch the realm whose settings we're using for rendering
|
2021-02-12 08:20:45 +01:00
|
|
|
realm = Realm.objects.get(id=event["message_realm_id"])
|
2017-01-22 05:55:30 +01:00
|
|
|
|
2016-10-27 12:06:44 +02:00
|
|
|
# If rendering fails, the called code will raise a JsonableError.
|
2021-06-17 12:20:40 +02:00
|
|
|
rendering_result = render_incoming_message(
|
2022-04-14 21:57:20 +02:00
|
|
|
message,
|
|
|
|
message.content,
|
|
|
|
realm,
|
|
|
|
url_embed_data=url_embed_data,
|
2021-02-12 08:19:30 +01:00
|
|
|
)
|
2021-06-17 12:20:40 +02:00
|
|
|
do_update_embedded_data(message.sender, message, message.content, rendering_result)
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2021-07-06 15:37:18 +02:00
|
|
|
def timer_expired(
|
|
|
|
self, limit: int, events: List[Dict[str, Any]], signal: int, frame: FrameType
|
|
|
|
) -> None:
|
|
|
|
assert len(events) == 1
|
|
|
|
event = events[0]
|
|
|
|
|
|
|
|
logging.warning(
|
2021-09-02 03:03:47 +02:00
|
|
|
"Timed out in %s after %s seconds while fetching URLs for message %s: %s",
|
|
|
|
self.queue_name,
|
2021-07-06 15:37:18 +02:00
|
|
|
limit,
|
|
|
|
event["message_id"],
|
|
|
|
event["urls"],
|
|
|
|
)
|
2021-07-07 13:25:42 +02:00
|
|
|
raise InterruptConsumeException
|
2021-07-06 15:37:18 +02:00
|
|
|
|
2017-04-20 22:04:08 +02:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
@assign_queue("outgoing_webhooks")
|
2017-04-20 22:04:08 +02:00
|
|
|
class OutgoingWebhookWorker(QueueProcessingWorker):
|
2020-06-23 00:53:03 +02:00
|
|
|
def consume(self, event: Dict[str, Any]) -> None:
|
2021-02-12 08:20:45 +01:00
|
|
|
message = event["message"]
|
|
|
|
event["command"] = message["content"]
|
2016-07-23 07:51:30 +02:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
services = get_bot_services(event["user_profile_id"])
|
2016-07-23 07:51:30 +02:00
|
|
|
for service in services:
|
2021-02-12 08:20:45 +01:00
|
|
|
event["service_name"] = str(service.name)
|
2017-05-26 16:37:45 +02:00
|
|
|
service_handler = get_outgoing_webhook_service_handler(service)
|
2021-03-27 03:11:40 +01:00
|
|
|
do_rest_call(service.base_url, event, service_handler)
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2017-05-25 20:41:29 +02:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
@assign_queue("embedded_bots")
|
2017-05-25 20:41:29 +02:00
|
|
|
class EmbeddedBotWorker(QueueProcessingWorker):
|
2018-03-10 08:29:46 +01:00
|
|
|
def get_bot_api_client(self, user_profile: UserProfile) -> EmbeddedBotHandler:
|
2017-06-20 12:22:55 +02:00
|
|
|
return EmbeddedBotHandler(user_profile)
|
2017-05-25 20:41:29 +02:00
|
|
|
|
2018-03-10 08:29:46 +01:00
|
|
|
def consume(self, event: Mapping[str, Any]) -> None:
|
2021-02-12 08:20:45 +01:00
|
|
|
user_profile_id = event["user_profile_id"]
|
2017-05-25 20:41:29 +02:00
|
|
|
user_profile = get_user_profile_by_id(user_profile_id)
|
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
message: Dict[str, Any] = event["message"]
|
2017-05-25 20:41:29 +02:00
|
|
|
|
|
|
|
# TODO: Do we actually want to allow multiple Services per bot user?
|
|
|
|
services = get_bot_services(user_profile_id)
|
|
|
|
for service in services:
|
2017-07-25 19:03:09 +02:00
|
|
|
bot_handler = get_bot_handler(str(service.name))
|
|
|
|
if bot_handler is None:
|
2020-05-02 08:44:14 +02:00
|
|
|
logging.error(
|
|
|
|
"Error: User %s has bot with invalid embedded bot service %s",
|
2021-02-12 08:19:30 +01:00
|
|
|
user_profile_id,
|
|
|
|
service.name,
|
2020-05-02 08:44:14 +02:00
|
|
|
)
|
2017-07-25 19:03:09 +02:00
|
|
|
continue
|
2018-02-08 15:51:38 +01:00
|
|
|
try:
|
2021-02-12 08:20:45 +01:00
|
|
|
if hasattr(bot_handler, "initialize"):
|
2019-01-31 14:32:37 +01:00
|
|
|
bot_handler.initialize(self.get_bot_api_client(user_profile))
|
2021-02-12 08:20:45 +01:00
|
|
|
if event["trigger"] == "mention":
|
|
|
|
message["content"] = extract_query_without_mention(
|
2018-02-08 15:51:38 +01:00
|
|
|
message=message,
|
2021-03-26 02:27:19 +01:00
|
|
|
client=self.get_bot_api_client(user_profile),
|
2018-02-08 15:51:38 +01:00
|
|
|
)
|
2021-02-12 08:20:45 +01:00
|
|
|
assert message["content"] is not None
|
2018-02-08 15:51:38 +01:00
|
|
|
bot_handler.handle_message(
|
2017-10-10 14:29:04 +02:00
|
|
|
message=message,
|
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
|
|
|
bot_handler=self.get_bot_api_client(user_profile),
|
2017-10-10 14:29:04 +02:00
|
|
|
)
|
2018-02-08 15:51:38 +01:00
|
|
|
except EmbeddedBotQuitException as e:
|
|
|
|
logging.warning(str(e))
|
2017-11-13 21:24:51 +01:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
@assign_queue("deferred_work")
|
2017-11-13 21:24:51 +01:00
|
|
|
class DeferredWorker(QueueProcessingWorker):
|
2020-08-06 18:21:42 +02:00
|
|
|
"""This queue processor is intended for cases where we want to trigger a
|
|
|
|
potentially expensive, not urgent, job to be run on a separate
|
|
|
|
thread from the Django worker that initiated it (E.g. so we that
|
|
|
|
can provide a low-latency HTTP response or avoid risk of request
|
|
|
|
timeouts for an operation that could in rare cases take minutes).
|
|
|
|
"""
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2020-09-29 01:16:54 +02:00
|
|
|
# Because these operations have no SLO, and can take minutes,
|
|
|
|
# remove any processing timeouts
|
|
|
|
MAX_CONSUME_SECONDS = None
|
|
|
|
|
2019-12-03 20:19:38 +01:00
|
|
|
def consume(self, event: Dict[str, Any]) -> None:
|
2020-10-29 08:00:39 +01:00
|
|
|
start = time.time()
|
2021-02-12 08:20:45 +01:00
|
|
|
if event["type"] == "mark_stream_messages_as_read":
|
|
|
|
user_profile = get_user_profile_by_id(event["user_profile_id"])
|
2017-11-13 21:24:51 +01:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
for recipient_id in event["stream_recipient_ids"]:
|
2021-05-26 17:41:52 +02:00
|
|
|
count = do_mark_stream_messages_as_read(user_profile, recipient_id)
|
|
|
|
logger.info(
|
|
|
|
"Marked %s messages as read for user %s, stream_recipient_id %s",
|
|
|
|
count,
|
|
|
|
user_profile.id,
|
|
|
|
recipient_id,
|
|
|
|
)
|
2021-02-12 08:20:45 +01:00
|
|
|
elif event["type"] == "mark_stream_messages_as_read_for_everyone":
|
2020-08-06 18:21:42 +02:00
|
|
|
# This event is generated by the stream deactivation code path.
|
|
|
|
batch_size = 100
|
|
|
|
offset = 0
|
|
|
|
while True:
|
2021-02-12 08:19:30 +01:00
|
|
|
messages = Message.objects.filter(
|
|
|
|
recipient_id=event["stream_recipient_id"]
|
|
|
|
).order_by("id")[offset : offset + batch_size]
|
|
|
|
UserMessage.objects.filter(message__in=messages).extra(
|
|
|
|
where=[UserMessage.where_unread()]
|
2021-02-12 08:20:45 +01:00
|
|
|
).update(flags=F("flags").bitor(UserMessage.flags.read))
|
2020-08-06 18:21:42 +02:00
|
|
|
offset += len(messages)
|
|
|
|
if len(messages) < batch_size:
|
|
|
|
break
|
2021-05-26 17:41:52 +02:00
|
|
|
logger.info(
|
|
|
|
"Marked %s messages as read for all users, stream_recipient_id %s",
|
|
|
|
offset,
|
|
|
|
event["stream_recipient_id"],
|
|
|
|
)
|
2021-02-12 08:20:45 +01:00
|
|
|
elif event["type"] == "clear_push_device_tokens":
|
2019-12-03 20:19:38 +01:00
|
|
|
try:
|
|
|
|
clear_push_device_tokens(event["user_profile_id"])
|
|
|
|
except PushNotificationBouncerRetryLaterError:
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2019-12-03 20:19:38 +01:00
|
|
|
def failure_processor(event: Dict[str, Any]) -> None:
|
|
|
|
logger.warning(
|
2020-05-02 08:44:14 +02:00
|
|
|
"Maximum retries exceeded for trigger:%s event:clear_push_device_tokens",
|
2021-02-12 08:20:45 +01:00
|
|
|
event["user_profile_id"],
|
2021-02-12 08:19:30 +01:00
|
|
|
)
|
|
|
|
|
2019-12-04 19:08:48 +01:00
|
|
|
retry_event(self.queue_name, event, failure_processor)
|
2021-02-12 08:20:45 +01:00
|
|
|
elif event["type"] == "realm_export":
|
|
|
|
realm = Realm.objects.get(id=event["realm_id"])
|
2019-03-27 00:57:33 +01:00
|
|
|
output_dir = tempfile.mkdtemp(prefix="zulip-export-")
|
2021-02-12 08:20:45 +01:00
|
|
|
export_event = RealmAuditLog.objects.get(id=event["id"])
|
|
|
|
user_profile = get_user_profile_by_id(event["user_profile_id"])
|
2020-04-16 23:00:24 +02:00
|
|
|
|
|
|
|
try:
|
2021-02-12 08:19:30 +01:00
|
|
|
public_url = export_realm_wrapper(
|
|
|
|
realm=realm,
|
|
|
|
output_dir=output_dir,
|
|
|
|
threads=6,
|
|
|
|
upload=True,
|
|
|
|
public_only=True,
|
|
|
|
)
|
2020-04-16 23:00:24 +02:00
|
|
|
except Exception:
|
2021-02-12 08:19:30 +01:00
|
|
|
export_event.extra_data = orjson.dumps(
|
|
|
|
dict(
|
|
|
|
failed_timestamp=timezone_now().timestamp(),
|
|
|
|
)
|
|
|
|
).decode()
|
2021-02-12 08:20:45 +01:00
|
|
|
export_event.save(update_fields=["extra_data"])
|
2022-04-28 19:52:34 +02:00
|
|
|
logging.exception(
|
2020-05-02 08:44:14 +02:00
|
|
|
"Data export for %s failed after %s",
|
2021-02-12 08:19:30 +01:00
|
|
|
user_profile.realm.string_id,
|
|
|
|
time.time() - start,
|
2022-04-28 19:52:34 +02:00
|
|
|
stack_info=True,
|
2020-05-02 08:44:14 +02:00
|
|
|
)
|
2020-04-16 23:00:24 +02:00
|
|
|
notify_realm_export(user_profile)
|
|
|
|
return
|
2019-03-27 00:57:33 +01:00
|
|
|
|
|
|
|
assert public_url is not None
|
|
|
|
|
2019-08-11 20:17:16 +02:00
|
|
|
# Update the extra_data field now that the export is complete.
|
2021-02-12 08:19:30 +01:00
|
|
|
export_event.extra_data = orjson.dumps(
|
|
|
|
dict(
|
|
|
|
export_path=urllib.parse.urlparse(public_url).path,
|
|
|
|
)
|
|
|
|
).decode()
|
2021-02-12 08:20:45 +01:00
|
|
|
export_event.save(update_fields=["extra_data"])
|
2019-05-17 00:54:56 +02:00
|
|
|
|
2019-03-27 00:57:33 +01:00
|
|
|
# Send a private message notification letting the user who
|
|
|
|
# triggered the export know the export finished.
|
2020-06-26 15:24:37 +02:00
|
|
|
with override_language(user_profile.default_language):
|
2021-02-12 08:19:30 +01:00
|
|
|
content = _(
|
|
|
|
"Your data export is complete and has been uploaded here:\n\n{public_url}"
|
|
|
|
).format(public_url=public_url)
|
2019-03-27 00:57:33 +01:00
|
|
|
internal_send_private_message(
|
2021-03-08 11:36:53 +01:00
|
|
|
sender=get_system_bot(settings.NOTIFICATION_BOT, realm.id),
|
2019-03-27 00:57:33 +01:00
|
|
|
recipient_user=user_profile,
|
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
|
|
|
content=content,
|
2019-03-27 00:57:33 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
# For future frontend use, also notify administrator
|
2019-06-24 02:51:13 +02:00
|
|
|
# clients that the export happened.
|
2019-08-02 00:14:58 +02:00
|
|
|
notify_realm_export(user_profile)
|
2020-05-02 08:44:14 +02:00
|
|
|
logging.info(
|
|
|
|
"Completed data export for %s in %s",
|
2021-02-12 08:19:30 +01:00
|
|
|
user_profile.realm.string_id,
|
|
|
|
time.time() - start,
|
2020-05-02 08:44:14 +02:00
|
|
|
)
|
2022-02-10 22:06:11 +01:00
|
|
|
elif event["type"] == "reupload_realm_emoji":
|
|
|
|
# This is a special event queued by the migration for reuploading emojis.
|
|
|
|
# We don't want to run the necessary code in the actual migration, so it simply
|
|
|
|
# queues the necessary event, and the actual work is done here in the queue worker.
|
|
|
|
realm = Realm.objects.get(id=event["realm_id"])
|
|
|
|
logger.info("Processing reupload_realm_emoji event for realm %s", realm.id)
|
|
|
|
handle_reupload_emojis_event(realm, logger)
|
2022-04-15 04:51:41 +02:00
|
|
|
elif event["type"] == "soft_reactivate":
|
|
|
|
user_profile = get_user_profile_by_id(event["user_profile_id"])
|
|
|
|
reactivate_user_if_soft_deactivated(user_profile)
|
2020-10-03 01:29:49 +02:00
|
|
|
|
2020-10-29 08:00:39 +01:00
|
|
|
end = time.time()
|
2021-02-12 08:20:45 +01:00
|
|
|
logger.info("deferred_work processed %s event (%dms)", event["type"], (end - start) * 1000)
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2020-10-29 08:00:39 +01:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
@assign_queue("test", is_test_queue=True)
|
2020-10-03 01:29:49 +02:00
|
|
|
class TestWorker(QueueProcessingWorker):
|
|
|
|
# This worker allows you to test the queue worker infrastructure without
|
|
|
|
# creating significant side effects. It can be useful in development or
|
|
|
|
# for troubleshooting prod/staging. It pulls a message off the test queue
|
|
|
|
# and appends it to a file in /tmp.
|
|
|
|
def consume(self, event: Mapping[str, Any]) -> None: # nocoverage
|
|
|
|
fn = settings.ZULIP_WORKER_TEST_FILE
|
|
|
|
message = orjson.dumps(event)
|
|
|
|
logging.info("TestWorker should append this message to %s: %s", fn, message.decode())
|
2021-02-12 08:20:45 +01:00
|
|
|
with open(fn, "ab") as f:
|
|
|
|
f.write(message + b"\n")
|
2020-10-03 01:29:49 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
@assign_queue("noop", is_test_queue=True)
|
2020-10-03 01:29:49 +02:00
|
|
|
class NoopWorker(QueueProcessingWorker):
|
2020-10-14 00:18:50 +02:00
|
|
|
"""Used to profile the queue processing framework, in zilencer's queue_rate."""
|
2020-10-03 01:29:49 +02:00
|
|
|
|
2021-04-12 23:52:10 +02:00
|
|
|
def __init__(self, max_consume: int = 1000, slow_queries: Sequence[int] = []) -> None:
|
2020-10-03 01:29:49 +02:00
|
|
|
self.consumed = 0
|
|
|
|
self.max_consume = max_consume
|
2021-04-12 23:52:10 +02:00
|
|
|
self.slow_queries: Set[int] = set(slow_queries)
|
2020-10-03 01:29:49 +02:00
|
|
|
|
|
|
|
def consume(self, event: Mapping[str, Any]) -> None:
|
|
|
|
self.consumed += 1
|
|
|
|
if self.consumed in self.slow_queries:
|
|
|
|
logging.info("Slow request...")
|
|
|
|
time.sleep(60)
|
|
|
|
logging.info("Done!")
|
|
|
|
if self.consumed >= self.max_consume:
|
|
|
|
self.stop()
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
@assign_queue("noop_batch", is_test_queue=True)
|
2020-10-03 01:29:49 +02:00
|
|
|
class BatchNoopWorker(LoopQueueProcessingWorker):
|
2020-10-14 00:18:50 +02:00
|
|
|
"""Used to profile the queue processing framework, in zilencer's queue_rate."""
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2021-11-10 03:02:01 +01:00
|
|
|
batch_size = 100
|
2020-10-03 01:29:49 +02:00
|
|
|
|
2021-04-12 23:52:10 +02:00
|
|
|
def __init__(self, max_consume: int = 1000, slow_queries: Sequence[int] = []) -> None:
|
2020-10-03 01:29:49 +02:00
|
|
|
self.consumed = 0
|
|
|
|
self.max_consume = max_consume
|
2021-04-12 23:52:10 +02:00
|
|
|
self.slow_queries: Set[int] = set(slow_queries)
|
2020-10-03 01:29:49 +02:00
|
|
|
|
|
|
|
def consume_batch(self, events: List[Dict[str, Any]]) -> None:
|
|
|
|
event_numbers = set(range(self.consumed + 1, self.consumed + 1 + len(events)))
|
|
|
|
found_slow = self.slow_queries & event_numbers
|
|
|
|
if found_slow:
|
|
|
|
logging.info("%d slow requests...", len(found_slow))
|
|
|
|
time.sleep(60 * len(found_slow))
|
|
|
|
logging.info("Done!")
|
|
|
|
self.consumed += len(events)
|
|
|
|
if self.consumed >= self.max_consume:
|
|
|
|
self.stop()
|