zulip/scripts/lib/check_rabbitmq_queue.py

209 lines
7.0 KiB
Python

import json
import os
import re
import subprocess
import sys
import time
from collections import defaultdict
from typing import Any
ZULIP_PATH = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(ZULIP_PATH)
from scripts.lib.zulip_tools import atomic_nagios_write, get_config, get_config_file
normal_queues = [
"deferred_work",
"digest_emails",
"email_mirror",
"email_senders",
"embed_links",
"embedded_bots",
"missedmessage_emails",
"missedmessage_mobile_notifications",
"outgoing_webhooks",
"user_activity",
"user_activity_interval",
"user_presence",
]
mobile_notification_shards = int(
get_config(get_config_file(), "application_server", "mobile_notification_shards", "1")
)
OK = 0
WARNING = 1
CRITICAL = 2
UNKNOWN = 3
states = {
0: "OK",
1: "WARNING",
2: "CRITICAL",
3: "UNKNOWN",
}
MAX_SECONDS_TO_CLEAR: defaultdict[str, int] = defaultdict(
lambda: 30,
deferred_work=600,
digest_emails=1200,
missedmessage_mobile_notifications=120,
embed_links=60,
)
CRITICAL_SECONDS_TO_CLEAR: defaultdict[str, int] = defaultdict(
lambda: 60,
deferred_work=900,
missedmessage_mobile_notifications=180,
digest_emails=1800,
embed_links=90,
)
def analyze_queue_stats(
queue_name: str, stats: dict[str, Any], queue_count_rabbitmqctl: int
) -> dict[str, Any]:
now = int(time.time())
if stats == {}:
return dict(status=UNKNOWN, name=queue_name, message="invalid or no stats data")
if now - stats["update_time"] > 180 and queue_count_rabbitmqctl > 10:
# Queue isn't updating the stats file and has some events in
# the backlog, it's likely stuck.
#
# TODO: There's an unlikely race condition here - if the queue
# was fully emptied and was idle due to no new events coming
# for over 180 seconds, suddenly gets a burst of events and
# this code runs exactly in the very small time window between
# those events popping up and the queue beginning to process
# the first one (which will refresh the stats file at the very
# start), we'll incorrectly return the CRITICAL status. The
# chance of that happening should be negligible because the queue
# worker should wake up immediately and log statistics before
# starting to process the first event.
return dict(
status=CRITICAL,
name=queue_name,
message="queue appears to be stuck, last update {}, queue size {}".format(
stats["update_time"], queue_count_rabbitmqctl
),
)
current_size = queue_count_rabbitmqctl
average_consume_time = stats["recent_average_consume_time"]
if average_consume_time is None:
# Queue just started; we can't effectively estimate anything.
#
# If the queue is stuck in this state and not processing
# anything, eventually the `update_time` rule above will fire.
return dict(status=OK, name=queue_name, message="")
expected_time_to_clear_backlog = current_size * average_consume_time
if expected_time_to_clear_backlog > MAX_SECONDS_TO_CLEAR[queue_name]:
if expected_time_to_clear_backlog > CRITICAL_SECONDS_TO_CLEAR[queue_name]:
status = CRITICAL
else:
status = WARNING
return dict(
status=status,
name=queue_name,
message=f"clearing the backlog will take too long: {expected_time_to_clear_backlog}s, size: {current_size}",
)
return dict(status=OK, name=queue_name, message="")
WARN_COUNT_THRESHOLD_DEFAULT = 10
CRITICAL_COUNT_THRESHOLD_DEFAULT = 50
def check_other_queues(queue_counts_dict: dict[str, int]) -> list[dict[str, Any]]:
"""Do a simple queue size check for queues whose workers don't publish stats files."""
results = []
for queue, count in queue_counts_dict.items():
if queue in normal_queues:
continue
if count > CRITICAL_COUNT_THRESHOLD_DEFAULT:
results.append(dict(status=CRITICAL, name=queue, message=f"count critical: {count}"))
elif count > WARN_COUNT_THRESHOLD_DEFAULT:
results.append(dict(status=WARNING, name=queue, message=f"count warning: {count}"))
else:
results.append(dict(status=OK, name=queue, message=""))
return results
def check_rabbitmq_queues() -> None:
pattern = re.compile(r"(\w+)\t(\d+)\t(\d+)")
if "USER" in os.environ and os.environ["USER"] not in ["root", "rabbitmq"]:
print("This script must be run as the root or rabbitmq user")
list_queues_output = subprocess.check_output(
["/usr/sbin/rabbitmqctl", "list_queues", "name", "messages", "consumers"],
text=True,
)
queue_counts_rabbitmqctl = {}
queues_with_consumers = []
for line in list_queues_output.split("\n"):
line = line.strip()
m = pattern.match(line)
if m:
queue = m.group(1)
count = int(m.group(2))
consumers = int(m.group(3))
queue_counts_rabbitmqctl[queue] = count
if consumers > 0 and not queue.startswith("notify_tornado"):
queues_with_consumers.append(queue)
queue_stats_dir = subprocess.check_output(
[os.path.join(ZULIP_PATH, "scripts/get-django-setting"), "QUEUE_STATS_DIR"],
text=True,
).strip()
queue_stats: dict[str, dict[str, Any]] = {}
check_queues = normal_queues
if mobile_notification_shards > 1:
check_queues += [
f"missedmessage_mobile_notifications_shard{d}"
for d in range(1, mobile_notification_shards + 1)
]
queues_to_check = set(check_queues).intersection(set(queues_with_consumers))
for queue in queues_to_check:
fn = queue + ".stats"
file_path = os.path.join(queue_stats_dir, fn)
if not os.path.exists(file_path):
queue_stats[queue] = {}
continue
with open(file_path) as f:
try:
queue_stats[queue] = json.load(f)
except json.decoder.JSONDecodeError:
queue_stats[queue] = {}
results = []
for queue_name, stats in queue_stats.items():
results.append(analyze_queue_stats(queue_name, stats, queue_counts_rabbitmqctl[queue_name]))
results.extend(check_other_queues(queue_counts_rabbitmqctl))
status = max(result["status"] for result in results)
if status > 0:
queue_error_template = "queue {} problem: {}:{}"
error_message = "; ".join(
queue_error_template.format(result["name"], states[result["status"]], result["message"])
for result in results
if result["status"] > 0
)
sys.exit(
atomic_nagios_write(
"check-rabbitmq-results",
"critical" if status == CRITICAL else "warning",
error_message,
)
)
else:
atomic_nagios_write("check-rabbitmq-results", "ok", "queues normal")