queue_processors: Rewrite MissedMessageWorker to always wait.

Previously, MissedMessageWorker used a batching strategy of just
grabbing all the events from the last 2 minutes, and then sending them
off as emails.  This suffered from the problem that you had a random
time, between 0s and 120s, to edit your message before it would be
sent out via an email.

Additionally, this made the queue had to monitor, because it was
expected to pile up large numbers of events, even if everything was
fine.

We fix this by batching together the events using a timer; the queue
processor itself just tracks the items, and then a timer-handler
process takes care of ensuring that the emails get sent at least 120s
(and at most 130s) after the first triggering message was sent in Zulip.

This introduces a new unpleasant bug, namely that when we restart a
Zulip server, we can now lose some missed_message email events;
further work is required on this point.

Fixes #6839.
This commit is contained in:
Tim Abbott 2018-10-24 12:08:38 -07:00
parent 9ed3fe3596
commit 5cec566cb9
2 changed files with 146 additions and 24 deletions

View File

@ -6,14 +6,15 @@ import smtplib
from django.conf import settings from django.conf import settings
from django.http import HttpResponse from django.http import HttpResponse
from django.test import TestCase from django.test import TestCase, override_settings
from mock import patch, MagicMock from mock import patch, MagicMock
from typing import Any, Callable, Dict, List, Mapping, Tuple from typing import Any, Callable, Dict, List, Mapping, Tuple
from zerver.lib.send_email import FromAddress from zerver.lib.send_email import FromAddress
from zerver.lib.test_helpers import simulated_queue_client from zerver.lib.test_helpers import simulated_queue_client
from zerver.lib.test_classes import ZulipTestCase from zerver.lib.test_classes import ZulipTestCase
from zerver.models import get_client, UserActivity, PreregistrationUser from zerver.models import get_client, UserActivity, PreregistrationUser, \
get_system_bot
from zerver.worker import queue_processors from zerver.worker import queue_processors
from zerver.worker.queue_processors import ( from zerver.worker.queue_processors import (
get_active_worker_queues, get_active_worker_queues,
@ -21,6 +22,7 @@ from zerver.worker.queue_processors import (
EmailSendingWorker, EmailSendingWorker,
LoopQueueProcessingWorker, LoopQueueProcessingWorker,
MissedMessageWorker, MissedMessageWorker,
SlowQueryWorker,
) )
Event = Dict[str, Any] Event = Dict[str, Any]
@ -35,7 +37,7 @@ class WorkerTest(ZulipTestCase):
class FakeClient: class FakeClient:
def __init__(self) -> None: def __init__(self) -> None:
self.consumers = {} # type: Dict[str, Callable[[Dict[str, Any]], None]] self.consumers = {} # type: Dict[str, Callable[[Dict[str, Any]], None]]
self.queue = [] # type: List[Tuple[str, Dict[str, Any]]] self.queue = [] # type: List[Tuple[str, Any]]
def register_json_consumer(self, def register_json_consumer(self,
queue_name: str, queue_name: str,
@ -46,6 +48,7 @@ class WorkerTest(ZulipTestCase):
for queue_name, data in self.queue: for queue_name, data in self.queue:
callback = self.consumers[queue_name] callback = self.consumers[queue_name]
callback(data) callback(data)
self.queue = []
def drain_queue(self, queue_name: str, json: bool) -> List[Event]: def drain_queue(self, queue_name: str, json: bool) -> List[Event]:
assert json assert json
@ -62,6 +65,47 @@ class WorkerTest(ZulipTestCase):
return events return events
@override_settings(SLOW_QUERY_LOGS_STREAM="errors")
def test_slow_queries_worker(self) -> None:
error_bot = get_system_bot(settings.ERROR_BOT)
fake_client = self.FakeClient()
events = [
'test query (data)',
'second test query (data)',
]
for event in events:
fake_client.queue.append(('slow_queries', event))
worker = SlowQueryWorker()
time_mock = patch(
'zerver.worker.queue_processors.time.sleep',
side_effect=AbortLoop,
)
send_mock = patch(
'zerver.worker.queue_processors.internal_send_message'
)
with send_mock as sm, time_mock as tm:
with simulated_queue_client(lambda: fake_client):
try:
worker.setup()
worker.start()
except AbortLoop:
pass
self.assertEqual(tm.call_args[0][0], 60) # should sleep 60 seconds
sm.assert_called_once()
args = [c[0] for c in sm.call_args_list][0]
self.assertEqual(args[0], error_bot.realm)
self.assertEqual(args[1], error_bot.email)
self.assertEqual(args[2], "stream")
self.assertEqual(args[3], "errors")
self.assertEqual(args[4], "testserver: slow queries")
self.assertEqual(args[5], " test query (data)\n second test query (data)\n")
def test_missed_message_worker(self) -> None: def test_missed_message_worker(self) -> None:
cordelia = self.example_user('cordelia') cordelia = self.example_user('cordelia')
hamlet = self.example_user('hamlet') hamlet = self.example_user('hamlet')
@ -79,6 +123,12 @@ class WorkerTest(ZulipTestCase):
content='goodbye hamlet', content='goodbye hamlet',
) )
hamlet3_msg_id = self.send_personal_message(
from_email=cordelia.email,
to_email=hamlet.email,
content='hello again hamlet',
)
othello_msg_id = self.send_personal_message( othello_msg_id = self.send_personal_message(
from_email=cordelia.email, from_email=cordelia.email,
to_email=othello.email, to_email=othello.email,
@ -97,24 +147,48 @@ class WorkerTest(ZulipTestCase):
mmw = MissedMessageWorker() mmw = MissedMessageWorker()
class MockTimer():
is_running = False
def is_alive(self) -> bool:
return self.is_running
def start(self) -> None:
self.is_running = True
def cancel(self) -> None:
self.is_running = False
timer = MockTimer()
time_mock = patch( time_mock = patch(
'zerver.worker.queue_processors.time.sleep', 'zerver.worker.queue_processors.Timer',
side_effect=AbortLoop, return_value=timer,
) )
send_mock = patch( send_mock = patch(
'zerver.lib.notifications.do_send_missedmessage_events_reply_in_zulip' 'zerver.lib.notifications.do_send_missedmessage_events_reply_in_zulip'
) )
mmw.BATCH_DURATION = 0
bonus_event = dict(user_profile_id=hamlet.id, message_id=hamlet3_msg_id)
with send_mock as sm, time_mock as tm: with send_mock as sm, time_mock as tm:
with simulated_queue_client(lambda: fake_client): with simulated_queue_client(lambda: fake_client):
try: self.assertFalse(timer.is_alive())
mmw.setup() mmw.setup()
mmw.start() mmw.start()
except AbortLoop: self.assertTrue(timer.is_alive())
pass fake_client.queue.append(('missedmessage_emails', bonus_event))
self.assertEqual(tm.call_args[0][0], 120) # should sleep two minutes # Double-calling start is our way to get it to run again
self.assertTrue(timer.is_alive())
mmw.start()
# Now, we actually send the emails.
mmw.maybe_send_batched_emails()
self.assertFalse(timer.is_alive())
self.assertEqual(tm.call_args[0][0], 5) # should sleep 5 seconds
args = [c[0] for c in sm.call_args_list] args = [c[0] for c in sm.call_args_list]
arg_dict = { arg_dict = {
@ -126,10 +200,10 @@ class WorkerTest(ZulipTestCase):
} }
hamlet_info = arg_dict[hamlet.id] hamlet_info = arg_dict[hamlet.id]
self.assertEqual(hamlet_info['count'], 2) self.assertEqual(hamlet_info['count'], 3)
self.assertEqual( self.assertEqual(
{m['message'].content for m in hamlet_info['missed_messages']}, {m['message'].content for m in hamlet_info['missed_messages']},
{'hi hamlet', 'goodbye hamlet'}, {'hi hamlet', 'goodbye hamlet', 'hello again hamlet'},
) )
othello_info = arg_dict[othello.id] othello_info = arg_dict[othello.id]

View File

@ -4,6 +4,7 @@ from typing import Any, Callable, Dict, List, Mapping, Optional, cast, TypeVar,
import copy import copy
import signal import signal
from functools import wraps from functools import wraps
from threading import Timer
import smtplib import smtplib
import socket import socket
@ -263,22 +264,67 @@ class UserPresenceWorker(QueueProcessingWorker):
do_update_user_presence(user_profile, client, log_time, status) do_update_user_presence(user_profile, client, log_time, status)
@assign_queue('missedmessage_emails', queue_type="loop") @assign_queue('missedmessage_emails', queue_type="loop")
class MissedMessageWorker(LoopQueueProcessingWorker): class MissedMessageWorker(QueueProcessingWorker):
# Aggregate all messages received every 2 minutes to let someone finish sending a batch # Aggregate all messages received over the last BATCH_DURATION
# of messages # seconds to let someone finish sending a batch of messages and/or
sleep_delay = 2 * 60 # editing them before they are sent out as emails to recipients.
#
# The timer is running whenever; we poll at most every TIMER_FREQUENCY
# seconds, to avoid excessive activity.
#
# TODO: Since this process keeps events in memory for up to 2
# minutes, it now will lose approximately BATCH_DURATION worth of
# missed_message emails whenever it is restarted as part of a
# server restart. We should probably add some sort of save/reload
# mechanism for that case.
TIMER_FREQUENCY = 5
BATCH_DURATION = 120
timer_event = None # type: Optional[Timer]
events_by_recipient = defaultdict(list) # type: Dict[int, List[Dict[str, Any]]]
batch_start_by_recipient = {} # type: Dict[int, float]
def consume_batch(self, missed_events: List[Dict[str, Any]]) -> None: def consume(self, event: Dict[str, Any]) -> None:
by_recipient = defaultdict(list) # type: Dict[int, List[Dict[str, Any]]] logging.debug("Received missedmessage_emails event: %s" % (event,))
for event in missed_events: # When we process an event, just put it into the queue and ensure we have a timer going.
logging.debug("Received missedmessage_emails event: %s" % (event,)) user_profile_id = event['user_profile_id']
by_recipient[event['user_profile_id']].append(event) if user_profile_id not in self.batch_start_by_recipient:
self.batch_start_by_recipient[user_profile_id] = time.time()
self.events_by_recipient[user_profile_id].append(event)
for user_profile_id, events in by_recipient.items(): self.ensure_timer()
def ensure_timer(self) -> None:
if self.timer_event is not None:
return
self.timer_event = Timer(self.TIMER_FREQUENCY, MissedMessageWorker.maybe_send_batched_emails, [self])
self.timer_event.start()
def stop_timer(self) -> None:
if self.timer_event and self.timer_event.is_alive(): # type: ignore # Report mypy bug.
self.timer_event.cancel()
self.timer_event = None
def maybe_send_batched_emails(self) -> None:
self.stop_timer()
current_time = time.time()
for user_profile_id, timestamp in list(self.batch_start_by_recipient.items()):
if current_time - timestamp < self.BATCH_DURATION:
logging.info("%s: %s - %s" % (user_profile_id, current_time, timestamp))
continue
events = self.events_by_recipient[user_profile_id]
logging.info("Batch-processing %s missedmessage_emails events for user %s" % logging.info("Batch-processing %s missedmessage_emails events for user %s" %
(len(events), user_profile_id)) (len(events), user_profile_id))
handle_missedmessage_emails(user_profile_id, events) handle_missedmessage_emails(user_profile_id, events)
del self.events_by_recipient[user_profile_id]
del self.batch_start_by_recipient[user_profile_id]
# By only restarting the timer if there are actually events in
# the queue, we ensure this queue processor is idle when there
# are no missed-message emails to process.
if len(self.batch_start_by_recipient) > 0:
self.ensure_timer()
@assign_queue('email_senders') @assign_queue('email_senders')
class EmailSendingWorker(QueueProcessingWorker): class EmailSendingWorker(QueueProcessingWorker):
@ -332,7 +378,9 @@ class SlowQueryWorker(LoopQueueProcessingWorker):
# Sleep 1 minute between checking the queue # Sleep 1 minute between checking the queue
sleep_delay = 60 * 1 sleep_delay = 60 * 1
def consume_batch(self, slow_queries: List[Dict[str, Any]]) -> None: # TODO: The type annotation here should be List[str], but that
# creates conflicts with other users in the file.
def consume_batch(self, slow_queries: List[Any]) -> None:
for query in slow_queries: for query in slow_queries:
logging.info("Slow query: %s" % (query)) logging.info("Slow query: %s" % (query))