From 2365a53496aaed59b759d8a59ce86068e1d3661f Mon Sep 17 00:00:00 2001 From: Mateusz Mandera Date: Sun, 6 Sep 2020 18:26:27 +0200 Subject: [PATCH] queue: Fix a race condition in monitoring after queue stops being idle. The race condition is described in the comment block removed by this commit. This leaves room for another, remaining race condition that should be virtually impossible, but nevertheless it seems worthwhile to have it documented in the code, so we put a new comment describing it. As a final note, this is not a new race condition, it was hypothetically possible with the old code as well. --- scripts/lib/check_rabbitmq_queue.py | 15 ++++++++++----- tools/tests/test_check_rabbitmq_queue.py | 6 +----- zerver/worker/queue_processors.py | 16 ++++++++++++++++ 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/scripts/lib/check_rabbitmq_queue.py b/scripts/lib/check_rabbitmq_queue.py index e95a22b0ca..7dd4e9dc0c 100644 --- a/scripts/lib/check_rabbitmq_queue.py +++ b/scripts/lib/check_rabbitmq_queue.py @@ -69,11 +69,16 @@ def analyze_queue_stats(queue_name: str, stats: Dict[str, Any], # Queue isn't updating the stats file and has some events in # the backlog, it's likely stuck. # - # TODO: There's an unfortunate race where if the queue has - # been empty for the last hour (because there haven't been 50 - # new events in the last hour), and then gets a burst, this - # condition will be true for the first (event_handling_time * - # 50). + # TODO: There's an unlikely race condition here - if the queue + # was fully emptied and was idle due to no new events coming + # for over 180 seconds, suddenly gets a burst of events and + # this code runs exactly in the very small time window between + # those events popping up and the queue beginning to process + # the first one (which will refresh the stats file at the very + # start), we'll incorrectly return the CRITICAL status. The + # chance of that happening should be negligible because the queue + # worker should wake up immediately and log statistics before + # starting to process the first event. return dict(status=CRITICAL, name=queue_name, message='queue appears to be stuck, last update {}, queue size {}'.format( diff --git a/tools/tests/test_check_rabbitmq_queue.py b/tools/tests/test_check_rabbitmq_queue.py index 2eb531e1d9..f7a6f323c3 100644 --- a/tools/tests/test_check_rabbitmq_queue.py +++ b/tools/tests/test_check_rabbitmq_queue.py @@ -10,12 +10,8 @@ class AnalyzeQueueStatsTests(TestCase): self.assertEqual(result['status'], UNKNOWN) def test_queue_stuck(self) -> None: - """Last update > 5 minutes ago and there's events in the queue. + """Last update > 5 minutes ago and there's events in the queue.""" - In theory, we could be having bad luck with a race where in - the last (event_handing_time * 50) a burst was added, but it's - unlikely and shouldn't fail 2 in a row for Nagios anyway. - """ result = analyze_queue_stats('name', {'update_time': time.time() - 301}, 100) self.assertEqual(result['status'], CRITICAL) self.assertIn('queue appears to be stuck', result['message']) diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 82c43a1095..7081118108 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -178,6 +178,7 @@ class QueueProcessingWorker(ABC): self.consumed_since_last_emptied = 0 self.recent_consume_times: MutableSequence[Tuple[int, float]] = deque(maxlen=50) self.consume_interation_counter = 0 + self.idle = True self.update_statistics(0) @@ -233,6 +234,14 @@ class QueueProcessingWorker(ABC): data={"events": events, "queue_size": self.get_remaining_queue_size()}, ) try: + if self.idle: + # We're reactivating after having gone idle due to emptying the queue. + # We should update the stats file to keep it fresh and to make it clear + # that the queue started processing, in case the event we're about to process + # makes us freeze. + self.idle = False + self.update_statistics(self.get_remaining_queue_size()) + time_start = time.time() consume_func(events) consume_time_seconds = time.time() - time_start @@ -250,6 +259,13 @@ class QueueProcessingWorker(ABC): if remaining_queue_size == 0: self.queue_last_emptied_timestamp = time.time() self.consumed_since_last_emptied = 0 + # We've cleared all the events from the queue, so we don't + # need to worry about the small overhead of doing a disk write. + # We take advantage of this to update the stats file to keep it fresh, + # especially since the queue might go idle until new events come in. + self.update_statistics(0) + self.idle = True + return self.consume_interation_counter += 1 if self.consume_interation_counter >= self.CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM: