queue_processors: Make timer_expired() a method.

This allows specific queue workers to override the defaut behavior and
implement their own response to the timer expiring. We will want to use
this for embed_links queue at least.
This commit is contained in:
Mateusz Mandera 2021-07-06 14:36:27 +02:00 committed by Tim Abbott
parent c13f0626e3
commit c101f3acd6
1 changed files with 6 additions and 5 deletions

View File

@ -197,10 +197,6 @@ def retry_send_email_failures(
return wrapper
def timer_expired(limit: int, event_count: int, signal: int, frame: FrameType) -> None:
raise WorkerTimeoutException(limit, event_count)
class QueueProcessingWorker(ABC):
queue_name: str
MAX_CONSUME_SECONDS: Optional[int] = 30
@ -292,7 +288,9 @@ class QueueProcessingWorker(ABC):
try:
signal.signal(
signal.SIGALRM,
functools.partial(timer_expired, self.MAX_CONSUME_SECONDS, len(events)),
functools.partial(
self.timer_expired, self.MAX_CONSUME_SECONDS, len(events)
),
)
try:
signal.alarm(self.MAX_CONSUME_SECONDS * len(events))
@ -339,6 +337,9 @@ class QueueProcessingWorker(ABC):
consume_func = lambda events: self.consume(events[0])
self.do_consume(consume_func, [event])
def timer_expired(self, limit: int, event_count: int, signal: int, frame: FrameType) -> None:
raise WorkerTimeoutException(limit, event_count)
def _handle_consume_exception(self, events: List[Dict[str, Any]], exception: Exception) -> None:
with configure_scope() as scope:
scope.set_context(