#!/usr/bin/env python3 import argparse import gzip import logging import os import re import signal import sys from enum import Enum, auto from typing import List, Match, Set, TextIO, Tuple ZULIP_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(ZULIP_PATH) from scripts.lib.setup_path import setup_path setup_path() os.environ["DJANGO_SETTINGS_MODULE"] = "zproject.settings" from typing import Protocol from django.conf import settings from scripts.lib.zulip_tools import BOLD, CYAN, ENDC, FAIL, GRAY, OKBLUE def parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Search logfiles, ignoring commonly-fetched URLs.") log_selection = parser.add_argument_group("File selection") log_selection_options = log_selection.add_mutually_exclusive_group() log_selection_options.add_argument( "--log-files", "-n", help="Number of log files to search", choices=range(1, 16), type=int, ) log_selection_options.add_argument( "--all-logs", "-A", help="Parse all logfiles, not just most recent", action="store_true", ) log_selection_options.add_argument( "--min-hours", "-H", help="Estimated minimum number of hours; includes previous log file, if estimated less than this", type=int, choices=range(0, 24), default=3, ) log_selection.add_argument( "--nginx", "-N", help="Parse from NGINX logs, not server.log", action="store_true", ) filtering = parser.add_argument_group("Filtering") filtering.add_argument( "filter_terms", help="IP address, hostname, user-id, path, or status code to search for; multiple are AND'ed together", nargs="+", ) filtering.add_argument( "--all-lines", "-L", help="Show all matching lines; equivalent to -suemtpr", action="store_true", ) filtering.add_argument("--static", "-s", help="Include static file paths", action="store_true") filtering.add_argument("--uploads", "-u", help="Include file upload paths", action="store_true") filtering.add_argument("--avatars", "-a", help="Include avatar paths", action="store_true") filtering.add_argument("--events", "-e", help="Include event fetch paths", action="store_true") filtering.add_argument("--messages", "-m", help="Include message paths", action="store_true") filtering.add_argument( "--typing", "-t", help="Include typing notification path", action="store_true", ) filtering.add_argument("--presence", "-p", help="Include presence paths", action="store_true") filtering.add_argument( "--report", "-r", help="Include timing report paths", action="store_true" ) filtering.add_argument( "--no-other", "-O", help="Exclude paths not explicitly included", action="store_true" ) output = parser.add_argument_group("Output") output.add_argument("--full-line", "-F", help="Show full matching line", action="store_true") return parser def maybe_gzip(logfile_name: str) -> TextIO: if logfile_name.endswith(".gz"): return gzip.open(logfile_name, "rt") return open(logfile_name) NGINX_LOG_LINE_RE = re.compile( r""" (?P \S+ ) \s+ - \s+ (?P \S+ ) \s+ \[ (?P \d+/\w+/\d+ ) : (?P