zulip/scripts/purge-old-deployments

69 lines
2.5 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import datetime
import os
import subprocess
import sys
if False:
from typing import Set, Text
ZULIP_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(ZULIP_PATH)
from scripts.lib.zulip_tools import DEPLOYMENTS_DIR, get_recent_deployments, \
may_be_perform_purging, TIMESTAMP_FORMAT
def parse_args():
# type: () -> argparse.Namespace
parser = argparse.ArgumentParser(
description="This script can be used for cleaning old unused deployments.",
epilog="Orphaned/unused caches older than threshold days will be automatically "
"examined and removed.")
parser.add_argument(
"--threshold", dest="threshold_days", type=int, default=14,
nargs="?", metavar="<days>", help="Deployments older than "
"threshold days will be deleted. (defaults to 14)")
parser.add_argument(
"--dry-run", dest="dry_run", action="store_true",
help="If specified then script will only print the deployments and "
"caches that it will delete/keep back. It will not delete anything.")
args = parser.parse_args()
return args
def get_deployments_to_be_purged(recent_deployments):
# type: (Set[Text]) -> Set[Text]
all_deployments = set([os.path.join(DEPLOYMENTS_DIR, deployment)
for deployment in os.listdir(DEPLOYMENTS_DIR)])
deployments_to_purge = set()
for deployment in all_deployments:
if not os.path.isdir(deployment):
# Skip things like uwsgi sockets.
continue
try:
datetime.datetime.strptime(deployment, TIMESTAMP_FORMAT)
except ValueError:
# Never purge deployments whose name is not in the format of a timestamp.
continue
if deployment not in recent_deployments:
deployments_to_purge.add(deployment)
return deployments_to_purge
def main():
# type: () -> None
args = parse_args()
deployments_to_keep = get_recent_deployments(args.threshold_days)
deployments_to_purge = get_deployments_to_be_purged(deployments_to_keep)
may_be_perform_purging(deployments_to_purge, deployments_to_keep, "deployment", args.dry_run)
if not args.dry_run:
print("Deployments cleaned successfully...")
print("Cleaning orphaned/unused caches...")
# Call 'clean-unused-caches' script to clean any orphaned/unused caches.
subprocess.check_call(["./scripts/clean-unused-caches"] + sys.argv[1:])
print("Done!\n")
if __name__ == "__main__":
main()