2017-03-08 12:18:27 +01:00
|
|
|
import os
|
|
|
|
import time
|
|
|
|
import ujson
|
2017-09-15 09:38:12 +02:00
|
|
|
import smtplib
|
2019-08-18 08:15:45 +02:00
|
|
|
import re
|
2017-03-08 12:18:27 +01:00
|
|
|
|
|
|
|
from django.conf import settings
|
2019-02-02 23:53:44 +01:00
|
|
|
from django.test import override_settings
|
2017-10-06 07:15:58 +02:00
|
|
|
from mock import patch, MagicMock
|
2017-03-08 12:18:27 +01:00
|
|
|
from typing import Any, Callable, Dict, List, Mapping, Tuple
|
|
|
|
|
2019-03-16 11:39:09 +01:00
|
|
|
from zerver.lib.email_mirror import RateLimitedRealmMirror
|
2019-03-21 10:24:56 +01:00
|
|
|
from zerver.lib.email_mirror_helpers import encode_email_address
|
2019-12-02 19:13:49 +01:00
|
|
|
from zerver.lib.queue import MAX_REQUEST_RETRIES
|
2019-03-16 11:39:09 +01:00
|
|
|
from zerver.lib.rate_limiter import RateLimiterLockingException, clear_history
|
2018-01-30 20:06:23 +01:00
|
|
|
from zerver.lib.send_email import FromAddress
|
2017-03-08 12:18:27 +01:00
|
|
|
from zerver.lib.test_helpers import simulated_queue_client
|
|
|
|
from zerver.lib.test_classes import ZulipTestCase
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
from zerver.models import get_client, UserActivity, PreregistrationUser, \
|
2019-03-18 08:09:54 +01:00
|
|
|
get_system_bot, get_stream, get_realm
|
2017-03-08 12:18:27 +01:00
|
|
|
from zerver.worker import queue_processors
|
2017-11-10 15:00:45 +01:00
|
|
|
from zerver.worker.queue_processors import (
|
|
|
|
get_active_worker_queues,
|
|
|
|
QueueProcessingWorker,
|
2017-11-29 08:25:57 +01:00
|
|
|
EmailSendingWorker,
|
2017-11-10 15:00:45 +01:00
|
|
|
LoopQueueProcessingWorker,
|
2017-11-15 15:27:41 +01:00
|
|
|
MissedMessageWorker,
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
SlowQueryWorker,
|
2017-11-10 15:00:45 +01:00
|
|
|
)
|
2017-03-08 12:18:27 +01:00
|
|
|
|
2019-08-16 22:03:52 +02:00
|
|
|
from zerver.middleware import write_log_line
|
|
|
|
|
2017-11-15 15:27:41 +01:00
|
|
|
Event = Dict[str, Any]
|
|
|
|
|
|
|
|
# This is used for testing LoopQueueProcessingWorker, which
|
|
|
|
# would run forever if we don't mock time.sleep to abort the
|
|
|
|
# loop.
|
|
|
|
class AbortLoop(Exception):
|
|
|
|
pass
|
|
|
|
|
2017-05-07 17:21:26 +02:00
|
|
|
class WorkerTest(ZulipTestCase):
|
2017-11-05 11:49:43 +01:00
|
|
|
class FakeClient:
|
2017-11-05 10:51:25 +01:00
|
|
|
def __init__(self) -> None:
|
2017-11-02 17:34:39 +01:00
|
|
|
self.consumers = {} # type: Dict[str, Callable[[Dict[str, Any]], None]]
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
self.queue = [] # type: List[Tuple[str, Any]]
|
2017-03-08 12:18:27 +01:00
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def register_json_consumer(self,
|
|
|
|
queue_name: str,
|
|
|
|
callback: Callable[[Dict[str, Any]], None]) -> None:
|
2017-03-08 12:18:27 +01:00
|
|
|
self.consumers[queue_name] = callback
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def start_consuming(self) -> None:
|
2017-03-08 12:18:27 +01:00
|
|
|
for queue_name, data in self.queue:
|
|
|
|
callback = self.consumers[queue_name]
|
|
|
|
callback(data)
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
self.queue = []
|
2017-03-08 12:18:27 +01:00
|
|
|
|
2017-11-15 15:27:41 +01:00
|
|
|
def drain_queue(self, queue_name: str, json: bool) -> List[Event]:
|
|
|
|
assert json
|
|
|
|
events = [
|
|
|
|
dct
|
|
|
|
for (queue_name, dct)
|
|
|
|
in self.queue
|
|
|
|
]
|
|
|
|
|
|
|
|
# IMPORTANT!
|
|
|
|
# This next line prevents us from double draining
|
|
|
|
# queues, which was a bug at one point.
|
|
|
|
self.queue = []
|
|
|
|
|
|
|
|
return events
|
|
|
|
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
@override_settings(SLOW_QUERY_LOGS_STREAM="errors")
|
|
|
|
def test_slow_queries_worker(self) -> None:
|
|
|
|
error_bot = get_system_bot(settings.ERROR_BOT)
|
|
|
|
fake_client = self.FakeClient()
|
|
|
|
worker = SlowQueryWorker()
|
|
|
|
|
|
|
|
time_mock = patch(
|
|
|
|
'zerver.worker.queue_processors.time.sleep',
|
|
|
|
side_effect=AbortLoop,
|
|
|
|
)
|
|
|
|
|
|
|
|
send_mock = patch(
|
|
|
|
'zerver.worker.queue_processors.internal_send_message'
|
|
|
|
)
|
|
|
|
|
|
|
|
with send_mock as sm, time_mock as tm:
|
|
|
|
with simulated_queue_client(lambda: fake_client):
|
|
|
|
try:
|
|
|
|
worker.setup()
|
2019-08-16 22:03:52 +02:00
|
|
|
# `write_log_line` is where we publish slow queries to the queue.
|
|
|
|
with patch('zerver.middleware.is_slow_query', return_value=True):
|
|
|
|
write_log_line(log_data=dict(test='data'), email='test@zulip.com',
|
|
|
|
remote_ip='127.0.0.1', client_name='website', path='/test/',
|
|
|
|
method='GET')
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
worker.start()
|
|
|
|
except AbortLoop:
|
|
|
|
pass
|
|
|
|
|
|
|
|
self.assertEqual(tm.call_args[0][0], 60) # should sleep 60 seconds
|
|
|
|
|
|
|
|
sm.assert_called_once()
|
|
|
|
args = [c[0] for c in sm.call_args_list][0]
|
|
|
|
self.assertEqual(args[0], error_bot.realm)
|
|
|
|
self.assertEqual(args[1], error_bot.email)
|
|
|
|
self.assertEqual(args[2], "stream")
|
|
|
|
self.assertEqual(args[3], "errors")
|
|
|
|
self.assertEqual(args[4], "testserver: slow queries")
|
2019-08-18 08:15:45 +02:00
|
|
|
# Testing for specific query times can lead to test discrepancies.
|
|
|
|
logging_info = re.sub(r'\(db: [0-9]+ms/13q\)', '', args[5])
|
|
|
|
self.assertEqual(logging_info, ' 127.0.0.1 GET 200 -1000ms '
|
|
|
|
' /test/ (test@zulip.com via website) (test@zulip.com)\n')
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
|
2019-09-18 02:06:20 +02:00
|
|
|
def test_UserActivityWorker(self) -> None:
|
|
|
|
fake_client = self.FakeClient()
|
|
|
|
|
|
|
|
user = self.example_user('hamlet')
|
|
|
|
UserActivity.objects.filter(
|
|
|
|
user_profile = user.id,
|
|
|
|
client = get_client('ios')
|
|
|
|
).delete()
|
|
|
|
|
|
|
|
data = dict(
|
|
|
|
user_profile_id = user.id,
|
|
|
|
client = 'ios',
|
|
|
|
time = time.time(),
|
|
|
|
query = 'send_message'
|
|
|
|
)
|
|
|
|
fake_client.queue.append(('user_activity', data))
|
|
|
|
|
2019-09-18 01:52:37 +02:00
|
|
|
time_mock = patch(
|
|
|
|
'zerver.worker.queue_processors.time.sleep',
|
|
|
|
side_effect=AbortLoop,
|
|
|
|
)
|
|
|
|
|
|
|
|
with time_mock:
|
|
|
|
with simulated_queue_client(lambda: fake_client):
|
|
|
|
worker = queue_processors.UserActivityWorker()
|
|
|
|
worker.setup()
|
|
|
|
try:
|
|
|
|
worker.start()
|
|
|
|
except AbortLoop:
|
|
|
|
pass
|
|
|
|
activity_records = UserActivity.objects.filter(
|
|
|
|
user_profile = user.id,
|
|
|
|
client = get_client('ios')
|
|
|
|
)
|
|
|
|
self.assertTrue(len(activity_records), 1)
|
|
|
|
self.assertTrue(activity_records[0].count, 1)
|
2019-09-18 02:06:20 +02:00
|
|
|
|
2019-09-18 01:53:49 +02:00
|
|
|
# Now process the event a second time and confirm count goes
|
|
|
|
# up to 2. Ideally, we'd use an event with a slightly never
|
|
|
|
# time, but it's not really important.
|
|
|
|
fake_client.queue.append(('user_activity', data))
|
|
|
|
with time_mock:
|
|
|
|
with simulated_queue_client(lambda: fake_client):
|
|
|
|
worker = queue_processors.UserActivityWorker()
|
|
|
|
worker.setup()
|
|
|
|
try:
|
|
|
|
worker.start()
|
|
|
|
except AbortLoop:
|
|
|
|
pass
|
|
|
|
activity_records = UserActivity.objects.filter(
|
|
|
|
user_profile = user.id,
|
|
|
|
client = get_client('ios')
|
|
|
|
)
|
|
|
|
self.assertTrue(len(activity_records), 1)
|
|
|
|
self.assertTrue(activity_records[0].count, 2)
|
|
|
|
|
2017-11-15 15:27:41 +01:00
|
|
|
def test_missed_message_worker(self) -> None:
|
|
|
|
cordelia = self.example_user('cordelia')
|
|
|
|
hamlet = self.example_user('hamlet')
|
|
|
|
othello = self.example_user('othello')
|
|
|
|
|
|
|
|
hamlet1_msg_id = self.send_personal_message(
|
|
|
|
from_email=cordelia.email,
|
|
|
|
to_email=hamlet.email,
|
|
|
|
content='hi hamlet',
|
|
|
|
)
|
|
|
|
|
|
|
|
hamlet2_msg_id = self.send_personal_message(
|
|
|
|
from_email=cordelia.email,
|
|
|
|
to_email=hamlet.email,
|
|
|
|
content='goodbye hamlet',
|
|
|
|
)
|
|
|
|
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
hamlet3_msg_id = self.send_personal_message(
|
|
|
|
from_email=cordelia.email,
|
|
|
|
to_email=hamlet.email,
|
|
|
|
content='hello again hamlet',
|
|
|
|
)
|
|
|
|
|
2017-11-15 15:27:41 +01:00
|
|
|
othello_msg_id = self.send_personal_message(
|
|
|
|
from_email=cordelia.email,
|
|
|
|
to_email=othello.email,
|
|
|
|
content='where art thou, othello?',
|
|
|
|
)
|
|
|
|
|
|
|
|
events = [
|
|
|
|
dict(user_profile_id=hamlet.id, message_id=hamlet1_msg_id),
|
|
|
|
dict(user_profile_id=hamlet.id, message_id=hamlet2_msg_id),
|
|
|
|
dict(user_profile_id=othello.id, message_id=othello_msg_id),
|
|
|
|
]
|
|
|
|
|
|
|
|
fake_client = self.FakeClient()
|
|
|
|
for event in events:
|
|
|
|
fake_client.queue.append(('missedmessage_emails', event))
|
|
|
|
|
|
|
|
mmw = MissedMessageWorker()
|
|
|
|
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
class MockTimer():
|
|
|
|
is_running = False
|
|
|
|
|
|
|
|
def is_alive(self) -> bool:
|
|
|
|
return self.is_running
|
|
|
|
|
|
|
|
def start(self) -> None:
|
|
|
|
self.is_running = True
|
|
|
|
|
|
|
|
def cancel(self) -> None:
|
|
|
|
self.is_running = False
|
|
|
|
|
|
|
|
timer = MockTimer()
|
2017-11-15 15:27:41 +01:00
|
|
|
time_mock = patch(
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
'zerver.worker.queue_processors.Timer',
|
|
|
|
return_value=timer,
|
2017-11-15 15:27:41 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
send_mock = patch(
|
2019-03-15 18:51:39 +01:00
|
|
|
'zerver.lib.email_notifications.do_send_missedmessage_events_reply_in_zulip'
|
2017-11-15 15:27:41 +01:00
|
|
|
)
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
mmw.BATCH_DURATION = 0
|
|
|
|
|
|
|
|
bonus_event = dict(user_profile_id=hamlet.id, message_id=hamlet3_msg_id)
|
2017-11-15 15:27:41 +01:00
|
|
|
|
|
|
|
with send_mock as sm, time_mock as tm:
|
|
|
|
with simulated_queue_client(lambda: fake_client):
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
self.assertFalse(timer.is_alive())
|
|
|
|
mmw.setup()
|
|
|
|
mmw.start()
|
|
|
|
self.assertTrue(timer.is_alive())
|
|
|
|
fake_client.queue.append(('missedmessage_emails', bonus_event))
|
|
|
|
|
|
|
|
# Double-calling start is our way to get it to run again
|
|
|
|
self.assertTrue(timer.is_alive())
|
|
|
|
mmw.start()
|
|
|
|
|
|
|
|
# Now, we actually send the emails.
|
|
|
|
mmw.maybe_send_batched_emails()
|
|
|
|
self.assertFalse(timer.is_alive())
|
2017-11-15 15:27:41 +01:00
|
|
|
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
self.assertEqual(tm.call_args[0][0], 5) # should sleep 5 seconds
|
2017-11-15 15:27:41 +01:00
|
|
|
|
|
|
|
args = [c[0] for c in sm.call_args_list]
|
|
|
|
arg_dict = {
|
|
|
|
arg[0].id: dict(
|
|
|
|
missed_messages=arg[1],
|
|
|
|
count=arg[2],
|
|
|
|
)
|
|
|
|
for arg in args
|
|
|
|
}
|
|
|
|
|
|
|
|
hamlet_info = arg_dict[hamlet.id]
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
self.assertEqual(hamlet_info['count'], 3)
|
2017-11-15 15:27:41 +01:00
|
|
|
self.assertEqual(
|
2018-07-14 07:31:10 +02:00
|
|
|
{m['message'].content for m in hamlet_info['missed_messages']},
|
queue_processors: Rewrite MissedMessageWorker to always wait.
Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails. This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.
Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.
We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.
This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.
Fixes #6839.
2018-10-24 21:08:38 +02:00
|
|
|
{'hi hamlet', 'goodbye hamlet', 'hello again hamlet'},
|
2017-11-15 15:27:41 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
othello_info = arg_dict[othello.id]
|
|
|
|
self.assertEqual(othello_info['count'], 1)
|
|
|
|
self.assertEqual(
|
2018-07-14 07:31:10 +02:00
|
|
|
{m['message'].content for m in othello_info['missed_messages']},
|
2017-11-15 15:27:41 +01:00
|
|
|
{'where art thou, othello?'}
|
|
|
|
)
|
|
|
|
|
2019-03-18 08:09:54 +01:00
|
|
|
@patch('zerver.worker.queue_processors.mirror_email')
|
|
|
|
def test_mirror_worker(self, mock_mirror_email: MagicMock) -> None:
|
2017-04-05 11:46:14 +02:00
|
|
|
fake_client = self.FakeClient()
|
2019-03-18 08:09:54 +01:00
|
|
|
stream = get_stream('Denmark', get_realm('zulip'))
|
|
|
|
stream_to_address = encode_email_address(stream)
|
2017-04-05 11:46:14 +02:00
|
|
|
data = [
|
|
|
|
dict(
|
|
|
|
message=u'\xf3test',
|
|
|
|
time=time.time(),
|
2019-03-18 08:09:54 +01:00
|
|
|
rcpt_to=stream_to_address
|
|
|
|
)
|
|
|
|
] * 3
|
2017-04-05 11:46:14 +02:00
|
|
|
for element in data:
|
|
|
|
fake_client.queue.append(('email_mirror', element))
|
|
|
|
|
2019-03-18 08:09:54 +01:00
|
|
|
with simulated_queue_client(lambda: fake_client):
|
|
|
|
worker = queue_processors.MirrorWorker()
|
|
|
|
worker.setup()
|
|
|
|
worker.start()
|
|
|
|
|
|
|
|
self.assertEqual(mock_mirror_email.call_count, 3)
|
2017-04-05 11:46:14 +02:00
|
|
|
|
2019-03-23 18:50:05 +01:00
|
|
|
@patch('zerver.lib.rate_limiter.logger.warning')
|
2019-03-16 11:39:09 +01:00
|
|
|
@patch('zerver.worker.queue_processors.mirror_email')
|
|
|
|
@override_settings(RATE_LIMITING_MIRROR_REALM_RULES=[(10, 2)])
|
2019-03-23 18:50:05 +01:00
|
|
|
def test_mirror_worker_rate_limiting(self, mock_mirror_email: MagicMock,
|
|
|
|
mock_warn: MagicMock) -> None:
|
2019-03-16 11:39:09 +01:00
|
|
|
fake_client = self.FakeClient()
|
|
|
|
realm = get_realm('zulip')
|
|
|
|
clear_history(RateLimitedRealmMirror(realm))
|
|
|
|
stream = get_stream('Denmark', realm)
|
|
|
|
stream_to_address = encode_email_address(stream)
|
|
|
|
data = [
|
|
|
|
dict(
|
|
|
|
message=u'\xf3test',
|
|
|
|
time=time.time(),
|
|
|
|
rcpt_to=stream_to_address
|
|
|
|
)
|
|
|
|
] * 5
|
|
|
|
for element in data:
|
|
|
|
fake_client.queue.append(('email_mirror', element))
|
|
|
|
|
|
|
|
with simulated_queue_client(lambda: fake_client):
|
|
|
|
start_time = time.time()
|
|
|
|
with patch('time.time', return_value=start_time):
|
|
|
|
worker = queue_processors.MirrorWorker()
|
|
|
|
worker.setup()
|
|
|
|
worker.start()
|
|
|
|
# Of the first 5 messages, only 2 should be processed
|
|
|
|
# (the rest being rate-limited):
|
|
|
|
self.assertEqual(mock_mirror_email.call_count, 2)
|
|
|
|
|
|
|
|
# If a new message is sent into the stream mirror, it will get rejected:
|
|
|
|
fake_client.queue.append(('email_mirror', data[0]))
|
|
|
|
worker.start()
|
|
|
|
self.assertEqual(mock_mirror_email.call_count, 2)
|
|
|
|
|
|
|
|
# However, missed message emails don't get rate limited:
|
|
|
|
with self.settings(EMAIL_GATEWAY_PATTERN="%s@example.com"):
|
|
|
|
address = 'mm' + ('x' * 32) + '@example.com'
|
|
|
|
event = dict(
|
|
|
|
message=u'\xf3test',
|
|
|
|
time=time.time(),
|
|
|
|
rcpt_to=address
|
|
|
|
)
|
|
|
|
fake_client.queue.append(('email_mirror', event))
|
|
|
|
worker.start()
|
|
|
|
self.assertEqual(mock_mirror_email.call_count, 3)
|
|
|
|
|
|
|
|
# After some times passes, emails get accepted again:
|
|
|
|
with patch('time.time', return_value=(start_time + 11.0)):
|
|
|
|
fake_client.queue.append(('email_mirror', data[0]))
|
|
|
|
worker.start()
|
|
|
|
self.assertEqual(mock_mirror_email.call_count, 4)
|
|
|
|
|
|
|
|
# If RateLimiterLockingException is thrown, we rate-limit the new message:
|
2019-03-23 18:50:05 +01:00
|
|
|
with patch('zerver.lib.rate_limiter.incr_ratelimit',
|
2019-03-16 11:39:09 +01:00
|
|
|
side_effect=RateLimiterLockingException):
|
|
|
|
fake_client.queue.append(('email_mirror', data[0]))
|
|
|
|
worker.start()
|
|
|
|
self.assertEqual(mock_mirror_email.call_count, 4)
|
2019-03-23 18:50:05 +01:00
|
|
|
expected_warn = "Deadlock trying to incr_ratelimit for RateLimitedRealmMirror:zulip"
|
|
|
|
mock_warn.assert_called_with(expected_warn)
|
2019-03-16 11:39:09 +01:00
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_email_sending_worker_retries(self) -> None:
|
2017-09-15 09:38:12 +02:00
|
|
|
"""Tests the retry_send_email_failures decorator to make sure it
|
|
|
|
retries sending the email 3 times and then gives up."""
|
|
|
|
fake_client = self.FakeClient()
|
|
|
|
|
2018-01-30 20:06:23 +01:00
|
|
|
data = {
|
|
|
|
'template_prefix': 'zerver/emails/confirm_new_email',
|
2018-12-03 23:26:51 +01:00
|
|
|
'to_emails': [self.example_email("hamlet")],
|
2018-01-30 20:06:23 +01:00
|
|
|
'from_name': 'Zulip Account Security',
|
|
|
|
'from_address': FromAddress.NOREPLY,
|
|
|
|
'context': {}
|
|
|
|
}
|
2017-11-29 08:25:57 +01:00
|
|
|
fake_client.queue.append(('email_senders', data))
|
2017-09-15 09:38:12 +02:00
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def fake_publish(queue_name: str,
|
|
|
|
event: Dict[str, Any],
|
|
|
|
processor: Callable[[Any], None]) -> None:
|
2017-09-15 09:38:12 +02:00
|
|
|
fake_client.queue.append((queue_name, event))
|
|
|
|
|
|
|
|
with simulated_queue_client(lambda: fake_client):
|
2017-11-29 08:25:57 +01:00
|
|
|
worker = queue_processors.EmailSendingWorker()
|
2017-09-15 09:38:12 +02:00
|
|
|
worker.setup()
|
2018-01-30 20:06:23 +01:00
|
|
|
with patch('zerver.lib.send_email.build_email',
|
2017-09-15 09:38:12 +02:00
|
|
|
side_effect=smtplib.SMTPServerDisconnected), \
|
|
|
|
patch('zerver.lib.queue.queue_json_publish',
|
|
|
|
side_effect=fake_publish), \
|
|
|
|
patch('logging.exception'):
|
|
|
|
worker.start()
|
|
|
|
|
2019-12-02 19:13:49 +01:00
|
|
|
self.assertEqual(data['failed_tries'], 1 + MAX_REQUEST_RETRIES)
|
2017-09-15 09:38:12 +02:00
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_signups_worker_retries(self) -> None:
|
2017-10-06 07:15:58 +02:00
|
|
|
"""Tests the retry logic of signups queue."""
|
|
|
|
fake_client = self.FakeClient()
|
|
|
|
|
|
|
|
user_id = self.example_user('hamlet').id
|
2017-10-28 03:14:13 +02:00
|
|
|
data = {'user_id': user_id, 'id': 'test_missed'}
|
2017-10-06 07:15:58 +02:00
|
|
|
fake_client.queue.append(('signups', data))
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def fake_publish(queue_name: str, event: Dict[str, Any], processor: Callable[[Any], None]) -> None:
|
2017-10-06 07:15:58 +02:00
|
|
|
fake_client.queue.append((queue_name, event))
|
|
|
|
|
|
|
|
fake_response = MagicMock()
|
|
|
|
fake_response.status_code = 400
|
|
|
|
fake_response.text = ujson.dumps({'title': ''})
|
|
|
|
with simulated_queue_client(lambda: fake_client):
|
|
|
|
worker = queue_processors.SignupWorker()
|
|
|
|
worker.setup()
|
|
|
|
with patch('zerver.worker.queue_processors.requests.post',
|
|
|
|
return_value=fake_response), \
|
|
|
|
patch('zerver.lib.queue.queue_json_publish',
|
|
|
|
side_effect=fake_publish), \
|
|
|
|
patch('logging.info'), \
|
|
|
|
self.settings(MAILCHIMP_API_KEY='one-two',
|
|
|
|
PRODUCTION=True,
|
|
|
|
ZULIP_FRIENDS_LIST_ID='id'):
|
|
|
|
worker.start()
|
|
|
|
|
2019-12-02 19:13:49 +01:00
|
|
|
self.assertEqual(data['failed_tries'], 1 + MAX_REQUEST_RETRIES)
|
2017-10-06 07:15:58 +02:00
|
|
|
|
2018-02-25 21:08:03 +01:00
|
|
|
def test_signups_worker_existing_member(self) -> None:
|
|
|
|
fake_client = self.FakeClient()
|
|
|
|
|
|
|
|
user_id = self.example_user('hamlet').id
|
|
|
|
data = {'user_id': user_id,
|
|
|
|
'id': 'test_missed',
|
|
|
|
'email_address': 'foo@bar.baz'}
|
|
|
|
fake_client.queue.append(('signups', data))
|
|
|
|
|
|
|
|
fake_response = MagicMock()
|
|
|
|
fake_response.status_code = 400
|
|
|
|
fake_response.text = ujson.dumps({'title': 'Member Exists'})
|
|
|
|
with simulated_queue_client(lambda: fake_client):
|
|
|
|
worker = queue_processors.SignupWorker()
|
|
|
|
worker.setup()
|
|
|
|
with patch('zerver.worker.queue_processors.requests.post',
|
|
|
|
return_value=fake_response), \
|
|
|
|
self.settings(MAILCHIMP_API_KEY='one-two',
|
|
|
|
PRODUCTION=True,
|
|
|
|
ZULIP_FRIENDS_LIST_ID='id'):
|
|
|
|
with patch('logging.warning') as logging_warning_mock:
|
|
|
|
worker.start()
|
|
|
|
logging_warning_mock.assert_called_once_with(
|
|
|
|
"Attempted to sign up already existing email to list: foo@bar.baz")
|
|
|
|
|
|
|
|
def test_signups_bad_request(self) -> None:
|
|
|
|
fake_client = self.FakeClient()
|
|
|
|
|
|
|
|
user_id = self.example_user('hamlet').id
|
|
|
|
data = {'user_id': user_id, 'id': 'test_missed'}
|
|
|
|
fake_client.queue.append(('signups', data))
|
|
|
|
|
|
|
|
fake_response = MagicMock()
|
|
|
|
fake_response.status_code = 444 # Any non-400 bad request code.
|
|
|
|
fake_response.text = ujson.dumps({'title': 'Member Exists'})
|
|
|
|
with simulated_queue_client(lambda: fake_client):
|
|
|
|
worker = queue_processors.SignupWorker()
|
|
|
|
worker.setup()
|
|
|
|
with patch('zerver.worker.queue_processors.requests.post',
|
|
|
|
return_value=fake_response), \
|
|
|
|
self.settings(MAILCHIMP_API_KEY='one-two',
|
|
|
|
PRODUCTION=True,
|
|
|
|
ZULIP_FRIENDS_LIST_ID='id'):
|
2019-01-31 14:32:37 +01:00
|
|
|
worker.start()
|
|
|
|
fake_response.raise_for_status.assert_called_once()
|
2018-02-25 21:08:03 +01:00
|
|
|
|
2017-12-07 00:58:34 +01:00
|
|
|
def test_invites_worker(self) -> None:
|
|
|
|
fake_client = self.FakeClient()
|
|
|
|
invitor = self.example_user('iago')
|
2017-12-05 09:01:41 +01:00
|
|
|
prereg_alice = PreregistrationUser.objects.create(
|
|
|
|
email=self.nonreg_email('alice'), referred_by=invitor, realm=invitor.realm)
|
2017-12-07 06:45:18 +01:00
|
|
|
PreregistrationUser.objects.create(
|
2017-12-07 00:58:34 +01:00
|
|
|
email=self.nonreg_email('bob'), referred_by=invitor, realm=invitor.realm)
|
|
|
|
data = [
|
2017-12-05 09:01:41 +01:00
|
|
|
dict(prereg_id=prereg_alice.id, referrer_id=invitor.id, email_body=None),
|
|
|
|
# Nonexistent prereg_id, as if the invitation was deleted
|
|
|
|
dict(prereg_id=-1, referrer_id=invitor.id, email_body=None),
|
|
|
|
# Form with `email` is from versions up to Zulip 1.7.1
|
2017-12-07 00:58:34 +01:00
|
|
|
dict(email=self.nonreg_email('bob'), referrer_id=invitor.id, email_body=None),
|
|
|
|
]
|
|
|
|
for element in data:
|
|
|
|
fake_client.queue.append(('invites', element))
|
|
|
|
|
|
|
|
with simulated_queue_client(lambda: fake_client):
|
|
|
|
worker = queue_processors.ConfirmationEmailWorker()
|
|
|
|
worker.setup()
|
2019-09-12 17:22:51 +02:00
|
|
|
with patch('zerver.lib.actions.send_email'), \
|
2017-12-07 06:45:18 +01:00
|
|
|
patch('zerver.worker.queue_processors.send_future_email') \
|
|
|
|
as send_mock, \
|
|
|
|
patch('logging.info'):
|
2017-12-07 00:58:34 +01:00
|
|
|
worker.start()
|
2017-12-05 09:01:41 +01:00
|
|
|
self.assertEqual(send_mock.call_count, 2)
|
2017-12-07 00:58:34 +01:00
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_error_handling(self) -> None:
|
2017-03-08 12:18:27 +01:00
|
|
|
processed = []
|
|
|
|
|
|
|
|
@queue_processors.assign_queue('unreliable_worker')
|
|
|
|
class UnreliableWorker(queue_processors.QueueProcessingWorker):
|
2017-11-05 10:51:25 +01:00
|
|
|
def consume(self, data: Mapping[str, Any]) -> None:
|
2017-03-08 12:18:27 +01:00
|
|
|
if data["type"] == 'unexpected behaviour':
|
|
|
|
raise Exception('Worker task not performing as expected!')
|
|
|
|
processed.append(data["type"])
|
|
|
|
|
|
|
|
fake_client = self.FakeClient()
|
|
|
|
for msg in ['good', 'fine', 'unexpected behaviour', 'back to normal']:
|
|
|
|
fake_client.queue.append(('unreliable_worker', {'type': msg}))
|
|
|
|
|
|
|
|
fn = os.path.join(settings.QUEUE_ERROR_DIR, 'unreliable_worker.errors')
|
|
|
|
try:
|
|
|
|
os.remove(fn)
|
|
|
|
except OSError: # nocoverage # error handling for the directory not existing
|
|
|
|
pass
|
|
|
|
|
|
|
|
with simulated_queue_client(lambda: fake_client):
|
|
|
|
worker = UnreliableWorker()
|
|
|
|
worker.setup()
|
2018-02-25 20:41:51 +01:00
|
|
|
with patch('logging.exception') as logging_exception_mock:
|
|
|
|
worker.start()
|
|
|
|
logging_exception_mock.assert_called_once_with(
|
|
|
|
"Problem handling data on queue unreliable_worker")
|
2017-03-08 12:18:27 +01:00
|
|
|
|
|
|
|
self.assertEqual(processed, ['good', 'fine', 'back to normal'])
|
2019-07-14 21:37:08 +02:00
|
|
|
with open(fn, 'r') as f:
|
|
|
|
line = f.readline().strip()
|
2017-03-08 12:18:27 +01:00
|
|
|
event = ujson.loads(line.split('\t')[1])
|
|
|
|
self.assertEqual(event["type"], 'unexpected behaviour')
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_worker_noname(self) -> None:
|
2017-03-08 12:18:27 +01:00
|
|
|
class TestWorker(queue_processors.QueueProcessingWorker):
|
2017-11-05 10:51:25 +01:00
|
|
|
def __init__(self) -> None:
|
2017-10-27 08:28:23 +02:00
|
|
|
super().__init__()
|
2017-03-08 12:18:27 +01:00
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def consume(self, data: Mapping[str, Any]) -> None:
|
2017-03-08 12:18:27 +01:00
|
|
|
pass # nocoverage # this is intentionally not called
|
|
|
|
with self.assertRaises(queue_processors.WorkerDeclarationException):
|
|
|
|
TestWorker()
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_worker_noconsume(self) -> None:
|
2017-03-08 12:18:27 +01:00
|
|
|
@queue_processors.assign_queue('test_worker')
|
|
|
|
class TestWorker(queue_processors.QueueProcessingWorker):
|
2017-11-05 10:51:25 +01:00
|
|
|
def __init__(self) -> None:
|
2017-10-27 08:28:23 +02:00
|
|
|
super().__init__()
|
2017-03-08 12:18:27 +01:00
|
|
|
|
|
|
|
with self.assertRaises(queue_processors.WorkerDeclarationException):
|
|
|
|
worker = TestWorker()
|
|
|
|
worker.consume({})
|
2017-11-10 15:00:45 +01:00
|
|
|
|
|
|
|
def test_get_active_worker_queues(self) -> None:
|
|
|
|
worker_queue_count = (len(QueueProcessingWorker.__subclasses__()) +
|
2017-11-29 08:25:57 +01:00
|
|
|
len(EmailSendingWorker.__subclasses__()) +
|
2017-11-10 15:00:45 +01:00
|
|
|
len(LoopQueueProcessingWorker.__subclasses__()) - 1)
|
|
|
|
self.assertEqual(worker_queue_count, len(get_active_worker_queues()))
|
|
|
|
self.assertEqual(1, len(get_active_worker_queues(queue_type='test')))
|