diff --git a/scripts/log-search b/scripts/log-search index 9e2085fe92..4d5f903a9c 100755 --- a/scripts/log-search +++ b/scripts/log-search @@ -8,7 +8,7 @@ import re import signal import sys from enum import Enum, auto -from typing import Callable, List, TextIO +from typing import Callable, List, TextIO, Tuple ZULIP_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(ZULIP_PATH) @@ -164,58 +164,7 @@ class FilterType(Enum): def main() -> None: args = parser().parse_args() - # The heuristics below are not intended to be precise -- they - # certainly count things as "IPv4" or "IPv6" addresses that are - # invalid. However, we expect the input here to already be - # reasonably well-formed. - filter = args.filter - - if re.match(r"[1-5][0-9][0-9]$", filter): - string_filter = lambda m: m["code"] == filter - filter_type = FilterType.STATUS - if not args.nginx and filter == "502": - logging.warning("Adding --nginx -- 502's do not appear in Django logs.") - args.nginx = True - elif re.match(r"[1-5]xx$", filter): - filter = filter[0] - string_filter = lambda m: m["code"].startswith(filter) - filter_type = FilterType.STATUS - elif re.match(r"\d+$", filter): - if args.nginx: - raise parser().error("Cannot parse user-ids with nginx logs; try without --nginx") - string_filter = lambda m: m["user_id"] == filter - filter_type = FilterType.USER_ID - elif re.match(r"\d{1,3}(\.\d{1,3}){3}$", filter): - string_filter = lambda m: m["ip"] == filter - filter_type = FilterType.CLIENT_IP - elif re.match(r"([a-f0-9:]+:+){1,7}[a-f0-9]+$", filter): - string_filter = lambda m: m["ip"] == filter - filter_type = FilterType.CLIENT_IP - elif re.match(r"[a-z0-9]([a-z0-9-]*[a-z0-9])?$", filter.lower()): - filter = filter.lower() - if args.nginx: - string_filter = lambda m: m["hostname"].startswith(filter + ".") - else: - string_filter = lambda m: m["hostname"] == filter - filter_type = FilterType.HOSTNAME - elif re.match(r"[a-z0-9-]+(\.[a-z0-9-]+)+$", filter.lower()) and re.search( - r"[a-z-]", filter.lower() - ): - if not args.nginx: - raise parser().error("Cannot parse full domains with Python logs; try --nginx") - filter = filter.lower() - string_filter = lambda m: m["hostname"] == filter - filter_type = FilterType.HOSTNAME - elif re.match(r"/\S*$", filter): - string_filter = lambda m: m["path"] == filter - filter_type = FilterType.PATH - args.all_lines = True - else: - raise RuntimeError( - f"Can't parse {filter} as an IP, hostname, user-id, path, or status code." - ) - assert filter_type is not None - + (filter_type, filter_func) = parse_filters(args) logfile_names = parse_logfile_names(args) try: @@ -224,7 +173,7 @@ def main() -> None: for logline in logfile: # As a performance optimization, just do a substring # check before we parse the line fully - if filter not in logline.lower(): + if args.filter not in logline.lower(): continue if args.nginx: @@ -236,7 +185,7 @@ def main() -> None: if args.nginx: print(f"! Failed to parse:\n{logline}", file=sys.stderr) continue - if passes_filters(string_filter, match, args): + if passes_filters(filter_func, match, args): print_line( match, args, @@ -277,6 +226,63 @@ def parse_logfile_names(args: argparse.Namespace) -> List[str]: return logfile_names +def parse_filters( + args: argparse.Namespace, +) -> Tuple[FilterType, Callable[[re.Match], bool]]: # type: ignore[type-arg] # Requires Python 3.9 + # The heuristics below are not intended to be precise -- they + # certainly count things as "IPv4" or "IPv6" addresses that are + # invalid. However, we expect the input here to already be + # reasonably well-formed. + + filter = args.filter + + if re.match(r"[1-5][0-9][0-9]$", filter): + filter_func = lambda m: m["code"] == filter + filter_type = FilterType.STATUS + if not args.nginx and filter == "502": + logging.warning("Adding --nginx -- 502's do not appear in Django logs.") + args.nginx = True + elif re.match(r"[1-5]xx$", filter): + filter = filter[0] + filter_func = lambda m: m["code"].startswith(filter) + filter_type = FilterType.STATUS + elif re.match(r"\d+$", filter): + if args.nginx: + raise parser().error("Cannot parse user-ids with nginx logs; try without --nginx") + filter_func = lambda m: m["user_id"] == filter + filter_type = FilterType.USER_ID + elif re.match(r"\d{1,3}(\.\d{1,3}){3}$", filter): + filter_func = lambda m: m["ip"] == filter + filter_type = FilterType.CLIENT_IP + elif re.match(r"([a-f0-9:]+:+){1,7}[a-f0-9]+$", filter): + filter_func = lambda m: m["ip"] == filter + filter_type = FilterType.CLIENT_IP + elif re.match(r"[a-z0-9]([a-z0-9-]*[a-z0-9])?$", filter.lower()): + filter = filter.lower() + if args.nginx: + filter_func = lambda m: m["hostname"].startswith(filter + ".") + else: + filter_func = lambda m: m["hostname"] == filter + filter_type = FilterType.HOSTNAME + elif re.match(r"[a-z0-9-]+(\.[a-z0-9-]+)+$", filter.lower()) and re.search( + r"[a-z-]", filter.lower() + ): + if not args.nginx: + raise parser().error("Cannot parse full domains with Python logs; try --nginx") + filter = filter.lower() + filter_func = lambda m: m["hostname"] == filter + filter_type = FilterType.HOSTNAME + elif re.match(r"/\S*$", filter): + filter_func = lambda m: m["path"] == filter + filter_type = FilterType.PATH + args.all_lines = True + else: + raise RuntimeError( + f"Can't parse {filter} as an IP, hostname, user-id, path, or status code." + ) + return (filter_type, filter_func) + + def passes_filters( string_filter: Callable[[re.Match], bool], # type: ignore[type-arg] # Requires Python 3.9 match: re.Match, # type: ignore[type-arg] # Requires Python 3.9