check-rabbitmq-queue: Add a simple algorithm to analyze queue stats.

This new algorithm is designed to avoid monitoring paging when a queue
simply has bursty behavior.
This commit is contained in:
Mateusz Mandera 2020-03-21 13:10:22 +01:00 committed by Tim Abbott
parent 0e57975643
commit 122d0bca83
3 changed files with 343 additions and 74 deletions

View File

@ -0,0 +1,217 @@
import os
import re
import time
import subprocess
import json
from collections import defaultdict
from typing import Any, DefaultDict, Dict, List
ZULIP_PATH = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
normal_queues = [
'deferred_work',
'digest_emails',
'email_mirror',
'embed_links',
'embedded_bots',
'error_reports',
'invites',
'email_senders',
'missedmessage_emails',
'missedmessage_mobile_notifications',
'outgoing_webhooks',
'signups',
'slow_queries',
'user_activity',
'user_activity_interval',
'user_presence',
]
OK = 0
WARNING = 1
CRITICAL = 2
UNKNOWN = 3
states = {
0: "OK",
1: "WARNING",
2: "CRITICAL",
3: "UNKNOWN"
}
MAX_SECONDS_TO_CLEAR_FOR_BURSTS = defaultdict(
lambda: 120,
digest_emails=600,
slow_queries=600,
) # type: DefaultDict[str, int]
MAX_SECONDS_TO_CLEAR_NORMAL = defaultdict(
lambda: 30,
digest_emails=120,
slow_queries=120,
) # type: DefaultDict[str, int]
CRITICAL_SECONDS_TO_CLEAR_FOR_BURSTS = defaultdict(
lambda: 240,
digest_emails=1200,
slow_queries=1200,
) # type: DefaultDict[str, int]
CRITICAL_SECONDS_TO_CLEAR_NORMAL = defaultdict(
lambda: 60,
digest_emails=600,
slow_queries=600,
) # type: DefaultDict[str, int]
def analyze_queue_stats(queue_name: str, stats: Dict[str, Any],
queue_count_rabbitmqctl: int) -> Dict[str, Any]:
now = int(time.time())
if stats == {}:
return dict(status=UNKNOWN,
name=queue_name,
message='invalid or no stats data')
if now - stats['update_time'] > 180 and queue_count_rabbitmqctl > 10:
# Queue isn't updating the stats file and has some events in
# the backlog, it's likely stuck.
#
# TODO: There's an unfortunate race where if the queue has
# been empty for the last hour (because there haven't been 50
# new events in the last hour), and then gets a burst, this
# condition will be true for the first (event_handling_time *
# 50).
return dict(status=CRITICAL,
name=queue_name,
message='queue appears to be stuck, last update %s' % (stats['update_time'],))
current_size = stats['current_queue_size']
average_consume_time = stats['recent_average_consume_time']
if average_consume_time is None:
# Queue just started; we can't effectively estimate anything.
#
# If the queue is stuck in this state and not processing
# anything, eventually the `update_time` rule above will fire.
return dict(status=OK,
name=queue_name,
message='')
expected_time_to_clear_backlog = current_size * average_consume_time
time_since_emptied = now - stats['queue_last_emptied_timestamp']
if time_since_emptied > max(300, CRITICAL_SECONDS_TO_CLEAR_FOR_BURSTS[queue_name]):
# We need the max() expression in case the rules for the queue
# permit longer processing times than 300s - to prevent
# incorrectly throwing an error by changing the classification
# of the the backlog from "burst" to "not burst" after 300s,
# while the worker is still processing it and staying below
# the CRITICAL threshold.
if expected_time_to_clear_backlog > MAX_SECONDS_TO_CLEAR_NORMAL[queue_name]:
if expected_time_to_clear_backlog > CRITICAL_SECONDS_TO_CLEAR_NORMAL[queue_name]:
status = CRITICAL
else:
status = WARNING
return dict(status=status,
name=queue_name,
message='clearing the backlog will take too long: %ss, size: %s' % (
expected_time_to_clear_backlog, current_size))
else:
# We slept recently, so treat this as a burst.
if expected_time_to_clear_backlog > MAX_SECONDS_TO_CLEAR_FOR_BURSTS[queue_name]:
if expected_time_to_clear_backlog > CRITICAL_SECONDS_TO_CLEAR_FOR_BURSTS[queue_name]:
status = CRITICAL
else:
status = WARNING
return dict(status=status,
name=queue_name,
message='clearing the burst will take too long: %ss, size: %s' % (
expected_time_to_clear_backlog, current_size))
return dict(status=OK,
name=queue_name,
message='')
WARN_COUNT_THRESHOLD_DEFAULT = 10
CRITICAL_COUNT_THRESHOLD_DEFAULT = 50
def check_other_queues(queue_counts_dict: Dict[str, int]) -> List[Dict[str, Any]]:
""" Do a simple queue size check for queues whose workers don't publish stats files."""
results = []
for queue, count in queue_counts_dict.items():
if queue in normal_queues:
continue
if count > CRITICAL_COUNT_THRESHOLD_DEFAULT:
results.append(dict(status=CRITICAL, name=queue,
message='count critical: %s' % (count,)))
elif count > WARN_COUNT_THRESHOLD_DEFAULT:
results.append(dict(status=WARNING, name=queue,
message='count warning: %s' % (count,)))
else:
results.append(dict(status=OK, name=queue, message=''))
return results
def check_rabbitmq_queues() -> None:
pattern = re.compile(r'(\w+)\t(\d+)')
if 'USER' in os.environ and not os.environ['USER'] in ['root', 'rabbitmq']:
print("This script must be run as the root or rabbitmq user")
list_queues_output = subprocess.check_output(['/usr/sbin/rabbitmqctl', 'list_queues'],
universal_newlines=True)
list_consumers_output = subprocess.check_output(['/usr/sbin/rabbitmqctl', 'list_consumers'],
universal_newlines=True)
queue_counts_rabbitmqctl = dict()
for line in list_queues_output.split("\n"):
line = line.strip()
m = pattern.match(line)
if m:
queue = m.group(1)
count = int(m.group(2))
queue_counts_rabbitmqctl[queue] = count
queues_with_consumers = []
for line in list_consumers_output.split('\n'):
parts = line.split('\t')
if len(parts) >= 2:
queue_name = parts[0]
if queue_name.startswith("notify_tornado"):
continue
queues_with_consumers.append(queue_name)
queue_stats_dir = subprocess.check_output([os.path.join(ZULIP_PATH, 'scripts/get-django-setting'),
'QUEUE_STATS_DIR'],
universal_newlines=True).strip()
queue_stats = dict() # type: Dict[str, Dict[str, Any]]
queues_to_check = set(normal_queues).intersection(set(queues_with_consumers))
for queue in queues_to_check:
fn = queue + ".stats"
file_path = os.path.join(queue_stats_dir, fn)
if not os.path.exists(file_path):
queue_stats[queue] = {}
continue
with open(file_path, "r") as f:
try:
queue_stats[queue] = json.load(f)
except json.decoder.JSONDecodeError:
queue_stats[queue] = {}
results = []
for queue_name, stats in queue_stats.items():
results.append(analyze_queue_stats(queue_name, stats, queue_counts_rabbitmqctl[queue]))
results.extend(check_other_queues(queue_counts_rabbitmqctl))
status = max(result['status'] for result in results)
now = int(time.time())
if status > 0:
queue_error_template = "queue {} problem: {}:{}"
error_message = '; '.join([
queue_error_template.format(result['name'], states[result['status']], result['message'])
for result in results if result['status'] > 0
])
print("%s|%s|%s|%s" % (now, status, states[status], error_message))
else:
print("%s|%s|%s|queues normal" % (now, status, states[status]))

