#!/usr/bin/env python from __future__ import print_function from __future__ import absolute_import import sys import time import optparse import os import random import logging import subprocess import hashlib from six.moves import range if False: from typing import Any, Dict, List, Set, Tuple parser = optparse.OptionParser() parser.add_option('--verbose', dest='verbose', default=False, action='store_true') parser.add_option('--site', dest='site', default="https://api.zulip.com", action='store') parser.add_option('--sharded', default=False, action='store_true') parser.add_option('--root-path', dest='root_path', default="/home/zulip", action='store') (options, args) = parser.parse_args() # The 'api' directory needs to go first, so that 'import zulip' won't pick up # some other directory named 'zulip'. pyzephyr_lib_path = "python-zephyr/build/lib.linux-%s-%s/" % (os.uname()[4], sys.version[0:3]) sys.path[:0] = [os.path.join(options.root_path, "api/"), os.path.join(options.root_path, "python-zephyr"), os.path.join(options.root_path, pyzephyr_lib_path), options.root_path] mit_user = 'tabbott/extra@ATHENA.MIT.EDU' zulip_user = 'tabbott/extra@mit.edu' sys.path.append(".") import zulip zulip_client = zulip.Client( email=zulip_user, api_key="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", verbose=True, client="ZulipMonitoring/0.1", site=options.site) # Configure logging log_file = "/var/log/zulip/check-mirroring-log" log_format = "%(asctime)s: %(message)s" logging.basicConfig(format=log_format) formatter = logging.Formatter(log_format) file_handler = logging.FileHandler(log_file) file_handler.setFormatter(formatter) logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) logger.addHandler(file_handler) # Initialize list of streams to test if options.sharded: # NOTE: Streams in this list must be in zulip_user's Zulip # subscriptions, or we won't receive messages via Zulip. # The sharded stream list has a bunch of pairs # (stream, shard_name), where sha1sum(stream).startswith(shard_name) test_streams = [ ("message", "p"), ("tabbott-nagios-test-32", "0"), ("tabbott-nagios-test-33", "1"), ("tabbott-nagios-test-2", "2"), ("tabbott-nagios-test-5", "3"), ("tabbott-nagios-test-13", "4"), ("tabbott-nagios-test-7", "5"), ("tabbott-nagios-test-22", "6"), ("tabbott-nagios-test-35", "7"), ("tabbott-nagios-test-4", "8"), ("tabbott-nagios-test-3", "9"), ("tabbott-nagios-test-1", "a"), ("tabbott-nagios-test-49", "b"), ("tabbott-nagios-test-34", "c"), ("tabbott-nagios-test-12", "d"), ("tabbott-nagios-test-11", "e"), ("tabbott-nagios-test-9", "f"), ] for (stream, test) in test_streams: if stream == "message": continue assert(hashlib.sha1(stream.encode("utf-8")).hexdigest().startswith(test)) else: test_streams = [ ("message", "p"), ("tabbott-nagios-test", "a"), ] def print_status_and_exit(status): # type: (int) -> None # The output of this script is used by Nagios. Various outputs, # e.g. true success and punting due to a SERVNAK, result in a # non-alert case, so to give us something unambiguous to check in # Nagios, print the exit status. print(status) sys.exit(status) def send_zulip(message): # type: (Dict[str, str]) -> None result = zulip_client.send_message(message) if result["result"] != "success": logger.error("Error sending zulip, args were:") logger.error(str(message)) logger.error(result) print_status_and_exit(1) # Returns True if and only if we "Detected server failure" sending the zephyr. def send_zephyr(zwrite_args, content): # type: (List[str], str) -> bool p = subprocess.Popen(zwrite_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate(input=content.encode("utf-8")) if p.returncode != 0: if "Detected server failure while receiving acknowledgement for" in stdout: logger.warning("Got server failure error sending zephyr; retrying") logger.warning(stderr) return True logger.error("Error sending zephyr:") logger.info(stdout) logger.error(stderr) print_status_and_exit(1) return False # Subscribe to Zulip try: res = zulip_client.register(event_types=["message"]) if 'error' in res.get('result'): logging.error("Error subscribing to Zulips!") logging.error(res['msg']) print_status_and_exit(1) queue_id, last_event_id = (res['queue_id'], res['last_event_id']) except Exception: logger.exception("Unexpected error subscribing to Zulips") print_status_and_exit(1) # Subscribe to Zephyrs import zephyr zephyr_subs_to_add = [] for (stream, test) in test_streams: if stream == "message": zephyr_subs_to_add.append((stream, 'personal', mit_user)) else: zephyr_subs_to_add.append((stream, '*', '*')) actually_subscribed = False for tries in range(10): try: zephyr.init() zephyr._z.subAll(zephyr_subs_to_add) zephyr_subs = zephyr._z.getSubscriptions() missing = 0 for elt in zephyr_subs_to_add: if elt not in zephyr_subs: logging.error("Failed to subscribe to %s" % (elt,)) missing += 1 if missing == 0: actually_subscribed = True break except IOError as e: if "SERVNAK received" in e: # type: ignore # https://github.com/python/mypy/issues/2118 logger.error("SERVNAK repeatedly received, punting rest of test") else: logger.exception("Exception subscribing to zephyrs") if not actually_subscribed: logger.error("Failed to subscribe to zephyrs") print_status_and_exit(1) # Prepare keys zhkeys = {} # type: Dict[str, Tuple[str, str]] hzkeys = {} # type: Dict[str, Tuple[str, str]] def gen_key(key_dict): # type: (Dict[str, Any]) -> str bits = str(random.getrandbits(32)) while bits in key_dict: # Avoid the unlikely event that we get the same bits twice bits = str(random.getrandbits(32)) return bits def gen_keys(key_dict): # type: (Dict[str, Tuple[str, str]]) -> None for (stream, test) in test_streams: key_dict[gen_key(key_dict)] = (stream, test) gen_keys(zhkeys) gen_keys(hzkeys) notices = [] # We check for new zephyrs multiple times, to avoid filling the zephyr # receive queue with 30+ messages, which might result in messages # being dropped. def receive_zephyrs(): # type: () -> None while True: try: notice = zephyr.receive(block=False) except Exception: logging.exception("Exception receiving zephyrs:") notice = None if notice is None: break if notice.opcode != "": continue notices.append(notice) logger.info("Starting sending messages!") # Send zephyrs zsig = "Timothy Good Abbott" for key, (stream, test) in zhkeys.items(): if stream == "message": zwrite_args = ["zwrite", "-n", "-s", zsig, mit_user] else: zwrite_args = ["zwrite", "-n", "-s", zsig, "-c", stream, "-i", "test"] server_failure = send_zephyr(zwrite_args, str(key)) if server_failure: # Replace the key we're not sure was delivered with a new key value = zhkeys.pop(key) new_key = gen_key(zhkeys) zhkeys[new_key] = value server_failure_again = send_zephyr(zwrite_args, str(new_key)) if server_failure_again: logging.error("Zephyr server failure twice in a row on keys %s and %s! Aborting." % (key, new_key)) print_status_and_exit(1) else: logging.warning("Replaced key %s with %s due to Zephyr server failure." % (key, new_key)) receive_zephyrs() receive_zephyrs() logger.info("Sent Zephyr messages!") # Send Zulips for key, (stream, test) in hzkeys.items(): if stream == "message": send_zulip({ "type": "private", "content": str(key), "to": zulip_user, }) else: send_zulip({ "type": "stream", "subject": "test", "content": str(key), "to": stream, }) receive_zephyrs() logger.info("Sent Zulip messages!") # Normally messages manage to forward through in under 3 seconds, but # sleep 10 to give a safe margin since the messages do need to do 2 # round trips. This alert is for correctness, not performance, and so # we want it to reliably alert only when messages aren't being # delivered at all. time.sleep(10) receive_zephyrs() logger.info("Starting receiving messages!") # receive zulips res = zulip_client.get_events(queue_id=queue_id, last_event_id=last_event_id) if 'error' in res.get('result'): logging.error("Error subscribing to Zulips!") logging.error(res['msg']) print_status_and_exit(1) messages = [event['message'] for event in res['events']] logger.info("Finished receiving Zulip messages!") receive_zephyrs() logger.info("Finished receiving Zephyr messages!") all_keys = set(list(zhkeys.keys()) + list(hzkeys.keys())) def process_keys(content_list): # type: (List[str]) -> Tuple[Dict[str, int], Set[str], Set[str], bool, bool] # Start by filtering out any keys that might have come from # concurrent check-mirroring processes content_keys = [key for key in content_list if key in all_keys] key_counts = {} # type: Dict[str, int] for key in all_keys: key_counts[key] = 0 for key in content_keys: key_counts[key] += 1 z_missing = set(key for key in zhkeys.keys() if key_counts[key] == 0) h_missing = set(key for key in hzkeys.keys() if key_counts[key] == 0) duplicates = any(val > 1 for val in key_counts.values()) success = all(val == 1 for val in key_counts.values()) return key_counts, z_missing, h_missing, duplicates, success # The h_foo variables are about the messages we _received_ in Zulip # The z_foo variables are about the messages we _received_ in Zephyr h_contents = [message["content"] for message in messages] z_contents = [notice.message.split('\0')[1] for notice in notices] (h_key_counts, h_missing_z, h_missing_h, h_duplicates, h_success) = process_keys(h_contents) (z_key_counts, z_missing_z, z_missing_h, z_duplicates, z_success) = process_keys(z_contents) if z_success and h_success: logger.info("Success!") print_status_and_exit(0) elif z_success: logger.info("Received everything correctly in Zephyr!") elif h_success: logger.info("Received everything correctly in Zulip!") logger.error("Messages received the wrong number of times:") for key in all_keys: if z_key_counts[key] == 1 and h_key_counts[key] == 1: continue if key in zhkeys: (stream, test) = zhkeys[key] logger.warning("%10s: z got %s, h got %s. Sent via Zephyr(%s): class %s" % \ (key, z_key_counts[key], h_key_counts[key], test, stream)) if key in hzkeys: (stream, test) = hzkeys[key] logger.warning("%10s: z got %s. h got %s. Sent via Zulip(%s): class %s" % \ (key, z_key_counts[key], h_key_counts[key], test, stream)) logger.error("") logger.error("Summary of specific problems:") if h_duplicates: logger.error("zulip: Received duplicate messages!") logger.error("zulip: This is probably a bug in our message loop detection.") logger.error("zulip: where Zulips go zulip=>zephyr=>zulip") if z_duplicates: logger.error("zephyr: Received duplicate messages!") logger.error("zephyr: This is probably a bug in our message loop detection.") logger.error("zephyr: where Zephyrs go zephyr=>zulip=>zephyr") if z_missing_z: logger.error("zephyr: Didn't receive all the Zephyrs we sent on the Zephyr end!") logger.error("zephyr: This is probably an issue with check-mirroring sending or receiving Zephyrs.") if h_missing_h: logger.error("zulip: Didn't receive all the Zulips we sent on the Zulip end!") logger.error("zulip: This is probably an issue with check-mirroring sending or receiving Zulips.") if z_missing_h: logger.error("zephyr: Didn't receive all the Zulips we sent on the Zephyr end!") if z_missing_h == h_missing_h: logger.error("zephyr: Including some Zulips that we did receive on the Zulip end.") logger.error("zephyr: This suggests we have a zulip=>zephyr mirroring problem.") logger.error("zephyr: aka the personals mirroring script has issues.") if h_missing_z: logger.error("zulip: Didn't receive all the Zephyrs we sent on the Zulip end!") if h_missing_z == z_missing_z: logger.error("zulip: Including some Zephyrs that we did receive on the Zephyr end.") logger.error("zulip: This suggests we have a zephyr=>zulip mirroring problem.") logger.error("zulip: aka the global class mirroring script has issues.") zulip_client.deregister(queue_id) print_status_and_exit(1)