diff --git a/zerver/tests/test_queue_worker.py b/zerver/tests/test_queue_worker.py index 072d66aa8a..9e6e1b8697 100644 --- a/zerver/tests/test_queue_worker.py +++ b/zerver/tests/test_queue_worker.py @@ -26,6 +26,7 @@ from zerver.models import ( PreregistrationUser, ScheduledMessageNotificationEmail, UserActivity, + UserProfile, get_client, get_realm, get_stream, @@ -345,6 +346,33 @@ class WorkerTest(ZulipTestCase): {"where art thou, othello?"}, ) + with send_mock as sm, timer_mock as tm: + with simulated_queue_client(lambda: fake_client): + time_zero = datetime.datetime(2021, 1, 1, tzinfo=datetime.timezone.utc) + # Verify that we make forward progress if one of the messages throws an exception + fake_client.enqueue("missedmessage_emails", hamlet_event1) + fake_client.enqueue("missedmessage_emails", hamlet_event2) + fake_client.enqueue("missedmessage_emails", othello_event) + with patch("zerver.worker.queue_processors.timezone_now", return_value=time_zero): + mmw.setup() + mmw.start() + + def fail_some(user: UserProfile, *args: Any) -> None: + if user.id == hamlet.id: + raise RuntimeError + + sm.side_effect = fail_some + one_minute_overdue = expected_scheduled_timestamp + datetime.timedelta(seconds=60) + with patch( + "zerver.worker.queue_processors.timezone_now", return_value=one_minute_overdue + ), self.assertLogs(level="ERROR") as error_logs: + mmw.maybe_send_batched_emails() + self.assertIn( + "ERROR:root:Failed to process 2 missedmessage_emails for user 10", + error_logs.output[0], + ) + self.assertEqual(ScheduledMessageNotificationEmail.objects.count(), 0) + def test_push_notifications_worker(self) -> None: """ The push notifications system has its own comprehensive test suite, diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index a9f3a3fdae..5e9de28bf0 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -659,7 +659,20 @@ class MissedMessageWorker(QueueProcessingWorker): len(events), user_profile_id, ) - handle_missedmessage_emails(user_profile_id, events) + try: + # Because we process events in batches, an + # escaped exception here would lead to + # duplicate messages being sent for other + # users in the same events_to_process batch, + # and no guarantee of forward progress. + handle_missedmessage_emails(user_profile_id, events) + except Exception: + logging.exception( + "Failed to process %d missedmessage_emails for user %s", + len(events), + user_profile_id, + stack_info=True, + ) events_to_process.delete()