missedmessage_emails: Ensure forward progress.

maybe_send_batched_emails handles batches of emails from different
users at once; as it processes each user's batch, it enqueues messages
onto the `email_senders` queue.  If `handle_missedmessage_emails`
raises an exception when processing a single user's email, no events
are marked as handled -- including those that were already handled and
enqueued onto `email_senders`.  This results in an increasing number
of users being sent repeated emails about the same missed messages.

Catch and log any exceptions when handling an individual user's
events.  This guarantees forward progress, and that notifications are
sent at-most-once, not at-least-once.
This commit is contained in:
Alex Vandiver 2021-08-11 06:22:19 +00:00 committed by Tim Abbott
parent c9861b3c74
commit 4d98b0552e
2 changed files with 42 additions and 1 deletions

View File

@ -26,6 +26,7 @@ from zerver.models import (
PreregistrationUser,
ScheduledMessageNotificationEmail,
UserActivity,
UserProfile,
get_client,
get_realm,
get_stream,
@ -345,6 +346,33 @@ class WorkerTest(ZulipTestCase):
{"where art thou, othello?"},
)
with send_mock as sm, timer_mock as tm:
with simulated_queue_client(lambda: fake_client):
time_zero = datetime.datetime(2021, 1, 1, tzinfo=datetime.timezone.utc)
# Verify that we make forward progress if one of the messages throws an exception
fake_client.enqueue("missedmessage_emails", hamlet_event1)
fake_client.enqueue("missedmessage_emails", hamlet_event2)
fake_client.enqueue("missedmessage_emails", othello_event)
with patch("zerver.worker.queue_processors.timezone_now", return_value=time_zero):
mmw.setup()
mmw.start()
def fail_some(user: UserProfile, *args: Any) -> None:
if user.id == hamlet.id:
raise RuntimeError
sm.side_effect = fail_some
one_minute_overdue = expected_scheduled_timestamp + datetime.timedelta(seconds=60)
with patch(
"zerver.worker.queue_processors.timezone_now", return_value=one_minute_overdue
), self.assertLogs(level="ERROR") as error_logs:
mmw.maybe_send_batched_emails()
self.assertIn(
"ERROR:root:Failed to process 2 missedmessage_emails for user 10",
error_logs.output[0],
)
self.assertEqual(ScheduledMessageNotificationEmail.objects.count(), 0)
def test_push_notifications_worker(self) -> None:
"""
The push notifications system has its own comprehensive test suite,

View File

@ -659,7 +659,20 @@ class MissedMessageWorker(QueueProcessingWorker):
len(events),
user_profile_id,
)
handle_missedmessage_emails(user_profile_id, events)
try:
# Because we process events in batches, an
# escaped exception here would lead to
# duplicate messages being sent for other
# users in the same events_to_process batch,
# and no guarantee of forward progress.
handle_missedmessage_emails(user_profile_id, events)
except Exception:
logging.exception(
"Failed to process %d missedmessage_emails for user %s",
len(events),
user_profile_id,
stack_info=True,
)
events_to_process.delete()