From cd9b194d88a099da99dc4bee0fbf82e7d7341a6b Mon Sep 17 00:00:00 2001 From: Mateusz Mandera Date: Sun, 20 Sep 2020 13:35:35 +0200 Subject: [PATCH] queue: Eliminate useless "burst" concept in monitoring. The reason higher expected_time_to_clear_backlog were allowed for queues during "bursts" was, in simpler terms, because those queues to which this happens, intrinsically have a higher acceptable "time until cleared" for new events. E.g. digests_email, where it's completely fine to take a long time to send them out after putting in the queue. And that's already configurable without a normal/burst distinction. Thanks to this we can remove a bunch of overly complicated, and ultimately useless, logic. --- scripts/lib/check_rabbitmq_queue.py | 49 +++++---------------- tools/tests/test_check_rabbitmq_queue.py | 55 +----------------------- 2 files changed, 13 insertions(+), 91 deletions(-) diff --git a/scripts/lib/check_rabbitmq_queue.py b/scripts/lib/check_rabbitmq_queue.py index 7dd4e9dc0c..5898917406 100644 --- a/scripts/lib/check_rabbitmq_queue.py +++ b/scripts/lib/check_rabbitmq_queue.py @@ -38,23 +38,15 @@ states = { 3: "UNKNOWN", } -MAX_SECONDS_TO_CLEAR_FOR_BURSTS: DefaultDict[str, int] = defaultdict( - lambda: 120, - digest_emails=600, -) -MAX_SECONDS_TO_CLEAR_NORMAL: DefaultDict[str, int] = defaultdict( +MAX_SECONDS_TO_CLEAR: DefaultDict[str, int] = defaultdict( lambda: 30, digest_emails=1200, missedmessage_mobile_notifications=120, ) -CRITICAL_SECONDS_TO_CLEAR_FOR_BURSTS: DefaultDict[str, int] = defaultdict( - lambda: 240, - digest_emails=1200, -) -CRITICAL_SECONDS_TO_CLEAR_NORMAL: DefaultDict[str, int] = defaultdict( +CRITICAL_SECONDS_TO_CLEAR: DefaultDict[str, int] = defaultdict( lambda: 60, missedmessage_mobile_notifications=180, - digest_emails=600, + digest_emails=1800, ) def analyze_queue_stats(queue_name: str, stats: Dict[str, Any], @@ -96,34 +88,15 @@ def analyze_queue_stats(queue_name: str, stats: Dict[str, Any], message='') expected_time_to_clear_backlog = current_size * average_consume_time - time_since_emptied = now - stats['queue_last_emptied_timestamp'] - if time_since_emptied > max(300, CRITICAL_SECONDS_TO_CLEAR_FOR_BURSTS[queue_name]): - # We need the max() expression in case the rules for the queue - # permit longer processing times than 300s - to prevent - # incorrectly throwing an error by changing the classification - # of the the backlog from "burst" to "not burst" after 300s, - # while the worker is still processing it and staying below - # the CRITICAL threshold. - if expected_time_to_clear_backlog > MAX_SECONDS_TO_CLEAR_NORMAL[queue_name]: - if expected_time_to_clear_backlog > CRITICAL_SECONDS_TO_CLEAR_NORMAL[queue_name]: - status = CRITICAL - else: - status = WARNING + if expected_time_to_clear_backlog > MAX_SECONDS_TO_CLEAR[queue_name]: + if expected_time_to_clear_backlog > CRITICAL_SECONDS_TO_CLEAR[queue_name]: + status = CRITICAL + else: + status = WARNING - return dict(status=status, - name=queue_name, - message=f'clearing the backlog will take too long: {expected_time_to_clear_backlog}s, size: {current_size}') - else: - # We slept recently, so treat this as a burst. - if expected_time_to_clear_backlog > MAX_SECONDS_TO_CLEAR_FOR_BURSTS[queue_name]: - if expected_time_to_clear_backlog > CRITICAL_SECONDS_TO_CLEAR_FOR_BURSTS[queue_name]: - status = CRITICAL - else: - status = WARNING - - return dict(status=status, - name=queue_name, - message=f'clearing the burst will take too long: {expected_time_to_clear_backlog}s, size: {current_size}') + return dict(status=status, + name=queue_name, + message=f'clearing the backlog will take too long: {expected_time_to_clear_backlog}s, size: {current_size}') return dict(status=OK, name=queue_name, diff --git a/tools/tests/test_check_rabbitmq_queue.py b/tools/tests/test_check_rabbitmq_queue.py index f7a6f323c3..daf9d910c5 100644 --- a/tools/tests/test_check_rabbitmq_queue.py +++ b/tools/tests/test_check_rabbitmq_queue.py @@ -42,8 +42,8 @@ class AnalyzeQueueStatsTests(TestCase): 'recent_average_consume_time': 0.0001}, 10000) self.assertEqual(result['status'], OK) - # Verify logic around whether it'll take MAX_SECONDS_TO_CLEAR_NORMAL to clear queue. - with mock.patch.dict('scripts.lib.check_rabbitmq_queue.MAX_SECONDS_TO_CLEAR_NORMAL', + # Verify logic around whether it'll take MAX_SECONDS_TO_CLEAR to clear queue. + with mock.patch.dict('scripts.lib.check_rabbitmq_queue.MAX_SECONDS_TO_CLEAR', {'name': 10}): result = analyze_queue_stats('name', {'update_time': time.time(), 'current_queue_size': 11, @@ -57,54 +57,3 @@ class AnalyzeQueueStatsTests(TestCase): 'queue_last_emptied_timestamp': time.time() - 10000, 'recent_average_consume_time': 1}, 9) self.assertEqual(result['status'], OK) - - def test_queue_burst(self) -> None: - """Test logic for just after a large number of events were added - to an empty queue. Happens routinely for digest emails, for example.""" - result = analyze_queue_stats('name', {'update_time': time.time(), - 'current_queue_size': 10000, - 'queue_last_emptied_timestamp': time.time() - 1, - 'recent_average_consume_time': 1}, 10000) - self.assertEqual(result['status'], CRITICAL) - self.assertIn('clearing the burst', result['message']) - - # verify logic around MAX_SECONDS_TO_CLEAR_FOR_BURSTS. - with mock.patch.dict('scripts.lib.check_rabbitmq_queue.MAX_SECONDS_TO_CLEAR_FOR_BURSTS', - {'name': 10}): - result = analyze_queue_stats('name', {'update_time': time.time(), - 'current_queue_size': 11, - 'queue_last_emptied_timestamp': time.time() - 1, - 'recent_average_consume_time': 1}, 11) - self.assertEqual(result['status'], WARNING) - self.assertIn('clearing the burst', result['message']) - - result = analyze_queue_stats('name', {'update_time': time.time(), - 'current_queue_size': 9, - 'queue_last_emptied_timestamp': time.time() - 1, - 'recent_average_consume_time': 1}, 9) - self.assertEqual(result['status'], OK) - - def test_queue_burst_long_time_to_clear_allowed(self) -> None: - """ - For a queue that is allowed > 300s to clear a burst of events, - we need to verify that the checker will not stop categorizing this as a burst - while the worker is still processing the events, within the allowed time limit. - """ - start_time = time.time() - with mock.patch.dict('scripts.lib.check_rabbitmq_queue.CRITICAL_SECONDS_TO_CLEAR_FOR_BURSTS', - {'name': 600}), \ - mock.patch.dict('scripts.lib.check_rabbitmq_queue.MAX_SECONDS_TO_CLEAR_FOR_BURSTS', - {'name': 600}): - with mock.patch('time.time', return_value=start_time + 599): - result = analyze_queue_stats('name', {'update_time': time.time(), - 'current_queue_size': 599, - 'queue_last_emptied_timestamp': start_time, - 'recent_average_consume_time': 1}, 599) - self.assertEqual(result['status'], OK) - - with mock.patch('time.time', return_value=start_time + 601): - result = analyze_queue_stats('name', {'update_time': time.time(), - 'current_queue_size': 599, - 'queue_last_emptied_timestamp': start_time, - 'recent_average_consume_time': 1}, 599) - self.assertEqual(result['status'], CRITICAL)