diff --git a/docs/overview/architecture-overview.md b/docs/overview/architecture-overview.md index c8aac56e64..d08e6b784a 100644 --- a/docs/overview/architecture-overview.md +++ b/docs/overview/architecture-overview.md @@ -220,7 +220,7 @@ Two simple wrappers around `pika` (the Python RabbitMQ client) are in Tornado and a more general client for use elsewhere. Most of the processes started by Supervisor are queue processors that continually pull things out of a RabbitMQ queue and handle them; they are defined -in `zerver/worker/queue_processors.py`. +in `zerver/worker/`. Also see [the queuing guide](../subsystems/queuing.md). diff --git a/docs/overview/directory-structure.md b/docs/overview/directory-structure.md index 65f78d40c7..ad1ec79f9a 100644 --- a/docs/overview/directory-structure.md +++ b/docs/overview/directory-structure.md @@ -34,7 +34,7 @@ paths will be familiar to Django developers. - `zerver/tornado/views.py` Tornado views. -- `zerver/worker/queue_processors.py` [Queue workers](../subsystems/queuing.md). +- `zerver/worker/` [Queue workers](../subsystems/queuing.md). - `zerver/lib/markdown/` [Backend Markdown processor](../subsystems/markdown.md). diff --git a/docs/subsystems/queuing.md b/docs/subsystems/queuing.md index 0b219522a6..2b2ca4d409 100644 --- a/docs/subsystems/queuing.md +++ b/docs/subsystems/queuing.md @@ -35,14 +35,12 @@ custom integration defined in `zerver/lib/queue.py`. To add a new queue processor: -- Define the processor in `zerver/worker/queue_processors.py` using - the `@assign_queue` decorator; it's pretty easy to get the template - for an existing similar queue processor. This suffices to test your - queue worker in the Zulip development environment - (`tools/run-dev` will automatically restart the queue processors - and start running your new queue processor code). You can also run - a single queue processor manually using e.g. - `./manage.py process_queue --queue=user_activity`. +- Define the processor in `zerver/worker/` using the `@assign_queue` decorator; + it's pretty easy to get the template for an existing similar queue + processor. This suffices to test your queue worker in the Zulip development + environment (`tools/run-dev` will automatically restart the queue processors + and start running your new queue processor code). You can also run a single + queue processor manually using e.g. `./manage.py process_queue --queue=user_activity`. - So that supervisord will know to run the queue processor in production, you will need to add to the `queues` variable in diff --git a/tools/linter_lib/custom_check.py b/tools/linter_lib/custom_check.py index 1a08da952e..b10f5adc9c 100644 --- a/tools/linter_lib/custom_check.py +++ b/tools/linter_lib/custom_check.py @@ -371,11 +371,8 @@ python_rules = RuleList( "pattern": "[^a-z]Message.objects.get", "exclude": { "zerver/tests", - "zerver/lib/onboarding.py", "zilencer/management/commands/add_mock_conversation.py", - "zerver/worker/queue_processors.py", "zerver/management/commands/export.py", - "zerver/lib/export.py", }, "description": "Please use access_message() to fetch Message objects", }, diff --git a/tools/test-backend b/tools/test-backend index 02227afb10..429b556b30 100755 --- a/tools/test-backend +++ b/tools/test-backend @@ -66,12 +66,20 @@ not_yet_fully_covered = [ "zerver/lib/i18n.py", "zerver/lib/send_email.py", "zerver/lib/url_preview/preview.py", - "zerver/worker/queue_processors.py", # Markdown sub-libs should have full coverage too; a lot are really close "zerver/lib/markdown/api_arguments_table_generator.py", "zerver/lib/markdown/fenced_code.py", "zerver/lib/markdown/help_relative_links.py", "zerver/lib/markdown/nested_code_blocks.py", + # Workers should get full coverage; many have it already + "zerver/worker/deferred_work.py", + "zerver/worker/email_senders.py", + "zerver/worker/invites.py", + "zerver/worker/missedmessage_emails.py", + # Worker-associated files; lower-priority to get testing on + "zerver/worker/base.py", + "zerver/worker/queue_processors.py", + "zerver/worker/test.py", # Other lib files that ideally would coverage, but aren't sorted "zerver/filters.py", "zerver/middleware.py", diff --git a/zerver/lib/digest.py b/zerver/lib/digest.py index a79b7f0ed1..ef4c624786 100644 --- a/zerver/lib/digest.py +++ b/zerver/lib/digest.py @@ -87,7 +87,7 @@ class DigestTopic: # Changes to this should also be reflected in -# zerver/worker/queue_processors.py:DigestWorker.consume() +# zerver/worker/digest_emails.py:DigestWorker.consume() def queue_digest_user_ids(user_ids: List[int], cutoff: datetime) -> None: # Convert cutoff to epoch seconds for transit. event = {"user_ids": user_ids, "cutoff": cutoff.strftime("%s")} diff --git a/zerver/lib/push_notifications.py b/zerver/lib/push_notifications.py index 24aea636e2..c65df45af0 100644 --- a/zerver/lib/push_notifications.py +++ b/zerver/lib/push_notifications.py @@ -1263,7 +1263,7 @@ def handle_remove_push_notification(user_profile_id: int, message_ids: List[int] def handle_push_notification(user_profile_id: int, missed_message: Dict[str, Any]) -> None: """ missed_message is the event received by the - zerver.worker.queue_processors.PushNotificationWorker.consume function. + zerver.worker.missedmessage_mobile_notifications.PushNotificationWorker.consume function. """ if not push_notifications_configured(): return diff --git a/zerver/tests/test_digest.py b/zerver/tests/test_digest.py index d2f4958ca3..e5099b516e 100644 --- a/zerver/tests/test_digest.py +++ b/zerver/tests/test_digest.py @@ -379,9 +379,7 @@ class TestDigestEmailMessages(ZulipTestCase): realm, "digest_weekday", timezone_now().weekday(), acting_user=None ) cutoff = timezone_now() - timedelta(days=0) - with mock.patch( - "zerver.worker.queue_processors.bulk_handle_digest_email" - ) as queue_mock: + with mock.patch("zerver.worker.digest_emails.bulk_handle_digest_email") as queue_mock: enqueue_emails(cutoff) return 0 if queue_mock.call_args is None else len(queue_mock.call_args[0][0]) @@ -403,7 +401,7 @@ class TestDigestEmailMessages(ZulipTestCase): # Check that all users without a UserActivityInterval entry are considered # inactive users and get enqueued. - with mock.patch("zerver.worker.queue_processors.bulk_handle_digest_email") as queue_mock: + with mock.patch("zerver.worker.digest_emails.bulk_handle_digest_email") as queue_mock: _enqueue_emails_for_realm(realm, cutoff) num_queued_users = len(queue_mock.call_args[0][0]) @@ -418,7 +416,7 @@ class TestDigestEmailMessages(ZulipTestCase): ) # Now we expect no users, due to recent activity. - with mock.patch("zerver.worker.queue_processors.bulk_handle_digest_email") as queue_mock: + with mock.patch("zerver.worker.digest_emails.bulk_handle_digest_email") as queue_mock: _enqueue_emails_for_realm(realm, cutoff) self.assertEqual(queue_mock.call_count, 0) @@ -427,7 +425,7 @@ class TestDigestEmailMessages(ZulipTestCase): last_visit = timezone_now() - timedelta(days=7) UserActivityInterval.objects.all().update(start=last_visit, end=last_visit) - with mock.patch("zerver.worker.queue_processors.bulk_handle_digest_email") as queue_mock: + with mock.patch("zerver.worker.digest_emails.bulk_handle_digest_email") as queue_mock: _enqueue_emails_for_realm(realm, cutoff) num_queued_users = len(queue_mock.call_args[0][0]) diff --git a/zerver/tests/test_email_mirror.py b/zerver/tests/test_email_mirror.py index 11b7cbc062..b02f17bbd3 100644 --- a/zerver/tests/test_email_mirror.py +++ b/zerver/tests/test_email_mirror.py @@ -41,7 +41,7 @@ from zerver.models import Attachment, Recipient, Stream, UserProfile from zerver.models.realms import get_realm from zerver.models.streams import get_stream from zerver.models.users import get_system_bot -from zerver.worker.queue_processors import MirrorWorker +from zerver.worker.email_mirror import MirrorWorker if TYPE_CHECKING: from django.test.client import _MonkeyPatchedWSGIResponse as TestHttpResponse diff --git a/zerver/tests/test_home.py b/zerver/tests/test_home.py index 38a22c6285..34237b1599 100644 --- a/zerver/tests/test_home.py +++ b/zerver/tests/test_home.py @@ -28,7 +28,7 @@ from zerver.models import DefaultStream, Draft, Realm, UserActivity, UserProfile from zerver.models.realms import get_realm from zerver.models.streams import get_stream from zerver.models.users import get_system_bot, get_user -from zerver.worker.queue_processors import UserActivityWorker +from zerver.worker.user_activity import UserActivityWorker if TYPE_CHECKING: from django.test.client import _MonkeyPatchedWSGIResponse as TestHttpResponse diff --git a/zerver/tests/test_link_embed.py b/zerver/tests/test_link_embed.py index a69de2ce49..fe75a86154 100644 --- a/zerver/tests/test_link_embed.py +++ b/zerver/tests/test_link_embed.py @@ -22,7 +22,7 @@ from zerver.lib.url_preview.parsers import GenericParser, OpenGraphParser from zerver.lib.url_preview.preview import get_link_embed_data from zerver.lib.url_preview.types import UrlEmbedData, UrlOEmbedData from zerver.models import Message, Realm, UserProfile -from zerver.worker.queue_processors import FetchLinksEmbedData +from zerver.worker.embed_links import FetchLinksEmbedData def reconstruct_url(url: str, maxwidth: int = 640, maxheight: int = 480) -> str: @@ -981,7 +981,7 @@ class PreviewTestCase(ZulipTestCase): with self.settings(TEST_SUITE=False): with self.assertLogs(level="INFO") as info_logs: with mock.patch( - "zerver.worker.queue_processors.url_preview.get_link_embed_data", + "zerver.worker.embed_links.url_preview.get_link_embed_data", lambda *args, **kwargs: mocked_data, ): FetchLinksEmbedData().consume(event) @@ -1019,7 +1019,7 @@ class PreviewTestCase(ZulipTestCase): with self.settings(TEST_SUITE=False): with self.assertLogs(level="INFO") as info_logs: with mock.patch( - "zerver.worker.queue_processors.url_preview.get_link_embed_data", + "zerver.worker.embed_links.url_preview.get_link_embed_data", lambda *args, **kwargs: mocked_data, ): FetchLinksEmbedData().consume(event) diff --git a/zerver/tests/test_push_notifications.py b/zerver/tests/test_push_notifications.py index f5343669cc..5c47f7cd1e 100644 --- a/zerver/tests/test_push_notifications.py +++ b/zerver/tests/test_push_notifications.py @@ -1297,9 +1297,9 @@ class PushBouncerNotificationTest(BouncerTestCase): # Now we want to remove them using the bouncer after an API key change. # First we test error handling in case of issues with the bouncer: with mock.patch( - "zerver.worker.queue_processors.clear_push_device_tokens", + "zerver.worker.deferred_work.clear_push_device_tokens", side_effect=PushNotificationBouncerRetryLaterError("test"), - ), mock.patch("zerver.worker.queue_processors.retry_event") as mock_retry: + ), mock.patch("zerver.worker.deferred_work.retry_event") as mock_retry: do_regenerate_api_key(user, user) mock_retry.assert_called() diff --git a/zerver/tests/test_queue_worker.py b/zerver/tests/test_queue_worker.py index 18f6e0d1f6..d0c38e84d1 100644 --- a/zerver/tests/test_queue_worker.py +++ b/zerver/tests/test_queue_worker.py @@ -1,12 +1,10 @@ import base64 -import itertools import os import signal import time from collections import defaultdict from contextlib import contextmanager, suppress from datetime import datetime, timedelta, timezone -from inspect import isabstract from typing import Any, Callable, Dict, Iterator, List, Mapping, Optional from unittest.mock import MagicMock, patch @@ -36,13 +34,14 @@ from zerver.models.realms import get_realm from zerver.models.scheduled_jobs import NotificationTriggers from zerver.models.streams import get_stream from zerver.tornado.event_queue import build_offline_notification -from zerver.worker import queue_processors -from zerver.worker.queue_processors import ( - FetchLinksEmbedData, - MissedMessageWorker, - QueueProcessingWorker, - get_active_worker_queues, -) +from zerver.worker import base as base_worker +from zerver.worker.email_mirror import MirrorWorker +from zerver.worker.email_senders import EmailSendingWorker +from zerver.worker.embed_links import FetchLinksEmbedData +from zerver.worker.invites import ConfirmationEmailWorker +from zerver.worker.missedmessage_emails import MissedMessageWorker +from zerver.worker.missedmessage_mobile_notifications import PushNotificationsWorker +from zerver.worker.user_activity import UserActivityWorker Event: TypeAlias = Dict[str, Any] @@ -75,7 +74,7 @@ class FakeClient: @contextmanager def simulated_queue_client(client: FakeClient) -> Iterator[None]: - with patch.object(queue_processors, "SimpleQueueClient", lambda *args, **kwargs: client): + with patch.object(base_worker, "SimpleQueueClient", lambda *args, **kwargs: client): yield @@ -135,7 +134,7 @@ class WorkerTest(ZulipTestCase): # Run the worker; this will produce a single upsert statement with simulated_queue_client(fake_client): - worker = queue_processors.UserActivityWorker() + worker = UserActivityWorker() worker.setup() with self.assert_database_query_count(1): worker.start() @@ -197,7 +196,7 @@ class WorkerTest(ZulipTestCase): ) # Run the worker again; this will insert one row and update the other with simulated_queue_client(fake_client): - worker = queue_processors.UserActivityWorker() + worker = UserActivityWorker() worker.setup() with self.assert_database_query_count(1): worker.start() @@ -504,14 +503,14 @@ class WorkerTest(ZulipTestCase): } with simulated_queue_client(fake_client): - worker = queue_processors.PushNotificationsWorker() + worker = PushNotificationsWorker() worker.setup() with patch( - "zerver.worker.queue_processors.handle_push_notification" + "zerver.worker.missedmessage_mobile_notifications.handle_push_notification" ) as mock_handle_new, patch( - "zerver.worker.queue_processors.handle_remove_push_notification" + "zerver.worker.missedmessage_mobile_notifications.handle_remove_push_notification" ) as mock_handle_remove, patch( - "zerver.worker.queue_processors.initialize_push_notifications" + "zerver.worker.missedmessage_mobile_notifications.initialize_push_notifications" ): event_new = generate_new_message_notification() event_remove = generate_remove_notification() @@ -525,13 +524,13 @@ class WorkerTest(ZulipTestCase): ) with patch( - "zerver.worker.queue_processors.handle_push_notification", + "zerver.worker.missedmessage_mobile_notifications.handle_push_notification", side_effect=PushNotificationBouncerRetryLaterError("test"), ) as mock_handle_new, patch( - "zerver.worker.queue_processors.handle_remove_push_notification", + "zerver.worker.missedmessage_mobile_notifications.handle_remove_push_notification", side_effect=PushNotificationBouncerRetryLaterError("test"), ) as mock_handle_remove, patch( - "zerver.worker.queue_processors.initialize_push_notifications" + "zerver.worker.missedmessage_mobile_notifications.initialize_push_notifications" ): event_new = generate_new_message_notification() event_remove = generate_remove_notification() @@ -540,19 +539,21 @@ class WorkerTest(ZulipTestCase): with mock_queue_publish( "zerver.lib.queue.queue_json_publish", side_effect=fake_publish - ), self.assertLogs("zerver.worker.queue_processors", "WARNING") as warn_logs: + ), self.assertLogs( + "zerver.worker.missedmessage_mobile_notifications", "WARNING" + ) as warn_logs: worker.start() self.assertEqual(mock_handle_new.call_count, 1 + MAX_REQUEST_RETRIES) self.assertEqual(mock_handle_remove.call_count, 1 + MAX_REQUEST_RETRIES) self.assertEqual( warn_logs.output, [ - "WARNING:zerver.worker.queue_processors:Maximum retries exceeded for trigger:1 event:push_notification", + "WARNING:zerver.worker.missedmessage_mobile_notifications:Maximum retries exceeded for trigger:1 event:push_notification", ] * 2, ) - @patch("zerver.worker.queue_processors.mirror_email") + @patch("zerver.worker.email_mirror.mirror_email") def test_mirror_worker(self, mock_mirror_email: MagicMock) -> None: fake_client = FakeClient() stream = get_stream("Denmark", get_realm("zulip")) @@ -568,13 +569,13 @@ class WorkerTest(ZulipTestCase): fake_client.enqueue("email_mirror", element) with simulated_queue_client(fake_client): - worker = queue_processors.MirrorWorker() + worker = MirrorWorker() worker.setup() worker.start() self.assertEqual(mock_mirror_email.call_count, 3) - @patch("zerver.worker.queue_processors.mirror_email") + @patch("zerver.worker.email_mirror.mirror_email") @override_settings(RATE_LIMITING_MIRROR_REALM_RULES=[(10, 2)]) def test_mirror_worker_rate_limiting(self, mock_mirror_email: MagicMock) -> None: fake_client = FakeClient() @@ -593,11 +594,11 @@ class WorkerTest(ZulipTestCase): fake_client.enqueue("email_mirror", element) with simulated_queue_client(fake_client), self.assertLogs( - "zerver.worker.queue_processors", level="WARNING" + "zerver.worker.email_mirror", level="WARNING" ) as warn_logs: start_time = time.time() with patch("time.time", return_value=start_time): - worker = queue_processors.MirrorWorker() + worker = MirrorWorker() worker.setup() worker.start() # Of the first 5 messages, only 2 should be processed @@ -645,7 +646,7 @@ class WorkerTest(ZulipTestCase): self.assertEqual( warn_logs.output, [ - "WARNING:zerver.worker.queue_processors:MirrorWorker: Rejecting an email from: None to realm: zulip - rate limited." + "WARNING:zerver.worker.email_mirror:MirrorWorker: Rejecting an email from: None to realm: zulip - rate limited." ] * 5, ) @@ -670,7 +671,7 @@ class WorkerTest(ZulipTestCase): fake_client.enqueue(queue_name, event) with simulated_queue_client(fake_client): - worker = queue_processors.EmailSendingWorker() + worker = EmailSendingWorker() worker.setup() with patch( "zerver.lib.send_email.build_email", side_effect=EmailNotDeliveredError @@ -715,10 +716,10 @@ class WorkerTest(ZulipTestCase): fake_client.enqueue("invites", element) with simulated_queue_client(fake_client): - worker = queue_processors.ConfirmationEmailWorker() + worker = ConfirmationEmailWorker() worker.setup() with patch("zerver.actions.user_settings.send_email"), patch( - "zerver.worker.queue_processors.send_future_email" + "zerver.worker.invites.send_future_email" ) as send_mock: worker.start() self.assertEqual(send_mock.call_count, 2) @@ -726,8 +727,8 @@ class WorkerTest(ZulipTestCase): def test_error_handling(self) -> None: processed = [] - @queue_processors.assign_queue("unreliable_worker", is_test_queue=True) - class UnreliableWorker(queue_processors.QueueProcessingWorker): + @base_worker.assign_queue("unreliable_worker", is_test_queue=True) + class UnreliableWorker(base_worker.QueueProcessingWorker): @override def consume(self, data: Mapping[str, Any]) -> None: if data["type"] == "unexpected behaviour": @@ -762,8 +763,8 @@ class WorkerTest(ZulipTestCase): processed = [] - @queue_processors.assign_queue("unreliable_loopworker", is_test_queue=True) - class UnreliableLoopWorker(queue_processors.LoopQueueProcessingWorker): + @base_worker.assign_queue("unreliable_loopworker", is_test_queue=True) + class UnreliableLoopWorker(base_worker.LoopQueueProcessingWorker): @override def consume_batch(self, events: List[Dict[str, Any]]) -> None: for event in events: @@ -802,8 +803,8 @@ class WorkerTest(ZulipTestCase): def test_timeouts(self) -> None: processed = [] - @queue_processors.assign_queue("timeout_worker", is_test_queue=True) - class TimeoutWorker(queue_processors.QueueProcessingWorker): + @base_worker.assign_queue("timeout_worker", is_test_queue=True) + class TimeoutWorker(base_worker.QueueProcessingWorker): MAX_CONSUME_SECONDS = 1 @override @@ -856,7 +857,7 @@ class WorkerTest(ZulipTestCase): assert_timeout(should_timeout=False, threaded=False, disable_timeout=True) def test_embed_links_timeout(self) -> None: - @queue_processors.assign_queue("timeout_worker", is_test_queue=True) + @base_worker.assign_queue("timeout_worker", is_test_queue=True) class TimeoutWorker(FetchLinksEmbedData): MAX_CONSUME_SECONDS = 1 @@ -887,7 +888,7 @@ class WorkerTest(ZulipTestCase): ) def test_worker_noname(self) -> None: - class TestWorker(queue_processors.QueueProcessingWorker): + class TestWorker(base_worker.QueueProcessingWorker): def __init__(self) -> None: super().__init__() @@ -895,23 +896,5 @@ class WorkerTest(ZulipTestCase): def consume(self, data: Mapping[str, Any]) -> None: pass # nocoverage # this is intentionally not called - with self.assertRaises(queue_processors.WorkerDeclarationError): + with self.assertRaises(base_worker.WorkerDeclarationError): TestWorker() - - def test_get_active_worker_queues(self) -> None: - # Find all recursive subclasses of QueueProcessingWorker - base_classes = [QueueProcessingWorker] - all_classes = [] - while base_classes: - new_subclasses = (base_class.__subclasses__() for base_class in base_classes) - base_classes = list(itertools.chain(*new_subclasses)) - all_classes += base_classes - worker_queue_names = { - queue_class.queue_name for queue_class in all_classes if not isabstract(queue_class) - } - - test_queue_names = set(get_active_worker_queues(only_test_queues=True)) - - # Verify that the set of active worker queues equals the set - # of subclasses without is_test_queue set. - self.assertEqual(set(get_active_worker_queues()), worker_queue_names - test_queue_names) diff --git a/zerver/tests/test_realm_export.py b/zerver/tests/test_realm_export.py index 3c758a7497..828fb8ac7e 100644 --- a/zerver/tests/test_realm_export.py +++ b/zerver/tests/test_realm_export.py @@ -256,7 +256,7 @@ class RealmExportTest(ZulipTestCase): info_logs.output, [ ( - "ERROR:zerver.worker.queue_processors:Marking export for realm zulip " + "ERROR:zerver.worker.deferred_work:Marking export for realm zulip " "as failed due to retry -- possible OOM during export?" ) ], diff --git a/zerver/worker/base.py b/zerver/worker/base.py new file mode 100644 index 0000000000..8b99f0a695 --- /dev/null +++ b/zerver/worker/base.py @@ -0,0 +1,319 @@ +# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html +import logging +import os +import signal +import time +from abc import ABC, abstractmethod +from collections import deque +from types import FrameType +from typing import Any, Callable, Dict, List, MutableSequence, Optional, Set, Tuple, Type, TypeVar + +import orjson +import sentry_sdk +from django.conf import settings +from django.db import connection +from returns.curry import partial +from typing_extensions import override + +from zerver.lib.context_managers import lockfile +from zerver.lib.db import reset_queries +from zerver.lib.per_request_cache import flush_per_request_caches +from zerver.lib.pysa import mark_sanitized +from zerver.lib.queue import SimpleQueueClient + +logger = logging.getLogger(__name__) + + +class WorkerTimeoutError(Exception): + def __init__(self, queue_name: str, limit: int, event_count: int) -> None: + self.queue_name = queue_name + self.limit = limit + self.event_count = event_count + + @override + def __str__(self) -> str: + return f"Timed out in {self.queue_name} after {self.limit * self.event_count} seconds processing {self.event_count} events" + + +class InterruptConsumeError(Exception): + """ + This exception is to be thrown inside event consume function + if the intention is to simply interrupt the processing + of the current event and normally continue the work of the queue. + """ + + +class WorkerDeclarationError(Exception): + pass + + +ConcreteQueueWorker = TypeVar("ConcreteQueueWorker", bound="QueueProcessingWorker") + + +def assign_queue( + queue_name: str, + enabled: bool = True, + is_test_queue: bool = False, +) -> Callable[[Type[ConcreteQueueWorker]], Type[ConcreteQueueWorker]]: + def decorate(clazz: Type[ConcreteQueueWorker]) -> Type[ConcreteQueueWorker]: + clazz.queue_name = queue_name + if enabled: + register_worker(queue_name, clazz, is_test_queue) + return clazz + + return decorate + + +worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {} +test_queues: Set[str] = set() + + +def register_worker( + queue_name: str, clazz: Type["QueueProcessingWorker"], is_test_queue: bool = False +) -> None: + worker_classes[queue_name] = clazz + if is_test_queue: + test_queues.add(queue_name) + + +def check_and_send_restart_signal() -> None: + try: + if not connection.is_usable(): + logging.warning("*** Sending self SIGUSR1 to trigger a restart.") + os.kill(os.getpid(), signal.SIGUSR1) + except Exception: + pass + + +class QueueProcessingWorker(ABC): + queue_name: str + MAX_CONSUME_SECONDS: Optional[int] = 30 + CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM = 50 + MAX_SECONDS_BEFORE_UPDATE_STATS = 30 + + # How many un-acknowledged events the worker should have on hand, + # fetched from the rabbitmq server. Larger values may be more + # performant, but if queues are large, cause more network IO at + # startup and steady-state memory. + PREFETCH = 100 + + def __init__(self, threaded: bool = False, disable_timeout: bool = False) -> None: + self.q: Optional[SimpleQueueClient] = None + self.threaded = threaded + self.disable_timeout = disable_timeout + if not hasattr(self, "queue_name"): + raise WorkerDeclarationError("Queue worker declared without queue_name") + + self.initialize_statistics() + + def initialize_statistics(self) -> None: + self.queue_last_emptied_timestamp = time.time() + self.consumed_since_last_emptied = 0 + self.recent_consume_times: MutableSequence[Tuple[int, float]] = deque(maxlen=50) + self.consume_iteration_counter = 0 + self.idle = True + self.last_statistics_update_time = 0.0 + + self.update_statistics() + + @sentry_sdk.trace + def update_statistics(self) -> None: + total_seconds = sum(seconds for _, seconds in self.recent_consume_times) + total_events = sum(events_number for events_number, _ in self.recent_consume_times) + if total_events == 0: + recent_average_consume_time = None + else: + recent_average_consume_time = total_seconds / total_events + stats_dict = dict( + update_time=time.time(), + recent_average_consume_time=recent_average_consume_time, + queue_last_emptied_timestamp=self.queue_last_emptied_timestamp, + consumed_since_last_emptied=self.consumed_since_last_emptied, + ) + + os.makedirs(settings.QUEUE_STATS_DIR, exist_ok=True) + + fname = f"{self.queue_name}.stats" + fn = os.path.join(settings.QUEUE_STATS_DIR, fname) + with lockfile(fn + ".lock"): + tmp_fn = fn + ".tmp" + with open(tmp_fn, "wb") as f: + f.write( + orjson.dumps(stats_dict, option=orjson.OPT_APPEND_NEWLINE | orjson.OPT_INDENT_2) + ) + os.rename(tmp_fn, fn) + self.last_statistics_update_time = time.time() + + def get_remaining_local_queue_size(self) -> int: + if self.q is not None: + return self.q.local_queue_size() + else: + # This is a special case that will happen if we're operating without + # using RabbitMQ (e.g. in tests). In that case there's no queuing to speak of + # and the only reasonable size to return is 0. + return 0 + + @abstractmethod + def consume(self, data: Dict[str, Any]) -> None: + pass + + def do_consume( + self, consume_func: Callable[[List[Dict[str, Any]]], None], events: List[Dict[str, Any]] + ) -> None: + consume_time_seconds: Optional[float] = None + with sentry_sdk.start_transaction( + op="task", + name=f"consume {self.queue_name}", + custom_sampling_context={"queue": self.queue_name}, + ): + sentry_sdk.add_breadcrumb( + type="debug", + category="queue_processor", + message=f"Consuming {self.queue_name}", + data={"events": events, "local_queue_size": self.get_remaining_local_queue_size()}, + ) + try: + if self.idle: + # We're reactivating after having gone idle due to emptying the queue. + # We should update the stats file to keep it fresh and to make it clear + # that the queue started processing, in case the event we're about to process + # makes us freeze. + self.idle = False + self.update_statistics() + + time_start = time.time() + if self.MAX_CONSUME_SECONDS and not self.threaded and not self.disable_timeout: + try: + signal.signal( + signal.SIGALRM, + partial(self.timer_expired, self.MAX_CONSUME_SECONDS, events), + ) + try: + signal.alarm(self.MAX_CONSUME_SECONDS * len(events)) + consume_func(events) + finally: + signal.alarm(0) + finally: + signal.signal(signal.SIGALRM, signal.SIG_DFL) + else: + consume_func(events) + consume_time_seconds = time.time() - time_start + self.consumed_since_last_emptied += len(events) + except Exception as e: + self._handle_consume_exception(events, e) + finally: + flush_per_request_caches() + reset_queries() + + with sentry_sdk.start_span(description="statistics"): + if consume_time_seconds is not None: + self.recent_consume_times.append((len(events), consume_time_seconds)) + + remaining_local_queue_size = self.get_remaining_local_queue_size() + if remaining_local_queue_size == 0: + self.queue_last_emptied_timestamp = time.time() + self.consumed_since_last_emptied = 0 + # We've cleared all the events from the queue, so we don't + # need to worry about the small overhead of doing a disk write. + # We take advantage of this to update the stats file to keep it fresh, + # especially since the queue might go idle until new events come in. + self.update_statistics() + self.idle = True + else: + self.consume_iteration_counter += 1 + if ( + self.consume_iteration_counter + >= self.CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM + or time.time() - self.last_statistics_update_time + >= self.MAX_SECONDS_BEFORE_UPDATE_STATS + ): + self.consume_iteration_counter = 0 + self.update_statistics() + + def consume_single_event(self, event: Dict[str, Any]) -> None: + consume_func = lambda events: self.consume(events[0]) + self.do_consume(consume_func, [event]) + + def timer_expired( + self, limit: int, events: List[Dict[str, Any]], signal: int, frame: Optional[FrameType] + ) -> None: + raise WorkerTimeoutError(self.queue_name, limit, len(events)) + + def _handle_consume_exception(self, events: List[Dict[str, Any]], exception: Exception) -> None: + if isinstance(exception, InterruptConsumeError): + # The exception signals that no further error handling + # is needed and the worker can proceed. + return + + with sentry_sdk.configure_scope() as scope: + scope.set_context( + "events", + { + "data": events, + "queue_name": self.queue_name, + }, + ) + if isinstance(exception, WorkerTimeoutError): + with sentry_sdk.push_scope() as scope: + scope.fingerprint = ["worker-timeout", self.queue_name] + logging.exception(exception, stack_info=True) + else: + logging.exception( + "Problem handling data on queue %s", self.queue_name, stack_info=True + ) + if not os.path.exists(settings.QUEUE_ERROR_DIR): + os.mkdir(settings.QUEUE_ERROR_DIR) # nocoverage + # Use 'mark_sanitized' to prevent Pysa from detecting this false positive + # flow. 'queue_name' is always a constant string. + fname = mark_sanitized(f"{self.queue_name}.errors") + fn = os.path.join(settings.QUEUE_ERROR_DIR, fname) + line = f"{time.asctime()}\t{orjson.dumps(events).decode()}\n" + lock_fn = fn + ".lock" + with lockfile(lock_fn): + with open(fn, "a") as f: + f.write(line) + check_and_send_restart_signal() + + def setup(self) -> None: + self.q = SimpleQueueClient(prefetch=self.PREFETCH) + + def start(self) -> None: + assert self.q is not None + self.initialize_statistics() + self.q.start_json_consumer( + self.queue_name, + lambda events: self.consume_single_event(events[0]), + ) + + def stop(self) -> None: # nocoverage + assert self.q is not None + self.q.stop_consuming() + + +class LoopQueueProcessingWorker(QueueProcessingWorker): + sleep_delay = 1 + batch_size = 100 + + @override + def setup(self) -> None: + self.q = SimpleQueueClient(prefetch=max(self.PREFETCH, self.batch_size)) + + @override + def start(self) -> None: # nocoverage + assert self.q is not None + self.initialize_statistics() + self.q.start_json_consumer( + self.queue_name, + lambda events: self.do_consume(self.consume_batch, events), + batch_size=self.batch_size, + timeout=self.sleep_delay, + ) + + @abstractmethod + def consume_batch(self, events: List[Dict[str, Any]]) -> None: + pass + + @override + def consume(self, event: Dict[str, Any]) -> None: + """In LoopQueueProcessingWorker, consume is used just for automated tests""" + self.consume_batch([event]) diff --git a/zerver/worker/deferred_work.py b/zerver/worker/deferred_work.py new file mode 100644 index 0000000000..752f5ee829 --- /dev/null +++ b/zerver/worker/deferred_work.py @@ -0,0 +1,228 @@ +# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html +import logging +import tempfile +import time +from typing import Any, Dict +from urllib.parse import urlsplit + +from django.conf import settings +from django.db import transaction +from django.db.models import F +from django.utils.timezone import now as timezone_now +from django.utils.translation import gettext as _ +from django.utils.translation import override as override_language +from typing_extensions import override + +from zerver.actions.message_flags import do_mark_stream_messages_as_read +from zerver.actions.message_send import internal_send_private_message +from zerver.actions.realm_export import notify_realm_export +from zerver.lib.export import export_realm_wrapper +from zerver.lib.push_notifications import clear_push_device_tokens +from zerver.lib.queue import queue_json_publish, retry_event +from zerver.lib.remote_server import ( + PushNotificationBouncerRetryLaterError, + send_server_data_to_push_bouncer, +) +from zerver.lib.soft_deactivation import reactivate_user_if_soft_deactivated +from zerver.lib.upload import handle_reupload_emojis_event +from zerver.models import Message, Realm, RealmAuditLog, Stream, UserMessage +from zerver.models.users import get_system_bot, get_user_profile_by_id +from zerver.worker.base import QueueProcessingWorker, assign_queue + +logger = logging.getLogger(__name__) + + +@assign_queue("deferred_work") +class DeferredWorker(QueueProcessingWorker): + """This queue processor is intended for cases where we want to trigger a + potentially expensive, not urgent, job to be run on a separate + thread from the Django worker that initiated it (E.g. so we that + can provide a low-latency HTTP response or avoid risk of request + timeouts for an operation that could in rare cases take minutes). + """ + + # Because these operations have no SLO, and can take minutes, + # remove any processing timeouts + MAX_CONSUME_SECONDS = None + + @override + def consume(self, event: Dict[str, Any]) -> None: + start = time.time() + if event["type"] == "mark_stream_messages_as_read": + user_profile = get_user_profile_by_id(event["user_profile_id"]) + logger.info( + "Marking messages as read for user %s, stream_recipient_ids %s", + user_profile.id, + event["stream_recipient_ids"], + ) + + for recipient_id in event["stream_recipient_ids"]: + count = do_mark_stream_messages_as_read(user_profile, recipient_id) + logger.info( + "Marked %s messages as read for user %s, stream_recipient_id %s", + count, + user_profile.id, + recipient_id, + ) + elif event["type"] == "mark_stream_messages_as_read_for_everyone": + logger.info( + "Marking messages as read for all users, stream_recipient_id %s", + event["stream_recipient_id"], + ) + stream = Stream.objects.get(recipient_id=event["stream_recipient_id"]) + # This event is generated by the stream deactivation code path. + batch_size = 50 + start_time = time.perf_counter() + min_id = event.get("min_id", 0) + total_messages = 0 + while True: + with transaction.atomic(savepoint=False): + messages = list( + Message.objects.filter( + # Uses index: zerver_message_realm_recipient_id + realm_id=stream.realm_id, + recipient_id=event["stream_recipient_id"], + id__gt=min_id, + ) + .order_by("id")[:batch_size] + .values_list("id", flat=True) + ) + UserMessage.select_for_update_query().filter(message__in=messages).extra( + where=[UserMessage.where_unread()] + ).update(flags=F("flags").bitor(UserMessage.flags.read)) + total_messages += len(messages) + if len(messages) < batch_size: + break + min_id = messages[-1] + if time.perf_counter() - start_time > 30: + # This task may take a _very_ long time to + # complete, if we have a large number of messages + # to mark as read. If we have taken more than + # 30s, we re-push the task onto the tail of the + # queue, to allow other deferred work to complete; + # this task is extremely low priority. + queue_json_publish("deferred_work", {**event, "min_id": min_id}) + break + logger.info( + "Marked %s messages as read for all users, stream_recipient_id %s", + total_messages, + event["stream_recipient_id"], + ) + elif event["type"] == "clear_push_device_tokens": + logger.info( + "Clearing push device tokens for user_profile_id %s", + event["user_profile_id"], + ) + try: + clear_push_device_tokens(event["user_profile_id"]) + except PushNotificationBouncerRetryLaterError: + + def failure_processor(event: Dict[str, Any]) -> None: + logger.warning( + "Maximum retries exceeded for trigger:%s event:clear_push_device_tokens", + event["user_profile_id"], + ) + + retry_event(self.queue_name, event, failure_processor) + elif event["type"] == "realm_export": + realm = Realm.objects.get(id=event["realm_id"]) + output_dir = tempfile.mkdtemp(prefix="zulip-export-") + export_event = RealmAuditLog.objects.get(id=event["id"]) + user_profile = get_user_profile_by_id(event["user_profile_id"]) + extra_data = export_event.extra_data + if extra_data.get("started_timestamp") is not None: + logger.error( + "Marking export for realm %s as failed due to retry -- possible OOM during export?", + realm.string_id, + ) + extra_data["failed_timestamp"] = timezone_now().timestamp() + export_event.extra_data = extra_data + export_event.save(update_fields=["extra_data"]) + notify_realm_export(user_profile) + return + + extra_data["started_timestamp"] = timezone_now().timestamp() + export_event.extra_data = extra_data + export_event.save(update_fields=["extra_data"]) + + logger.info( + "Starting realm export for realm %s into %s, initiated by user_profile_id %s", + realm.string_id, + output_dir, + event["user_profile_id"], + ) + + try: + public_url = export_realm_wrapper( + realm=realm, + output_dir=output_dir, + threads=1 if self.threaded else 6, + upload=True, + public_only=True, + ) + except Exception: + extra_data["failed_timestamp"] = timezone_now().timestamp() + export_event.extra_data = extra_data + export_event.save(update_fields=["extra_data"]) + logging.exception( + "Data export for %s failed after %s", + user_profile.realm.string_id, + time.time() - start, + stack_info=True, + ) + notify_realm_export(user_profile) + return + + assert public_url is not None + + # Update the extra_data field now that the export is complete. + extra_data["export_path"] = urlsplit(public_url).path + export_event.extra_data = extra_data + export_event.save(update_fields=["extra_data"]) + + # Send a direct message notification letting the user who + # triggered the export know the export finished. + with override_language(user_profile.default_language): + content = _( + "Your data export is complete. [View and download exports]({export_settings_link})." + ).format(export_settings_link="/#organization/data-exports-admin") + internal_send_private_message( + sender=get_system_bot(settings.NOTIFICATION_BOT, realm.id), + recipient_user=user_profile, + content=content, + ) + + # For future frontend use, also notify administrator + # clients that the export happened. + notify_realm_export(user_profile) + logging.info( + "Completed data export for %s in %s", + user_profile.realm.string_id, + time.time() - start, + ) + elif event["type"] == "reupload_realm_emoji": + # This is a special event queued by the migration for reuploading emojis. + # We don't want to run the necessary code in the actual migration, so it simply + # queues the necessary event, and the actual work is done here in the queue worker. + realm = Realm.objects.get(id=event["realm_id"]) + logger.info("Processing reupload_realm_emoji event for realm %s", realm.id) + handle_reupload_emojis_event(realm, logger) + elif event["type"] == "soft_reactivate": + logger.info( + "Starting soft reactivation for user_profile_id %s", + event["user_profile_id"], + ) + user_profile = get_user_profile_by_id(event["user_profile_id"]) + reactivate_user_if_soft_deactivated(user_profile) + elif event["type"] == "push_bouncer_update_for_realm": + # In the future we may use the realm_id to send only that single realm's info. + realm_id = event["realm_id"] + logger.info("Updating push bouncer with metadata on behalf of realm %s", realm_id) + send_server_data_to_push_bouncer(consider_usage_statistics=False) + + end = time.time() + logger.info( + "deferred_work processed %s event (%dms)", + event["type"], + (end - start) * 1000, + ) diff --git a/zerver/worker/digest_emails.py b/zerver/worker/digest_emails.py new file mode 100644 index 0000000000..8d3c2749b0 --- /dev/null +++ b/zerver/worker/digest_emails.py @@ -0,0 +1,24 @@ +# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html +import logging +from typing import Any, Mapping + +from typing_extensions import override + +from zerver.lib.digest import bulk_handle_digest_email +from zerver.worker.base import QueueProcessingWorker, assign_queue + +logger = logging.getLogger(__name__) + + +@assign_queue("digest_emails") +class DigestWorker(QueueProcessingWorker): # nocoverage + # Who gets a digest is entirely determined by the enqueue_digest_emails + # management command, not here. + @override + def consume(self, event: Mapping[str, Any]) -> None: + if "user_ids" in event: + user_ids = event["user_ids"] + else: + # legacy code may have enqueued a single id + user_ids = [event["user_profile_id"]] + bulk_handle_digest_email(user_ids, event["cutoff"]) diff --git a/zerver/worker/email_mirror.py b/zerver/worker/email_mirror.py new file mode 100644 index 0000000000..94da59e5bc --- /dev/null +++ b/zerver/worker/email_mirror.py @@ -0,0 +1,47 @@ +# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html +import base64 +import email +import email.policy +import logging +from email.message import EmailMessage +from typing import Any, Mapping + +from typing_extensions import override + +from zerver.lib.email_mirror import ( + decode_stream_email_address, + is_missed_message_address, + rate_limit_mirror_by_realm, +) +from zerver.lib.email_mirror import process_message as mirror_email +from zerver.lib.exceptions import RateLimitedError +from zerver.worker.base import QueueProcessingWorker, assign_queue + +logger = logging.getLogger(__name__) + + +@assign_queue("email_mirror") +class MirrorWorker(QueueProcessingWorker): + @override + def consume(self, event: Mapping[str, Any]) -> None: + rcpt_to = event["rcpt_to"] + msg = email.message_from_bytes( + base64.b64decode(event["msg_base64"]), + policy=email.policy.default, + ) + assert isinstance(msg, EmailMessage) # https://github.com/python/typeshed/issues/2417 + if not is_missed_message_address(rcpt_to): + # Missed message addresses are one-time use, so we don't need + # to worry about emails to them resulting in message spam. + recipient_realm = decode_stream_email_address(rcpt_to)[0].realm + try: + rate_limit_mirror_by_realm(recipient_realm) + except RateLimitedError: + logger.warning( + "MirrorWorker: Rejecting an email from: %s to realm: %s - rate limited.", + msg["From"], + recipient_realm.subdomain, + ) + return + + mirror_email(msg, rcpt_to=rcpt_to) diff --git a/zerver/worker/email_senders.py b/zerver/worker/email_senders.py new file mode 100644 index 0000000000..37b740c0bf --- /dev/null +++ b/zerver/worker/email_senders.py @@ -0,0 +1,75 @@ +# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html +import copy +import logging +import socket +from functools import wraps +from typing import Any, Callable, Dict, List, Optional + +from django.core.mail.backends.base import BaseEmailBackend +from typing_extensions import override + +from zerver.lib.queue import retry_event +from zerver.lib.send_email import ( + EmailNotDeliveredError, + handle_send_email_format_changes, + initialize_connection, + send_email, +) +from zerver.worker.base import ConcreteQueueWorker, LoopQueueProcessingWorker, assign_queue + +logger = logging.getLogger(__name__) + + +# If you change the function on which this decorator is used be careful that the new +# function doesn't delete the "failed_tries" attribute of "data" which is needed for +# "retry_event" to work correctly; see EmailSendingWorker for an example with deepcopy. +def retry_send_email_failures( + func: Callable[[ConcreteQueueWorker, Dict[str, Any]], None], +) -> Callable[[ConcreteQueueWorker, Dict[str, Any]], None]: + @wraps(func) + def wrapper(worker: ConcreteQueueWorker, data: Dict[str, Any]) -> None: + try: + func(worker, data) + except (socket.gaierror, TimeoutError, EmailNotDeliveredError) as e: + error_class_name = type(e).__name__ + + def on_failure(event: Dict[str, Any]) -> None: + logging.exception( + "Event %r failed due to exception %s", event, error_class_name, stack_info=True + ) + + retry_event(worker.queue_name, data, on_failure) + + return wrapper + + +@assign_queue("email_senders") +class EmailSendingWorker(LoopQueueProcessingWorker): + def __init__(self, threaded: bool = False, disable_timeout: bool = False) -> None: + super().__init__(threaded, disable_timeout) + self.connection: Optional[BaseEmailBackend] = None + + @retry_send_email_failures + def send_email(self, event: Dict[str, Any]) -> None: + # Copy the event, so that we don't pass the `failed_tries' + # data to send_email (which neither takes that + # argument nor needs that data). + copied_event = copy.deepcopy(event) + if "failed_tries" in copied_event: + del copied_event["failed_tries"] + handle_send_email_format_changes(copied_event) + self.connection = initialize_connection(self.connection) + send_email(**copied_event, connection=self.connection) + + @override + def consume_batch(self, events: List[Dict[str, Any]]) -> None: + for event in events: + self.send_email(event) + + @override + def stop(self) -> None: + try: + if self.connection is not None: + self.connection.close() + finally: + super().stop() diff --git a/zerver/worker/embed_links.py b/zerver/worker/embed_links.py new file mode 100644 index 0000000000..c4f9321feb --- /dev/null +++ b/zerver/worker/embed_links.py @@ -0,0 +1,74 @@ +# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html +import logging +import time +from types import FrameType +from typing import Any, Dict, List, Mapping, Optional + +from django.db import transaction +from typing_extensions import override + +from zerver.actions.message_edit import do_update_embedded_data +from zerver.actions.message_send import render_incoming_message +from zerver.lib.url_preview import preview as url_preview +from zerver.lib.url_preview.types import UrlEmbedData +from zerver.models import Message, Realm +from zerver.worker.base import InterruptConsumeError, QueueProcessingWorker, assign_queue + +logger = logging.getLogger(__name__) + + +@assign_queue("embed_links") +class FetchLinksEmbedData(QueueProcessingWorker): + # This is a slow queue with network requests, so a disk write is negligible. + # Update stats file after every consume call. + CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM = 1 + + @override + def consume(self, event: Mapping[str, Any]) -> None: + url_embed_data: Dict[str, Optional[UrlEmbedData]] = {} + for url in event["urls"]: + start_time = time.time() + url_embed_data[url] = url_preview.get_link_embed_data(url) + logging.info( + "Time spent on get_link_embed_data for %s: %s", url, time.time() - start_time + ) + + with transaction.atomic(): + try: + message = Message.objects.select_for_update().get(id=event["message_id"]) + except Message.DoesNotExist: + # Message may have been deleted + return + + # If the message changed, we will run this task after updating the message + # in zerver.actions.message_edit.check_update_message + if message.content != event["message_content"]: + return + + # Fetch the realm whose settings we're using for rendering + realm = Realm.objects.get(id=event["message_realm_id"]) + + # If rendering fails, the called code will raise a JsonableError. + rendering_result = render_incoming_message( + message, + message.content, + realm, + url_embed_data=url_embed_data, + ) + do_update_embedded_data(message.sender, message, message.content, rendering_result) + + @override + def timer_expired( + self, limit: int, events: List[Dict[str, Any]], signal: int, frame: Optional[FrameType] + ) -> None: + assert len(events) == 1 + event = events[0] + + logging.warning( + "Timed out in %s after %s seconds while fetching URLs for message %s: %s", + self.queue_name, + limit, + event["message_id"], + event["urls"], + ) + raise InterruptConsumeError diff --git a/zerver/worker/embedded_bots.py b/zerver/worker/embedded_bots.py new file mode 100644 index 0000000000..e2a1a37ff7 --- /dev/null +++ b/zerver/worker/embedded_bots.py @@ -0,0 +1,54 @@ +# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html +import logging +from typing import Any, Dict, Mapping + +from typing_extensions import override +from zulip_bots.lib import extract_query_without_mention + +from zerver.lib.bot_lib import EmbeddedBotHandler, EmbeddedBotQuitError, get_bot_handler +from zerver.models import UserProfile +from zerver.models.bots import get_bot_services +from zerver.models.users import get_user_profile_by_id +from zerver.worker.base import QueueProcessingWorker, assign_queue + +logger = logging.getLogger(__name__) + + +@assign_queue("embedded_bots") +class EmbeddedBotWorker(QueueProcessingWorker): + def get_bot_api_client(self, user_profile: UserProfile) -> EmbeddedBotHandler: + return EmbeddedBotHandler(user_profile) + + @override + def consume(self, event: Mapping[str, Any]) -> None: + user_profile_id = event["user_profile_id"] + user_profile = get_user_profile_by_id(user_profile_id) + + message: Dict[str, Any] = event["message"] + + # TODO: Do we actually want to allow multiple Services per bot user? + services = get_bot_services(user_profile_id) + for service in services: + bot_handler = get_bot_handler(str(service.name)) + if bot_handler is None: + logging.error( + "Error: User %s has bot with invalid embedded bot service %s", + user_profile_id, + service.name, + ) + continue + try: + if hasattr(bot_handler, "initialize"): + bot_handler.initialize(self.get_bot_api_client(user_profile)) + if event["trigger"] == "mention": + message["content"] = extract_query_without_mention( + message=message, + client=self.get_bot_api_client(user_profile), + ) + assert message["content"] is not None + bot_handler.handle_message( + message=message, + bot_handler=self.get_bot_api_client(user_profile), + ) + except EmbeddedBotQuitError as e: + logging.warning("%s", e) diff --git a/zerver/worker/invites.py b/zerver/worker/invites.py new file mode 100644 index 0000000000..c26de9ff31 --- /dev/null +++ b/zerver/worker/invites.py @@ -0,0 +1,70 @@ +# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html +import logging +from datetime import timedelta +from typing import Any, Mapping + +from typing_extensions import override + +from zerver.actions.invites import do_send_confirmation_email +from zerver.context_processors import common_context +from zerver.lib.send_email import FromAddress, send_future_email +from zerver.models import PreregistrationUser +from zerver.models.prereg_users import filter_to_valid_prereg_users +from zerver.models.users import get_user_profile_by_id +from zerver.worker.base import QueueProcessingWorker, assign_queue + +logger = logging.getLogger(__name__) + + +@assign_queue("invites") +class ConfirmationEmailWorker(QueueProcessingWorker): + @override + def consume(self, data: Mapping[str, Any]) -> None: + if "invite_expires_in_days" in data: + invite_expires_in_minutes = data["invite_expires_in_days"] * 24 * 60 + elif "invite_expires_in_minutes" in data: + invite_expires_in_minutes = data["invite_expires_in_minutes"] + invitee = filter_to_valid_prereg_users( + PreregistrationUser.objects.filter(id=data["prereg_id"]), invite_expires_in_minutes + ).first() + if invitee is None: + # The invitation could have been revoked + return + + referrer = get_user_profile_by_id(data["referrer_id"]) + logger.info( + "Sending invitation for realm %s to %s", referrer.realm.string_id, invitee.email + ) + if "email_language" in data: + email_language = data["email_language"] + else: + email_language = referrer.realm.default_language + + activate_url = do_send_confirmation_email( + invitee, referrer, email_language, invite_expires_in_minutes + ) + if invite_expires_in_minutes is None: + # We do not queue reminder email for never expiring + # invitations. This is probably a low importance bug; it + # would likely be more natural to send a reminder after 7 + # days. + return + + # queue invitation reminder + if invite_expires_in_minutes >= 4 * 24 * 60: + context = common_context(referrer) + context.update( + activate_url=activate_url, + referrer_name=referrer.full_name, + referrer_email=referrer.delivery_email, + referrer_realm_name=referrer.realm.name, + ) + send_future_email( + "zerver/emails/invitation_reminder", + referrer.realm, + to_emails=[invitee.email], + from_address=FromAddress.tokenized_no_reply_placeholder, + language=email_language, + context=context, + delay=timedelta(minutes=invite_expires_in_minutes - (2 * 24 * 60)), + ) diff --git a/zerver/worker/missedmessage_emails.py b/zerver/worker/missedmessage_emails.py new file mode 100644 index 0000000000..9ad011d6a7 --- /dev/null +++ b/zerver/worker/missedmessage_emails.py @@ -0,0 +1,247 @@ +# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html +import logging +import threading +from collections import defaultdict +from datetime import timedelta +from typing import Any, Dict, Optional + +import sentry_sdk +from django.db import transaction +from django.db.utils import IntegrityError +from django.utils.timezone import now as timezone_now +from typing_extensions import override + +from zerver.lib.db import reset_queries +from zerver.lib.email_notifications import MissedMessageData, handle_missedmessage_emails +from zerver.lib.per_request_cache import flush_per_request_caches +from zerver.models import ScheduledMessageNotificationEmail +from zerver.models.users import get_user_profile_by_id +from zerver.worker.base import QueueProcessingWorker, assign_queue + +logger = logging.getLogger(__name__) + + +@assign_queue("missedmessage_emails") +class MissedMessageWorker(QueueProcessingWorker): + # Aggregate all messages received over the last several seconds + # (configurable by each recipient) to let someone finish sending a + # batch of messages and/or editing them before they are sent out + # as emails to recipients. + # + # The batch interval is best-effort -- we poll at most every + # CHECK_FREQUENCY_SECONDS, to avoid excessive activity. + CHECK_FREQUENCY_SECONDS = 5 + + worker_thread: Optional[threading.Thread] = None + + # This condition variable mediates the stopping and has_timeout + # pieces of state, below it. + cv = threading.Condition() + stopping = False + has_timeout = False + + # The main thread, which handles the RabbitMQ connection and creates + # database rows from them. + @override + @sentry_sdk.trace + def consume(self, event: Dict[str, Any]) -> None: + logging.debug("Processing missedmessage_emails event: %s", event) + # When we consume an event, check if there are existing pending emails + # for that user, and if so use the same scheduled timestamp. + + user_profile_id: int = event["user_profile_id"] + user_profile = get_user_profile_by_id(user_profile_id) + batch_duration_seconds = user_profile.email_notifications_batching_period_seconds + batch_duration = timedelta(seconds=batch_duration_seconds) + + try: + pending_email = ScheduledMessageNotificationEmail.objects.filter( + user_profile_id=user_profile_id + )[0] + scheduled_timestamp = pending_email.scheduled_timestamp + except IndexError: + scheduled_timestamp = timezone_now() + batch_duration + + with self.cv: + # We now hold the lock, so there are three places the + # worker thread can be: + # + # 1. In maybe_send_batched_emails, and will have to take + # the lock (and thus block insertions of new rows + # here) to decide if there are any rows and if it thus + # needs a timeout. + # + # 2. In the cv.wait_for with a timeout because there were + # rows already. There's nothing for us to do, since + # the newly-inserted row will get checked upon that + # timeout. + # + # 3. In the cv.wait_for without a timeout, because there + # weren't any rows (which we're about to change). + # + # Notifying in (1) is irrelevant, since the thread is not + # waiting. If we over-notify by doing so for both (2) and + # (3), the behaviour is correct but slightly inefficient, + # as the thread will be needlessly awoken and will just + # re-wait. However, if we fail to awake case (3), the + # worker thread will never wake up, and the + # ScheduledMessageNotificationEmail internal queue will + # back up. + # + # Use the self.has_timeout property (which is protected by + # the lock) to determine which of cases (2) or (3) we are + # in, and as such if we need to notify after making the + # row. + try: + ScheduledMessageNotificationEmail.objects.create( + user_profile_id=user_profile_id, + message_id=event["message_id"], + trigger=event["trigger"], + scheduled_timestamp=scheduled_timestamp, + mentioned_user_group_id=event.get("mentioned_user_group_id"), + ) + if not self.has_timeout: + self.cv.notify() + except IntegrityError: + logging.debug( + "ScheduledMessageNotificationEmail row could not be created. The message may have been deleted. Skipping event." + ) + + @override + def start(self) -> None: + with self.cv: + self.stopping = False + self.worker_thread = threading.Thread(target=self.work) + self.worker_thread.start() + super().start() + + def work(self) -> None: + while True: + with sentry_sdk.start_transaction( + op="task", + name=f"{self.queue_name} worker thread", + custom_sampling_context={"queue": self.queue_name}, + ): + flush_per_request_caches() + reset_queries() + try: + finished = self.background_loop() + if finished: + break + except Exception: + logging.exception( + "Exception in MissedMessage background worker; restarting the loop", + stack_info=True, + ) + + def background_loop(self) -> bool: + with self.cv: + if self.stopping: + return True + # There are three conditions which we wait for: + # + # 1. We are being explicitly asked to stop; see the + # notify() call in stop() + # + # 2. We have no ScheduledMessageNotificationEmail + # objects currently (has_timeout = False) and the + # first one was just enqueued; see the notify() + # call in consume(). We break out so that we can + # come back around the loop and re-wait with a + # timeout (see next condition). + # + # 3. One or more ScheduledMessageNotificationEmail + # exist in the database, so we need to re-check + # them regularly; this happens by hitting the + # timeout and calling maybe_send_batched_emails(). + # There is no explicit notify() for this. + timeout: Optional[int] = None + if ScheduledMessageNotificationEmail.objects.exists(): + timeout = self.CHECK_FREQUENCY_SECONDS + self.has_timeout = timeout is not None + + def wait_condition() -> bool: + if self.stopping: + # Condition (1) + return True + if timeout is None: + # Condition (2). We went to sleep with no + # ScheduledMessageNotificationEmail existing, + # and one has just been made. We re-check + # that is still true now that we have the + # lock, and if we see it, we stop waiting. + return ScheduledMessageNotificationEmail.objects.exists() + # This should only happen at the start or end of + # the wait, when we haven't been notified, but are + # re-checking the condition. + return False + + with sentry_sdk.start_span(description="condvar wait") as span: + span.set_data("timeout", timeout) + was_notified = self.cv.wait_for(wait_condition, timeout=timeout) + span.set_data("was_notified", was_notified) + + # Being notified means that we are in conditions (1) or + # (2), above. In neither case do we need to look at if + # there are batches to send -- (2) means that the + # ScheduledMessageNotificationEmail was _just_ created, so + # there is no need to check it now. + if not was_notified: + self.maybe_send_batched_emails() + + return False + + @sentry_sdk.trace + def maybe_send_batched_emails(self) -> None: + current_time = timezone_now() + + with transaction.atomic(): + events_to_process = ScheduledMessageNotificationEmail.objects.filter( + scheduled_timestamp__lte=current_time + ).select_for_update() + + # Batch the entries by user + events_by_recipient: Dict[int, Dict[int, MissedMessageData]] = defaultdict(dict) + for event in events_to_process: + events_by_recipient[event.user_profile_id][event.message_id] = MissedMessageData( + trigger=event.trigger, mentioned_user_group_id=event.mentioned_user_group_id + ) + + for user_profile_id in events_by_recipient: + events = events_by_recipient[user_profile_id] + + logging.info( + "Batch-processing %s missedmessage_emails events for user %s", + len(events), + user_profile_id, + ) + with sentry_sdk.start_span( + description="sending missedmessage_emails to user" + ) as span: + span.set_data("user_profile_id", user_profile_id) + span.set_data("event_count", len(events)) + try: + # Because we process events in batches, an + # escaped exception here would lead to + # duplicate messages being sent for other + # users in the same events_to_process batch, + # and no guarantee of forward progress. + handle_missedmessage_emails(user_profile_id, events) + except Exception: + logging.exception( + "Failed to process %d missedmessage_emails for user %s", + len(events), + user_profile_id, + stack_info=True, + ) + + events_to_process.delete() + + @override + def stop(self) -> None: + with self.cv: + self.stopping = True + self.cv.notify() + if self.worker_thread is not None: + self.worker_thread.join() + super().stop() diff --git a/zerver/worker/missedmessage_mobile_notifications.py b/zerver/worker/missedmessage_mobile_notifications.py new file mode 100644 index 0000000000..5d04fad740 --- /dev/null +++ b/zerver/worker/missedmessage_mobile_notifications.py @@ -0,0 +1,50 @@ +# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html +import logging +from typing import Any, Dict + +from typing_extensions import override + +from zerver.lib.push_notifications import ( + handle_push_notification, + handle_remove_push_notification, + initialize_push_notifications, +) +from zerver.lib.queue import retry_event +from zerver.lib.remote_server import PushNotificationBouncerRetryLaterError +from zerver.worker.base import QueueProcessingWorker, assign_queue + +logger = logging.getLogger(__name__) + + +@assign_queue("missedmessage_mobile_notifications") +class PushNotificationsWorker(QueueProcessingWorker): + # The use of aioapns in the backend means that we cannot use + # SIGALRM to limit how long a consume takes, as SIGALRM does not + # play well with asyncio. + MAX_CONSUME_SECONDS = None + + @override + def start(self) -> None: + # initialize_push_notifications doesn't strictly do anything + # beyond printing some logging warnings if push notifications + # are not available in the current configuration. + initialize_push_notifications() + super().start() + + @override + def consume(self, event: Dict[str, Any]) -> None: + try: + if event.get("type", "add") == "remove": + message_ids = event["message_ids"] + handle_remove_push_notification(event["user_profile_id"], message_ids) + else: + handle_push_notification(event["user_profile_id"], event) + except PushNotificationBouncerRetryLaterError: + + def failure_processor(event: Dict[str, Any]) -> None: + logger.warning( + "Maximum retries exceeded for trigger:%s event:push_notification", + event["user_profile_id"], + ) + + retry_event(self.queue_name, event, failure_processor) diff --git a/zerver/worker/outgoing_webhooks.py b/zerver/worker/outgoing_webhooks.py new file mode 100644 index 0000000000..4100123c42 --- /dev/null +++ b/zerver/worker/outgoing_webhooks.py @@ -0,0 +1,25 @@ +# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html +import logging +from typing import Any, Dict + +from typing_extensions import override + +from zerver.lib.outgoing_webhook import do_rest_call, get_outgoing_webhook_service_handler +from zerver.models.bots import get_bot_services +from zerver.worker.base import QueueProcessingWorker, assign_queue + +logger = logging.getLogger(__name__) + + +@assign_queue("outgoing_webhooks") +class OutgoingWebhookWorker(QueueProcessingWorker): + @override + def consume(self, event: Dict[str, Any]) -> None: + message = event["message"] + event["command"] = message["content"] + + services = get_bot_services(event["user_profile_id"]) + for service in services: + event["service_name"] = str(service.name) + service_handler = get_outgoing_webhook_service_handler(service) + do_rest_call(service.base_url, event, service_handler) diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 3c0ba26469..28b177697e 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -1,1307 +1,31 @@ # Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html -import base64 -import copy -import email -import email.policy -import logging -import os -import signal -import socket -import tempfile -import threading -import time -from abc import ABC, abstractmethod -from collections import defaultdict, deque -from datetime import timedelta -from email.message import EmailMessage -from functools import wraps -from types import FrameType -from typing import ( - Any, - Callable, - Dict, - List, - Mapping, - MutableSequence, - Optional, - Sequence, - Set, - Tuple, - Type, - TypeVar, -) -from urllib.parse import urlsplit +import importlib +import pkgutil +from typing import List -import orjson -import sentry_sdk -from django.conf import settings -from django.core.mail.backends.base import BaseEmailBackend -from django.db import connection, transaction -from django.db.models import F -from django.db.utils import IntegrityError -from django.utils.timezone import now as timezone_now -from django.utils.translation import gettext as _ -from django.utils.translation import override as override_language -from psycopg2.sql import SQL, Literal -from returns.curry import partial -from typing_extensions import override -from zulip_bots.lib import extract_query_without_mention - -from zerver.actions.invites import do_send_confirmation_email -from zerver.actions.message_edit import do_update_embedded_data -from zerver.actions.message_flags import do_mark_stream_messages_as_read -from zerver.actions.message_send import internal_send_private_message, render_incoming_message -from zerver.actions.presence import do_update_user_presence -from zerver.actions.realm_export import notify_realm_export -from zerver.actions.user_activity import do_update_user_activity_interval -from zerver.context_processors import common_context -from zerver.lib.bot_lib import EmbeddedBotHandler, EmbeddedBotQuitError, get_bot_handler -from zerver.lib.context_managers import lockfile -from zerver.lib.db import reset_queries -from zerver.lib.digest import bulk_handle_digest_email -from zerver.lib.email_mirror import ( - decode_stream_email_address, - is_missed_message_address, - rate_limit_mirror_by_realm, -) -from zerver.lib.email_mirror import process_message as mirror_email -from zerver.lib.email_notifications import MissedMessageData, handle_missedmessage_emails -from zerver.lib.exceptions import RateLimitedError -from zerver.lib.export import export_realm_wrapper -from zerver.lib.outgoing_webhook import do_rest_call, get_outgoing_webhook_service_handler -from zerver.lib.per_request_cache import flush_per_request_caches -from zerver.lib.push_notifications import ( - clear_push_device_tokens, - handle_push_notification, - handle_remove_push_notification, - initialize_push_notifications, -) -from zerver.lib.pysa import mark_sanitized -from zerver.lib.queue import SimpleQueueClient, queue_json_publish, retry_event -from zerver.lib.remote_server import ( - PushNotificationBouncerRetryLaterError, - send_server_data_to_push_bouncer, -) -from zerver.lib.send_email import ( - EmailNotDeliveredError, - FromAddress, - handle_send_email_format_changes, - initialize_connection, - send_email, - send_future_email, -) -from zerver.lib.soft_deactivation import reactivate_user_if_soft_deactivated -from zerver.lib.timestamp import timestamp_to_datetime -from zerver.lib.upload import handle_reupload_emojis_event -from zerver.lib.url_preview import preview as url_preview -from zerver.lib.url_preview.types import UrlEmbedData -from zerver.models import ( - Message, - PreregistrationUser, - Realm, - RealmAuditLog, - ScheduledMessageNotificationEmail, - Stream, - UserMessage, - UserProfile, -) -from zerver.models.bots import get_bot_services -from zerver.models.clients import get_client -from zerver.models.prereg_users import filter_to_valid_prereg_users -from zerver.models.users import get_system_bot, get_user_profile_by_id - -logger = logging.getLogger(__name__) - - -class WorkerTimeoutError(Exception): - def __init__(self, queue_name: str, limit: int, event_count: int) -> None: - self.queue_name = queue_name - self.limit = limit - self.event_count = event_count - - @override - def __str__(self) -> str: - return f"Timed out in {self.queue_name} after {self.limit * self.event_count} seconds processing {self.event_count} events" - - -class InterruptConsumeError(Exception): - """ - This exception is to be thrown inside event consume function - if the intention is to simply interrupt the processing - of the current event and normally continue the work of the queue. - """ - - -class WorkerDeclarationError(Exception): - pass - - -ConcreteQueueWorker = TypeVar("ConcreteQueueWorker", bound="QueueProcessingWorker") - - -def assign_queue( - queue_name: str, - enabled: bool = True, - is_test_queue: bool = False, -) -> Callable[[Type[ConcreteQueueWorker]], Type[ConcreteQueueWorker]]: - def decorate(clazz: Type[ConcreteQueueWorker]) -> Type[ConcreteQueueWorker]: - clazz.queue_name = queue_name - if enabled: - register_worker(queue_name, clazz, is_test_queue) - return clazz - - return decorate - - -worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {} -test_queues: Set[str] = set() - - -def register_worker( - queue_name: str, clazz: Type["QueueProcessingWorker"], is_test_queue: bool = False -) -> None: - worker_classes[queue_name] = clazz - if is_test_queue: - test_queues.add(queue_name) +import zerver.worker +from zerver.worker.base import QueueProcessingWorker, test_queues, worker_classes def get_worker( queue_name: str, threaded: bool = False, disable_timeout: bool = False -) -> "QueueProcessingWorker": +) -> QueueProcessingWorker: + if queue_name in {"test", "noop", "noop_batch"}: + import_module = "zerver.worker.test" + else: + import_module = f"zerver.worker.{queue_name}" + + importlib.import_module(import_module) return worker_classes[queue_name](threaded=threaded, disable_timeout=disable_timeout) def get_active_worker_queues(only_test_queues: bool = False) -> List[str]: """Returns all (either test, or real) worker queues.""" + for module_info in pkgutil.iter_modules(zerver.worker.__path__, "zerver.worker."): + importlib.import_module(module_info.name) + return [ queue_name for queue_name in worker_classes if bool(queue_name in test_queues) == only_test_queues ] - - -def check_and_send_restart_signal() -> None: - try: - if not connection.is_usable(): - logging.warning("*** Sending self SIGUSR1 to trigger a restart.") - os.kill(os.getpid(), signal.SIGUSR1) - except Exception: - pass - - -# If you change the function on which this decorator is used be careful that the new -# function doesn't delete the "failed_tries" attribute of "data" which is needed for -# "retry_event" to work correctly; see EmailSendingWorker for an example with deepcopy. -def retry_send_email_failures( - func: Callable[[ConcreteQueueWorker, Dict[str, Any]], None], -) -> Callable[[ConcreteQueueWorker, Dict[str, Any]], None]: - @wraps(func) - def wrapper(worker: ConcreteQueueWorker, data: Dict[str, Any]) -> None: - try: - func(worker, data) - except (socket.gaierror, TimeoutError, EmailNotDeliveredError) as e: - error_class_name = type(e).__name__ - - def on_failure(event: Dict[str, Any]) -> None: - logging.exception( - "Event %r failed due to exception %s", event, error_class_name, stack_info=True - ) - - retry_event(worker.queue_name, data, on_failure) - - return wrapper - - -class QueueProcessingWorker(ABC): - queue_name: str - MAX_CONSUME_SECONDS: Optional[int] = 30 - CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM = 50 - MAX_SECONDS_BEFORE_UPDATE_STATS = 30 - - # How many un-acknowledged events the worker should have on hand, - # fetched from the rabbitmq server. Larger values may be more - # performant, but if queues are large, cause more network IO at - # startup and steady-state memory. - PREFETCH = 100 - - def __init__(self, threaded: bool = False, disable_timeout: bool = False) -> None: - self.q: Optional[SimpleQueueClient] = None - self.threaded = threaded - self.disable_timeout = disable_timeout - if not hasattr(self, "queue_name"): - raise WorkerDeclarationError("Queue worker declared without queue_name") - - self.initialize_statistics() - - def initialize_statistics(self) -> None: - self.queue_last_emptied_timestamp = time.time() - self.consumed_since_last_emptied = 0 - self.recent_consume_times: MutableSequence[Tuple[int, float]] = deque(maxlen=50) - self.consume_iteration_counter = 0 - self.idle = True - self.last_statistics_update_time = 0.0 - - self.update_statistics() - - @sentry_sdk.trace - def update_statistics(self) -> None: - total_seconds = sum(seconds for _, seconds in self.recent_consume_times) - total_events = sum(events_number for events_number, _ in self.recent_consume_times) - if total_events == 0: - recent_average_consume_time = None - else: - recent_average_consume_time = total_seconds / total_events - stats_dict = dict( - update_time=time.time(), - recent_average_consume_time=recent_average_consume_time, - queue_last_emptied_timestamp=self.queue_last_emptied_timestamp, - consumed_since_last_emptied=self.consumed_since_last_emptied, - ) - - os.makedirs(settings.QUEUE_STATS_DIR, exist_ok=True) - - fname = f"{self.queue_name}.stats" - fn = os.path.join(settings.QUEUE_STATS_DIR, fname) - with lockfile(fn + ".lock"): - tmp_fn = fn + ".tmp" - with open(tmp_fn, "wb") as f: - f.write( - orjson.dumps(stats_dict, option=orjson.OPT_APPEND_NEWLINE | orjson.OPT_INDENT_2) - ) - os.rename(tmp_fn, fn) - self.last_statistics_update_time = time.time() - - def get_remaining_local_queue_size(self) -> int: - if self.q is not None: - return self.q.local_queue_size() - else: - # This is a special case that will happen if we're operating without - # using RabbitMQ (e.g. in tests). In that case there's no queuing to speak of - # and the only reasonable size to return is 0. - return 0 - - @abstractmethod - def consume(self, data: Dict[str, Any]) -> None: - pass - - def do_consume( - self, consume_func: Callable[[List[Dict[str, Any]]], None], events: List[Dict[str, Any]] - ) -> None: - consume_time_seconds: Optional[float] = None - with sentry_sdk.start_transaction( - op="task", - name=f"consume {self.queue_name}", - custom_sampling_context={"queue": self.queue_name}, - ): - sentry_sdk.add_breadcrumb( - type="debug", - category="queue_processor", - message=f"Consuming {self.queue_name}", - data={"events": events, "local_queue_size": self.get_remaining_local_queue_size()}, - ) - try: - if self.idle: - # We're reactivating after having gone idle due to emptying the queue. - # We should update the stats file to keep it fresh and to make it clear - # that the queue started processing, in case the event we're about to process - # makes us freeze. - self.idle = False - self.update_statistics() - - time_start = time.time() - if self.MAX_CONSUME_SECONDS and not self.threaded and not self.disable_timeout: - try: - signal.signal( - signal.SIGALRM, - partial(self.timer_expired, self.MAX_CONSUME_SECONDS, events), - ) - try: - signal.alarm(self.MAX_CONSUME_SECONDS * len(events)) - consume_func(events) - finally: - signal.alarm(0) - finally: - signal.signal(signal.SIGALRM, signal.SIG_DFL) - else: - consume_func(events) - consume_time_seconds = time.time() - time_start - self.consumed_since_last_emptied += len(events) - except Exception as e: - self._handle_consume_exception(events, e) - finally: - flush_per_request_caches() - reset_queries() - - with sentry_sdk.start_span(description="statistics"): - if consume_time_seconds is not None: - self.recent_consume_times.append((len(events), consume_time_seconds)) - - remaining_local_queue_size = self.get_remaining_local_queue_size() - if remaining_local_queue_size == 0: - self.queue_last_emptied_timestamp = time.time() - self.consumed_since_last_emptied = 0 - # We've cleared all the events from the queue, so we don't - # need to worry about the small overhead of doing a disk write. - # We take advantage of this to update the stats file to keep it fresh, - # especially since the queue might go idle until new events come in. - self.update_statistics() - self.idle = True - else: - self.consume_iteration_counter += 1 - if ( - self.consume_iteration_counter - >= self.CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM - or time.time() - self.last_statistics_update_time - >= self.MAX_SECONDS_BEFORE_UPDATE_STATS - ): - self.consume_iteration_counter = 0 - self.update_statistics() - - def consume_single_event(self, event: Dict[str, Any]) -> None: - consume_func = lambda events: self.consume(events[0]) - self.do_consume(consume_func, [event]) - - def timer_expired( - self, limit: int, events: List[Dict[str, Any]], signal: int, frame: Optional[FrameType] - ) -> None: - raise WorkerTimeoutError(self.queue_name, limit, len(events)) - - def _handle_consume_exception(self, events: List[Dict[str, Any]], exception: Exception) -> None: - if isinstance(exception, InterruptConsumeError): - # The exception signals that no further error handling - # is needed and the worker can proceed. - return - - with sentry_sdk.configure_scope() as scope: - scope.set_context( - "events", - { - "data": events, - "queue_name": self.queue_name, - }, - ) - if isinstance(exception, WorkerTimeoutError): - with sentry_sdk.push_scope() as scope: - scope.fingerprint = ["worker-timeout", self.queue_name] - logging.exception(exception, stack_info=True) - else: - logging.exception( - "Problem handling data on queue %s", self.queue_name, stack_info=True - ) - if not os.path.exists(settings.QUEUE_ERROR_DIR): - os.mkdir(settings.QUEUE_ERROR_DIR) # nocoverage - # Use 'mark_sanitized' to prevent Pysa from detecting this false positive - # flow. 'queue_name' is always a constant string. - fname = mark_sanitized(f"{self.queue_name}.errors") - fn = os.path.join(settings.QUEUE_ERROR_DIR, fname) - line = f"{time.asctime()}\t{orjson.dumps(events).decode()}\n" - lock_fn = fn + ".lock" - with lockfile(lock_fn): - with open(fn, "a") as f: - f.write(line) - check_and_send_restart_signal() - - def setup(self) -> None: - self.q = SimpleQueueClient(prefetch=self.PREFETCH) - - def start(self) -> None: - assert self.q is not None - self.initialize_statistics() - self.q.start_json_consumer( - self.queue_name, - lambda events: self.consume_single_event(events[0]), - ) - - def stop(self) -> None: # nocoverage - assert self.q is not None - self.q.stop_consuming() - - -class LoopQueueProcessingWorker(QueueProcessingWorker): - sleep_delay = 1 - batch_size = 100 - - @override - def setup(self) -> None: - self.q = SimpleQueueClient(prefetch=max(self.PREFETCH, self.batch_size)) - - @override - def start(self) -> None: # nocoverage - assert self.q is not None - self.initialize_statistics() - self.q.start_json_consumer( - self.queue_name, - lambda events: self.do_consume(self.consume_batch, events), - batch_size=self.batch_size, - timeout=self.sleep_delay, - ) - - @abstractmethod - def consume_batch(self, events: List[Dict[str, Any]]) -> None: - pass - - @override - def consume(self, event: Dict[str, Any]) -> None: - """In LoopQueueProcessingWorker, consume is used just for automated tests""" - self.consume_batch([event]) - - -@assign_queue("invites") -class ConfirmationEmailWorker(QueueProcessingWorker): - @override - def consume(self, data: Mapping[str, Any]) -> None: - if "invite_expires_in_days" in data: - invite_expires_in_minutes = data["invite_expires_in_days"] * 24 * 60 - elif "invite_expires_in_minutes" in data: - invite_expires_in_minutes = data["invite_expires_in_minutes"] - invitee = filter_to_valid_prereg_users( - PreregistrationUser.objects.filter(id=data["prereg_id"]), invite_expires_in_minutes - ).first() - if invitee is None: - # The invitation could have been revoked - return - - referrer = get_user_profile_by_id(data["referrer_id"]) - logger.info( - "Sending invitation for realm %s to %s", referrer.realm.string_id, invitee.email - ) - if "email_language" in data: - email_language = data["email_language"] - else: - email_language = referrer.realm.default_language - - activate_url = do_send_confirmation_email( - invitee, referrer, email_language, invite_expires_in_minutes - ) - if invite_expires_in_minutes is None: - # We do not queue reminder email for never expiring - # invitations. This is probably a low importance bug; it - # would likely be more natural to send a reminder after 7 - # days. - return - - # queue invitation reminder - if invite_expires_in_minutes >= 4 * 24 * 60: - context = common_context(referrer) - context.update( - activate_url=activate_url, - referrer_name=referrer.full_name, - referrer_email=referrer.delivery_email, - referrer_realm_name=referrer.realm.name, - ) - send_future_email( - "zerver/emails/invitation_reminder", - referrer.realm, - to_emails=[invitee.email], - from_address=FromAddress.tokenized_no_reply_placeholder, - language=email_language, - context=context, - delay=timedelta(minutes=invite_expires_in_minutes - (2 * 24 * 60)), - ) - - -@assign_queue("user_activity") -class UserActivityWorker(LoopQueueProcessingWorker): - """The UserActivity queue is perhaps our highest-traffic queue, and - requires some care to ensure it performs adequately. - - We use a LoopQueueProcessingWorker as a performance optimization - for managing the queue. The structure of UserActivity records is - such that they are easily deduplicated before being sent to the - database; we take advantage of that to make this queue highly - effective at dealing with a backlog containing many similar - events. Such a backlog happen in a few ways: - - * In abuse/DoS situations, if a client is sending huge numbers of - similar requests to the server. - * If the queue ends up with several minutes of backlog e.g. due to - downtime of the queue processor, many clients will have several - common events from doing an action multiple times. - - """ - - client_id_map: Dict[str, int] = {} - - @override - def start(self) -> None: - # For our unit tests to make sense, we need to clear this on startup. - self.client_id_map = {} - super().start() - - @override - def consume_batch(self, user_activity_events: List[Dict[str, Any]]) -> None: - uncommitted_events: Dict[Tuple[int, int, str], Tuple[int, float]] = {} - - # First, we drain the queue of all user_activity events and - # deduplicate them for insertion into the database. - for event in user_activity_events: - user_profile_id = event["user_profile_id"] - client_id = event["client_id"] - - key_tuple = (user_profile_id, client_id, event["query"]) - if key_tuple not in uncommitted_events: - uncommitted_events[key_tuple] = (1, event["time"]) - else: - count, event_time = uncommitted_events[key_tuple] - uncommitted_events[key_tuple] = (count + 1, max(event_time, event["time"])) - - rows = [] - for key_tuple, value_tuple in uncommitted_events.items(): - user_profile_id, client_id, query = key_tuple - count, event_time = value_tuple - rows.append( - SQL("({},{},{},{},to_timestamp({}))").format( - Literal(user_profile_id), - Literal(client_id), - Literal(query), - Literal(count), - Literal(event_time), - ) - ) - - # Perform a single bulk UPSERT for all of the rows - sql_query = SQL( - """ - INSERT INTO zerver_useractivity(user_profile_id, client_id, query, count, last_visit) - VALUES {rows} - ON CONFLICT (user_profile_id, client_id, query) DO UPDATE SET - count = zerver_useractivity.count + excluded.count, - last_visit = greatest(zerver_useractivity.last_visit, excluded.last_visit) - """ - ).format(rows=SQL(", ").join(rows)) - with connection.cursor() as cursor: - cursor.execute(sql_query) - - -@assign_queue("user_activity_interval") -class UserActivityIntervalWorker(QueueProcessingWorker): - @override - def consume(self, event: Mapping[str, Any]) -> None: - user_profile = get_user_profile_by_id(event["user_profile_id"]) - log_time = timestamp_to_datetime(event["time"]) - do_update_user_activity_interval(user_profile, log_time) - - -@assign_queue("user_presence") -class UserPresenceWorker(QueueProcessingWorker): - @override - def consume(self, event: Mapping[str, Any]) -> None: - logging.debug("Received presence event: %s", event) - user_profile = get_user_profile_by_id(event["user_profile_id"]) - client = get_client(event["client"]) - log_time = timestamp_to_datetime(event["time"]) - status = event["status"] - do_update_user_presence(user_profile, client, log_time, status) - - -@assign_queue("missedmessage_emails") -class MissedMessageWorker(QueueProcessingWorker): - # Aggregate all messages received over the last several seconds - # (configurable by each recipient) to let someone finish sending a - # batch of messages and/or editing them before they are sent out - # as emails to recipients. - # - # The batch interval is best-effort -- we poll at most every - # CHECK_FREQUENCY_SECONDS, to avoid excessive activity. - CHECK_FREQUENCY_SECONDS = 5 - - worker_thread: Optional[threading.Thread] = None - - # This condition variable mediates the stopping and has_timeout - # pieces of state, below it. - cv = threading.Condition() - stopping = False - has_timeout = False - - # The main thread, which handles the RabbitMQ connection and creates - # database rows from them. - @override - @sentry_sdk.trace - def consume(self, event: Dict[str, Any]) -> None: - logging.debug("Processing missedmessage_emails event: %s", event) - # When we consume an event, check if there are existing pending emails - # for that user, and if so use the same scheduled timestamp. - - user_profile_id: int = event["user_profile_id"] - user_profile = get_user_profile_by_id(user_profile_id) - batch_duration_seconds = user_profile.email_notifications_batching_period_seconds - batch_duration = timedelta(seconds=batch_duration_seconds) - - try: - pending_email = ScheduledMessageNotificationEmail.objects.filter( - user_profile_id=user_profile_id - )[0] - scheduled_timestamp = pending_email.scheduled_timestamp - except IndexError: - scheduled_timestamp = timezone_now() + batch_duration - - with self.cv: - # We now hold the lock, so there are three places the - # worker thread can be: - # - # 1. In maybe_send_batched_emails, and will have to take - # the lock (and thus block insertions of new rows - # here) to decide if there are any rows and if it thus - # needs a timeout. - # - # 2. In the cv.wait_for with a timeout because there were - # rows already. There's nothing for us to do, since - # the newly-inserted row will get checked upon that - # timeout. - # - # 3. In the cv.wait_for without a timeout, because there - # weren't any rows (which we're about to change). - # - # Notifying in (1) is irrelevant, since the thread is not - # waiting. If we over-notify by doing so for both (2) and - # (3), the behaviour is correct but slightly inefficient, - # as the thread will be needlessly awoken and will just - # re-wait. However, if we fail to awake case (3), the - # worker thread will never wake up, and the - # ScheduledMessageNotificationEmail internal queue will - # back up. - # - # Use the self.has_timeout property (which is protected by - # the lock) to determine which of cases (2) or (3) we are - # in, and as such if we need to notify after making the - # row. - try: - ScheduledMessageNotificationEmail.objects.create( - user_profile_id=user_profile_id, - message_id=event["message_id"], - trigger=event["trigger"], - scheduled_timestamp=scheduled_timestamp, - mentioned_user_group_id=event.get("mentioned_user_group_id"), - ) - if not self.has_timeout: - self.cv.notify() - except IntegrityError: - logging.debug( - "ScheduledMessageNotificationEmail row could not be created. The message may have been deleted. Skipping event." - ) - - @override - def start(self) -> None: - with self.cv: - self.stopping = False - self.worker_thread = threading.Thread(target=self.work) - self.worker_thread.start() - super().start() - - def work(self) -> None: - while True: - with sentry_sdk.start_transaction( - op="task", - name=f"{self.queue_name} worker thread", - custom_sampling_context={"queue": self.queue_name}, - ): - flush_per_request_caches() - reset_queries() - try: - finished = self.background_loop() - if finished: - break - except Exception: - logging.exception( - "Exception in MissedMessage background worker; restarting the loop", - stack_info=True, - ) - - def background_loop(self) -> bool: - with self.cv: - if self.stopping: - return True - # There are three conditions which we wait for: - # - # 1. We are being explicitly asked to stop; see the - # notify() call in stop() - # - # 2. We have no ScheduledMessageNotificationEmail - # objects currently (has_timeout = False) and the - # first one was just enqueued; see the notify() - # call in consume(). We break out so that we can - # come back around the loop and re-wait with a - # timeout (see next condition). - # - # 3. One or more ScheduledMessageNotificationEmail - # exist in the database, so we need to re-check - # them regularly; this happens by hitting the - # timeout and calling maybe_send_batched_emails(). - # There is no explicit notify() for this. - timeout: Optional[int] = None - if ScheduledMessageNotificationEmail.objects.exists(): - timeout = self.CHECK_FREQUENCY_SECONDS - self.has_timeout = timeout is not None - - def wait_condition() -> bool: - if self.stopping: - # Condition (1) - return True - if timeout is None: - # Condition (2). We went to sleep with no - # ScheduledMessageNotificationEmail existing, - # and one has just been made. We re-check - # that is still true now that we have the - # lock, and if we see it, we stop waiting. - return ScheduledMessageNotificationEmail.objects.exists() - # This should only happen at the start or end of - # the wait, when we haven't been notified, but are - # re-checking the condition. - return False - - with sentry_sdk.start_span(description="condvar wait") as span: - span.set_data("timeout", timeout) - was_notified = self.cv.wait_for(wait_condition, timeout=timeout) - span.set_data("was_notified", was_notified) - - # Being notified means that we are in conditions (1) or - # (2), above. In neither case do we need to look at if - # there are batches to send -- (2) means that the - # ScheduledMessageNotificationEmail was _just_ created, so - # there is no need to check it now. - if not was_notified: - self.maybe_send_batched_emails() - - return False - - @sentry_sdk.trace - def maybe_send_batched_emails(self) -> None: - current_time = timezone_now() - - with transaction.atomic(): - events_to_process = ScheduledMessageNotificationEmail.objects.filter( - scheduled_timestamp__lte=current_time - ).select_for_update() - - # Batch the entries by user - events_by_recipient: Dict[int, Dict[int, MissedMessageData]] = defaultdict(dict) - for event in events_to_process: - events_by_recipient[event.user_profile_id][event.message_id] = MissedMessageData( - trigger=event.trigger, mentioned_user_group_id=event.mentioned_user_group_id - ) - - for user_profile_id in events_by_recipient: - events = events_by_recipient[user_profile_id] - - logging.info( - "Batch-processing %s missedmessage_emails events for user %s", - len(events), - user_profile_id, - ) - with sentry_sdk.start_span( - description="sending missedmessage_emails to user" - ) as span: - span.set_data("user_profile_id", user_profile_id) - span.set_data("event_count", len(events)) - try: - # Because we process events in batches, an - # escaped exception here would lead to - # duplicate messages being sent for other - # users in the same events_to_process batch, - # and no guarantee of forward progress. - handle_missedmessage_emails(user_profile_id, events) - except Exception: - logging.exception( - "Failed to process %d missedmessage_emails for user %s", - len(events), - user_profile_id, - stack_info=True, - ) - - events_to_process.delete() - - @override - def stop(self) -> None: - with self.cv: - self.stopping = True - self.cv.notify() - if self.worker_thread is not None: - self.worker_thread.join() - super().stop() - - -@assign_queue("email_senders") -class EmailSendingWorker(LoopQueueProcessingWorker): - def __init__(self, threaded: bool = False, disable_timeout: bool = False) -> None: - super().__init__(threaded, disable_timeout) - self.connection: Optional[BaseEmailBackend] = None - - @retry_send_email_failures - def send_email(self, event: Dict[str, Any]) -> None: - # Copy the event, so that we don't pass the `failed_tries' - # data to send_email (which neither takes that - # argument nor needs that data). - copied_event = copy.deepcopy(event) - if "failed_tries" in copied_event: - del copied_event["failed_tries"] - handle_send_email_format_changes(copied_event) - self.connection = initialize_connection(self.connection) - send_email(**copied_event, connection=self.connection) - - @override - def consume_batch(self, events: List[Dict[str, Any]]) -> None: - for event in events: - self.send_email(event) - - @override - def stop(self) -> None: - try: - if self.connection is not None: - self.connection.close() - finally: - super().stop() - - -@assign_queue("missedmessage_mobile_notifications") -class PushNotificationsWorker(QueueProcessingWorker): - # The use of aioapns in the backend means that we cannot use - # SIGALRM to limit how long a consume takes, as SIGALRM does not - # play well with asyncio. - MAX_CONSUME_SECONDS = None - - @override - def start(self) -> None: - # initialize_push_notifications doesn't strictly do anything - # beyond printing some logging warnings if push notifications - # are not available in the current configuration. - initialize_push_notifications() - super().start() - - @override - def consume(self, event: Dict[str, Any]) -> None: - try: - if event.get("type", "add") == "remove": - message_ids = event["message_ids"] - handle_remove_push_notification(event["user_profile_id"], message_ids) - else: - handle_push_notification(event["user_profile_id"], event) - except PushNotificationBouncerRetryLaterError: - - def failure_processor(event: Dict[str, Any]) -> None: - logger.warning( - "Maximum retries exceeded for trigger:%s event:push_notification", - event["user_profile_id"], - ) - - retry_event(self.queue_name, event, failure_processor) - - -@assign_queue("digest_emails") -class DigestWorker(QueueProcessingWorker): # nocoverage - # Who gets a digest is entirely determined by the enqueue_digest_emails - # management command, not here. - @override - def consume(self, event: Mapping[str, Any]) -> None: - if "user_ids" in event: - user_ids = event["user_ids"] - else: - # legacy code may have enqueued a single id - user_ids = [event["user_profile_id"]] - bulk_handle_digest_email(user_ids, event["cutoff"]) - - -@assign_queue("email_mirror") -class MirrorWorker(QueueProcessingWorker): - @override - def consume(self, event: Mapping[str, Any]) -> None: - rcpt_to = event["rcpt_to"] - msg = email.message_from_bytes( - base64.b64decode(event["msg_base64"]), - policy=email.policy.default, - ) - assert isinstance(msg, EmailMessage) # https://github.com/python/typeshed/issues/2417 - if not is_missed_message_address(rcpt_to): - # Missed message addresses are one-time use, so we don't need - # to worry about emails to them resulting in message spam. - recipient_realm = decode_stream_email_address(rcpt_to)[0].realm - try: - rate_limit_mirror_by_realm(recipient_realm) - except RateLimitedError: - logger.warning( - "MirrorWorker: Rejecting an email from: %s to realm: %s - rate limited.", - msg["From"], - recipient_realm.subdomain, - ) - return - - mirror_email(msg, rcpt_to=rcpt_to) - - -@assign_queue("embed_links") -class FetchLinksEmbedData(QueueProcessingWorker): - # This is a slow queue with network requests, so a disk write is negligible. - # Update stats file after every consume call. - CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM = 1 - - @override - def consume(self, event: Mapping[str, Any]) -> None: - url_embed_data: Dict[str, Optional[UrlEmbedData]] = {} - for url in event["urls"]: - start_time = time.time() - url_embed_data[url] = url_preview.get_link_embed_data(url) - logging.info( - "Time spent on get_link_embed_data for %s: %s", url, time.time() - start_time - ) - - with transaction.atomic(): - try: - message = Message.objects.select_for_update().get(id=event["message_id"]) - except Message.DoesNotExist: - # Message may have been deleted - return - - # If the message changed, we will run this task after updating the message - # in zerver.actions.message_edit.check_update_message - if message.content != event["message_content"]: - return - - # Fetch the realm whose settings we're using for rendering - realm = Realm.objects.get(id=event["message_realm_id"]) - - # If rendering fails, the called code will raise a JsonableError. - rendering_result = render_incoming_message( - message, - message.content, - realm, - url_embed_data=url_embed_data, - ) - do_update_embedded_data(message.sender, message, message.content, rendering_result) - - @override - def timer_expired( - self, limit: int, events: List[Dict[str, Any]], signal: int, frame: Optional[FrameType] - ) -> None: - assert len(events) == 1 - event = events[0] - - logging.warning( - "Timed out in %s after %s seconds while fetching URLs for message %s: %s", - self.queue_name, - limit, - event["message_id"], - event["urls"], - ) - raise InterruptConsumeError - - -@assign_queue("outgoing_webhooks") -class OutgoingWebhookWorker(QueueProcessingWorker): - @override - def consume(self, event: Dict[str, Any]) -> None: - message = event["message"] - event["command"] = message["content"] - - services = get_bot_services(event["user_profile_id"]) - for service in services: - event["service_name"] = str(service.name) - service_handler = get_outgoing_webhook_service_handler(service) - do_rest_call(service.base_url, event, service_handler) - - -@assign_queue("embedded_bots") -class EmbeddedBotWorker(QueueProcessingWorker): - def get_bot_api_client(self, user_profile: UserProfile) -> EmbeddedBotHandler: - return EmbeddedBotHandler(user_profile) - - @override - def consume(self, event: Mapping[str, Any]) -> None: - user_profile_id = event["user_profile_id"] - user_profile = get_user_profile_by_id(user_profile_id) - - message: Dict[str, Any] = event["message"] - - # TODO: Do we actually want to allow multiple Services per bot user? - services = get_bot_services(user_profile_id) - for service in services: - bot_handler = get_bot_handler(str(service.name)) - if bot_handler is None: - logging.error( - "Error: User %s has bot with invalid embedded bot service %s", - user_profile_id, - service.name, - ) - continue - try: - if hasattr(bot_handler, "initialize"): - bot_handler.initialize(self.get_bot_api_client(user_profile)) - if event["trigger"] == "mention": - message["content"] = extract_query_without_mention( - message=message, - client=self.get_bot_api_client(user_profile), - ) - assert message["content"] is not None - bot_handler.handle_message( - message=message, - bot_handler=self.get_bot_api_client(user_profile), - ) - except EmbeddedBotQuitError as e: - logging.warning("%s", e) - - -@assign_queue("deferred_work") -class DeferredWorker(QueueProcessingWorker): - """This queue processor is intended for cases where we want to trigger a - potentially expensive, not urgent, job to be run on a separate - thread from the Django worker that initiated it (E.g. so we that - can provide a low-latency HTTP response or avoid risk of request - timeouts for an operation that could in rare cases take minutes). - """ - - # Because these operations have no SLO, and can take minutes, - # remove any processing timeouts - MAX_CONSUME_SECONDS = None - - @override - def consume(self, event: Dict[str, Any]) -> None: - start = time.time() - if event["type"] == "mark_stream_messages_as_read": - user_profile = get_user_profile_by_id(event["user_profile_id"]) - logger.info( - "Marking messages as read for user %s, stream_recipient_ids %s", - user_profile.id, - event["stream_recipient_ids"], - ) - - for recipient_id in event["stream_recipient_ids"]: - count = do_mark_stream_messages_as_read(user_profile, recipient_id) - logger.info( - "Marked %s messages as read for user %s, stream_recipient_id %s", - count, - user_profile.id, - recipient_id, - ) - elif event["type"] == "mark_stream_messages_as_read_for_everyone": - logger.info( - "Marking messages as read for all users, stream_recipient_id %s", - event["stream_recipient_id"], - ) - stream = Stream.objects.get(recipient_id=event["stream_recipient_id"]) - # This event is generated by the stream deactivation code path. - batch_size = 50 - start_time = time.perf_counter() - min_id = event.get("min_id", 0) - total_messages = 0 - while True: - with transaction.atomic(savepoint=False): - messages = list( - Message.objects.filter( - # Uses index: zerver_message_realm_recipient_id - realm_id=stream.realm_id, - recipient_id=event["stream_recipient_id"], - id__gt=min_id, - ) - .order_by("id")[:batch_size] - .values_list("id", flat=True) - ) - UserMessage.select_for_update_query().filter(message__in=messages).extra( - where=[UserMessage.where_unread()] - ).update(flags=F("flags").bitor(UserMessage.flags.read)) - total_messages += len(messages) - if len(messages) < batch_size: - break - min_id = messages[-1] - if time.perf_counter() - start_time > 30: - # This task may take a _very_ long time to - # complete, if we have a large number of messages - # to mark as read. If we have taken more than - # 30s, we re-push the task onto the tail of the - # queue, to allow other deferred work to complete; - # this task is extremely low priority. - queue_json_publish("deferred_work", {**event, "min_id": min_id}) - break - logger.info( - "Marked %s messages as read for all users, stream_recipient_id %s", - total_messages, - event["stream_recipient_id"], - ) - elif event["type"] == "clear_push_device_tokens": - logger.info( - "Clearing push device tokens for user_profile_id %s", - event["user_profile_id"], - ) - try: - clear_push_device_tokens(event["user_profile_id"]) - except PushNotificationBouncerRetryLaterError: - - def failure_processor(event: Dict[str, Any]) -> None: - logger.warning( - "Maximum retries exceeded for trigger:%s event:clear_push_device_tokens", - event["user_profile_id"], - ) - - retry_event(self.queue_name, event, failure_processor) - elif event["type"] == "realm_export": - realm = Realm.objects.get(id=event["realm_id"]) - output_dir = tempfile.mkdtemp(prefix="zulip-export-") - export_event = RealmAuditLog.objects.get(id=event["id"]) - user_profile = get_user_profile_by_id(event["user_profile_id"]) - extra_data = export_event.extra_data - if extra_data.get("started_timestamp") is not None: - logger.error( - "Marking export for realm %s as failed due to retry -- possible OOM during export?", - realm.string_id, - ) - extra_data["failed_timestamp"] = timezone_now().timestamp() - export_event.extra_data = extra_data - export_event.save(update_fields=["extra_data"]) - notify_realm_export(user_profile) - return - - extra_data["started_timestamp"] = timezone_now().timestamp() - export_event.extra_data = extra_data - export_event.save(update_fields=["extra_data"]) - - logger.info( - "Starting realm export for realm %s into %s, initiated by user_profile_id %s", - realm.string_id, - output_dir, - event["user_profile_id"], - ) - - try: - public_url = export_realm_wrapper( - realm=realm, - output_dir=output_dir, - threads=1 if self.threaded else 6, - upload=True, - public_only=True, - ) - except Exception: - extra_data["failed_timestamp"] = timezone_now().timestamp() - export_event.extra_data = extra_data - export_event.save(update_fields=["extra_data"]) - logging.exception( - "Data export for %s failed after %s", - user_profile.realm.string_id, - time.time() - start, - stack_info=True, - ) - notify_realm_export(user_profile) - return - - assert public_url is not None - - # Update the extra_data field now that the export is complete. - extra_data["export_path"] = urlsplit(public_url).path - export_event.extra_data = extra_data - export_event.save(update_fields=["extra_data"]) - - # Send a direct message notification letting the user who - # triggered the export know the export finished. - with override_language(user_profile.default_language): - content = _( - "Your data export is complete. [View and download exports]({export_settings_link})." - ).format(export_settings_link="/#organization/data-exports-admin") - internal_send_private_message( - sender=get_system_bot(settings.NOTIFICATION_BOT, realm.id), - recipient_user=user_profile, - content=content, - ) - - # For future frontend use, also notify administrator - # clients that the export happened. - notify_realm_export(user_profile) - logging.info( - "Completed data export for %s in %s", - user_profile.realm.string_id, - time.time() - start, - ) - elif event["type"] == "reupload_realm_emoji": - # This is a special event queued by the migration for reuploading emojis. - # We don't want to run the necessary code in the actual migration, so it simply - # queues the necessary event, and the actual work is done here in the queue worker. - realm = Realm.objects.get(id=event["realm_id"]) - logger.info("Processing reupload_realm_emoji event for realm %s", realm.id) - handle_reupload_emojis_event(realm, logger) - elif event["type"] == "soft_reactivate": - logger.info( - "Starting soft reactivation for user_profile_id %s", - event["user_profile_id"], - ) - user_profile = get_user_profile_by_id(event["user_profile_id"]) - reactivate_user_if_soft_deactivated(user_profile) - elif event["type"] == "push_bouncer_update_for_realm": - # In the future we may use the realm_id to send only that single realm's info. - realm_id = event["realm_id"] - logger.info("Updating push bouncer with metadata on behalf of realm %s", realm_id) - send_server_data_to_push_bouncer(consider_usage_statistics=False) - - end = time.time() - logger.info( - "deferred_work processed %s event (%dms)", - event["type"], - (end - start) * 1000, - ) - - -@assign_queue("test", is_test_queue=True) -class TestWorker(QueueProcessingWorker): - # This worker allows you to test the queue worker infrastructure without - # creating significant side effects. It can be useful in development or - # for troubleshooting prod/staging. It pulls a message off the test queue - # and appends it to a file in /var/log/zulip. - @override - def consume(self, event: Mapping[str, Any]) -> None: # nocoverage - fn = settings.ZULIP_WORKER_TEST_FILE - message = orjson.dumps(event) - logging.info("TestWorker should append this message to %s: %s", fn, message.decode()) - with open(fn, "ab") as f: - f.write(message + b"\n") - - -@assign_queue("noop", is_test_queue=True) -class NoopWorker(QueueProcessingWorker): - """Used to profile the queue processing framework, in zilencer's queue_rate.""" - - def __init__( - self, - threaded: bool = False, - disable_timeout: bool = False, - max_consume: int = 1000, - slow_queries: Sequence[int] = [], - ) -> None: - super().__init__(threaded, disable_timeout) - self.consumed = 0 - self.max_consume = max_consume - self.slow_queries: Set[int] = set(slow_queries) - - @override - def consume(self, event: Mapping[str, Any]) -> None: - self.consumed += 1 - if self.consumed in self.slow_queries: - logging.info("Slow request...") - time.sleep(60) - logging.info("Done!") - if self.consumed >= self.max_consume: - self.stop() - - -@assign_queue("noop_batch", is_test_queue=True) -class BatchNoopWorker(LoopQueueProcessingWorker): - """Used to profile the queue processing framework, in zilencer's queue_rate.""" - - batch_size = 100 - - def __init__( - self, - threaded: bool = False, - disable_timeout: bool = False, - max_consume: int = 1000, - slow_queries: Sequence[int] = [], - ) -> None: - super().__init__(threaded, disable_timeout) - self.consumed = 0 - self.max_consume = max_consume - self.slow_queries: Set[int] = set(slow_queries) - - @override - def consume_batch(self, events: List[Dict[str, Any]]) -> None: - event_numbers = set(range(self.consumed + 1, self.consumed + 1 + len(events))) - found_slow = self.slow_queries & event_numbers - if found_slow: - logging.info("%d slow requests...", len(found_slow)) - time.sleep(60 * len(found_slow)) - logging.info("Done!") - self.consumed += len(events) - if self.consumed >= self.max_consume: - self.stop() diff --git a/zerver/worker/test.py b/zerver/worker/test.py new file mode 100644 index 0000000000..4f8cfa37b9 --- /dev/null +++ b/zerver/worker/test.py @@ -0,0 +1,85 @@ +# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html +import logging +import time +from typing import Any, Dict, List, Mapping, Sequence, Set + +import orjson +from django.conf import settings +from typing_extensions import override + +from zerver.worker.base import LoopQueueProcessingWorker, QueueProcessingWorker, assign_queue + +logger = logging.getLogger(__name__) + + +@assign_queue("test", is_test_queue=True) +class TestWorker(QueueProcessingWorker): + # This worker allows you to test the queue worker infrastructure without + # creating significant side effects. It can be useful in development or + # for troubleshooting prod/staging. It pulls a message off the test queue + # and appends it to a file in /var/log/zulip. + @override + def consume(self, event: Mapping[str, Any]) -> None: # nocoverage + fn = settings.ZULIP_WORKER_TEST_FILE + message = orjson.dumps(event) + logging.info("TestWorker should append this message to %s: %s", fn, message.decode()) + with open(fn, "ab") as f: + f.write(message + b"\n") + + +@assign_queue("noop", is_test_queue=True) +class NoopWorker(QueueProcessingWorker): + """Used to profile the queue processing framework, in zilencer's queue_rate.""" + + def __init__( + self, + threaded: bool = False, + disable_timeout: bool = False, + max_consume: int = 1000, + slow_queries: Sequence[int] = [], + ) -> None: + super().__init__(threaded, disable_timeout) + self.consumed = 0 + self.max_consume = max_consume + self.slow_queries: Set[int] = set(slow_queries) + + @override + def consume(self, event: Mapping[str, Any]) -> None: + self.consumed += 1 + if self.consumed in self.slow_queries: + logging.info("Slow request...") + time.sleep(60) + logging.info("Done!") + if self.consumed >= self.max_consume: + self.stop() + + +@assign_queue("noop_batch", is_test_queue=True) +class BatchNoopWorker(LoopQueueProcessingWorker): + """Used to profile the queue processing framework, in zilencer's queue_rate.""" + + batch_size = 100 + + def __init__( + self, + threaded: bool = False, + disable_timeout: bool = False, + max_consume: int = 1000, + slow_queries: Sequence[int] = [], + ) -> None: + super().__init__(threaded, disable_timeout) + self.consumed = 0 + self.max_consume = max_consume + self.slow_queries: Set[int] = set(slow_queries) + + @override + def consume_batch(self, events: List[Dict[str, Any]]) -> None: + event_numbers = set(range(self.consumed + 1, self.consumed + 1 + len(events))) + found_slow = self.slow_queries & event_numbers + if found_slow: + logging.info("%d slow requests...", len(found_slow)) + time.sleep(60 * len(found_slow)) + logging.info("Done!") + self.consumed += len(events) + if self.consumed >= self.max_consume: + self.stop() diff --git a/zerver/worker/user_activity.py b/zerver/worker/user_activity.py new file mode 100644 index 0000000000..5547a50477 --- /dev/null +++ b/zerver/worker/user_activity.py @@ -0,0 +1,84 @@ +# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html +import logging +from typing import Any, Dict, List, Tuple + +from django.db import connection +from psycopg2.sql import SQL, Literal +from typing_extensions import override + +from zerver.worker.base import LoopQueueProcessingWorker, assign_queue + +logger = logging.getLogger(__name__) + + +@assign_queue("user_activity") +class UserActivityWorker(LoopQueueProcessingWorker): + """The UserActivity queue is perhaps our highest-traffic queue, and + requires some care to ensure it performs adequately. + + We use a LoopQueueProcessingWorker as a performance optimization + for managing the queue. The structure of UserActivity records is + such that they are easily deduplicated before being sent to the + database; we take advantage of that to make this queue highly + effective at dealing with a backlog containing many similar + events. Such a backlog happen in a few ways: + + * In abuse/DoS situations, if a client is sending huge numbers of + similar requests to the server. + * If the queue ends up with several minutes of backlog e.g. due to + downtime of the queue processor, many clients will have several + common events from doing an action multiple times. + + """ + + client_id_map: Dict[str, int] = {} + + @override + def start(self) -> None: + # For our unit tests to make sense, we need to clear this on startup. + self.client_id_map = {} + super().start() + + @override + def consume_batch(self, user_activity_events: List[Dict[str, Any]]) -> None: + uncommitted_events: Dict[Tuple[int, int, str], Tuple[int, float]] = {} + + # First, we drain the queue of all user_activity events and + # deduplicate them for insertion into the database. + for event in user_activity_events: + user_profile_id = event["user_profile_id"] + client_id = event["client_id"] + + key_tuple = (user_profile_id, client_id, event["query"]) + if key_tuple not in uncommitted_events: + uncommitted_events[key_tuple] = (1, event["time"]) + else: + count, event_time = uncommitted_events[key_tuple] + uncommitted_events[key_tuple] = (count + 1, max(event_time, event["time"])) + + rows = [] + for key_tuple, value_tuple in uncommitted_events.items(): + user_profile_id, client_id, query = key_tuple + count, event_time = value_tuple + rows.append( + SQL("({},{},{},{},to_timestamp({}))").format( + Literal(user_profile_id), + Literal(client_id), + Literal(query), + Literal(count), + Literal(event_time), + ) + ) + + # Perform a single bulk UPSERT for all of the rows + sql_query = SQL( + """ + INSERT INTO zerver_useractivity(user_profile_id, client_id, query, count, last_visit) + VALUES {rows} + ON CONFLICT (user_profile_id, client_id, query) DO UPDATE SET + count = zerver_useractivity.count + excluded.count, + last_visit = greatest(zerver_useractivity.last_visit, excluded.last_visit) + """ + ).format(rows=SQL(", ").join(rows)) + with connection.cursor() as cursor: + cursor.execute(sql_query) diff --git a/zerver/worker/user_activity_interval.py b/zerver/worker/user_activity_interval.py new file mode 100644 index 0000000000..2e3a8b1b8d --- /dev/null +++ b/zerver/worker/user_activity_interval.py @@ -0,0 +1,21 @@ +# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html +import logging +from typing import Any, Mapping + +from typing_extensions import override + +from zerver.actions.user_activity import do_update_user_activity_interval +from zerver.lib.timestamp import timestamp_to_datetime +from zerver.models.users import get_user_profile_by_id +from zerver.worker.base import QueueProcessingWorker, assign_queue + +logger = logging.getLogger(__name__) + + +@assign_queue("user_activity_interval") +class UserActivityIntervalWorker(QueueProcessingWorker): + @override + def consume(self, event: Mapping[str, Any]) -> None: + user_profile = get_user_profile_by_id(event["user_profile_id"]) + log_time = timestamp_to_datetime(event["time"]) + do_update_user_activity_interval(user_profile, log_time) diff --git a/zerver/worker/user_presence.py b/zerver/worker/user_presence.py new file mode 100644 index 0000000000..86a49428db --- /dev/null +++ b/zerver/worker/user_presence.py @@ -0,0 +1,25 @@ +# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html +import logging +from typing import Any, Mapping + +from typing_extensions import override + +from zerver.actions.presence import do_update_user_presence +from zerver.lib.timestamp import timestamp_to_datetime +from zerver.models.clients import get_client +from zerver.models.users import get_user_profile_by_id +from zerver.worker.base import QueueProcessingWorker, assign_queue + +logger = logging.getLogger(__name__) + + +@assign_queue("user_presence") +class UserPresenceWorker(QueueProcessingWorker): + @override + def consume(self, event: Mapping[str, Any]) -> None: + logging.debug("Received presence event: %s", event) + user_profile = get_user_profile_by_id(event["user_profile_id"]) + client = get_client(event["client"]) + log_time = timestamp_to_datetime(event["time"]) + status = event["status"] + do_update_user_presence(user_profile, client, log_time, status) diff --git a/zilencer/management/commands/queue_rate.py b/zilencer/management/commands/queue_rate.py index bcc04694f0..c8c429d4b2 100644 --- a/zilencer/management/commands/queue_rate.py +++ b/zilencer/management/commands/queue_rate.py @@ -6,7 +6,7 @@ from django.core.management.base import BaseCommand, CommandParser from typing_extensions import override from zerver.lib.queue import SimpleQueueClient, queue_json_publish -from zerver.worker.queue_processors import BatchNoopWorker, NoopWorker +from zerver.worker.test import BatchNoopWorker, NoopWorker class Command(BaseCommand): diff --git a/zproject/test_extra_settings.py b/zproject/test_extra_settings.py index 8348d5dc1e..c60ac3fa76 100644 --- a/zproject/test_extra_settings.py +++ b/zproject/test_extra_settings.py @@ -128,7 +128,7 @@ if not PUPPETEER_TESTS: set_loglevel("zerver.lib.push_notifications", "WARNING") set_loglevel("zerver.lib.digest", "ERROR") set_loglevel("zerver.lib.email_mirror", "ERROR") - set_loglevel("zerver.worker.queue_processors", "WARNING") + set_loglevel("zerver.worker", "WARNING") set_loglevel("stripe", "WARNING") # Enable file:/// hyperlink support by default in tests