View File

@ -1,80 +1,12 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import re
import time
import os import os
import subprocess import sys
# The WARN_THRESHOLD/CRIT_THRESHOLD settings makes it possible to ZULIP_PATH = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# configure specific queues to have a higher or lower limit then the sys.path.append(ZULIP_PATH)
# default.
WARN_THRESHOLD_DEFAULT = 10
WARN_THRESHOLD = {
'missedmessage_emails': WARN_THRESHOLD_DEFAULT,
# The user_activity worker has high throughput and uses a
# LoopQueueProcessingWorker, so it's normal to have a moderate
# backlog.
'user_activity': 1000,
}
CRIT_THRESHOLD_DEFAULT = 50
CRIT_THRESHOLD = {
'missedmessage_emails': CRIT_THRESHOLD_DEFAULT,
# A backlog of hundreds of events for user_activity likely
# indicates an outage of the processor.
'user_activity': 5000,
}
states = { from scripts.lib.check_rabbitmq_queue import check_rabbitmq_queues
0: "OK",
1: "WARNING",
2: "CRITICAL",
3: "UNKNOWN"
}
pattern = re.compile(r'(\w+)\t(\d+)') if __name__ == "__main__":
if 'USER' in os.environ and not os.environ['USER'] in ['root', 'rabbitmq']: check_rabbitmq_queues()
print("This script must be run as the root or rabbitmq user")
output = subprocess.check_output(['/usr/sbin/rabbitmqctl', 'list_queues'], universal_newlines=True)
status = 0
max_count = 0
warn_queues = []
for line in output.split("\n"):
line = line.strip()
m = pattern.match(line)
if m:
queue = m.group(1)
count = int(m.group(2))
this_status = 0
if count > CRIT_THRESHOLD.get(queue, CRIT_THRESHOLD_DEFAULT):
this_status = 2
warn_queues.append(queue)
elif count > WARN_THRESHOLD.get(queue, WARN_THRESHOLD_DEFAULT):
this_status = max(status, 1)
warn_queues.append(queue)
status = max(status, this_status)
max_count = max(max_count, count)
warn_about = ", ".join(warn_queues)
now = int(time.time())
now_struct = time.gmtime(now)
# While we are sending digest emails, at 1800 hrs (UTC) each weekday, the mail
# queues can get backed up; don't alert on those. Additionally, certain workers
# (slow_queries and digest_emails) have a polling algorithm that means it's
# normal for them to accumulate items.
if not set(warn_queues) - set(("digest_emails", "slow_queries")) and \
now_struct.tm_hour == 18 and now_struct.tm_min < 25:
status = 0
print("%s|%s|%s|processing digests, not alerting on elevated mail queues" % (
now, status, states[status]))
exit(0)
if status > 0:
print("%s|%s|%s|max count %s, queues affected: %s" % (
now, status, states[status], max_count, warn_about))
else:
print("%s|%s|%s|queues normal, max count %s" % (now, status, states[status], max_count))

