queue_processors: Shut down background missedmessage_emails thread.

Python's behaviour on `sys.exit` is to wait for all non-daemon threads
to exit.  In the context of the missedmessage_emails worker, if any
work is pending, a non-daemon Timer thread exists, which is waiting
for 5 seconds.  As soon as that thread is serviced, it sets up another
5-second Timer, a process which repeats until all
ScheduledMessageNotificationEmail records have been handled.  This
likely takes two minutes, but may theoretically take up to a week
until the thread exits, and thus sys.exit can complete.

Supervisor only gives the process 30 seconds to shut down, so
something else must prevent this endless Timer.

When `stop` is called, take the lock so we can mutate the timer.
However, since `stop` may have been called from a signal handler, our
thread may _already_ have the lock.  As Python provides no way to know
if our thread is the one which has the lock, make the lock a
re-entrant one, allowing us to always try to take it.

With the lock in hand, cancel any outstanding timers.  A race exists
where the timer may not be able to be canceled because it has
finished, maybe_send_batched_emails has been called, and is itself
blocked on the lock.  Handle this case by timing out the thread join
in `stop()`, and signal the running thread to exit by unsetting the
timer event, which will be detected once it claims the lock.
This commit is contained in:
Alex Vandiver 2021-11-22 14:52:05 -08:00 committed by Tim Abbott
parent acbeeac037
commit 3efed5f1e6
1 changed files with 32 additions and 4 deletions

View File

@ -16,7 +16,7 @@ from abc import ABC, abstractmethod
from collections import deque
from email.message import EmailMessage
from functools import wraps
from threading import Lock, Timer
from threading import RLock, Timer
from types import FrameType
from typing import (
Any,
@ -582,8 +582,10 @@ class MissedMessageWorker(QueueProcessingWorker):
# This lock protects access to all of the data structures declared
# above. A lock is required because maybe_send_batched_emails, as
# the argument to Timer, runs in a separate thread from the rest
# of the consumer.
lock = Lock()
# of the consumer. This is a _re-entrant_ lock because we may
# need to take the lock when we already have it during shutdown
# (see the stop method).
lock = RLock()
# Because the background `maybe_send_batched_email` thread can
# hold the lock for an indeterminate amount of time, the `consume`
@ -640,7 +642,10 @@ class MissedMessageWorker(QueueProcessingWorker):
# self.timer_event just triggered execution of this
# function in a thread, so now that we hold the lock, we
# clear the timer_event attribute to record that no Timer
# is active.
# is active. If it is already None, stop() is shutting us
# down.
if self.timer_event is None:
return
self.timer_event = None
current_time = timezone_now()
@ -696,6 +701,29 @@ class MissedMessageWorker(QueueProcessingWorker):
if ScheduledMessageNotificationEmail.objects.exists():
self.ensure_timer()
def stop(self) -> None:
# This may be called from a signal handler when we _already_
# have the lock. Python doesn't give us a way to check if our
# thread has the lock, so we instead use a re-entrant lock to
# always take it.
with self.lock:
# With the lock,we can safely inspect the timer_event and
# cancel it if it is still pending.
if self.timer_event is not None:
# We cancel and then join the timer with a timeout to
# prevent deadlock, where we took the lock, the timer
# then ran out and started maybe_send_batched_emails,
# and then it started waiting for the lock. The timer
# isn't running anymore so can't be canceled, and the
# thread is blocked on the lock, so will never join().
self.timer_event.cancel()
self.timer_event.join(timeout=1)
# In case we did hit this deadlock, we signal to
# maybe_send_batched_emails that it should abort by,
# before releasing the lock, unsetting the timer.
self.timer_event = None
super().stop()
@assign_queue("email_senders")
class EmailSendingWorker(LoopQueueProcessingWorker):