diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 20e0e3db59..7f4cc5e9e8 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -657,56 +657,69 @@ class MissedMessageWorker(QueueProcessingWorker): def work(self) -> None: while True: - with self.cv: + try: + finished = self.background_loop() + if finished: + break + except Exception: + logging.exception( + "Exception in MissedMessage background worker; restarting the loop", + stack_info=True, + ) + + def background_loop(self) -> bool: + with self.cv: + if self.stopping: + return True + # There are three conditions which we wait for: + # + # 1. We are being explicitly asked to stop; see the + # notify() call in stop() + # + # 2. We have no ScheduledMessageNotificationEmail + # objects currently (has_timeout = False) and the + # first one was just enqueued; see the notify() + # call in consume(). We break out so that we can + # come back around the loop and re-wait with a + # timeout (see next condition). + # + # 3. One or more ScheduledMessageNotificationEmail + # exist in the database, so we need to re-check + # them regularly; this happens by hitting the + # timeout and calling maybe_send_batched_emails(). + # There is no explicit notify() for this. + timeout: Optional[int] = None + if ScheduledMessageNotificationEmail.objects.exists(): + timeout = self.CHECK_FREQUENCY_SECONDS + self.has_timeout = timeout is not None + + def wait_condition() -> bool: if self.stopping: - return - # There are three conditions which we wait for: - # - # 1. We are being explicitly asked to stop; see the - # notify() call in stop() - # - # 2. We have no ScheduledMessageNotificationEmail - # objects currently (has_timeout = False) and the - # first one was just enqueued; see the notify() - # call in consume(). We break out so that we can - # come back around the loop and re-wait with a - # timeout (see next condition). - # - # 3. One or more ScheduledMessageNotificationEmail - # exist in the database, so we need to re-check - # them regularly; this happens by hitting the - # timeout and calling maybe_send_batched_emails(). - # There is no explicit notify() for this. - timeout: Optional[int] = None - if ScheduledMessageNotificationEmail.objects.exists(): - timeout = self.CHECK_FREQUENCY_SECONDS - self.has_timeout = timeout is not None + # Condition (1) + return True + if timeout is None: + # Condition (2). We went to sleep with no + # ScheduledMessageNotificationEmail existing, + # and one has just been made. We re-check + # that is still true now that we have the + # lock, and if we see it, we stop waiting. + return ScheduledMessageNotificationEmail.objects.exists() + # This should only happen at the start or end of + # the wait, when we haven't been notified, but are + # re-checking the condition. + return False - def wait_condition() -> bool: - if self.stopping: - # Condition (1) - return True - if timeout is None: - # Condition (2). We went to sleep with no - # ScheduledMessageNotificationEmail existing, - # and one has just been made. We re-check - # that is still true now that we have the - # lock, and if we see it, we stop waiting. - return ScheduledMessageNotificationEmail.objects.exists() - # This should only happen at the start or end of - # the wait, when we haven't been notified, but are - # re-checking the condition. - return False + was_notified = self.cv.wait_for(wait_condition, timeout=timeout) - was_notified = self.cv.wait_for(wait_condition, timeout=timeout) + # Being notified means that we are in conditions (1) or + # (2), above. In neither case do we need to look at if + # there are batches to send -- (2) means that the + # ScheduledMessageNotificationEmail was _just_ created, so + # there is no need to check it now. + if not was_notified: + self.maybe_send_batched_emails() - # Being notified means that we are in conditions (1) or - # (2), above. In neither case do we need to look at if - # there are batches to send -- (2) means that the - # ScheduledMessageNotificationEmail was _just_ created, so - # there is no need to check it now. - if not was_notified: - self.maybe_send_batched_emails() + return False def maybe_send_batched_emails(self) -> None: current_time = timezone_now()