zulip/scripts/lib/check_rabbitmq_queue.py

213 lines
7.8 KiB
Python

import json
import os
import re
import subprocess
import time
from collections import defaultdict
from typing import Any, DefaultDict, Dict, List
ZULIP_PATH = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
normal_queues = [
'deferred_work',
'digest_emails',
'email_mirror',
'embed_links',
'embedded_bots',
'error_reports',
'invites',
'email_senders',
'missedmessage_emails',
'missedmessage_mobile_notifications',
'outgoing_webhooks',
'signups',
'user_activity',
'user_activity_interval',
'user_presence',
]
OK = 0
WARNING = 1
CRITICAL = 2
UNKNOWN = 3
states = {
0: "OK",
1: "WARNING",
2: "CRITICAL",
3: "UNKNOWN",
}
MAX_SECONDS_TO_CLEAR_FOR_BURSTS: DefaultDict[str, int] = defaultdict(
lambda: 120,
digest_emails=600,
)
MAX_SECONDS_TO_CLEAR_NORMAL: DefaultDict[str, int] = defaultdict(
lambda: 30,
digest_emails=1200,
missedmessage_mobile_notifications=120,
)
CRITICAL_SECONDS_TO_CLEAR_FOR_BURSTS: DefaultDict[str, int] = defaultdict(
lambda: 240,
digest_emails=1200,
)
CRITICAL_SECONDS_TO_CLEAR_NORMAL: DefaultDict[str, int] = defaultdict(
lambda: 60,
missedmessage_mobile_notifications=180,
digest_emails=600,
)
def analyze_queue_stats(queue_name: str, stats: Dict[str, Any],
queue_count_rabbitmqctl: int) -> Dict[str, Any]:
now = int(time.time())
if stats == {}:
return dict(status=UNKNOWN,
name=queue_name,
message='invalid or no stats data')
if now - stats['update_time'] > 180 and queue_count_rabbitmqctl > 10:
# Queue isn't updating the stats file and has some events in
# the backlog, it's likely stuck.
#
# TODO: There's an unfortunate race where if the queue has
# been empty for the last hour (because there haven't been 50
# new events in the last hour), and then gets a burst, this
# condition will be true for the first (event_handling_time *
# 50).
return dict(status=CRITICAL,
name=queue_name,
message='queue appears to be stuck, last update {}, queue size {}'.format(
stats['update_time'], queue_count_rabbitmqctl))
current_size = stats['current_queue_size']
average_consume_time = stats['recent_average_consume_time']
if average_consume_time is None:
# Queue just started; we can't effectively estimate anything.
#
# If the queue is stuck in this state and not processing
# anything, eventually the `update_time` rule above will fire.
return dict(status=OK,
name=queue_name,
message='')
expected_time_to_clear_backlog = current_size * average_consume_time
time_since_emptied = now - stats['queue_last_emptied_timestamp']
if time_since_emptied > max(300, CRITICAL_SECONDS_TO_CLEAR_FOR_BURSTS[queue_name]):
# We need the max() expression in case the rules for the queue
# permit longer processing times than 300s - to prevent
# incorrectly throwing an error by changing the classification
# of the the backlog from "burst" to "not burst" after 300s,
# while the worker is still processing it and staying below
# the CRITICAL threshold.
if expected_time_to_clear_backlog > MAX_SECONDS_TO_CLEAR_NORMAL[queue_name]:
if expected_time_to_clear_backlog > CRITICAL_SECONDS_TO_CLEAR_NORMAL[queue_name]:
status = CRITICAL
else:
status = WARNING
return dict(status=status,
name=queue_name,
message=f'clearing the backlog will take too long: {expected_time_to_clear_backlog}s, size: {current_size}')
else:
# We slept recently, so treat this as a burst.
if expected_time_to_clear_backlog > MAX_SECONDS_TO_CLEAR_FOR_BURSTS[queue_name]:
if expected_time_to_clear_backlog > CRITICAL_SECONDS_TO_CLEAR_FOR_BURSTS[queue_name]:
status = CRITICAL
else:
status = WARNING
return dict(status=status,
name=queue_name,
message=f'clearing the burst will take too long: {expected_time_to_clear_backlog}s, size: {current_size}')
return dict(status=OK,
name=queue_name,
message='')
WARN_COUNT_THRESHOLD_DEFAULT = 10
CRITICAL_COUNT_THRESHOLD_DEFAULT = 50
def check_other_queues(queue_counts_dict: Dict[str, int]) -> List[Dict[str, Any]]:
""" Do a simple queue size check for queues whose workers don't publish stats files."""
results = []
for queue, count in queue_counts_dict.items():
if queue in normal_queues:
continue
if count > CRITICAL_COUNT_THRESHOLD_DEFAULT:
results.append(dict(status=CRITICAL, name=queue,
message=f'count critical: {count}'))
elif count > WARN_COUNT_THRESHOLD_DEFAULT:
results.append(dict(status=WARNING, name=queue,
message=f'count warning: {count}'))
else:
results.append(dict(status=OK, name=queue, message=''))
return results
def check_rabbitmq_queues() -> None:
pattern = re.compile(r'(\w+)\t(\d+)')
if 'USER' in os.environ and not os.environ['USER'] in ['root', 'rabbitmq']:
print("This script must be run as the root or rabbitmq user")
list_queues_output = subprocess.check_output(['/usr/sbin/rabbitmqctl', 'list_queues'],
universal_newlines=True)
list_consumers_output = subprocess.check_output(['/usr/sbin/rabbitmqctl', 'list_consumers'],
universal_newlines=True)
queue_counts_rabbitmqctl = dict()
for line in list_queues_output.split("\n"):
line = line.strip()
m = pattern.match(line)
if m:
queue = m.group(1)
count = int(m.group(2))
queue_counts_rabbitmqctl[queue] = count
queues_with_consumers = []
for line in list_consumers_output.split('\n'):
parts = line.split('\t')
if len(parts) >= 2:
queue_name = parts[0]
if queue_name.startswith("notify_tornado"):
continue
queues_with_consumers.append(queue_name)
queue_stats_dir = subprocess.check_output([os.path.join(ZULIP_PATH, 'scripts/get-django-setting'),
'QUEUE_STATS_DIR'],
universal_newlines=True).strip()
queue_stats: Dict[str, Dict[str, Any]] = dict()
queues_to_check = set(normal_queues).intersection(set(queues_with_consumers))
for queue in queues_to_check:
fn = queue + ".stats"
file_path = os.path.join(queue_stats_dir, fn)
if not os.path.exists(file_path):
queue_stats[queue] = {}
continue
with open(file_path) as f:
try:
queue_stats[queue] = json.load(f)
except json.decoder.JSONDecodeError:
queue_stats[queue] = {}
results = []
for queue_name, stats in queue_stats.items():
results.append(analyze_queue_stats(queue_name, stats, queue_counts_rabbitmqctl[queue_name]))
results.extend(check_other_queues(queue_counts_rabbitmqctl))
status = max(result['status'] for result in results)
now = int(time.time())
if status > 0:
queue_error_template = "queue {} problem: {}:{}"
error_message = '; '.join([
queue_error_template.format(result['name'], states[result['status']], result['message'])
for result in results if result['status'] > 0
])
print(f"{now}|{status}|{states[status]}|{error_message}")
else:
print(f"{now}|{status}|{states[status]}|queues normal")