mirror of https://github.com/zulip/zulip.git
worker: Split out worker sampling rate, and add Sentry transactions.
This commit is contained in:
parent
f64b9475c1
commit
9451d08bb9
|
@ -44,7 +44,6 @@ from django.utils.translation import gettext as _
|
||||||
from django.utils.translation import override as override_language
|
from django.utils.translation import override as override_language
|
||||||
from psycopg2.sql import SQL, Literal
|
from psycopg2.sql import SQL, Literal
|
||||||
from returns.curry import partial
|
from returns.curry import partial
|
||||||
from sentry_sdk import add_breadcrumb, configure_scope
|
|
||||||
from typing_extensions import override
|
from typing_extensions import override
|
||||||
from zulip_bots.lib import extract_query_without_mention
|
from zulip_bots.lib import extract_query_without_mention
|
||||||
|
|
||||||
|
@ -248,6 +247,7 @@ class QueueProcessingWorker(ABC):
|
||||||
|
|
||||||
self.update_statistics()
|
self.update_statistics()
|
||||||
|
|
||||||
|
@sentry_sdk.trace
|
||||||
def update_statistics(self) -> None:
|
def update_statistics(self) -> None:
|
||||||
total_seconds = sum(seconds for _, seconds in self.recent_consume_times)
|
total_seconds = sum(seconds for _, seconds in self.recent_consume_times)
|
||||||
total_events = sum(events_number for events_number, _ in self.recent_consume_times)
|
total_events = sum(events_number for events_number, _ in self.recent_consume_times)
|
||||||
|
@ -292,71 +292,75 @@ class QueueProcessingWorker(ABC):
|
||||||
self, consume_func: Callable[[List[Dict[str, Any]]], None], events: List[Dict[str, Any]]
|
self, consume_func: Callable[[List[Dict[str, Any]]], None], events: List[Dict[str, Any]]
|
||||||
) -> None:
|
) -> None:
|
||||||
consume_time_seconds: Optional[float] = None
|
consume_time_seconds: Optional[float] = None
|
||||||
with configure_scope() as scope:
|
with sentry_sdk.start_transaction(
|
||||||
scope.clear_breadcrumbs()
|
op="task",
|
||||||
add_breadcrumb(
|
name=f"consume {self.queue_name}",
|
||||||
|
custom_sampling_context={"queue": self.queue_name},
|
||||||
|
):
|
||||||
|
sentry_sdk.add_breadcrumb(
|
||||||
type="debug",
|
type="debug",
|
||||||
category="queue_processor",
|
category="queue_processor",
|
||||||
message=f"Consuming {self.queue_name}",
|
message=f"Consuming {self.queue_name}",
|
||||||
data={"events": events, "local_queue_size": self.get_remaining_local_queue_size()},
|
data={"events": events, "local_queue_size": self.get_remaining_local_queue_size()},
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
if self.idle:
|
if self.idle:
|
||||||
# We're reactivating after having gone idle due to emptying the queue.
|
# We're reactivating after having gone idle due to emptying the queue.
|
||||||
# We should update the stats file to keep it fresh and to make it clear
|
# We should update the stats file to keep it fresh and to make it clear
|
||||||
# that the queue started processing, in case the event we're about to process
|
# that the queue started processing, in case the event we're about to process
|
||||||
# makes us freeze.
|
# makes us freeze.
|
||||||
self.idle = False
|
self.idle = False
|
||||||
self.update_statistics()
|
|
||||||
|
|
||||||
time_start = time.time()
|
|
||||||
if self.MAX_CONSUME_SECONDS and not self.threaded and not self.disable_timeout:
|
|
||||||
try:
|
|
||||||
signal.signal(
|
|
||||||
signal.SIGALRM,
|
|
||||||
partial(self.timer_expired, self.MAX_CONSUME_SECONDS, events),
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
signal.alarm(self.MAX_CONSUME_SECONDS * len(events))
|
|
||||||
consume_func(events)
|
|
||||||
finally:
|
|
||||||
signal.alarm(0)
|
|
||||||
finally:
|
|
||||||
signal.signal(signal.SIGALRM, signal.SIG_DFL)
|
|
||||||
else:
|
|
||||||
consume_func(events)
|
|
||||||
consume_time_seconds = time.time() - time_start
|
|
||||||
self.consumed_since_last_emptied += len(events)
|
|
||||||
except Exception as e:
|
|
||||||
self._handle_consume_exception(events, e)
|
|
||||||
finally:
|
|
||||||
flush_per_request_caches()
|
|
||||||
reset_queries()
|
|
||||||
|
|
||||||
if consume_time_seconds is not None:
|
|
||||||
self.recent_consume_times.append((len(events), consume_time_seconds))
|
|
||||||
|
|
||||||
remaining_local_queue_size = self.get_remaining_local_queue_size()
|
|
||||||
if remaining_local_queue_size == 0:
|
|
||||||
self.queue_last_emptied_timestamp = time.time()
|
|
||||||
self.consumed_since_last_emptied = 0
|
|
||||||
# We've cleared all the events from the queue, so we don't
|
|
||||||
# need to worry about the small overhead of doing a disk write.
|
|
||||||
# We take advantage of this to update the stats file to keep it fresh,
|
|
||||||
# especially since the queue might go idle until new events come in.
|
|
||||||
self.update_statistics()
|
|
||||||
self.idle = True
|
|
||||||
else:
|
|
||||||
self.consume_iteration_counter += 1
|
|
||||||
if (
|
|
||||||
self.consume_iteration_counter
|
|
||||||
>= self.CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM
|
|
||||||
or time.time() - self.last_statistics_update_time
|
|
||||||
>= self.MAX_SECONDS_BEFORE_UPDATE_STATS
|
|
||||||
):
|
|
||||||
self.consume_iteration_counter = 0
|
|
||||||
self.update_statistics()
|
self.update_statistics()
|
||||||
|
|
||||||
|
time_start = time.time()
|
||||||
|
if self.MAX_CONSUME_SECONDS and not self.threaded and not self.disable_timeout:
|
||||||
|
try:
|
||||||
|
signal.signal(
|
||||||
|
signal.SIGALRM,
|
||||||
|
partial(self.timer_expired, self.MAX_CONSUME_SECONDS, events),
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
signal.alarm(self.MAX_CONSUME_SECONDS * len(events))
|
||||||
|
consume_func(events)
|
||||||
|
finally:
|
||||||
|
signal.alarm(0)
|
||||||
|
finally:
|
||||||
|
signal.signal(signal.SIGALRM, signal.SIG_DFL)
|
||||||
|
else:
|
||||||
|
consume_func(events)
|
||||||
|
consume_time_seconds = time.time() - time_start
|
||||||
|
self.consumed_since_last_emptied += len(events)
|
||||||
|
except Exception as e:
|
||||||
|
self._handle_consume_exception(events, e)
|
||||||
|
finally:
|
||||||
|
flush_per_request_caches()
|
||||||
|
reset_queries()
|
||||||
|
|
||||||
|
with sentry_sdk.start_span(description="statistics"):
|
||||||
|
if consume_time_seconds is not None:
|
||||||
|
self.recent_consume_times.append((len(events), consume_time_seconds))
|
||||||
|
|
||||||
|
remaining_local_queue_size = self.get_remaining_local_queue_size()
|
||||||
|
if remaining_local_queue_size == 0:
|
||||||
|
self.queue_last_emptied_timestamp = time.time()
|
||||||
|
self.consumed_since_last_emptied = 0
|
||||||
|
# We've cleared all the events from the queue, so we don't
|
||||||
|
# need to worry about the small overhead of doing a disk write.
|
||||||
|
# We take advantage of this to update the stats file to keep it fresh,
|
||||||
|
# especially since the queue might go idle until new events come in.
|
||||||
|
self.update_statistics()
|
||||||
|
self.idle = True
|
||||||
|
else:
|
||||||
|
self.consume_iteration_counter += 1
|
||||||
|
if (
|
||||||
|
self.consume_iteration_counter
|
||||||
|
>= self.CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM
|
||||||
|
or time.time() - self.last_statistics_update_time
|
||||||
|
>= self.MAX_SECONDS_BEFORE_UPDATE_STATS
|
||||||
|
):
|
||||||
|
self.consume_iteration_counter = 0
|
||||||
|
self.update_statistics()
|
||||||
|
|
||||||
def consume_single_event(self, event: Dict[str, Any]) -> None:
|
def consume_single_event(self, event: Dict[str, Any]) -> None:
|
||||||
consume_func = lambda events: self.consume(events[0])
|
consume_func = lambda events: self.consume(events[0])
|
||||||
self.do_consume(consume_func, [event])
|
self.do_consume(consume_func, [event])
|
||||||
|
@ -372,7 +376,7 @@ class QueueProcessingWorker(ABC):
|
||||||
# is needed and the worker can proceed.
|
# is needed and the worker can proceed.
|
||||||
return
|
return
|
||||||
|
|
||||||
with configure_scope() as scope:
|
with sentry_sdk.configure_scope() as scope:
|
||||||
scope.set_context(
|
scope.set_context(
|
||||||
"events",
|
"events",
|
||||||
{
|
{
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
import os
|
import os
|
||||||
from email.headerregistry import Address
|
from email.headerregistry import Address
|
||||||
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Literal, Optional, Tuple
|
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Literal, Optional, Tuple, Union
|
||||||
|
|
||||||
from django_auth_ldap.config import GroupOfUniqueNamesType, LDAPGroupType
|
from django_auth_ldap.config import GroupOfUniqueNamesType, LDAPGroupType
|
||||||
|
|
||||||
|
@ -138,6 +138,7 @@ LOGGING_SHOW_PID = False
|
||||||
|
|
||||||
# Sentry.io error defaults to off
|
# Sentry.io error defaults to off
|
||||||
SENTRY_DSN: Optional[str] = None
|
SENTRY_DSN: Optional[str] = None
|
||||||
|
SENTRY_TRACE_WORKER_RATE: Union[float, Dict[str, float]] = 0.0
|
||||||
SENTRY_TRACE_RATE: float = 0.0
|
SENTRY_TRACE_RATE: float = 0.0
|
||||||
SENTRY_PROFILE_RATE: float = 0.1
|
SENTRY_PROFILE_RATE: float = 0.1
|
||||||
SENTRY_FRONTEND_DSN: Optional[str] = None
|
SENTRY_FRONTEND_DSN: Optional[str] = None
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
import os
|
import os
|
||||||
from typing import TYPE_CHECKING, Optional
|
from typing import TYPE_CHECKING, Any, Dict, Optional, Union
|
||||||
|
|
||||||
import sentry_sdk
|
import sentry_sdk
|
||||||
from django.utils.translation import override as override_language
|
from django.utils.translation import override as override_language
|
||||||
|
@ -50,6 +50,19 @@ def add_context(event: "Event", hint: "Hint") -> Optional["Event"]:
|
||||||
return event
|
return event
|
||||||
|
|
||||||
|
|
||||||
|
def traces_sampler(sampling_context: Dict[str, Any]) -> Union[float, bool]:
|
||||||
|
from django.conf import settings
|
||||||
|
|
||||||
|
queue = sampling_context.get("queue")
|
||||||
|
if queue is not None and isinstance(queue, str):
|
||||||
|
if isinstance(settings.SENTRY_TRACE_WORKER_RATE, float):
|
||||||
|
return settings.SENTRY_TRACE_WORKER_RATE
|
||||||
|
else:
|
||||||
|
return settings.SENTRY_TRACE_WORKER_RATE.get(queue, 0.0)
|
||||||
|
else:
|
||||||
|
return settings.SENTRY_TRACE_RATE
|
||||||
|
|
||||||
|
|
||||||
def setup_sentry(dsn: Optional[str], environment: str) -> None:
|
def setup_sentry(dsn: Optional[str], environment: str) -> None:
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
|
|
||||||
|
@ -82,7 +95,7 @@ def setup_sentry(dsn: Optional[str], environment: str) -> None:
|
||||||
# PII while having the identifiers needed to determine that an
|
# PII while having the identifiers needed to determine that an
|
||||||
# exception only affects a small subset of users or realms.
|
# exception only affects a small subset of users or realms.
|
||||||
send_default_pii=True,
|
send_default_pii=True,
|
||||||
traces_sample_rate=settings.SENTRY_TRACE_RATE,
|
traces_sampler=traces_sampler,
|
||||||
profiles_sample_rate=settings.SENTRY_PROFILE_RATE,
|
profiles_sample_rate=settings.SENTRY_PROFILE_RATE,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue