2020-06-05 23:35:52 +02:00
|
|
|
import base64
|
2021-07-07 16:59:03 +02:00
|
|
|
import datetime
|
2023-05-16 16:56:00 +02:00
|
|
|
import itertools
|
2017-03-08 12:18:27 +01:00
|
|
|
import os
|
2021-07-06 15:37:18 +02:00
|
|
|
import signal
|
2020-06-11 00:54:34 +02:00
|
|
|
import time
|
2020-10-10 04:54:09 +02:00
|
|
|
from collections import defaultdict
|
2023-01-18 05:25:49 +01:00
|
|
|
from contextlib import contextmanager, suppress
|
2020-11-10 22:32:01 +01:00
|
|
|
from inspect import isabstract
|
2021-11-15 20:53:35 +01:00
|
|
|
from typing import Any, Callable, Dict, Iterator, List, Mapping, Optional
|
2020-06-11 00:54:34 +02:00
|
|
|
from unittest.mock import MagicMock, patch
|
2017-03-08 12:18:27 +01:00
|
|
|
|
2020-08-07 01:09:47 +02:00
|
|
|
import orjson
|
2023-03-30 22:28:34 +02:00
|
|
|
import time_machine
|
2017-03-08 12:18:27 +01:00
|
|
|
from django.conf import settings
|
2021-07-22 06:52:58 +02:00
|
|
|
from django.db.utils import IntegrityError
|
2019-02-02 23:53:44 +01:00
|
|
|
from django.test import override_settings
|
2017-03-08 12:18:27 +01:00
|
|
|
|
2019-03-16 11:39:09 +01:00
|
|
|
from zerver.lib.email_mirror import RateLimitedRealmMirror
|
2019-03-21 10:24:56 +01:00
|
|
|
from zerver.lib.email_mirror_helpers import encode_email_address
|
2019-12-02 19:13:49 +01:00
|
|
|
from zerver.lib.queue import MAX_REQUEST_RETRIES
|
2022-11-17 09:30:48 +01:00
|
|
|
from zerver.lib.rate_limiter import RateLimiterLockingError
|
2019-12-02 19:46:11 +01:00
|
|
|
from zerver.lib.remote_server import PushNotificationBouncerRetryLaterError
|
2022-11-17 09:30:48 +01:00
|
|
|
from zerver.lib.send_email import EmailNotDeliveredError, FromAddress
|
2017-03-08 12:18:27 +01:00
|
|
|
from zerver.lib.test_classes import ZulipTestCase
|
2021-11-15 20:53:35 +01:00
|
|
|
from zerver.lib.test_helpers import mock_queue_publish
|
2021-07-07 16:59:03 +02:00
|
|
|
from zerver.models import (
|
|
|
|
NotificationTriggers,
|
|
|
|
PreregistrationUser,
|
|
|
|
ScheduledMessageNotificationEmail,
|
|
|
|
UserActivity,
|
2021-08-11 08:22:19 +02:00
|
|
|
UserProfile,
|
2021-07-07 16:59:03 +02:00
|
|
|
get_client,
|
|
|
|
get_realm,
|
|
|
|
get_stream,
|
|
|
|
)
|
2019-12-02 19:46:11 +01:00
|
|
|
from zerver.tornado.event_queue import build_offline_notification
|
2017-03-08 12:18:27 +01:00
|
|
|
from zerver.worker import queue_processors
|
2017-11-10 15:00:45 +01:00
|
|
|
from zerver.worker.queue_processors import (
|
2021-07-06 15:37:18 +02:00
|
|
|
FetchLinksEmbedData,
|
2017-11-15 15:27:41 +01:00
|
|
|
MissedMessageWorker,
|
2020-06-11 00:54:34 +02:00
|
|
|
QueueProcessingWorker,
|
|
|
|
get_active_worker_queues,
|
2017-11-10 15:00:45 +01:00
|
|
|
)
|
2017-03-08 12:18:27 +01:00
|
|
|
|
2017-11-15 15:27:41 +01:00
|
|
|
Event = Dict[str, Any]
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2021-11-15 20:53:35 +01:00
|
|
|
class FakeClient:
|
2021-11-15 21:03:55 +01:00
|
|
|
def __init__(self, prefetch: int = 0) -> None:
|
2021-11-15 20:53:35 +01:00
|
|
|
self.queues: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
|
|
|
|
|
|
|
|
def enqueue(self, queue_name: str, data: Dict[str, Any]) -> None:
|
|
|
|
self.queues[queue_name].append(data)
|
|
|
|
|
|
|
|
def start_json_consumer(
|
|
|
|
self,
|
|
|
|
queue_name: str,
|
|
|
|
callback: Callable[[List[Dict[str, Any]]], None],
|
|
|
|
batch_size: int = 1,
|
|
|
|
timeout: Optional[int] = None,
|
|
|
|
) -> None:
|
|
|
|
chunk: List[Dict[str, Any]] = []
|
|
|
|
queue = self.queues[queue_name]
|
|
|
|
while queue:
|
|
|
|
chunk.append(queue.pop(0))
|
|
|
|
if len(chunk) >= batch_size or not len(queue):
|
|
|
|
callback(chunk)
|
|
|
|
chunk = []
|
2020-10-09 22:50:53 +02:00
|
|
|
|
2021-11-15 20:53:35 +01:00
|
|
|
def local_queue_size(self) -> int:
|
|
|
|
return sum(len(q) for q in self.queues.values())
|
2020-03-18 20:48:49 +01:00
|
|
|
|
2021-11-15 20:53:35 +01:00
|
|
|
|
|
|
|
@contextmanager
|
2021-11-15 20:55:47 +01:00
|
|
|
def simulated_queue_client(client: FakeClient) -> Iterator[None]:
|
|
|
|
with patch.object(queue_processors, "SimpleQueueClient", lambda *args, **kwargs: client):
|
2021-11-15 20:53:35 +01:00
|
|
|
yield
|
|
|
|
|
|
|
|
|
|
|
|
class WorkerTest(ZulipTestCase):
|
2019-09-18 02:06:20 +02:00
|
|
|
def test_UserActivityWorker(self) -> None:
|
2021-11-15 20:53:35 +01:00
|
|
|
fake_client = FakeClient()
|
2019-09-18 02:06:20 +02:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
user = self.example_user("hamlet")
|
2019-09-18 02:06:20 +02:00
|
|
|
UserActivity.objects.filter(
|
2021-02-12 08:19:30 +01:00
|
|
|
user_profile=user.id,
|
2021-02-12 08:20:45 +01:00
|
|
|
client=get_client("ios"),
|
2019-09-18 02:06:20 +02:00
|
|
|
).delete()
|
|
|
|
|
|
|
|
data = dict(
|
2021-02-12 08:19:30 +01:00
|
|
|
user_profile_id=user.id,
|
2021-02-12 08:20:45 +01:00
|
|
|
client_id=get_client("ios").id,
|
2021-02-12 08:19:30 +01:00
|
|
|
time=time.time(),
|
2021-02-12 08:20:45 +01:00
|
|
|
query="send_message",
|
2019-09-18 02:06:20 +02:00
|
|
|
)
|
2021-02-12 08:20:45 +01:00
|
|
|
fake_client.enqueue("user_activity", data)
|
2019-09-18 02:06:20 +02:00
|
|
|
|
2019-09-18 01:53:49 +02:00
|
|
|
# Now process the event a second time and confirm count goes
|
2020-03-27 16:33:06 +01:00
|
|
|
# up. Ideally, we'd use an event with a slightly newer
|
2019-09-18 01:53:49 +02:00
|
|
|
# time, but it's not really important.
|
2021-02-12 08:20:45 +01:00
|
|
|
fake_client.enqueue("user_activity", data)
|
2021-11-15 20:55:47 +01:00
|
|
|
with simulated_queue_client(fake_client):
|
2020-10-09 22:50:53 +02:00
|
|
|
worker = queue_processors.UserActivityWorker()
|
|
|
|
worker.setup()
|
|
|
|
worker.start()
|
|
|
|
activity_records = UserActivity.objects.filter(
|
2021-02-12 08:19:30 +01:00
|
|
|
user_profile=user.id,
|
2021-02-12 08:20:45 +01:00
|
|
|
client=get_client("ios"),
|
2020-10-09 22:50:53 +02:00
|
|
|
)
|
2021-05-17 05:41:32 +02:00
|
|
|
self.assert_length(activity_records, 1)
|
2022-11-15 23:38:11 +01:00
|
|
|
self.assertEqual(activity_records[0].count, 2)
|
2019-09-18 01:53:49 +02:00
|
|
|
|
2017-11-15 15:27:41 +01:00
|
|
|
def test_missed_message_worker(self) -> None:
|
2021-02-12 08:20:45 +01:00
|
|
|
cordelia = self.example_user("cordelia")
|
|
|
|
hamlet = self.example_user("hamlet")
|
|
|
|
othello = self.example_user("othello")
|
2017-11-15 15:27:41 +01:00
|
|
|
|
|
|
|
hamlet1_msg_id = self.send_personal_message(
|
2020-03-07 11:43:05 +01:00
|
|
|
from_user=cordelia,
|
|
|
|
to_user=hamlet,
|
2021-02-12 08:20:45 +01:00
|
|
|
content="hi hamlet",
|
2017-11-15 15:27:41 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
hamlet2_msg_id = self.send_personal_message(
|
2020-03-07 11:43:05 +01:00
|
|
|
from_user=cordelia,
|
|
|
|
to_user=hamlet,
|
2021-02-12 08:20:45 +01:00
|
|
|
content="goodbye hamlet",
|
2017-11-15 15:27:41 +01:00
|
|
|
)
|
|
|
|
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
hamlet3_msg_id = self.send_personal_message(
|
2020-03-07 11:43:05 +01:00
|
|
|
from_user=cordelia,
|
|
|
|
to_user=hamlet,
|
2021-02-12 08:20:45 +01:00
|
|
|
content="hello again hamlet",
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
)
|
|
|
|
|
2017-11-15 15:27:41 +01:00
|
|
|
othello_msg_id = self.send_personal_message(
|
2020-03-07 11:43:05 +01:00
|
|
|
from_user=cordelia,
|
|
|
|
to_user=othello,
|
2021-02-12 08:20:45 +01:00
|
|
|
content="where art thou, othello?",
|
2017-11-15 15:27:41 +01:00
|
|
|
)
|
|
|
|
|
2021-07-07 16:59:03 +02:00
|
|
|
hamlet_event1 = dict(
|
|
|
|
user_profile_id=hamlet.id,
|
|
|
|
message_id=hamlet1_msg_id,
|
|
|
|
trigger=NotificationTriggers.PRIVATE_MESSAGE,
|
|
|
|
)
|
|
|
|
hamlet_event2 = dict(
|
|
|
|
user_profile_id=hamlet.id,
|
|
|
|
message_id=hamlet2_msg_id,
|
|
|
|
trigger=NotificationTriggers.PRIVATE_MESSAGE,
|
|
|
|
mentioned_user_group_id=4,
|
|
|
|
)
|
|
|
|
othello_event = dict(
|
|
|
|
user_profile_id=othello.id,
|
|
|
|
message_id=othello_msg_id,
|
|
|
|
trigger=NotificationTriggers.PRIVATE_MESSAGE,
|
|
|
|
)
|
|
|
|
|
|
|
|
events = [hamlet_event1, hamlet_event2, othello_event]
|
2017-11-15 15:27:41 +01:00
|
|
|
|
|
|
|
mmw = MissedMessageWorker()
|
2021-07-19 11:04:18 +02:00
|
|
|
batch_duration = datetime.timedelta(
|
|
|
|
seconds=hamlet.email_notifications_batching_period_seconds
|
|
|
|
)
|
|
|
|
assert (
|
|
|
|
hamlet.email_notifications_batching_period_seconds
|
|
|
|
== othello.email_notifications_batching_period_seconds
|
|
|
|
)
|
2017-11-15 15:27:41 +01:00
|
|
|
|
|
|
|
send_mock = patch(
|
2021-02-12 08:20:45 +01:00
|
|
|
"zerver.lib.email_notifications.do_send_missedmessage_events_reply_in_zulip",
|
2017-11-15 15:27:41 +01:00
|
|
|
)
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
|
2021-07-07 16:59:03 +02:00
|
|
|
bonus_event_hamlet = dict(
|
|
|
|
user_profile_id=hamlet.id,
|
|
|
|
message_id=hamlet3_msg_id,
|
|
|
|
trigger=NotificationTriggers.PRIVATE_MESSAGE,
|
|
|
|
)
|
|
|
|
|
|
|
|
def check_row(
|
|
|
|
row: ScheduledMessageNotificationEmail,
|
|
|
|
scheduled_timestamp: datetime.datetime,
|
|
|
|
mentioned_user_group_id: Optional[int],
|
|
|
|
) -> None:
|
|
|
|
self.assertEqual(row.trigger, NotificationTriggers.PRIVATE_MESSAGE)
|
|
|
|
self.assertEqual(row.scheduled_timestamp, scheduled_timestamp)
|
|
|
|
self.assertEqual(row.mentioned_user_group_id, mentioned_user_group_id)
|
2017-11-15 15:27:41 +01:00
|
|
|
|
2023-03-30 22:28:34 +02:00
|
|
|
def advance() -> Optional[float]:
|
|
|
|
mmw.stopping = False
|
|
|
|
|
|
|
|
def inner(check: Callable[[], bool], timeout: Optional[float]) -> bool:
|
|
|
|
# The check should never pass, since we've just (with
|
|
|
|
# the lock) ascertained above the cv.wait that its
|
|
|
|
# conditions are not met.
|
|
|
|
self.assertFalse(check())
|
|
|
|
|
|
|
|
# Set ourself to stop at the top of the next loop, but
|
|
|
|
# pretend we didn't get an event
|
|
|
|
mmw.stopping = True
|
|
|
|
return False
|
|
|
|
|
|
|
|
with patch.object(mmw.cv, "wait_for", side_effect=inner):
|
|
|
|
mmw.work()
|
|
|
|
return mmw.has_timeout
|
|
|
|
|
|
|
|
# With nothing enqueued, the condition variable is pending
|
|
|
|
# forever. We double-check that the condition is false in
|
|
|
|
# steady-state.
|
|
|
|
has_timeout = advance()
|
|
|
|
self.assertFalse(has_timeout)
|
|
|
|
|
|
|
|
# Enqueues the events to the internal queue, as if from RabbitMQ
|
|
|
|
time_zero = datetime.datetime(2021, 1, 1, tzinfo=datetime.timezone.utc)
|
|
|
|
with time_machine.travel(time_zero, tick=False), patch.object(
|
|
|
|
mmw.cv, "notify"
|
|
|
|
) as notify_mock:
|
|
|
|
for event in events:
|
|
|
|
mmw.consume_single_event(event)
|
|
|
|
# All of these notify, because has_timeout is still false in
|
|
|
|
# each case. This represents multiple consume() calls getting
|
|
|
|
# the lock before the worker escapes the wait_for, and is
|
|
|
|
# unlikely in real life but does not lead to incorrect
|
|
|
|
# behaviour.
|
|
|
|
self.assertEqual(notify_mock.call_count, 3)
|
|
|
|
|
|
|
|
# This leaves a timeout set, since there are objects pending
|
|
|
|
with time_machine.travel(time_zero, tick=False):
|
|
|
|
has_timeout = advance()
|
|
|
|
self.assertTrue(has_timeout)
|
|
|
|
|
|
|
|
expected_scheduled_timestamp = time_zero + batch_duration
|
|
|
|
|
|
|
|
# The events should be saved in the database
|
|
|
|
hamlet_row1 = ScheduledMessageNotificationEmail.objects.get(
|
|
|
|
user_profile_id=hamlet.id, message_id=hamlet1_msg_id
|
|
|
|
)
|
|
|
|
check_row(hamlet_row1, expected_scheduled_timestamp, None)
|
|
|
|
|
|
|
|
hamlet_row2 = ScheduledMessageNotificationEmail.objects.get(
|
|
|
|
user_profile_id=hamlet.id, message_id=hamlet2_msg_id
|
|
|
|
)
|
|
|
|
check_row(hamlet_row2, expected_scheduled_timestamp, 4)
|
|
|
|
|
|
|
|
othello_row1 = ScheduledMessageNotificationEmail.objects.get(
|
|
|
|
user_profile_id=othello.id, message_id=othello_msg_id
|
|
|
|
)
|
|
|
|
check_row(othello_row1, expected_scheduled_timestamp, None)
|
|
|
|
|
|
|
|
# If another event is received, test that it gets saved with the same
|
|
|
|
# `expected_scheduled_timestamp` as the earlier events.
|
|
|
|
|
|
|
|
few_moments_later = time_zero + datetime.timedelta(seconds=3)
|
|
|
|
with time_machine.travel(few_moments_later, tick=False), patch.object(
|
|
|
|
mmw.cv, "notify"
|
|
|
|
) as notify_mock:
|
|
|
|
mmw.consume_single_event(bonus_event_hamlet)
|
|
|
|
self.assertEqual(notify_mock.call_count, 0)
|
|
|
|
|
|
|
|
with time_machine.travel(few_moments_later, tick=False):
|
|
|
|
has_timeout = advance()
|
|
|
|
self.assertTrue(has_timeout)
|
|
|
|
hamlet_row3 = ScheduledMessageNotificationEmail.objects.get(
|
|
|
|
user_profile_id=hamlet.id, message_id=hamlet3_msg_id
|
|
|
|
)
|
|
|
|
check_row(hamlet_row3, expected_scheduled_timestamp, None)
|
|
|
|
|
|
|
|
# Now let us test `maybe_send_batched_emails`
|
|
|
|
# If called too early, it shouldn't process the emails.
|
|
|
|
one_minute_premature = expected_scheduled_timestamp - datetime.timedelta(seconds=60)
|
|
|
|
with time_machine.travel(one_minute_premature, tick=False):
|
|
|
|
has_timeout = advance()
|
|
|
|
self.assertTrue(has_timeout)
|
|
|
|
self.assertEqual(ScheduledMessageNotificationEmail.objects.count(), 4)
|
|
|
|
|
|
|
|
# If called after `expected_scheduled_timestamp`, it should process all emails.
|
|
|
|
one_minute_overdue = expected_scheduled_timestamp + datetime.timedelta(seconds=60)
|
|
|
|
with time_machine.travel(one_minute_overdue, tick=True):
|
|
|
|
with send_mock as sm, self.assertLogs(level="INFO") as info_logs:
|
|
|
|
has_timeout = advance()
|
|
|
|
self.assertTrue(has_timeout)
|
|
|
|
self.assertEqual(ScheduledMessageNotificationEmail.objects.count(), 0)
|
|
|
|
has_timeout = advance()
|
|
|
|
self.assertFalse(has_timeout)
|
|
|
|
|
|
|
|
self.assertEqual(
|
|
|
|
[
|
|
|
|
f"INFO:root:Batch-processing 3 missedmessage_emails events for user {hamlet.id}",
|
|
|
|
f"INFO:root:Batch-processing 1 missedmessage_emails events for user {othello.id}",
|
|
|
|
],
|
|
|
|
info_logs.output,
|
|
|
|
)
|
2017-11-15 15:27:41 +01:00
|
|
|
|
2021-07-07 16:59:03 +02:00
|
|
|
# Verify the payloads now
|
2017-11-15 15:27:41 +01:00
|
|
|
args = [c[0] for c in sm.call_args_list]
|
|
|
|
arg_dict = {
|
|
|
|
arg[0].id: dict(
|
|
|
|
missed_messages=arg[1],
|
|
|
|
count=arg[2],
|
|
|
|
)
|
|
|
|
for arg in args
|
|
|
|
}
|
|
|
|
|
|
|
|
hamlet_info = arg_dict[hamlet.id]
|
2021-02-12 08:20:45 +01:00
|
|
|
self.assertEqual(hamlet_info["count"], 3)
|
2017-11-15 15:27:41 +01:00
|
|
|
self.assertEqual(
|
2021-02-12 08:20:45 +01:00
|
|
|
{m["message"].content for m in hamlet_info["missed_messages"]},
|
|
|
|
{"hi hamlet", "goodbye hamlet", "hello again hamlet"},
|
2017-11-15 15:27:41 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
othello_info = arg_dict[othello.id]
|
2021-02-12 08:20:45 +01:00
|
|
|
self.assertEqual(othello_info["count"], 1)
|
2017-11-15 15:27:41 +01:00
|
|
|
self.assertEqual(
|
2021-02-12 08:20:45 +01:00
|
|
|
{m["message"].content for m in othello_info["missed_messages"]},
|
|
|
|
{"where art thou, othello?"},
|
2017-11-15 15:27:41 +01:00
|
|
|
)
|
|
|
|
|
2023-03-30 22:28:34 +02:00
|
|
|
# Hacky test coming up! We want to test the try-except block in the consumer which handles
|
|
|
|
# IntegrityErrors raised when the message was deleted before it processed the notification
|
|
|
|
# event.
|
|
|
|
# However, Postgres defers checking ForeignKey constraints to when the current transaction
|
|
|
|
# commits. This poses some difficulties in testing because of Django running tests inside a
|
|
|
|
# transaction which never commits. See https://code.djangoproject.com/ticket/22431 for more
|
|
|
|
# details, but the summary is that IntegrityErrors due to database constraints are raised at
|
|
|
|
# the end of the test, not inside the `try` block. So, we have the code inside the `try` block
|
|
|
|
# raise `IntegrityError` by mocking.
|
|
|
|
with patch(
|
|
|
|
"zerver.models.ScheduledMessageNotificationEmail.objects.create",
|
|
|
|
side_effect=IntegrityError,
|
|
|
|
), self.assertLogs(level="DEBUG") as debug_logs, patch.object(
|
|
|
|
mmw.cv, "notify"
|
|
|
|
) as notify_mock:
|
|
|
|
mmw.consume_single_event(hamlet_event1)
|
|
|
|
self.assertEqual(notify_mock.call_count, 0)
|
|
|
|
self.assertIn(
|
|
|
|
"DEBUG:root:ScheduledMessageNotificationEmail row could not be created. The message may have been deleted. Skipping event.",
|
|
|
|
debug_logs.output,
|
|
|
|
)
|
|
|
|
|
|
|
|
# Verify that we make forward progress if one of the messages
|
|
|
|
# throws an exception. First, enqueue the messages, and get
|
|
|
|
# them to create database rows:
|
|
|
|
time_zero = datetime.datetime(2021, 1, 1, tzinfo=datetime.timezone.utc)
|
|
|
|
with time_machine.travel(time_zero, tick=False), patch.object(
|
|
|
|
mmw.cv, "notify"
|
|
|
|
) as notify_mock:
|
|
|
|
mmw.consume_single_event(hamlet_event1)
|
|
|
|
mmw.consume_single_event(hamlet_event2)
|
|
|
|
mmw.consume_single_event(othello_event)
|
|
|
|
# See above note about multiple notifies
|
|
|
|
self.assertEqual(notify_mock.call_count, 3)
|
|
|
|
has_timeout = advance()
|
|
|
|
self.assertTrue(has_timeout)
|
|
|
|
|
|
|
|
# Next, set up a fail-y consumer:
|
|
|
|
def fail_some(user: UserProfile, *args: Any) -> None:
|
|
|
|
if user.id == hamlet.id:
|
|
|
|
raise RuntimeError
|
|
|
|
|
|
|
|
one_minute_overdue = expected_scheduled_timestamp + datetime.timedelta(seconds=60)
|
|
|
|
with time_machine.travel(one_minute_overdue, tick=False), self.assertLogs(
|
|
|
|
level="ERROR"
|
|
|
|
) as error_logs, send_mock as sm:
|
|
|
|
sm.side_effect = fail_some
|
|
|
|
has_timeout = advance()
|
|
|
|
self.assertTrue(has_timeout)
|
|
|
|
self.assertEqual(ScheduledMessageNotificationEmail.objects.count(), 0)
|
|
|
|
has_timeout = advance()
|
|
|
|
self.assertFalse(has_timeout)
|
|
|
|
self.assertIn(
|
|
|
|
"ERROR:root:Failed to process 2 missedmessage_emails for user 10",
|
|
|
|
error_logs.output[0],
|
|
|
|
)
|
2021-08-11 08:22:19 +02:00
|
|
|
|
2019-12-02 19:46:11 +01:00
|
|
|
def test_push_notifications_worker(self) -> None:
|
|
|
|
"""
|
|
|
|
The push notifications system has its own comprehensive test suite,
|
|
|
|
so we can limit ourselves to simple unit testing the queue processor,
|
|
|
|
without going deeper into the system - by mocking the handle_push_notification
|
|
|
|
functions to immediately produce the effect we want, to test its handling by the queue
|
|
|
|
processor.
|
|
|
|
"""
|
2021-11-15 20:53:35 +01:00
|
|
|
fake_client = FakeClient()
|
2019-12-02 19:46:11 +01:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
def fake_publish(
|
|
|
|
queue_name: str, event: Dict[str, Any], processor: Callable[[Any], None]
|
|
|
|
) -> None:
|
2020-10-10 04:54:09 +02:00
|
|
|
fake_client.enqueue(queue_name, event)
|
2019-12-02 19:46:11 +01:00
|
|
|
|
|
|
|
def generate_new_message_notification() -> Dict[str, Any]:
|
|
|
|
return build_offline_notification(1, 1)
|
|
|
|
|
|
|
|
def generate_remove_notification() -> Dict[str, Any]:
|
|
|
|
return {
|
|
|
|
"type": "remove",
|
|
|
|
"user_profile_id": 1,
|
|
|
|
"message_ids": [1],
|
|
|
|
}
|
|
|
|
|
2021-11-15 20:55:47 +01:00
|
|
|
with simulated_queue_client(fake_client):
|
2019-12-02 19:46:11 +01:00
|
|
|
worker = queue_processors.PushNotificationsWorker()
|
|
|
|
worker.setup()
|
2021-02-12 08:19:30 +01:00
|
|
|
with patch(
|
2021-02-12 08:20:45 +01:00
|
|
|
"zerver.worker.queue_processors.handle_push_notification"
|
2021-02-12 08:19:30 +01:00
|
|
|
) as mock_handle_new, patch(
|
2021-02-12 08:20:45 +01:00
|
|
|
"zerver.worker.queue_processors.handle_remove_push_notification"
|
2021-02-12 08:19:30 +01:00
|
|
|
) as mock_handle_remove, patch(
|
2021-02-12 08:20:45 +01:00
|
|
|
"zerver.worker.queue_processors.initialize_push_notifications"
|
2021-02-12 08:19:30 +01:00
|
|
|
):
|
2019-12-02 19:46:11 +01:00
|
|
|
event_new = generate_new_message_notification()
|
|
|
|
event_remove = generate_remove_notification()
|
2021-02-12 08:20:45 +01:00
|
|
|
fake_client.enqueue("missedmessage_mobile_notifications", event_new)
|
|
|
|
fake_client.enqueue("missedmessage_mobile_notifications", event_remove)
|
2019-12-02 19:46:11 +01:00
|
|
|
|
|
|
|
worker.start()
|
2021-02-12 08:20:45 +01:00
|
|
|
mock_handle_new.assert_called_once_with(event_new["user_profile_id"], event_new)
|
2021-02-12 08:19:30 +01:00
|
|
|
mock_handle_remove.assert_called_once_with(
|
2021-02-12 08:20:45 +01:00
|
|
|
event_remove["user_profile_id"], event_remove["message_ids"]
|
2021-02-12 08:19:30 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
with patch(
|
2021-02-12 08:20:45 +01:00
|
|
|
"zerver.worker.queue_processors.handle_push_notification",
|
2021-02-12 08:19:30 +01:00
|
|
|
side_effect=PushNotificationBouncerRetryLaterError("test"),
|
|
|
|
) as mock_handle_new, patch(
|
2021-02-12 08:20:45 +01:00
|
|
|
"zerver.worker.queue_processors.handle_remove_push_notification",
|
2021-02-12 08:19:30 +01:00
|
|
|
side_effect=PushNotificationBouncerRetryLaterError("test"),
|
|
|
|
) as mock_handle_remove, patch(
|
2021-02-12 08:20:45 +01:00
|
|
|
"zerver.worker.queue_processors.initialize_push_notifications"
|
2021-02-12 08:19:30 +01:00
|
|
|
):
|
2019-12-02 19:46:11 +01:00
|
|
|
event_new = generate_new_message_notification()
|
|
|
|
event_remove = generate_remove_notification()
|
2021-02-12 08:20:45 +01:00
|
|
|
fake_client.enqueue("missedmessage_mobile_notifications", event_new)
|
|
|
|
fake_client.enqueue("missedmessage_mobile_notifications", event_remove)
|
2019-12-02 19:46:11 +01:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
with mock_queue_publish(
|
2021-02-12 08:20:45 +01:00
|
|
|
"zerver.lib.queue.queue_json_publish", side_effect=fake_publish
|
|
|
|
), self.assertLogs("zerver.worker.queue_processors", "WARNING") as warn_logs:
|
2019-12-02 19:46:11 +01:00
|
|
|
worker.start()
|
|
|
|
self.assertEqual(mock_handle_new.call_count, 1 + MAX_REQUEST_RETRIES)
|
|
|
|
self.assertEqual(mock_handle_remove.call_count, 1 + MAX_REQUEST_RETRIES)
|
2021-02-12 08:19:30 +01:00
|
|
|
self.assertEqual(
|
|
|
|
warn_logs.output,
|
|
|
|
[
|
2021-02-12 08:20:45 +01:00
|
|
|
"WARNING:zerver.worker.queue_processors:Maximum retries exceeded for trigger:1 event:push_notification",
|
2021-02-12 08:19:30 +01:00
|
|
|
]
|
|
|
|
* 2,
|
|
|
|
)
|
2019-12-02 19:46:11 +01:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
@patch("zerver.worker.queue_processors.mirror_email")
|
2019-03-18 08:09:54 +01:00
|
|
|
def test_mirror_worker(self, mock_mirror_email: MagicMock) -> None:
|
2021-11-15 20:53:35 +01:00
|
|
|
fake_client = FakeClient()
|
2021-02-12 08:20:45 +01:00
|
|
|
stream = get_stream("Denmark", get_realm("zulip"))
|
2019-03-18 08:09:54 +01:00
|
|
|
stream_to_address = encode_email_address(stream)
|
2017-04-05 11:46:14 +02:00
|
|
|
data = [
|
|
|
|
dict(
|
2021-02-12 08:20:45 +01:00
|
|
|
msg_base64=base64.b64encode(b"\xf3test").decode(),
|
2017-04-05 11:46:14 +02:00
|
|
|
time=time.time(),
|
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
|
|
|
rcpt_to=stream_to_address,
|
|
|
|
),
|
2019-03-18 08:09:54 +01:00
|
|
|
] * 3
|
2017-04-05 11:46:14 +02:00
|
|
|
for element in data:
|
2021-02-12 08:20:45 +01:00
|
|
|
fake_client.enqueue("email_mirror", element)
|
2017-04-05 11:46:14 +02:00
|
|
|
|
2021-11-15 20:55:47 +01:00
|
|
|
with simulated_queue_client(fake_client):
|
2019-03-18 08:09:54 +01:00
|
|
|
worker = queue_processors.MirrorWorker()
|
|
|
|
worker.setup()
|
|
|
|
worker.start()
|
|
|
|
|
|
|
|
self.assertEqual(mock_mirror_email.call_count, 3)
|
2017-04-05 11:46:14 +02:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
@patch("zerver.worker.queue_processors.mirror_email")
|
2019-03-16 11:39:09 +01:00
|
|
|
@override_settings(RATE_LIMITING_MIRROR_REALM_RULES=[(10, 2)])
|
2020-12-23 21:45:16 +01:00
|
|
|
def test_mirror_worker_rate_limiting(self, mock_mirror_email: MagicMock) -> None:
|
2021-11-15 20:53:35 +01:00
|
|
|
fake_client = FakeClient()
|
2021-02-12 08:20:45 +01:00
|
|
|
realm = get_realm("zulip")
|
2020-03-04 14:05:25 +01:00
|
|
|
RateLimitedRealmMirror(realm).clear_history()
|
2021-02-12 08:20:45 +01:00
|
|
|
stream = get_stream("Denmark", realm)
|
2019-03-16 11:39:09 +01:00
|
|
|
stream_to_address = encode_email_address(stream)
|
|
|
|
data = [
|
|
|
|
dict(
|
2021-02-12 08:20:45 +01:00
|
|
|
msg_base64=base64.b64encode(b"\xf3test").decode(),
|
2019-03-16 11:39:09 +01:00
|
|
|
time=time.time(),
|
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
|
|
|
rcpt_to=stream_to_address,
|
|
|
|
),
|
2019-03-16 11:39:09 +01:00
|
|
|
] * 5
|
|
|
|
for element in data:
|
2021-02-12 08:20:45 +01:00
|
|
|
fake_client.enqueue("email_mirror", element)
|
2019-03-16 11:39:09 +01:00
|
|
|
|
2021-11-15 20:55:47 +01:00
|
|
|
with simulated_queue_client(fake_client), self.assertLogs(
|
2021-02-12 08:20:45 +01:00
|
|
|
"zerver.worker.queue_processors", level="WARNING"
|
2021-02-12 08:19:30 +01:00
|
|
|
) as warn_logs:
|
2019-03-16 11:39:09 +01:00
|
|
|
start_time = time.time()
|
2021-02-12 08:20:45 +01:00
|
|
|
with patch("time.time", return_value=start_time):
|
2019-03-16 11:39:09 +01:00
|
|
|
worker = queue_processors.MirrorWorker()
|
|
|
|
worker.setup()
|
|
|
|
worker.start()
|
|
|
|
# Of the first 5 messages, only 2 should be processed
|
|
|
|
# (the rest being rate-limited):
|
|
|
|
self.assertEqual(mock_mirror_email.call_count, 2)
|
|
|
|
|
|
|
|
# If a new message is sent into the stream mirror, it will get rejected:
|
2021-02-12 08:20:45 +01:00
|
|
|
fake_client.enqueue("email_mirror", data[0])
|
2019-03-16 11:39:09 +01:00
|
|
|
worker.start()
|
|
|
|
self.assertEqual(mock_mirror_email.call_count, 2)
|
|
|
|
|
2021-04-20 23:27:25 +02:00
|
|
|
# However, message notification emails don't get rate limited:
|
2019-03-16 11:39:09 +01:00
|
|
|
with self.settings(EMAIL_GATEWAY_PATTERN="%s@example.com"):
|
2021-02-12 08:20:45 +01:00
|
|
|
address = "mm" + ("x" * 32) + "@example.com"
|
2019-03-16 11:39:09 +01:00
|
|
|
event = dict(
|
2021-02-12 08:20:45 +01:00
|
|
|
msg_base64=base64.b64encode(b"\xf3test").decode(),
|
2019-03-16 11:39:09 +01:00
|
|
|
time=time.time(),
|
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
|
|
|
rcpt_to=address,
|
2019-03-16 11:39:09 +01:00
|
|
|
)
|
2021-02-12 08:20:45 +01:00
|
|
|
fake_client.enqueue("email_mirror", event)
|
2019-03-16 11:39:09 +01:00
|
|
|
worker.start()
|
|
|
|
self.assertEqual(mock_mirror_email.call_count, 3)
|
|
|
|
|
2022-02-08 00:13:33 +01:00
|
|
|
# After some time passes, emails get accepted again:
|
2021-02-12 08:20:45 +01:00
|
|
|
with patch("time.time", return_value=(start_time + 11.0)):
|
|
|
|
fake_client.enqueue("email_mirror", data[0])
|
2019-03-16 11:39:09 +01:00
|
|
|
worker.start()
|
|
|
|
self.assertEqual(mock_mirror_email.call_count, 4)
|
|
|
|
|
2022-11-17 09:30:48 +01:00
|
|
|
# If RateLimiterLockingError is thrown, we rate-limit the new message:
|
2021-02-12 08:19:30 +01:00
|
|
|
with patch(
|
2021-02-12 08:20:45 +01:00
|
|
|
"zerver.lib.rate_limiter.RedisRateLimiterBackend.incr_ratelimit",
|
2022-11-17 09:30:48 +01:00
|
|
|
side_effect=RateLimiterLockingError,
|
2021-02-12 08:19:30 +01:00
|
|
|
):
|
2021-02-12 08:20:45 +01:00
|
|
|
with self.assertLogs("zerver.lib.rate_limiter", "WARNING") as mock_warn:
|
|
|
|
fake_client.enqueue("email_mirror", data[0])
|
2020-12-23 21:45:16 +01:00
|
|
|
worker.start()
|
|
|
|
self.assertEqual(mock_mirror_email.call_count, 4)
|
2021-02-12 08:19:30 +01:00
|
|
|
self.assertEqual(
|
|
|
|
mock_warn.output,
|
|
|
|
[
|
2021-02-12 08:20:45 +01:00
|
|
|
"WARNING:zerver.lib.rate_limiter:Deadlock trying to incr_ratelimit for RateLimitedRealmMirror:zulip"
|
2021-02-12 08:19:30 +01:00
|
|
|
],
|
|
|
|
)
|
|
|
|
self.assertEqual(
|
|
|
|
warn_logs.output,
|
|
|
|
[
|
2021-11-22 13:05:07 +01:00
|
|
|
"WARNING:zerver.worker.queue_processors:MirrorWorker: Rejecting an email from: None to realm: zulip - rate limited."
|
2021-02-12 08:19:30 +01:00
|
|
|
]
|
|
|
|
* 5,
|
|
|
|
)
|
2019-03-16 11:39:09 +01:00
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_email_sending_worker_retries(self) -> None:
|
2017-09-15 09:38:12 +02:00
|
|
|
"""Tests the retry_send_email_failures decorator to make sure it
|
|
|
|
retries sending the email 3 times and then gives up."""
|
2021-11-15 20:53:35 +01:00
|
|
|
fake_client = FakeClient()
|
2017-09-15 09:38:12 +02:00
|
|
|
|
2018-01-30 20:06:23 +01:00
|
|
|
data = {
|
2021-02-12 08:20:45 +01:00
|
|
|
"template_prefix": "zerver/emails/confirm_new_email",
|
|
|
|
"to_emails": [self.example_email("hamlet")],
|
|
|
|
"from_name": "Zulip Account Security",
|
|
|
|
"from_address": FromAddress.NOREPLY,
|
|
|
|
"context": {},
|
2018-01-30 20:06:23 +01:00
|
|
|
}
|
2021-02-12 08:20:45 +01:00
|
|
|
fake_client.enqueue("email_senders", data)
|
2017-09-15 09:38:12 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
def fake_publish(
|
|
|
|
queue_name: str, event: Dict[str, Any], processor: Optional[Callable[[Any], None]]
|
|
|
|
) -> None:
|
2020-10-10 04:54:09 +02:00
|
|
|
fake_client.enqueue(queue_name, event)
|
2017-09-15 09:38:12 +02:00
|
|
|
|
2021-11-15 20:55:47 +01:00
|
|
|
with simulated_queue_client(fake_client):
|
2017-11-29 08:25:57 +01:00
|
|
|
worker = queue_processors.EmailSendingWorker()
|
2017-09-15 09:38:12 +02:00
|
|
|
worker.setup()
|
2021-02-12 08:19:30 +01:00
|
|
|
with patch(
|
2022-11-17 09:30:48 +01:00
|
|
|
"zerver.lib.send_email.build_email", side_effect=EmailNotDeliveredError
|
2021-02-12 08:19:30 +01:00
|
|
|
), mock_queue_publish(
|
2021-02-12 08:20:45 +01:00
|
|
|
"zerver.lib.queue.queue_json_publish", side_effect=fake_publish
|
2021-02-12 08:19:30 +01:00
|
|
|
), self.assertLogs(
|
|
|
|
level="ERROR"
|
|
|
|
) as m:
|
2017-09-15 09:38:12 +02:00
|
|
|
worker.start()
|
2022-11-17 09:30:48 +01:00
|
|
|
self.assertIn("failed due to exception EmailNotDeliveredError", m.output[0])
|
2017-09-15 09:38:12 +02:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
self.assertEqual(data["failed_tries"], 1 + MAX_REQUEST_RETRIES)
|
2017-09-15 09:38:12 +02:00
|
|
|
|
2017-12-07 00:58:34 +01:00
|
|
|
def test_invites_worker(self) -> None:
|
2021-11-15 20:53:35 +01:00
|
|
|
fake_client = FakeClient()
|
2021-02-12 08:20:45 +01:00
|
|
|
inviter = self.example_user("iago")
|
2017-12-05 09:01:41 +01:00
|
|
|
prereg_alice = PreregistrationUser.objects.create(
|
2021-02-12 08:20:45 +01:00
|
|
|
email=self.nonreg_email("alice"), referred_by=inviter, realm=inviter.realm
|
2021-02-12 08:19:30 +01:00
|
|
|
)
|
2017-12-07 06:45:18 +01:00
|
|
|
PreregistrationUser.objects.create(
|
2021-02-12 08:20:45 +01:00
|
|
|
email=self.nonreg_email("bob"), referred_by=inviter, realm=inviter.realm
|
2021-02-12 08:19:30 +01:00
|
|
|
)
|
2022-02-10 11:52:34 +01:00
|
|
|
invite_expires_in_minutes = 4 * 24 * 60
|
2020-10-10 04:54:09 +02:00
|
|
|
data: List[Dict[str, Any]] = [
|
2021-04-05 18:42:45 +02:00
|
|
|
dict(
|
|
|
|
prereg_id=prereg_alice.id,
|
|
|
|
referrer_id=inviter.id,
|
2022-02-10 11:52:34 +01:00
|
|
|
invite_expires_in_minutes=invite_expires_in_minutes,
|
2021-04-05 18:42:45 +02:00
|
|
|
),
|
2021-06-22 13:29:40 +02:00
|
|
|
dict(
|
|
|
|
prereg_id=prereg_alice.id,
|
|
|
|
referrer_id=inviter.id,
|
|
|
|
email_language="en",
|
2022-02-10 11:52:34 +01:00
|
|
|
invite_expires_in_minutes=invite_expires_in_minutes,
|
2021-06-22 13:29:40 +02:00
|
|
|
),
|
2017-12-05 09:01:41 +01:00
|
|
|
# Nonexistent prereg_id, as if the invitation was deleted
|
2021-04-05 18:42:45 +02:00
|
|
|
dict(
|
|
|
|
prereg_id=-1,
|
|
|
|
referrer_id=inviter.id,
|
2022-02-10 11:52:34 +01:00
|
|
|
invite_expires_in_minutes=invite_expires_in_minutes,
|
2021-04-05 18:42:45 +02:00
|
|
|
),
|
2017-12-07 00:58:34 +01:00
|
|
|
]
|
|
|
|
for element in data:
|
2021-02-12 08:20:45 +01:00
|
|
|
fake_client.enqueue("invites", element)
|
2017-12-07 00:58:34 +01:00
|
|
|
|
2021-11-15 20:55:47 +01:00
|
|
|
with simulated_queue_client(fake_client):
|
2017-12-07 00:58:34 +01:00
|
|
|
worker = queue_processors.ConfirmationEmailWorker()
|
|
|
|
worker.setup()
|
2022-04-14 23:49:26 +02:00
|
|
|
with patch("zerver.actions.user_settings.send_email"), patch(
|
2021-02-12 08:20:45 +01:00
|
|
|
"zerver.worker.queue_processors.send_future_email"
|
2021-02-12 08:19:30 +01:00
|
|
|
) as send_mock:
|
2017-12-07 00:58:34 +01:00
|
|
|
worker.start()
|
2021-06-22 13:29:40 +02:00
|
|
|
self.assertEqual(send_mock.call_count, 2)
|
2017-12-07 00:58:34 +01:00
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_error_handling(self) -> None:
|
2017-03-08 12:18:27 +01:00
|
|
|
processed = []
|
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
@queue_processors.assign_queue("unreliable_worker", is_test_queue=True)
|
2017-03-08 12:18:27 +01:00
|
|
|
class UnreliableWorker(queue_processors.QueueProcessingWorker):
|
2017-11-05 10:51:25 +01:00
|
|
|
def consume(self, data: Mapping[str, Any]) -> None:
|
2021-02-12 08:20:45 +01:00
|
|
|
if data["type"] == "unexpected behaviour":
|
|
|
|
raise Exception("Worker task not performing as expected!")
|
2017-03-08 12:18:27 +01:00
|
|
|
processed.append(data["type"])
|
|
|
|
|
2021-11-15 20:53:35 +01:00
|
|
|
fake_client = FakeClient()
|
2021-02-12 08:20:45 +01:00
|
|
|
for msg in ["good", "fine", "unexpected behaviour", "back to normal"]:
|
|
|
|
fake_client.enqueue("unreliable_worker", {"type": msg})
|
2017-03-08 12:18:27 +01:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
fn = os.path.join(settings.QUEUE_ERROR_DIR, "unreliable_worker.errors")
|
2023-01-18 05:25:49 +01:00
|
|
|
with suppress(FileNotFoundError):
|
2017-03-08 12:18:27 +01:00
|
|
|
os.remove(fn)
|
|
|
|
|
2021-11-15 20:55:47 +01:00
|
|
|
with simulated_queue_client(fake_client):
|
2017-03-08 12:18:27 +01:00
|
|
|
worker = UnreliableWorker()
|
|
|
|
worker.setup()
|
2021-02-12 08:20:45 +01:00
|
|
|
with self.assertLogs(level="ERROR") as m:
|
2018-02-25 20:41:51 +01:00
|
|
|
worker.start()
|
2021-02-12 08:19:30 +01:00
|
|
|
self.assertEqual(
|
2021-02-12 08:20:45 +01:00
|
|
|
m.records[0].message, "Problem handling data on queue unreliable_worker"
|
2021-02-12 08:19:30 +01:00
|
|
|
)
|
2020-12-23 21:45:16 +01:00
|
|
|
self.assertIn(m.records[0].stack_info, m.output[0])
|
2017-03-08 12:18:27 +01:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
self.assertEqual(processed, ["good", "fine", "back to normal"])
|
2020-04-09 21:51:58 +02:00
|
|
|
with open(fn) as f:
|
2019-07-14 21:37:08 +02:00
|
|
|
line = f.readline().strip()
|
2021-02-12 08:20:45 +01:00
|
|
|
events = orjson.loads(line.split("\t")[1])
|
2019-12-26 21:11:55 +01:00
|
|
|
self.assert_length(events, 1)
|
|
|
|
event = events[0]
|
2021-02-12 08:20:45 +01:00
|
|
|
self.assertEqual(event["type"], "unexpected behaviour")
|
2017-03-08 12:18:27 +01:00
|
|
|
|
2019-12-26 21:11:55 +01:00
|
|
|
processed = []
|
2020-06-03 06:02:53 +02:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
@queue_processors.assign_queue("unreliable_loopworker", is_test_queue=True)
|
2019-12-26 21:11:55 +01:00
|
|
|
class UnreliableLoopWorker(queue_processors.LoopQueueProcessingWorker):
|
|
|
|
def consume_batch(self, events: List[Dict[str, Any]]) -> None:
|
|
|
|
for event in events:
|
2021-02-12 08:20:45 +01:00
|
|
|
if event["type"] == "unexpected behaviour":
|
|
|
|
raise Exception("Worker task not performing as expected!")
|
2019-12-26 21:11:55 +01:00
|
|
|
processed.append(event["type"])
|
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
for msg in ["good", "fine", "unexpected behaviour", "back to normal"]:
|
|
|
|
fake_client.enqueue("unreliable_loopworker", {"type": msg})
|
2019-12-26 21:11:55 +01:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
fn = os.path.join(settings.QUEUE_ERROR_DIR, "unreliable_loopworker.errors")
|
2023-01-18 05:25:49 +01:00
|
|
|
with suppress(FileNotFoundError):
|
2019-12-26 21:11:55 +01:00
|
|
|
os.remove(fn)
|
|
|
|
|
2021-11-15 20:55:47 +01:00
|
|
|
with simulated_queue_client(fake_client):
|
2019-12-26 21:11:55 +01:00
|
|
|
loopworker = UnreliableLoopWorker()
|
|
|
|
loopworker.setup()
|
2021-02-12 08:20:45 +01:00
|
|
|
with self.assertLogs(level="ERROR") as m:
|
2020-10-09 22:50:53 +02:00
|
|
|
loopworker.start()
|
2021-02-12 08:19:30 +01:00
|
|
|
self.assertEqual(
|
2021-02-12 08:20:45 +01:00
|
|
|
m.records[0].message, "Problem handling data on queue unreliable_loopworker"
|
2021-02-12 08:19:30 +01:00
|
|
|
)
|
2020-12-23 21:45:16 +01:00
|
|
|
self.assertIn(m.records[0].stack_info, m.output[0])
|
2019-12-26 21:11:55 +01:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
self.assertEqual(processed, ["good", "fine"])
|
2020-04-09 21:51:58 +02:00
|
|
|
with open(fn) as f:
|
2019-12-26 21:11:55 +01:00
|
|
|
line = f.readline().strip()
|
2021-02-12 08:20:45 +01:00
|
|
|
events = orjson.loads(line.split("\t")[1])
|
2019-12-26 21:11:55 +01:00
|
|
|
self.assert_length(events, 4)
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
self.assertEqual(
|
|
|
|
[event["type"] for event in events],
|
2021-02-12 08:20:45 +01:00
|
|
|
["good", "fine", "unexpected behaviour", "back to normal"],
|
2021-02-12 08:19:30 +01:00
|
|
|
)
|
2019-12-26 21:11:55 +01:00
|
|
|
|
2020-09-29 01:16:54 +02:00
|
|
|
def test_timeouts(self) -> None:
|
|
|
|
processed = []
|
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
@queue_processors.assign_queue("timeout_worker", is_test_queue=True)
|
2020-09-29 01:16:54 +02:00
|
|
|
class TimeoutWorker(queue_processors.QueueProcessingWorker):
|
|
|
|
MAX_CONSUME_SECONDS = 1
|
|
|
|
|
|
|
|
def consume(self, data: Mapping[str, Any]) -> None:
|
2021-02-12 08:20:45 +01:00
|
|
|
if data["type"] == "timeout":
|
2020-09-29 01:16:54 +02:00
|
|
|
time.sleep(5)
|
|
|
|
processed.append(data["type"])
|
|
|
|
|
2021-11-15 20:53:35 +01:00
|
|
|
fake_client = FakeClient()
|
2021-02-12 08:20:45 +01:00
|
|
|
for msg in ["good", "fine", "timeout", "back to normal"]:
|
|
|
|
fake_client.enqueue("timeout_worker", {"type": msg})
|
2020-09-29 01:16:54 +02:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
fn = os.path.join(settings.QUEUE_ERROR_DIR, "timeout_worker.errors")
|
2023-01-18 05:25:49 +01:00
|
|
|
with suppress(FileNotFoundError):
|
2020-09-29 01:16:54 +02:00
|
|
|
os.remove(fn)
|
|
|
|
|
2021-11-15 20:55:47 +01:00
|
|
|
with simulated_queue_client(fake_client):
|
2020-09-29 01:16:54 +02:00
|
|
|
worker = TimeoutWorker()
|
|
|
|
worker.setup()
|
2021-02-12 08:20:45 +01:00
|
|
|
with self.assertLogs(level="ERROR") as m:
|
2020-09-29 01:16:54 +02:00
|
|
|
worker.start()
|
2021-02-12 08:19:30 +01:00
|
|
|
self.assertEqual(
|
|
|
|
m.records[0].message,
|
2021-09-02 03:03:47 +02:00
|
|
|
"Timed out in timeout_worker after 1 seconds processing 1 events",
|
2021-02-12 08:19:30 +01:00
|
|
|
)
|
2020-12-23 21:45:16 +01:00
|
|
|
self.assertIn(m.records[0].stack_info, m.output[0])
|
2020-09-29 01:16:54 +02:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
self.assertEqual(processed, ["good", "fine", "back to normal"])
|
2020-09-29 01:16:54 +02:00
|
|
|
with open(fn) as f:
|
|
|
|
line = f.readline().strip()
|
2021-02-12 08:20:45 +01:00
|
|
|
events = orjson.loads(line.split("\t")[1])
|
2020-09-29 01:16:54 +02:00
|
|
|
self.assert_length(events, 1)
|
|
|
|
event = events[0]
|
2021-02-12 08:20:45 +01:00
|
|
|
self.assertEqual(event["type"], "timeout")
|
2020-09-29 01:16:54 +02:00
|
|
|
|
2021-07-06 15:37:18 +02:00
|
|
|
def test_embed_links_timeout(self) -> None:
|
|
|
|
@queue_processors.assign_queue("timeout_worker", is_test_queue=True)
|
|
|
|
class TimeoutWorker(FetchLinksEmbedData):
|
|
|
|
MAX_CONSUME_SECONDS = 1
|
|
|
|
|
|
|
|
def consume(self, data: Mapping[str, Any]) -> None:
|
|
|
|
# Send SIGALRM to ourselves to simulate a timeout.
|
|
|
|
pid = os.getpid()
|
|
|
|
os.kill(pid, signal.SIGALRM)
|
|
|
|
|
2021-11-15 20:53:35 +01:00
|
|
|
fake_client = FakeClient()
|
2021-07-06 15:37:18 +02:00
|
|
|
fake_client.enqueue(
|
|
|
|
"timeout_worker",
|
|
|
|
{
|
|
|
|
"type": "timeout",
|
|
|
|
"message_id": 15,
|
|
|
|
"urls": ["first", "second"],
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
2021-11-15 20:55:47 +01:00
|
|
|
with simulated_queue_client(fake_client):
|
2021-07-06 15:37:18 +02:00
|
|
|
worker = TimeoutWorker()
|
|
|
|
worker.setup()
|
|
|
|
with self.assertLogs(level="WARNING") as m:
|
|
|
|
worker.start()
|
|
|
|
self.assertEqual(
|
|
|
|
m.records[0].message,
|
2021-09-02 03:03:47 +02:00
|
|
|
"Timed out in timeout_worker after 1 seconds while fetching URLs for message 15: ['first', 'second']",
|
2021-07-06 15:37:18 +02:00
|
|
|
)
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_worker_noname(self) -> None:
|
2017-03-08 12:18:27 +01:00
|
|
|
class TestWorker(queue_processors.QueueProcessingWorker):
|
2017-11-05 10:51:25 +01:00
|
|
|
def __init__(self) -> None:
|
2017-10-27 08:28:23 +02:00
|
|
|
super().__init__()
|
2017-03-08 12:18:27 +01:00
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def consume(self, data: Mapping[str, Any]) -> None:
|
2017-03-08 12:18:27 +01:00
|
|
|
pass # nocoverage # this is intentionally not called
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2022-11-17 09:30:48 +01:00
|
|
|
with self.assertRaises(queue_processors.WorkerDeclarationError):
|
2017-03-08 12:18:27 +01:00
|
|
|
TestWorker()
|
|
|
|
|
2017-11-10 15:00:45 +01:00
|
|
|
def test_get_active_worker_queues(self) -> None:
|
2023-05-16 16:56:00 +02:00
|
|
|
# Find all recursive subclasses of QueueProcessingWorker
|
|
|
|
base_classes = [QueueProcessingWorker]
|
|
|
|
all_classes = []
|
|
|
|
while base_classes:
|
|
|
|
new_subclasses = []
|
|
|
|
for base_class in base_classes:
|
|
|
|
new_subclasses.append(base_class.__subclasses__())
|
|
|
|
base_classes = list(itertools.chain(*new_subclasses))
|
|
|
|
all_classes += base_classes
|
2020-11-10 22:32:01 +01:00
|
|
|
worker_queue_names = {
|
2023-05-16 16:56:00 +02:00
|
|
|
queue_class.queue_name for queue_class in all_classes if not isabstract(queue_class)
|
2020-11-10 22:32:01 +01:00
|
|
|
}
|
2021-07-06 23:37:16 +02:00
|
|
|
|
2023-05-16 16:56:00 +02:00
|
|
|
test_queue_names = set(get_active_worker_queues(only_test_queues=True))
|
|
|
|
|
2021-07-06 23:37:16 +02:00
|
|
|
# Verify that the set of active worker queues equals the set
|
2022-02-08 00:13:33 +01:00
|
|
|
# of subclasses without is_test_queue set.
|
2021-02-12 08:19:30 +01:00
|
|
|
self.assertEqual(set(get_active_worker_queues()), worker_queue_names - test_queue_names)
|