mirror of https://github.com/zulip/zulip.git
log-search: Factor out argument parsing.
This commit is contained in:
parent
8eab5f6931
commit
bd73e7d411
|
@ -8,7 +8,7 @@ import re
|
|||
import signal
|
||||
import sys
|
||||
from enum import Enum, auto
|
||||
from typing import Callable, List, TextIO
|
||||
from typing import Callable, List, TextIO, Tuple
|
||||
|
||||
ZULIP_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
sys.path.append(ZULIP_PATH)
|
||||
|
@ -164,58 +164,7 @@ class FilterType(Enum):
|
|||
def main() -> None:
|
||||
args = parser().parse_args()
|
||||
|
||||
# The heuristics below are not intended to be precise -- they
|
||||
# certainly count things as "IPv4" or "IPv6" addresses that are
|
||||
# invalid. However, we expect the input here to already be
|
||||
# reasonably well-formed.
|
||||
filter = args.filter
|
||||
|
||||
if re.match(r"[1-5][0-9][0-9]$", filter):
|
||||
string_filter = lambda m: m["code"] == filter
|
||||
filter_type = FilterType.STATUS
|
||||
if not args.nginx and filter == "502":
|
||||
logging.warning("Adding --nginx -- 502's do not appear in Django logs.")
|
||||
args.nginx = True
|
||||
elif re.match(r"[1-5]xx$", filter):
|
||||
filter = filter[0]
|
||||
string_filter = lambda m: m["code"].startswith(filter)
|
||||
filter_type = FilterType.STATUS
|
||||
elif re.match(r"\d+$", filter):
|
||||
if args.nginx:
|
||||
raise parser().error("Cannot parse user-ids with nginx logs; try without --nginx")
|
||||
string_filter = lambda m: m["user_id"] == filter
|
||||
filter_type = FilterType.USER_ID
|
||||
elif re.match(r"\d{1,3}(\.\d{1,3}){3}$", filter):
|
||||
string_filter = lambda m: m["ip"] == filter
|
||||
filter_type = FilterType.CLIENT_IP
|
||||
elif re.match(r"([a-f0-9:]+:+){1,7}[a-f0-9]+$", filter):
|
||||
string_filter = lambda m: m["ip"] == filter
|
||||
filter_type = FilterType.CLIENT_IP
|
||||
elif re.match(r"[a-z0-9]([a-z0-9-]*[a-z0-9])?$", filter.lower()):
|
||||
filter = filter.lower()
|
||||
if args.nginx:
|
||||
string_filter = lambda m: m["hostname"].startswith(filter + ".")
|
||||
else:
|
||||
string_filter = lambda m: m["hostname"] == filter
|
||||
filter_type = FilterType.HOSTNAME
|
||||
elif re.match(r"[a-z0-9-]+(\.[a-z0-9-]+)+$", filter.lower()) and re.search(
|
||||
r"[a-z-]", filter.lower()
|
||||
):
|
||||
if not args.nginx:
|
||||
raise parser().error("Cannot parse full domains with Python logs; try --nginx")
|
||||
filter = filter.lower()
|
||||
string_filter = lambda m: m["hostname"] == filter
|
||||
filter_type = FilterType.HOSTNAME
|
||||
elif re.match(r"/\S*$", filter):
|
||||
string_filter = lambda m: m["path"] == filter
|
||||
filter_type = FilterType.PATH
|
||||
args.all_lines = True
|
||||
else:
|
||||
raise RuntimeError(
|
||||
f"Can't parse {filter} as an IP, hostname, user-id, path, or status code."
|
||||
)
|
||||
assert filter_type is not None
|
||||
|
||||
(filter_type, filter_func) = parse_filters(args)
|
||||
logfile_names = parse_logfile_names(args)
|
||||
|
||||
try:
|
||||
|
@ -224,7 +173,7 @@ def main() -> None:
|
|||
for logline in logfile:
|
||||
# As a performance optimization, just do a substring
|
||||
# check before we parse the line fully
|
||||
if filter not in logline.lower():
|
||||
if args.filter not in logline.lower():
|
||||
continue
|
||||
|
||||
if args.nginx:
|
||||
|
@ -236,7 +185,7 @@ def main() -> None:
|
|||
if args.nginx:
|
||||
print(f"! Failed to parse:\n{logline}", file=sys.stderr)
|
||||
continue
|
||||
if passes_filters(string_filter, match, args):
|
||||
if passes_filters(filter_func, match, args):
|
||||
print_line(
|
||||
match,
|
||||
args,
|
||||
|
@ -277,6 +226,63 @@ def parse_logfile_names(args: argparse.Namespace) -> List[str]:
|
|||
return logfile_names
|
||||
|
||||
|
||||
def parse_filters(
|
||||
args: argparse.Namespace,
|
||||
) -> Tuple[FilterType, Callable[[re.Match], bool]]: # type: ignore[type-arg] # Requires Python 3.9
|
||||
# The heuristics below are not intended to be precise -- they
|
||||
# certainly count things as "IPv4" or "IPv6" addresses that are
|
||||
# invalid. However, we expect the input here to already be
|
||||
# reasonably well-formed.
|
||||
|
||||
filter = args.filter
|
||||
|
||||
if re.match(r"[1-5][0-9][0-9]$", filter):
|
||||
filter_func = lambda m: m["code"] == filter
|
||||
filter_type = FilterType.STATUS
|
||||
if not args.nginx and filter == "502":
|
||||
logging.warning("Adding --nginx -- 502's do not appear in Django logs.")
|
||||
args.nginx = True
|
||||
elif re.match(r"[1-5]xx$", filter):
|
||||
filter = filter[0]
|
||||
filter_func = lambda m: m["code"].startswith(filter)
|
||||
filter_type = FilterType.STATUS
|
||||
elif re.match(r"\d+$", filter):
|
||||
if args.nginx:
|
||||
raise parser().error("Cannot parse user-ids with nginx logs; try without --nginx")
|
||||
filter_func = lambda m: m["user_id"] == filter
|
||||
filter_type = FilterType.USER_ID
|
||||
elif re.match(r"\d{1,3}(\.\d{1,3}){3}$", filter):
|
||||
filter_func = lambda m: m["ip"] == filter
|
||||
filter_type = FilterType.CLIENT_IP
|
||||
elif re.match(r"([a-f0-9:]+:+){1,7}[a-f0-9]+$", filter):
|
||||
filter_func = lambda m: m["ip"] == filter
|
||||
filter_type = FilterType.CLIENT_IP
|
||||
elif re.match(r"[a-z0-9]([a-z0-9-]*[a-z0-9])?$", filter.lower()):
|
||||
filter = filter.lower()
|
||||
if args.nginx:
|
||||
filter_func = lambda m: m["hostname"].startswith(filter + ".")
|
||||
else:
|
||||
filter_func = lambda m: m["hostname"] == filter
|
||||
filter_type = FilterType.HOSTNAME
|
||||
elif re.match(r"[a-z0-9-]+(\.[a-z0-9-]+)+$", filter.lower()) and re.search(
|
||||
r"[a-z-]", filter.lower()
|
||||
):
|
||||
if not args.nginx:
|
||||
raise parser().error("Cannot parse full domains with Python logs; try --nginx")
|
||||
filter = filter.lower()
|
||||
filter_func = lambda m: m["hostname"] == filter
|
||||
filter_type = FilterType.HOSTNAME
|
||||
elif re.match(r"/\S*$", filter):
|
||||
filter_func = lambda m: m["path"] == filter
|
||||
filter_type = FilterType.PATH
|
||||
args.all_lines = True
|
||||
else:
|
||||
raise RuntimeError(
|
||||
f"Can't parse {filter} as an IP, hostname, user-id, path, or status code."
|
||||
)
|
||||
return (filter_type, filter_func)
|
||||
|
||||
|
||||
def passes_filters(
|
||||
string_filter: Callable[[re.Match], bool], # type: ignore[type-arg] # Requires Python 3.9
|
||||
match: re.Match, # type: ignore[type-arg] # Requires Python 3.9
|
||||
|
|
Loading…
Reference in New Issue