queue: Eliminate useless "burst" concept in monitoring.

The reason higher expected_time_to_clear_backlog were allowed for queues
during "bursts" was, in simpler terms, because those queues to which
this happens, intrinsically have a higher acceptable "time until cleared"
for new events. E.g. digests_email, where it's completely fine to take a
long time to send them out after putting in the queue. And that's
already configurable without a normal/burst distinction.
Thanks to this we can remove a bunch of overly complicated, and
ultimately useless, logic.
This commit is contained in:
Mateusz Mandera 2020-09-20 13:35:35 +02:00 committed by Tim Abbott
parent 810514dd9d
commit cd9b194d88
2 changed files with 13 additions and 91 deletions

View File

@ -38,23 +38,15 @@ states = {
3: "UNKNOWN",
}
MAX_SECONDS_TO_CLEAR_FOR_BURSTS: DefaultDict[str, int] = defaultdict(
lambda: 120,
digest_emails=600,
)
MAX_SECONDS_TO_CLEAR_NORMAL: DefaultDict[str, int] = defaultdict(
MAX_SECONDS_TO_CLEAR: DefaultDict[str, int] = defaultdict(
lambda: 30,
digest_emails=1200,
missedmessage_mobile_notifications=120,
)
CRITICAL_SECONDS_TO_CLEAR_FOR_BURSTS: DefaultDict[str, int] = defaultdict(
lambda: 240,
digest_emails=1200,
)
CRITICAL_SECONDS_TO_CLEAR_NORMAL: DefaultDict[str, int] = defaultdict(
CRITICAL_SECONDS_TO_CLEAR: DefaultDict[str, int] = defaultdict(
lambda: 60,
missedmessage_mobile_notifications=180,
digest_emails=600,
digest_emails=1800,
)
def analyze_queue_stats(queue_name: str, stats: Dict[str, Any],
@ -96,34 +88,15 @@ def analyze_queue_stats(queue_name: str, stats: Dict[str, Any],
message='')
expected_time_to_clear_backlog = current_size * average_consume_time
time_since_emptied = now - stats['queue_last_emptied_timestamp']
if time_since_emptied > max(300, CRITICAL_SECONDS_TO_CLEAR_FOR_BURSTS[queue_name]):
# We need the max() expression in case the rules for the queue
# permit longer processing times than 300s - to prevent
# incorrectly throwing an error by changing the classification
# of the the backlog from "burst" to "not burst" after 300s,
# while the worker is still processing it and staying below
# the CRITICAL threshold.
if expected_time_to_clear_backlog > MAX_SECONDS_TO_CLEAR_NORMAL[queue_name]:
if expected_time_to_clear_backlog > CRITICAL_SECONDS_TO_CLEAR_NORMAL[queue_name]:
status = CRITICAL
else:
status = WARNING
if expected_time_to_clear_backlog > MAX_SECONDS_TO_CLEAR[queue_name]:
if expected_time_to_clear_backlog > CRITICAL_SECONDS_TO_CLEAR[queue_name]:
status = CRITICAL
else:
status = WARNING
return dict(status=status,
name=queue_name,
message=f'clearing the backlog will take too long: {expected_time_to_clear_backlog}s, size: {current_size}')
else:
# We slept recently, so treat this as a burst.
if expected_time_to_clear_backlog > MAX_SECONDS_TO_CLEAR_FOR_BURSTS[queue_name]:
if expected_time_to_clear_backlog > CRITICAL_SECONDS_TO_CLEAR_FOR_BURSTS[queue_name]:
status = CRITICAL
else:
status = WARNING
return dict(status=status,
name=queue_name,
message=f'clearing the burst will take too long: {expected_time_to_clear_backlog}s, size: {current_size}')
return dict(status=status,
name=queue_name,
message=f'clearing the backlog will take too long: {expected_time_to_clear_backlog}s, size: {current_size}')
return dict(status=OK,
name=queue_name,

View File

@ -42,8 +42,8 @@ class AnalyzeQueueStatsTests(TestCase):
'recent_average_consume_time': 0.0001}, 10000)
self.assertEqual(result['status'], OK)
# Verify logic around whether it'll take MAX_SECONDS_TO_CLEAR_NORMAL to clear queue.
with mock.patch.dict('scripts.lib.check_rabbitmq_queue.MAX_SECONDS_TO_CLEAR_NORMAL',
# Verify logic around whether it'll take MAX_SECONDS_TO_CLEAR to clear queue.
with mock.patch.dict('scripts.lib.check_rabbitmq_queue.MAX_SECONDS_TO_CLEAR',
{'name': 10}):
result = analyze_queue_stats('name', {'update_time': time.time(),
'current_queue_size': 11,
@ -57,54 +57,3 @@ class AnalyzeQueueStatsTests(TestCase):
'queue_last_emptied_timestamp': time.time() - 10000,
'recent_average_consume_time': 1}, 9)
self.assertEqual(result['status'], OK)
def test_queue_burst(self) -> None:
"""Test logic for just after a large number of events were added
to an empty queue. Happens routinely for digest emails, for example."""
result = analyze_queue_stats('name', {'update_time': time.time(),
'current_queue_size': 10000,
'queue_last_emptied_timestamp': time.time() - 1,
'recent_average_consume_time': 1}, 10000)
self.assertEqual(result['status'], CRITICAL)
self.assertIn('clearing the burst', result['message'])
# verify logic around MAX_SECONDS_TO_CLEAR_FOR_BURSTS.
with mock.patch.dict('scripts.lib.check_rabbitmq_queue.MAX_SECONDS_TO_CLEAR_FOR_BURSTS',
{'name': 10}):
result = analyze_queue_stats('name', {'update_time': time.time(),
'current_queue_size': 11,
'queue_last_emptied_timestamp': time.time() - 1,
'recent_average_consume_time': 1}, 11)
self.assertEqual(result['status'], WARNING)
self.assertIn('clearing the burst', result['message'])
result = analyze_queue_stats('name', {'update_time': time.time(),
'current_queue_size': 9,
'queue_last_emptied_timestamp': time.time() - 1,
'recent_average_consume_time': 1}, 9)
self.assertEqual(result['status'], OK)
def test_queue_burst_long_time_to_clear_allowed(self) -> None:
"""
For a queue that is allowed > 300s to clear a burst of events,
we need to verify that the checker will not stop categorizing this as a burst
while the worker is still processing the events, within the allowed time limit.
"""
start_time = time.time()
with mock.patch.dict('scripts.lib.check_rabbitmq_queue.CRITICAL_SECONDS_TO_CLEAR_FOR_BURSTS',
{'name': 600}), \
mock.patch.dict('scripts.lib.check_rabbitmq_queue.MAX_SECONDS_TO_CLEAR_FOR_BURSTS',
{'name': 600}):
with mock.patch('time.time', return_value=start_time + 599):
result = analyze_queue_stats('name', {'update_time': time.time(),
'current_queue_size': 599,
'queue_last_emptied_timestamp': start_time,
'recent_average_consume_time': 1}, 599)
self.assertEqual(result['status'], OK)
with mock.patch('time.time', return_value=start_time + 601):
result = analyze_queue_stats('name', {'update_time': time.time(),
'current_queue_size': 599,
'queue_last_emptied_timestamp': start_time,
'recent_average_consume_time': 1}, 599)
self.assertEqual(result['status'], CRITICAL)