diff --git a/scripts/lib/check_rabbitmq_queue.py b/scripts/lib/check_rabbitmq_queue.py new file mode 100644 index 0000000000..ece364b1aa --- /dev/null +++ b/scripts/lib/check_rabbitmq_queue.py @@ -0,0 +1,217 @@ +import os +import re +import time +import subprocess +import json + +from collections import defaultdict +from typing import Any, DefaultDict, Dict, List + +ZULIP_PATH = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +normal_queues = [ + 'deferred_work', + 'digest_emails', + 'email_mirror', + 'embed_links', + 'embedded_bots', + 'error_reports', + 'invites', + 'email_senders', + 'missedmessage_emails', + 'missedmessage_mobile_notifications', + 'outgoing_webhooks', + 'signups', + 'slow_queries', + 'user_activity', + 'user_activity_interval', + 'user_presence', +] + +OK = 0 +WARNING = 1 +CRITICAL = 2 +UNKNOWN = 3 + +states = { + 0: "OK", + 1: "WARNING", + 2: "CRITICAL", + 3: "UNKNOWN" +} + +MAX_SECONDS_TO_CLEAR_FOR_BURSTS = defaultdict( + lambda: 120, + digest_emails=600, + slow_queries=600, +) # type: DefaultDict[str, int] +MAX_SECONDS_TO_CLEAR_NORMAL = defaultdict( + lambda: 30, + digest_emails=120, + slow_queries=120, +) # type: DefaultDict[str, int] +CRITICAL_SECONDS_TO_CLEAR_FOR_BURSTS = defaultdict( + lambda: 240, + digest_emails=1200, + slow_queries=1200, +) # type: DefaultDict[str, int] +CRITICAL_SECONDS_TO_CLEAR_NORMAL = defaultdict( + lambda: 60, + digest_emails=600, + slow_queries=600, +) # type: DefaultDict[str, int] + +def analyze_queue_stats(queue_name: str, stats: Dict[str, Any], + queue_count_rabbitmqctl: int) -> Dict[str, Any]: + now = int(time.time()) + if stats == {}: + return dict(status=UNKNOWN, + name=queue_name, + message='invalid or no stats data') + + if now - stats['update_time'] > 180 and queue_count_rabbitmqctl > 10: + # Queue isn't updating the stats file and has some events in + # the backlog, it's likely stuck. + # + # TODO: There's an unfortunate race where if the queue has + # been empty for the last hour (because there haven't been 50 + # new events in the last hour), and then gets a burst, this + # condition will be true for the first (event_handling_time * + # 50). + return dict(status=CRITICAL, + name=queue_name, + message='queue appears to be stuck, last update %s' % (stats['update_time'],)) + + current_size = stats['current_queue_size'] + average_consume_time = stats['recent_average_consume_time'] + if average_consume_time is None: + # Queue just started; we can't effectively estimate anything. + # + # If the queue is stuck in this state and not processing + # anything, eventually the `update_time` rule above will fire. + return dict(status=OK, + name=queue_name, + message='') + + expected_time_to_clear_backlog = current_size * average_consume_time + time_since_emptied = now - stats['queue_last_emptied_timestamp'] + if time_since_emptied > max(300, CRITICAL_SECONDS_TO_CLEAR_FOR_BURSTS[queue_name]): + # We need the max() expression in case the rules for the queue + # permit longer processing times than 300s - to prevent + # incorrectly throwing an error by changing the classification + # of the the backlog from "burst" to "not burst" after 300s, + # while the worker is still processing it and staying below + # the CRITICAL threshold. + if expected_time_to_clear_backlog > MAX_SECONDS_TO_CLEAR_NORMAL[queue_name]: + if expected_time_to_clear_backlog > CRITICAL_SECONDS_TO_CLEAR_NORMAL[queue_name]: + status = CRITICAL + else: + status = WARNING + + return dict(status=status, + name=queue_name, + message='clearing the backlog will take too long: %ss, size: %s' % ( + expected_time_to_clear_backlog, current_size)) + else: + # We slept recently, so treat this as a burst. + if expected_time_to_clear_backlog > MAX_SECONDS_TO_CLEAR_FOR_BURSTS[queue_name]: + if expected_time_to_clear_backlog > CRITICAL_SECONDS_TO_CLEAR_FOR_BURSTS[queue_name]: + status = CRITICAL + else: + status = WARNING + + return dict(status=status, + name=queue_name, + message='clearing the burst will take too long: %ss, size: %s' % ( + expected_time_to_clear_backlog, current_size)) + + return dict(status=OK, + name=queue_name, + message='') + +WARN_COUNT_THRESHOLD_DEFAULT = 10 +CRITICAL_COUNT_THRESHOLD_DEFAULT = 50 +def check_other_queues(queue_counts_dict: Dict[str, int]) -> List[Dict[str, Any]]: + """ Do a simple queue size check for queues whose workers don't publish stats files.""" + + results = [] + for queue, count in queue_counts_dict.items(): + if queue in normal_queues: + continue + + if count > CRITICAL_COUNT_THRESHOLD_DEFAULT: + results.append(dict(status=CRITICAL, name=queue, + message='count critical: %s' % (count,))) + elif count > WARN_COUNT_THRESHOLD_DEFAULT: + results.append(dict(status=WARNING, name=queue, + message='count warning: %s' % (count,))) + else: + results.append(dict(status=OK, name=queue, message='')) + + return results + +def check_rabbitmq_queues() -> None: + pattern = re.compile(r'(\w+)\t(\d+)') + if 'USER' in os.environ and not os.environ['USER'] in ['root', 'rabbitmq']: + print("This script must be run as the root or rabbitmq user") + + list_queues_output = subprocess.check_output(['/usr/sbin/rabbitmqctl', 'list_queues'], + universal_newlines=True) + list_consumers_output = subprocess.check_output(['/usr/sbin/rabbitmqctl', 'list_consumers'], + universal_newlines=True) + + queue_counts_rabbitmqctl = dict() + for line in list_queues_output.split("\n"): + line = line.strip() + m = pattern.match(line) + if m: + queue = m.group(1) + count = int(m.group(2)) + queue_counts_rabbitmqctl[queue] = count + + queues_with_consumers = [] + for line in list_consumers_output.split('\n'): + parts = line.split('\t') + if len(parts) >= 2: + queue_name = parts[0] + if queue_name.startswith("notify_tornado"): + continue + queues_with_consumers.append(queue_name) + + queue_stats_dir = subprocess.check_output([os.path.join(ZULIP_PATH, 'scripts/get-django-setting'), + 'QUEUE_STATS_DIR'], + universal_newlines=True).strip() + queue_stats = dict() # type: Dict[str, Dict[str, Any]] + queues_to_check = set(normal_queues).intersection(set(queues_with_consumers)) + for queue in queues_to_check: + fn = queue + ".stats" + file_path = os.path.join(queue_stats_dir, fn) + if not os.path.exists(file_path): + queue_stats[queue] = {} + continue + + with open(file_path, "r") as f: + try: + queue_stats[queue] = json.load(f) + except json.decoder.JSONDecodeError: + queue_stats[queue] = {} + + results = [] + for queue_name, stats in queue_stats.items(): + results.append(analyze_queue_stats(queue_name, stats, queue_counts_rabbitmqctl[queue])) + + results.extend(check_other_queues(queue_counts_rabbitmqctl)) + + status = max(result['status'] for result in results) + + now = int(time.time()) + + if status > 0: + queue_error_template = "queue {} problem: {}:{}" + error_message = '; '.join([ + queue_error_template.format(result['name'], states[result['status']], result['message']) + for result in results if result['status'] > 0 + ]) + print("%s|%s|%s|%s" % (now, status, states[status], error_message)) + else: + print("%s|%s|%s|queues normal" % (now, status, states[status])) diff --git a/scripts/nagios/check-rabbitmq-queue b/scripts/nagios/check-rabbitmq-queue index 1e2272eec4..39bb8452fa 100755 --- a/scripts/nagios/check-rabbitmq-queue +++ b/scripts/nagios/check-rabbitmq-queue @@ -1,80 +1,12 @@ #!/usr/bin/env python3 -import re -import time import os -import subprocess +import sys -# The WARN_THRESHOLD/CRIT_THRESHOLD settings makes it possible to -# configure specific queues to have a higher or lower limit then the -# default. -WARN_THRESHOLD_DEFAULT = 10 -WARN_THRESHOLD = { - 'missedmessage_emails': WARN_THRESHOLD_DEFAULT, - # The user_activity worker has high throughput and uses a - # LoopQueueProcessingWorker, so it's normal to have a moderate - # backlog. - 'user_activity': 1000, -} -CRIT_THRESHOLD_DEFAULT = 50 -CRIT_THRESHOLD = { - 'missedmessage_emails': CRIT_THRESHOLD_DEFAULT, - # A backlog of hundreds of events for user_activity likely - # indicates an outage of the processor. - 'user_activity': 5000, -} +ZULIP_PATH = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +sys.path.append(ZULIP_PATH) -states = { - 0: "OK", - 1: "WARNING", - 2: "CRITICAL", - 3: "UNKNOWN" -} +from scripts.lib.check_rabbitmq_queue import check_rabbitmq_queues -pattern = re.compile(r'(\w+)\t(\d+)') -if 'USER' in os.environ and not os.environ['USER'] in ['root', 'rabbitmq']: - print("This script must be run as the root or rabbitmq user") - -output = subprocess.check_output(['/usr/sbin/rabbitmqctl', 'list_queues'], universal_newlines=True) - -status = 0 -max_count = 0 -warn_queues = [] - -for line in output.split("\n"): - line = line.strip() - m = pattern.match(line) - if m: - queue = m.group(1) - count = int(m.group(2)) - this_status = 0 - if count > CRIT_THRESHOLD.get(queue, CRIT_THRESHOLD_DEFAULT): - this_status = 2 - warn_queues.append(queue) - elif count > WARN_THRESHOLD.get(queue, WARN_THRESHOLD_DEFAULT): - this_status = max(status, 1) - warn_queues.append(queue) - - status = max(status, this_status) - max_count = max(max_count, count) - -warn_about = ", ".join(warn_queues) -now = int(time.time()) -now_struct = time.gmtime(now) - -# While we are sending digest emails, at 1800 hrs (UTC) each weekday, the mail -# queues can get backed up; don't alert on those. Additionally, certain workers -# (slow_queries and digest_emails) have a polling algorithm that means it's -# normal for them to accumulate items. -if not set(warn_queues) - set(("digest_emails", "slow_queries")) and \ - now_struct.tm_hour == 18 and now_struct.tm_min < 25: - status = 0 - print("%s|%s|%s|processing digests, not alerting on elevated mail queues" % ( - now, status, states[status])) - exit(0) - -if status > 0: - print("%s|%s|%s|max count %s, queues affected: %s" % ( - now, status, states[status], max_count, warn_about)) -else: - print("%s|%s|%s|queues normal, max count %s" % (now, status, states[status], max_count)) +if __name__ == "__main__": + check_rabbitmq_queues() diff --git a/tools/tests/test_check_rabbitmq_queue.py b/tools/tests/test_check_rabbitmq_queue.py new file mode 100644 index 0000000000..0810aa8752 --- /dev/null +++ b/tools/tests/test_check_rabbitmq_queue.py @@ -0,0 +1,120 @@ +import mock +from unittest import TestCase + +from scripts.lib.check_rabbitmq_queue import ( + analyze_queue_stats, + OK, + WARNING, + CRITICAL, + UNKNOWN, +) +import time + +class AnalyzeQueueStatsTests(TestCase): + def test_no_stats_available(self) -> None: + result = analyze_queue_stats('name', {}, 0) + self.assertEqual(result['status'], UNKNOWN) + + def test_queue_stuck(self) -> None: + """Last update > 5 minutes ago and there's events in the queue. + + In theory, we could be having bad luck with a race where in + the last (event_handing_time * 50) a burst was added, but it's + unlikely and shouldn't fail 2 in a row for Nagios anyway. + """ + result = analyze_queue_stats('name', {'update_time': time.time() - 301}, 100) + self.assertEqual(result['status'], CRITICAL) + self.assertIn('queue appears to be stuck', result['message']) + + def test_queue_just_started(self) -> None: + """ + We just started processing a burst of events, and haven't processed enough + to log productivity statistics yet. + """ + result = analyze_queue_stats('name', {'update_time': time.time(), + 'current_queue_size': 10000, + 'recent_average_consume_time': None}, 10000) + self.assertEqual(result['status'], OK) + + def test_queue_normal(self) -> None: + """10000 events and each takes a second => it'll take a long time to empty.""" + result = analyze_queue_stats('name', {'update_time': time.time(), + 'current_queue_size': 10000, + 'queue_last_emptied_timestamp': time.time() - 10000, + 'recent_average_consume_time': 1}, 10000) + self.assertEqual(result['status'], CRITICAL) + self.assertIn('clearing the backlog', result['message']) + + # If we're doing 10K/sec, it's OK. + result = analyze_queue_stats('name', {'update_time': time.time(), + 'current_queue_size': 10000, + 'queue_last_emptied_timestamp': time.time() - 10000, + 'recent_average_consume_time': 0.0001}, 10000) + self.assertEqual(result['status'], OK) + + # Verify logic around whether it'll take MAX_SECONDS_TO_CLEAR_NORMAL to clear queue. + with mock.patch.dict('scripts.lib.check_rabbitmq_queue.MAX_SECONDS_TO_CLEAR_NORMAL', + {'name': 10}): + result = analyze_queue_stats('name', {'update_time': time.time(), + 'current_queue_size': 11, + 'queue_last_emptied_timestamp': time.time() - 10000, + 'recent_average_consume_time': 1}, 11) + self.assertEqual(result['status'], WARNING) + self.assertIn('clearing the backlog', result['message']) + + result = analyze_queue_stats('name', {'update_time': time.time(), + 'current_queue_size': 9, + 'queue_last_emptied_timestamp': time.time() - 10000, + 'recent_average_consume_time': 1}, 9) + self.assertEqual(result['status'], OK) + + def test_queue_burst(self) -> None: + """Test logic for just after a large number of events were added + to an empty queue. Happens routinely for digest emails, for example.""" + result = analyze_queue_stats('name', {'update_time': time.time(), + 'current_queue_size': 10000, + 'queue_last_emptied_timestamp': time.time() - 1, + 'recent_average_consume_time': 1}, 10000) + self.assertEqual(result['status'], CRITICAL) + self.assertIn('clearing the burst', result['message']) + + # verify logic around MAX_SECONDS_TO_CLEAR_FOR_BURSTS. + with mock.patch.dict('scripts.lib.check_rabbitmq_queue.MAX_SECONDS_TO_CLEAR_FOR_BURSTS', + {'name': 10}): + result = analyze_queue_stats('name', {'update_time': time.time(), + 'current_queue_size': 11, + 'queue_last_emptied_timestamp': time.time() - 1, + 'recent_average_consume_time': 1}, 11) + self.assertEqual(result['status'], WARNING) + self.assertIn('clearing the burst', result['message']) + + result = analyze_queue_stats('name', {'update_time': time.time(), + 'current_queue_size': 9, + 'queue_last_emptied_timestamp': time.time() - 1, + 'recent_average_consume_time': 1}, 9) + self.assertEqual(result['status'], OK) + + def test_queue_burst_long_time_to_clear_allowed(self) -> None: + """ + For a queue that is allowed > 300s to clear a burst of events, + we need to verify that the checker will not stop categorizing this as a burst + while the worker is still processing the events, within the allowed time limit. + """ + start_time = time.time() + with mock.patch.dict('scripts.lib.check_rabbitmq_queue.CRITICAL_SECONDS_TO_CLEAR_FOR_BURSTS', + {'name': 600}), \ + mock.patch.dict('scripts.lib.check_rabbitmq_queue.MAX_SECONDS_TO_CLEAR_FOR_BURSTS', + {'name': 600}): + with mock.patch('time.time', return_value=start_time + 599): + result = analyze_queue_stats('name', {'update_time': time.time(), + 'current_queue_size': 599, + 'queue_last_emptied_timestamp': start_time, + 'recent_average_consume_time': 1}, 599) + self.assertEqual(result['status'], OK) + + with mock.patch('time.time', return_value=start_time + 601): + result = analyze_queue_stats('name', {'update_time': time.time(), + 'current_queue_size': 599, + 'queue_last_emptied_timestamp': start_time, + 'recent_average_consume_time': 1}, 599) + self.assertEqual(result['status'], CRITICAL)