diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 8e48b51a93..4943184680 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -44,7 +44,6 @@ from django.utils.translation import gettext as _ from django.utils.translation import override as override_language from psycopg2.sql import SQL, Literal from returns.curry import partial -from sentry_sdk import add_breadcrumb, configure_scope from typing_extensions import override from zulip_bots.lib import extract_query_without_mention @@ -248,6 +247,7 @@ class QueueProcessingWorker(ABC): self.update_statistics() + @sentry_sdk.trace def update_statistics(self) -> None: total_seconds = sum(seconds for _, seconds in self.recent_consume_times) total_events = sum(events_number for events_number, _ in self.recent_consume_times) @@ -292,71 +292,75 @@ class QueueProcessingWorker(ABC): self, consume_func: Callable[[List[Dict[str, Any]]], None], events: List[Dict[str, Any]] ) -> None: consume_time_seconds: Optional[float] = None - with configure_scope() as scope: - scope.clear_breadcrumbs() - add_breadcrumb( + with sentry_sdk.start_transaction( + op="task", + name=f"consume {self.queue_name}", + custom_sampling_context={"queue": self.queue_name}, + ): + sentry_sdk.add_breadcrumb( type="debug", category="queue_processor", message=f"Consuming {self.queue_name}", data={"events": events, "local_queue_size": self.get_remaining_local_queue_size()}, ) - try: - if self.idle: - # We're reactivating after having gone idle due to emptying the queue. - # We should update the stats file to keep it fresh and to make it clear - # that the queue started processing, in case the event we're about to process - # makes us freeze. - self.idle = False - self.update_statistics() - - time_start = time.time() - if self.MAX_CONSUME_SECONDS and not self.threaded and not self.disable_timeout: - try: - signal.signal( - signal.SIGALRM, - partial(self.timer_expired, self.MAX_CONSUME_SECONDS, events), - ) - try: - signal.alarm(self.MAX_CONSUME_SECONDS * len(events)) - consume_func(events) - finally: - signal.alarm(0) - finally: - signal.signal(signal.SIGALRM, signal.SIG_DFL) - else: - consume_func(events) - consume_time_seconds = time.time() - time_start - self.consumed_since_last_emptied += len(events) - except Exception as e: - self._handle_consume_exception(events, e) - finally: - flush_per_request_caches() - reset_queries() - - if consume_time_seconds is not None: - self.recent_consume_times.append((len(events), consume_time_seconds)) - - remaining_local_queue_size = self.get_remaining_local_queue_size() - if remaining_local_queue_size == 0: - self.queue_last_emptied_timestamp = time.time() - self.consumed_since_last_emptied = 0 - # We've cleared all the events from the queue, so we don't - # need to worry about the small overhead of doing a disk write. - # We take advantage of this to update the stats file to keep it fresh, - # especially since the queue might go idle until new events come in. - self.update_statistics() - self.idle = True - else: - self.consume_iteration_counter += 1 - if ( - self.consume_iteration_counter - >= self.CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM - or time.time() - self.last_statistics_update_time - >= self.MAX_SECONDS_BEFORE_UPDATE_STATS - ): - self.consume_iteration_counter = 0 + try: + if self.idle: + # We're reactivating after having gone idle due to emptying the queue. + # We should update the stats file to keep it fresh and to make it clear + # that the queue started processing, in case the event we're about to process + # makes us freeze. + self.idle = False self.update_statistics() + time_start = time.time() + if self.MAX_CONSUME_SECONDS and not self.threaded and not self.disable_timeout: + try: + signal.signal( + signal.SIGALRM, + partial(self.timer_expired, self.MAX_CONSUME_SECONDS, events), + ) + try: + signal.alarm(self.MAX_CONSUME_SECONDS * len(events)) + consume_func(events) + finally: + signal.alarm(0) + finally: + signal.signal(signal.SIGALRM, signal.SIG_DFL) + else: + consume_func(events) + consume_time_seconds = time.time() - time_start + self.consumed_since_last_emptied += len(events) + except Exception as e: + self._handle_consume_exception(events, e) + finally: + flush_per_request_caches() + reset_queries() + + with sentry_sdk.start_span(description="statistics"): + if consume_time_seconds is not None: + self.recent_consume_times.append((len(events), consume_time_seconds)) + + remaining_local_queue_size = self.get_remaining_local_queue_size() + if remaining_local_queue_size == 0: + self.queue_last_emptied_timestamp = time.time() + self.consumed_since_last_emptied = 0 + # We've cleared all the events from the queue, so we don't + # need to worry about the small overhead of doing a disk write. + # We take advantage of this to update the stats file to keep it fresh, + # especially since the queue might go idle until new events come in. + self.update_statistics() + self.idle = True + else: + self.consume_iteration_counter += 1 + if ( + self.consume_iteration_counter + >= self.CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM + or time.time() - self.last_statistics_update_time + >= self.MAX_SECONDS_BEFORE_UPDATE_STATS + ): + self.consume_iteration_counter = 0 + self.update_statistics() + def consume_single_event(self, event: Dict[str, Any]) -> None: consume_func = lambda events: self.consume(events[0]) self.do_consume(consume_func, [event]) @@ -372,7 +376,7 @@ class QueueProcessingWorker(ABC): # is needed and the worker can proceed. return - with configure_scope() as scope: + with sentry_sdk.configure_scope() as scope: scope.set_context( "events", { diff --git a/zproject/default_settings.py b/zproject/default_settings.py index 84db1f19ca..c00c1d7186 100644 --- a/zproject/default_settings.py +++ b/zproject/default_settings.py @@ -1,6 +1,6 @@ import os from email.headerregistry import Address -from typing import TYPE_CHECKING, Any, Callable, Dict, List, Literal, Optional, Tuple +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Literal, Optional, Tuple, Union from django_auth_ldap.config import GroupOfUniqueNamesType, LDAPGroupType @@ -138,6 +138,7 @@ LOGGING_SHOW_PID = False # Sentry.io error defaults to off SENTRY_DSN: Optional[str] = None +SENTRY_TRACE_WORKER_RATE: Union[float, Dict[str, float]] = 0.0 SENTRY_TRACE_RATE: float = 0.0 SENTRY_PROFILE_RATE: float = 0.1 SENTRY_FRONTEND_DSN: Optional[str] = None diff --git a/zproject/sentry.py b/zproject/sentry.py index 9ebbcb0b79..71c1a50c7c 100644 --- a/zproject/sentry.py +++ b/zproject/sentry.py @@ -1,5 +1,5 @@ import os -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Any, Dict, Optional, Union import sentry_sdk from django.utils.translation import override as override_language @@ -50,6 +50,19 @@ def add_context(event: "Event", hint: "Hint") -> Optional["Event"]: return event +def traces_sampler(sampling_context: Dict[str, Any]) -> Union[float, bool]: + from django.conf import settings + + queue = sampling_context.get("queue") + if queue is not None and isinstance(queue, str): + if isinstance(settings.SENTRY_TRACE_WORKER_RATE, float): + return settings.SENTRY_TRACE_WORKER_RATE + else: + return settings.SENTRY_TRACE_WORKER_RATE.get(queue, 0.0) + else: + return settings.SENTRY_TRACE_RATE + + def setup_sentry(dsn: Optional[str], environment: str) -> None: from django.conf import settings @@ -82,7 +95,7 @@ def setup_sentry(dsn: Optional[str], environment: str) -> None: # PII while having the identifiers needed to determine that an # exception only affects a small subset of users or realms. send_default_pii=True, - traces_sample_rate=settings.SENTRY_TRACE_RATE, + traces_sampler=traces_sampler, profiles_sample_rate=settings.SENTRY_PROFILE_RATE, )