mirror of https://github.com/zulip/zulip.git
queue: Eliminate useless "burst" concept in monitoring.
The reason higher expected_time_to_clear_backlog were allowed for queues during "bursts" was, in simpler terms, because those queues to which this happens, intrinsically have a higher acceptable "time until cleared" for new events. E.g. digests_email, where it's completely fine to take a long time to send them out after putting in the queue. And that's already configurable without a normal/burst distinction. Thanks to this we can remove a bunch of overly complicated, and ultimately useless, logic.
This commit is contained in:
parent
810514dd9d
commit
cd9b194d88
|
@ -38,23 +38,15 @@ states = {
|
||||||
3: "UNKNOWN",
|
3: "UNKNOWN",
|
||||||
}
|
}
|
||||||
|
|
||||||
MAX_SECONDS_TO_CLEAR_FOR_BURSTS: DefaultDict[str, int] = defaultdict(
|
MAX_SECONDS_TO_CLEAR: DefaultDict[str, int] = defaultdict(
|
||||||
lambda: 120,
|
|
||||||
digest_emails=600,
|
|
||||||
)
|
|
||||||
MAX_SECONDS_TO_CLEAR_NORMAL: DefaultDict[str, int] = defaultdict(
|
|
||||||
lambda: 30,
|
lambda: 30,
|
||||||
digest_emails=1200,
|
digest_emails=1200,
|
||||||
missedmessage_mobile_notifications=120,
|
missedmessage_mobile_notifications=120,
|
||||||
)
|
)
|
||||||
CRITICAL_SECONDS_TO_CLEAR_FOR_BURSTS: DefaultDict[str, int] = defaultdict(
|
CRITICAL_SECONDS_TO_CLEAR: DefaultDict[str, int] = defaultdict(
|
||||||
lambda: 240,
|
|
||||||
digest_emails=1200,
|
|
||||||
)
|
|
||||||
CRITICAL_SECONDS_TO_CLEAR_NORMAL: DefaultDict[str, int] = defaultdict(
|
|
||||||
lambda: 60,
|
lambda: 60,
|
||||||
missedmessage_mobile_notifications=180,
|
missedmessage_mobile_notifications=180,
|
||||||
digest_emails=600,
|
digest_emails=1800,
|
||||||
)
|
)
|
||||||
|
|
||||||
def analyze_queue_stats(queue_name: str, stats: Dict[str, Any],
|
def analyze_queue_stats(queue_name: str, stats: Dict[str, Any],
|
||||||
|
@ -96,16 +88,8 @@ def analyze_queue_stats(queue_name: str, stats: Dict[str, Any],
|
||||||
message='')
|
message='')
|
||||||
|
|
||||||
expected_time_to_clear_backlog = current_size * average_consume_time
|
expected_time_to_clear_backlog = current_size * average_consume_time
|
||||||
time_since_emptied = now - stats['queue_last_emptied_timestamp']
|
if expected_time_to_clear_backlog > MAX_SECONDS_TO_CLEAR[queue_name]:
|
||||||
if time_since_emptied > max(300, CRITICAL_SECONDS_TO_CLEAR_FOR_BURSTS[queue_name]):
|
if expected_time_to_clear_backlog > CRITICAL_SECONDS_TO_CLEAR[queue_name]:
|
||||||
# We need the max() expression in case the rules for the queue
|
|
||||||
# permit longer processing times than 300s - to prevent
|
|
||||||
# incorrectly throwing an error by changing the classification
|
|
||||||
# of the the backlog from "burst" to "not burst" after 300s,
|
|
||||||
# while the worker is still processing it and staying below
|
|
||||||
# the CRITICAL threshold.
|
|
||||||
if expected_time_to_clear_backlog > MAX_SECONDS_TO_CLEAR_NORMAL[queue_name]:
|
|
||||||
if expected_time_to_clear_backlog > CRITICAL_SECONDS_TO_CLEAR_NORMAL[queue_name]:
|
|
||||||
status = CRITICAL
|
status = CRITICAL
|
||||||
else:
|
else:
|
||||||
status = WARNING
|
status = WARNING
|
||||||
|
@ -113,17 +97,6 @@ def analyze_queue_stats(queue_name: str, stats: Dict[str, Any],
|
||||||
return dict(status=status,
|
return dict(status=status,
|
||||||
name=queue_name,
|
name=queue_name,
|
||||||
message=f'clearing the backlog will take too long: {expected_time_to_clear_backlog}s, size: {current_size}')
|
message=f'clearing the backlog will take too long: {expected_time_to_clear_backlog}s, size: {current_size}')
|
||||||
else:
|
|
||||||
# We slept recently, so treat this as a burst.
|
|
||||||
if expected_time_to_clear_backlog > MAX_SECONDS_TO_CLEAR_FOR_BURSTS[queue_name]:
|
|
||||||
if expected_time_to_clear_backlog > CRITICAL_SECONDS_TO_CLEAR_FOR_BURSTS[queue_name]:
|
|
||||||
status = CRITICAL
|
|
||||||
else:
|
|
||||||
status = WARNING
|
|
||||||
|
|
||||||
return dict(status=status,
|
|
||||||
name=queue_name,
|
|
||||||
message=f'clearing the burst will take too long: {expected_time_to_clear_backlog}s, size: {current_size}')
|
|
||||||
|
|
||||||
return dict(status=OK,
|
return dict(status=OK,
|
||||||
name=queue_name,
|
name=queue_name,
|
||||||
|
|
|
@ -42,8 +42,8 @@ class AnalyzeQueueStatsTests(TestCase):
|
||||||
'recent_average_consume_time': 0.0001}, 10000)
|
'recent_average_consume_time': 0.0001}, 10000)
|
||||||
self.assertEqual(result['status'], OK)
|
self.assertEqual(result['status'], OK)
|
||||||
|
|
||||||
# Verify logic around whether it'll take MAX_SECONDS_TO_CLEAR_NORMAL to clear queue.
|
# Verify logic around whether it'll take MAX_SECONDS_TO_CLEAR to clear queue.
|
||||||
with mock.patch.dict('scripts.lib.check_rabbitmq_queue.MAX_SECONDS_TO_CLEAR_NORMAL',
|
with mock.patch.dict('scripts.lib.check_rabbitmq_queue.MAX_SECONDS_TO_CLEAR',
|
||||||
{'name': 10}):
|
{'name': 10}):
|
||||||
result = analyze_queue_stats('name', {'update_time': time.time(),
|
result = analyze_queue_stats('name', {'update_time': time.time(),
|
||||||
'current_queue_size': 11,
|
'current_queue_size': 11,
|
||||||
|
@ -57,54 +57,3 @@ class AnalyzeQueueStatsTests(TestCase):
|
||||||
'queue_last_emptied_timestamp': time.time() - 10000,
|
'queue_last_emptied_timestamp': time.time() - 10000,
|
||||||
'recent_average_consume_time': 1}, 9)
|
'recent_average_consume_time': 1}, 9)
|
||||||
self.assertEqual(result['status'], OK)
|
self.assertEqual(result['status'], OK)
|
||||||
|
|
||||||
def test_queue_burst(self) -> None:
|
|
||||||
"""Test logic for just after a large number of events were added
|
|
||||||
to an empty queue. Happens routinely for digest emails, for example."""
|
|
||||||
result = analyze_queue_stats('name', {'update_time': time.time(),
|
|
||||||
'current_queue_size': 10000,
|
|
||||||
'queue_last_emptied_timestamp': time.time() - 1,
|
|
||||||
'recent_average_consume_time': 1}, 10000)
|
|
||||||
self.assertEqual(result['status'], CRITICAL)
|
|
||||||
self.assertIn('clearing the burst', result['message'])
|
|
||||||
|
|
||||||
# verify logic around MAX_SECONDS_TO_CLEAR_FOR_BURSTS.
|
|
||||||
with mock.patch.dict('scripts.lib.check_rabbitmq_queue.MAX_SECONDS_TO_CLEAR_FOR_BURSTS',
|
|
||||||
{'name': 10}):
|
|
||||||
result = analyze_queue_stats('name', {'update_time': time.time(),
|
|
||||||
'current_queue_size': 11,
|
|
||||||
'queue_last_emptied_timestamp': time.time() - 1,
|
|
||||||
'recent_average_consume_time': 1}, 11)
|
|
||||||
self.assertEqual(result['status'], WARNING)
|
|
||||||
self.assertIn('clearing the burst', result['message'])
|
|
||||||
|
|
||||||
result = analyze_queue_stats('name', {'update_time': time.time(),
|
|
||||||
'current_queue_size': 9,
|
|
||||||
'queue_last_emptied_timestamp': time.time() - 1,
|
|
||||||
'recent_average_consume_time': 1}, 9)
|
|
||||||
self.assertEqual(result['status'], OK)
|
|
||||||
|
|
||||||
def test_queue_burst_long_time_to_clear_allowed(self) -> None:
|
|
||||||
"""
|
|
||||||
For a queue that is allowed > 300s to clear a burst of events,
|
|
||||||
we need to verify that the checker will not stop categorizing this as a burst
|
|
||||||
while the worker is still processing the events, within the allowed time limit.
|
|
||||||
"""
|
|
||||||
start_time = time.time()
|
|
||||||
with mock.patch.dict('scripts.lib.check_rabbitmq_queue.CRITICAL_SECONDS_TO_CLEAR_FOR_BURSTS',
|
|
||||||
{'name': 600}), \
|
|
||||||
mock.patch.dict('scripts.lib.check_rabbitmq_queue.MAX_SECONDS_TO_CLEAR_FOR_BURSTS',
|
|
||||||
{'name': 600}):
|
|
||||||
with mock.patch('time.time', return_value=start_time + 599):
|
|
||||||
result = analyze_queue_stats('name', {'update_time': time.time(),
|
|
||||||
'current_queue_size': 599,
|
|
||||||
'queue_last_emptied_timestamp': start_time,
|
|
||||||
'recent_average_consume_time': 1}, 599)
|
|
||||||
self.assertEqual(result['status'], OK)
|
|
||||||
|
|
||||||
with mock.patch('time.time', return_value=start_time + 601):
|
|
||||||
result = analyze_queue_stats('name', {'update_time': time.time(),
|
|
||||||
'current_queue_size': 599,
|
|
||||||
'queue_last_emptied_timestamp': start_time,
|
|
||||||
'recent_average_consume_time': 1}, 599)
|
|
||||||
self.assertEqual(result['status'], CRITICAL)
|
|
||||||
|
|
Loading…
Reference in New Issue