View File

@ -0,0 +1,120 @@
import mock
from unittest import TestCase
from scripts.lib.check_rabbitmq_queue import (
analyze_queue_stats,
OK,
WARNING,
CRITICAL,
UNKNOWN,
)
import time
class AnalyzeQueueStatsTests(TestCase):
def test_no_stats_available(self) -> None:
result = analyze_queue_stats('name', {}, 0)
self.assertEqual(result['status'], UNKNOWN)
def test_queue_stuck(self) -> None:
"""Last update > 5 minutes ago and there's events in the queue.
In theory, we could be having bad luck with a race where in
the last (event_handing_time * 50) a burst was added, but it's
unlikely and shouldn't fail 2 in a row for Nagios anyway.
"""
result = analyze_queue_stats('name', {'update_time': time.time() - 301}, 100)
self.assertEqual(result['status'], CRITICAL)
self.assertIn('queue appears to be stuck', result['message'])
def test_queue_just_started(self) -> None:
"""
We just started processing a burst of events, and haven't processed enough
to log productivity statistics yet.
"""
result = analyze_queue_stats('name', {'update_time': time.time(),
'current_queue_size': 10000,
'recent_average_consume_time': None}, 10000)
self.assertEqual(result['status'], OK)
def test_queue_normal(self) -> None:
"""10000 events and each takes a second => it'll take a long time to empty."""
result = analyze_queue_stats('name', {'update_time': time.time(),
'current_queue_size': 10000,
'queue_last_emptied_timestamp': time.time() - 10000,
'recent_average_consume_time': 1}, 10000)
self.assertEqual(result['status'], CRITICAL)
self.assertIn('clearing the backlog', result['message'])
# If we're doing 10K/sec, it's OK.
result = analyze_queue_stats('name', {'update_time': time.time(),
'current_queue_size': 10000,
'queue_last_emptied_timestamp': time.time() - 10000,
'recent_average_consume_time': 0.0001}, 10000)
self.assertEqual(result['status'], OK)
# Verify logic around whether it'll take MAX_SECONDS_TO_CLEAR_NORMAL to clear queue.
with mock.patch.dict('scripts.lib.check_rabbitmq_queue.MAX_SECONDS_TO_CLEAR_NORMAL',
{'name': 10}):
result = analyze_queue_stats('name', {'update_time': time.time(),
'current_queue_size': 11,
'queue_last_emptied_timestamp': time.time() - 10000,
'recent_average_consume_time': 1}, 11)
self.assertEqual(result['status'], WARNING)
self.assertIn('clearing the backlog', result['message'])
result = analyze_queue_stats('name', {'update_time': time.time(),
'current_queue_size': 9,
'queue_last_emptied_timestamp': time.time() - 10000,
'recent_average_consume_time': 1}, 9)
self.assertEqual(result['status'], OK)
def test_queue_burst(self) -> None:
"""Test logic for just after a large number of events were added
to an empty queue. Happens routinely for digest emails, for example."""
result = analyze_queue_stats('name', {'update_time': time.time(),
'current_queue_size': 10000,
'queue_last_emptied_timestamp': time.time() - 1,
'recent_average_consume_time': 1}, 10000)
self.assertEqual(result['status'], CRITICAL)
self.assertIn('clearing the burst', result['message'])
# verify logic around MAX_SECONDS_TO_CLEAR_FOR_BURSTS.
with mock.patch.dict('scripts.lib.check_rabbitmq_queue.MAX_SECONDS_TO_CLEAR_FOR_BURSTS',
{'name': 10}):
result = analyze_queue_stats('name', {'update_time': time.time(),
'current_queue_size': 11,
'queue_last_emptied_timestamp': time.time() - 1,
'recent_average_consume_time': 1}, 11)
self.assertEqual(result['status'], WARNING)
self.assertIn('clearing the burst', result['message'])
result = analyze_queue_stats('name', {'update_time': time.time(),
'current_queue_size': 9,
'queue_last_emptied_timestamp': time.time() - 1,
'recent_average_consume_time': 1}, 9)
self.assertEqual(result['status'], OK)
def test_queue_burst_long_time_to_clear_allowed(self) -> None:
"""
For a queue that is allowed > 300s to clear a burst of events,
we need to verify that the checker will not stop categorizing this as a burst
while the worker is still processing the events, within the allowed time limit.
"""
start_time = time.time()
with mock.patch.dict('scripts.lib.check_rabbitmq_queue.CRITICAL_SECONDS_TO_CLEAR_FOR_BURSTS',
{'name': 600}), \
mock.patch.dict('scripts.lib.check_rabbitmq_queue.MAX_SECONDS_TO_CLEAR_FOR_BURSTS',
{'name': 600}):
with mock.patch('time.time', return_value=start_time + 599):
result = analyze_queue_stats('name', {'update_time': time.time(),
'current_queue_size': 599,
'queue_last_emptied_timestamp': start_time,
'recent_average_consume_time': 1}, 599)
self.assertEqual(result['status'], OK)
with mock.patch('time.time', return_value=start_time + 601):
result = analyze_queue_stats('name', {'update_time': time.time(),
'current_queue_size': 599,
'queue_last_emptied_timestamp': start_time,
'recent_average_consume_time': 1}, 599)
self.assertEqual(result['status'], CRITICAL)