queue: Fix a race condition in monitoring after queue stops being idle.

The race condition is described in the comment block removed by this
commit. This leaves room for another, remaining race condition
that should be virtually impossible, but nevertheless it seems
worthwhile to have it documented in the code, so we put a new comment
describing it.
As a final note, this is not a new race condition,
it was hypothetically possible with the old code as well.
This commit is contained in:
Mateusz Mandera 2020-09-06 18:26:27 +02:00 committed by Tim Abbott
parent a72e9476ee
commit 2365a53496
3 changed files with 27 additions and 10 deletions

View File

@ -69,11 +69,16 @@ def analyze_queue_stats(queue_name: str, stats: Dict[str, Any],
# Queue isn't updating the stats file and has some events in # Queue isn't updating the stats file and has some events in
# the backlog, it's likely stuck. # the backlog, it's likely stuck.
# #
# TODO: There's an unfortunate race where if the queue has # TODO: There's an unlikely race condition here - if the queue
# been empty for the last hour (because there haven't been 50 # was fully emptied and was idle due to no new events coming
# new events in the last hour), and then gets a burst, this # for over 180 seconds, suddenly gets a burst of events and
# condition will be true for the first (event_handling_time * # this code runs exactly in the very small time window between
# 50). # those events popping up and the queue beginning to process
# the first one (which will refresh the stats file at the very
# start), we'll incorrectly return the CRITICAL status. The
# chance of that happening should be negligible because the queue
# worker should wake up immediately and log statistics before
# starting to process the first event.
return dict(status=CRITICAL, return dict(status=CRITICAL,
name=queue_name, name=queue_name,
message='queue appears to be stuck, last update {}, queue size {}'.format( message='queue appears to be stuck, last update {}, queue size {}'.format(

View File

@ -10,12 +10,8 @@ class AnalyzeQueueStatsTests(TestCase):
self.assertEqual(result['status'], UNKNOWN) self.assertEqual(result['status'], UNKNOWN)
def test_queue_stuck(self) -> None: def test_queue_stuck(self) -> None:
"""Last update > 5 minutes ago and there's events in the queue. """Last update > 5 minutes ago and there's events in the queue."""
In theory, we could be having bad luck with a race where in
the last (event_handing_time * 50) a burst was added, but it's
unlikely and shouldn't fail 2 in a row for Nagios anyway.
"""
result = analyze_queue_stats('name', {'update_time': time.time() - 301}, 100) result = analyze_queue_stats('name', {'update_time': time.time() - 301}, 100)
self.assertEqual(result['status'], CRITICAL) self.assertEqual(result['status'], CRITICAL)
self.assertIn('queue appears to be stuck', result['message']) self.assertIn('queue appears to be stuck', result['message'])

View File

@ -178,6 +178,7 @@ class QueueProcessingWorker(ABC):
self.consumed_since_last_emptied = 0 self.consumed_since_last_emptied = 0
self.recent_consume_times: MutableSequence[Tuple[int, float]] = deque(maxlen=50) self.recent_consume_times: MutableSequence[Tuple[int, float]] = deque(maxlen=50)
self.consume_interation_counter = 0 self.consume_interation_counter = 0
self.idle = True
self.update_statistics(0) self.update_statistics(0)
@ -233,6 +234,14 @@ class QueueProcessingWorker(ABC):
data={"events": events, "queue_size": self.get_remaining_queue_size()}, data={"events": events, "queue_size": self.get_remaining_queue_size()},
) )
try: try:
if self.idle:
# We're reactivating after having gone idle due to emptying the queue.
# We should update the stats file to keep it fresh and to make it clear
# that the queue started processing, in case the event we're about to process
# makes us freeze.
self.idle = False
self.update_statistics(self.get_remaining_queue_size())
time_start = time.time() time_start = time.time()
consume_func(events) consume_func(events)
consume_time_seconds = time.time() - time_start consume_time_seconds = time.time() - time_start
@ -250,6 +259,13 @@ class QueueProcessingWorker(ABC):
if remaining_queue_size == 0: if remaining_queue_size == 0:
self.queue_last_emptied_timestamp = time.time() self.queue_last_emptied_timestamp = time.time()
self.consumed_since_last_emptied = 0 self.consumed_since_last_emptied = 0
# We've cleared all the events from the queue, so we don't
# need to worry about the small overhead of doing a disk write.
# We take advantage of this to update the stats file to keep it fresh,
# especially since the queue might go idle until new events come in.
self.update_statistics(0)
self.idle = True
return
self.consume_interation_counter += 1 self.consume_interation_counter += 1
if self.consume_interation_counter >= self.CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM: if self.consume_interation_counter >= self.CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM: