diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 4943184680..f5a21d0755 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -620,6 +620,7 @@ class MissedMessageWorker(QueueProcessingWorker): # The main thread, which handles the RabbitMQ connection and creates # database rows from them. @override + @sentry_sdk.trace def consume(self, event: Dict[str, Any]) -> None: logging.debug("Processing missedmessage_emails event: %s", event) # When we consume an event, check if there are existing pending emails @@ -693,17 +694,22 @@ class MissedMessageWorker(QueueProcessingWorker): def work(self) -> None: while True: - flush_per_request_caches() - reset_queries() - try: - finished = self.background_loop() - if finished: - break - except Exception: - logging.exception( - "Exception in MissedMessage background worker; restarting the loop", - stack_info=True, - ) + with sentry_sdk.start_transaction( + op="task", + name=f"{self.queue_name} worker thread", + custom_sampling_context={"queue": self.queue_name}, + ): + flush_per_request_caches() + reset_queries() + try: + finished = self.background_loop() + if finished: + break + except Exception: + logging.exception( + "Exception in MissedMessage background worker; restarting the loop", + stack_info=True, + ) def background_loop(self) -> bool: with self.cv: @@ -747,7 +753,10 @@ class MissedMessageWorker(QueueProcessingWorker): # re-checking the condition. return False - was_notified = self.cv.wait_for(wait_condition, timeout=timeout) + with sentry_sdk.start_span(description="condvar wait") as span: + span.set_data("timeout", timeout) + was_notified = self.cv.wait_for(wait_condition, timeout=timeout) + span.set_data("was_notified", was_notified) # Being notified means that we are in conditions (1) or # (2), above. In neither case do we need to look at if @@ -759,6 +768,7 @@ class MissedMessageWorker(QueueProcessingWorker): return False + @sentry_sdk.trace def maybe_send_batched_emails(self) -> None: current_time = timezone_now() @@ -782,20 +792,25 @@ class MissedMessageWorker(QueueProcessingWorker): len(events), user_profile_id, ) - try: - # Because we process events in batches, an - # escaped exception here would lead to - # duplicate messages being sent for other - # users in the same events_to_process batch, - # and no guarantee of forward progress. - handle_missedmessage_emails(user_profile_id, events) - except Exception: - logging.exception( - "Failed to process %d missedmessage_emails for user %s", - len(events), - user_profile_id, - stack_info=True, - ) + with sentry_sdk.start_span( + description="sending missedmessage_emails to user" + ) as span: + span.set_data("user_profile_id", user_profile_id) + span.set_data("event_count", len(events)) + try: + # Because we process events in batches, an + # escaped exception here would lead to + # duplicate messages being sent for other + # users in the same events_to_process batch, + # and no guarantee of forward progress. + handle_missedmessage_emails(user_profile_id, events) + except Exception: + logging.exception( + "Failed to process %d missedmessage_emails for user %s", + len(events), + user_profile_id, + stack_info=True, + ) events_to_process.delete()