#!/usr/bin/env python3 import argparse import calendar import gzip import logging import os import re import signal import sys from datetime import datetime, timedelta from enum import Enum, auto from typing import List, Match, Optional, Set, TextIO, Tuple ZULIP_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(ZULIP_PATH) from scripts.lib.setup_path import setup_path setup_path() os.environ["DJANGO_SETTINGS_MODULE"] = "zproject.settings" from typing import Protocol from django.conf import settings from scripts.lib.zulip_tools import ( BOLD, CYAN, ENDC, FAIL, GRAY, OKBLUE, get_config, get_config_file, ) def parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Search logfiles, ignoring commonly-fetched URLs.") log_selection = parser.add_argument_group("File selection") log_selection_options = log_selection.add_mutually_exclusive_group() access_log_retention_days = int( get_config(get_config_file(), "application_server", "access_log_retention_days", "14") ) log_selection_options.add_argument( "--log-files", "-n", help="Number of log files to search", choices=range(1, access_log_retention_days + 2), metavar=f"[1-{access_log_retention_days+1}]", type=int, ) log_selection_options.add_argument( "--all-logs", "-A", help="Parse all logfiles, not just most recent", action="store_true", ) log_selection_options.add_argument( "--min-hours", "-H", help="Estimated minimum number of hours; includes previous log file, if estimated less than this", type=int, choices=range(24), default=3, ) log_selection.add_argument( "--nginx", "-N", help="Parse from NGINX logs, not server.log", action="store_true", ) filtering = parser.add_argument_group("Filtering") filtering.add_argument( "filter_terms", help="IP address, hostname, user-id, HTTP method, path, or status code to search for; multiple are AND'ed together", nargs="+", ) filtering.add_argument( "--all-lines", "-L", help="Show all matching lines; equivalent to -suemtpr", action="store_true", ) filtering.add_argument("--static", "-s", help="Include static file paths", action="store_true") filtering.add_argument("--uploads", "-u", help="Include file upload paths", action="store_true") filtering.add_argument("--avatars", "-a", help="Include avatar paths", action="store_true") filtering.add_argument("--events", "-e", help="Include event fetch paths", action="store_true") filtering.add_argument("--messages", "-m", help="Include message paths", action="store_true") filtering.add_argument( "--typing", "-t", help="Include typing notification path", action="store_true", ) filtering.add_argument("--presence", "-p", help="Include presence paths", action="store_true") filtering.add_argument( "--report", "-r", help="Include Sentry reporting paths", action="store_true" ) filtering.add_argument( "--no-other", "-O", help="Exclude paths not explicitly included", action="store_true" ) filtering.add_argument( "--client", "--user-agent", "-C", help="Only include requests whose client/user-agent contains this string", ) output = parser.add_argument_group("Output") output.add_argument("--full-line", "-F", help="Show full matching line", action="store_true") output.add_argument("--timeline", "-T", help="Show start, end, and gaps", action="store_true") return parser def maybe_gzip(logfile_name: str) -> TextIO: if logfile_name.endswith(".gz"): return gzip.open(logfile_name, "rt") return open(logfile_name) # noqa: SIM115 NGINX_LOG_LINE_RE = re.compile( r""" (?P \S+ ) \s+ - \s+ (?P \S+ ) \s+ \[ (?P \d+/\w+/\d+ ) : (?P