diff --git a/zerver/tests/test_queue_worker.py b/zerver/tests/test_queue_worker.py index 328674b3b2..bfb5df063b 100644 --- a/zerver/tests/test_queue_worker.py +++ b/zerver/tests/test_queue_worker.py @@ -6,14 +6,15 @@ import smtplib from django.conf import settings from django.http import HttpResponse -from django.test import TestCase +from django.test import TestCase, override_settings from mock import patch, MagicMock from typing import Any, Callable, Dict, List, Mapping, Tuple from zerver.lib.send_email import FromAddress from zerver.lib.test_helpers import simulated_queue_client from zerver.lib.test_classes import ZulipTestCase -from zerver.models import get_client, UserActivity, PreregistrationUser +from zerver.models import get_client, UserActivity, PreregistrationUser, \ + get_system_bot from zerver.worker import queue_processors from zerver.worker.queue_processors import ( get_active_worker_queues, @@ -21,6 +22,7 @@ from zerver.worker.queue_processors import ( EmailSendingWorker, LoopQueueProcessingWorker, MissedMessageWorker, + SlowQueryWorker, ) Event = Dict[str, Any] @@ -35,7 +37,7 @@ class WorkerTest(ZulipTestCase): class FakeClient: def __init__(self) -> None: self.consumers = {} # type: Dict[str, Callable[[Dict[str, Any]], None]] - self.queue = [] # type: List[Tuple[str, Dict[str, Any]]] + self.queue = [] # type: List[Tuple[str, Any]] def register_json_consumer(self, queue_name: str, @@ -46,6 +48,7 @@ class WorkerTest(ZulipTestCase): for queue_name, data in self.queue: callback = self.consumers[queue_name] callback(data) + self.queue = [] def drain_queue(self, queue_name: str, json: bool) -> List[Event]: assert json @@ -62,6 +65,47 @@ class WorkerTest(ZulipTestCase): return events + @override_settings(SLOW_QUERY_LOGS_STREAM="errors") + def test_slow_queries_worker(self) -> None: + error_bot = get_system_bot(settings.ERROR_BOT) + fake_client = self.FakeClient() + events = [ + 'test query (data)', + 'second test query (data)', + ] + for event in events: + fake_client.queue.append(('slow_queries', event)) + + worker = SlowQueryWorker() + + time_mock = patch( + 'zerver.worker.queue_processors.time.sleep', + side_effect=AbortLoop, + ) + + send_mock = patch( + 'zerver.worker.queue_processors.internal_send_message' + ) + + with send_mock as sm, time_mock as tm: + with simulated_queue_client(lambda: fake_client): + try: + worker.setup() + worker.start() + except AbortLoop: + pass + + self.assertEqual(tm.call_args[0][0], 60) # should sleep 60 seconds + + sm.assert_called_once() + args = [c[0] for c in sm.call_args_list][0] + self.assertEqual(args[0], error_bot.realm) + self.assertEqual(args[1], error_bot.email) + self.assertEqual(args[2], "stream") + self.assertEqual(args[3], "errors") + self.assertEqual(args[4], "testserver: slow queries") + self.assertEqual(args[5], " test query (data)\n second test query (data)\n") + def test_missed_message_worker(self) -> None: cordelia = self.example_user('cordelia') hamlet = self.example_user('hamlet') @@ -79,6 +123,12 @@ class WorkerTest(ZulipTestCase): content='goodbye hamlet', ) + hamlet3_msg_id = self.send_personal_message( + from_email=cordelia.email, + to_email=hamlet.email, + content='hello again hamlet', + ) + othello_msg_id = self.send_personal_message( from_email=cordelia.email, to_email=othello.email, @@ -97,24 +147,48 @@ class WorkerTest(ZulipTestCase): mmw = MissedMessageWorker() + class MockTimer(): + is_running = False + + def is_alive(self) -> bool: + return self.is_running + + def start(self) -> None: + self.is_running = True + + def cancel(self) -> None: + self.is_running = False + + timer = MockTimer() time_mock = patch( - 'zerver.worker.queue_processors.time.sleep', - side_effect=AbortLoop, + 'zerver.worker.queue_processors.Timer', + return_value=timer, ) send_mock = patch( 'zerver.lib.notifications.do_send_missedmessage_events_reply_in_zulip' ) + mmw.BATCH_DURATION = 0 + + bonus_event = dict(user_profile_id=hamlet.id, message_id=hamlet3_msg_id) with send_mock as sm, time_mock as tm: with simulated_queue_client(lambda: fake_client): - try: - mmw.setup() - mmw.start() - except AbortLoop: - pass + self.assertFalse(timer.is_alive()) + mmw.setup() + mmw.start() + self.assertTrue(timer.is_alive()) + fake_client.queue.append(('missedmessage_emails', bonus_event)) - self.assertEqual(tm.call_args[0][0], 120) # should sleep two minutes + # Double-calling start is our way to get it to run again + self.assertTrue(timer.is_alive()) + mmw.start() + + # Now, we actually send the emails. + mmw.maybe_send_batched_emails() + self.assertFalse(timer.is_alive()) + + self.assertEqual(tm.call_args[0][0], 5) # should sleep 5 seconds args = [c[0] for c in sm.call_args_list] arg_dict = { @@ -126,10 +200,10 @@ class WorkerTest(ZulipTestCase): } hamlet_info = arg_dict[hamlet.id] - self.assertEqual(hamlet_info['count'], 2) + self.assertEqual(hamlet_info['count'], 3) self.assertEqual( {m['message'].content for m in hamlet_info['missed_messages']}, - {'hi hamlet', 'goodbye hamlet'}, + {'hi hamlet', 'goodbye hamlet', 'hello again hamlet'}, ) othello_info = arg_dict[othello.id] diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 1946d3f22e..afce74b4f5 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -4,6 +4,7 @@ from typing import Any, Callable, Dict, List, Mapping, Optional, cast, TypeVar, import copy import signal from functools import wraps +from threading import Timer import smtplib import socket @@ -263,22 +264,67 @@ class UserPresenceWorker(QueueProcessingWorker): do_update_user_presence(user_profile, client, log_time, status) @assign_queue('missedmessage_emails', queue_type="loop") -class MissedMessageWorker(LoopQueueProcessingWorker): - # Aggregate all messages received every 2 minutes to let someone finish sending a batch - # of messages - sleep_delay = 2 * 60 +class MissedMessageWorker(QueueProcessingWorker): + # Aggregate all messages received over the last BATCH_DURATION + # seconds to let someone finish sending a batch of messages and/or + # editing them before they are sent out as emails to recipients. + # + # The timer is running whenever; we poll at most every TIMER_FREQUENCY + # seconds, to avoid excessive activity. + # + # TODO: Since this process keeps events in memory for up to 2 + # minutes, it now will lose approximately BATCH_DURATION worth of + # missed_message emails whenever it is restarted as part of a + # server restart. We should probably add some sort of save/reload + # mechanism for that case. + TIMER_FREQUENCY = 5 + BATCH_DURATION = 120 + timer_event = None # type: Optional[Timer] + events_by_recipient = defaultdict(list) # type: Dict[int, List[Dict[str, Any]]] + batch_start_by_recipient = {} # type: Dict[int, float] - def consume_batch(self, missed_events: List[Dict[str, Any]]) -> None: - by_recipient = defaultdict(list) # type: Dict[int, List[Dict[str, Any]]] + def consume(self, event: Dict[str, Any]) -> None: + logging.debug("Received missedmessage_emails event: %s" % (event,)) - for event in missed_events: - logging.debug("Received missedmessage_emails event: %s" % (event,)) - by_recipient[event['user_profile_id']].append(event) + # When we process an event, just put it into the queue and ensure we have a timer going. + user_profile_id = event['user_profile_id'] + if user_profile_id not in self.batch_start_by_recipient: + self.batch_start_by_recipient[user_profile_id] = time.time() + self.events_by_recipient[user_profile_id].append(event) - for user_profile_id, events in by_recipient.items(): + self.ensure_timer() + + def ensure_timer(self) -> None: + if self.timer_event is not None: + return + self.timer_event = Timer(self.TIMER_FREQUENCY, MissedMessageWorker.maybe_send_batched_emails, [self]) + self.timer_event.start() + + def stop_timer(self) -> None: + if self.timer_event and self.timer_event.is_alive(): # type: ignore # Report mypy bug. + self.timer_event.cancel() + self.timer_event = None + + def maybe_send_batched_emails(self) -> None: + self.stop_timer() + + current_time = time.time() + for user_profile_id, timestamp in list(self.batch_start_by_recipient.items()): + if current_time - timestamp < self.BATCH_DURATION: + logging.info("%s: %s - %s" % (user_profile_id, current_time, timestamp)) + continue + events = self.events_by_recipient[user_profile_id] logging.info("Batch-processing %s missedmessage_emails events for user %s" % (len(events), user_profile_id)) handle_missedmessage_emails(user_profile_id, events) + del self.events_by_recipient[user_profile_id] + del self.batch_start_by_recipient[user_profile_id] + + # By only restarting the timer if there are actually events in + # the queue, we ensure this queue processor is idle when there + # are no missed-message emails to process. + if len(self.batch_start_by_recipient) > 0: + self.ensure_timer() @assign_queue('email_senders') class EmailSendingWorker(QueueProcessingWorker): @@ -332,7 +378,9 @@ class SlowQueryWorker(LoopQueueProcessingWorker): # Sleep 1 minute between checking the queue sleep_delay = 60 * 1 - def consume_batch(self, slow_queries: List[Dict[str, Any]]) -> None: + # TODO: The type annotation here should be List[str], but that + # creates conflicts with other users in the file. + def consume_batch(self, slow_queries: List[Any]) -> None: for query in slow_queries: logging.info("Slow query: %s" % (query))