alpine-zdt-images/scripts/prune-amis.py.in

169 lines
5.8 KiB
Python

#@PYTHON@
# vim: ts=4 et:
import os
import sys
import argparse
from datetime import datetime
import yaml
import boto3
from botocore.exceptions import ClientError
LEVEL_HELP = """\
revision - keep only the latest revision per release
release - keep only the latest release per version
version - keep only the versions that aren't end-of-life
"""
def find_repo_root():
path = os.getcwd()
while ".git" not in set(os.listdir(path)) and path != "/":
path = os.path.dirname(path)
if path == "/":
raise Exception("No repo found, stopping at /")
return path
def main(args):
parser = argparse.ArgumentParser(
description="Prune AMIs from AWS",
formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument(
"level", choices=["revision", "release", "version"], help=LEVEL_HELP)
parser.add_argument("profile", help="profile to prune")
parser.add_argument(
"build", nargs="?", help="build within profile to prune")
args = parser.parse_args()
now = datetime.utcnow()
release_yaml = os.path.join(
find_repo_root() "releases", f"{args.profile}.yaml")
with open(release_yaml, "r") as data:
before = yaml.safe_load(data)
known = {}
prune = {}
after = {}
# for all builds in the profile...
for build_name, releases in before.items():
# this is not the build that was specified
if args.build is not None and args.build != build_name:
print(f"< skipping {args.profile}/{build_name}")
# ensure its release data remains intact
after[build_name] = before[build_name]
continue
else:
print(f"> PRUNING {args.profile}/{build_name} for {args.level}")
criteria = {}
# scan releases for pruning criteria
for release, amis in releases.items():
for ami_name, info in amis.items():
version = info["version"]
built = info["build_time"]
if info["end_of_life"]:
eol = datetime.fromisoformat(info["end_of_life"])
else:
eol = None
for region, ami_id in info["artifacts"].items():
if region not in known:
known[region] = []
known[region].append(ami_id)
if args.level == "revision":
# find build timestamp of most recent revision, per release
if release not in criteria or built > criteria[release]:
criteria[release] = built
elif args.level == "release":
# find build timestamp of most recent revision, per version
if version not in criteria or built > criteria[version]:
criteria[version] = built
elif args.level == "version":
# find latest EOL date, per version
if (version not in criteria or not criteria[version]) or (
eol and eol > criteria[version]):
criteria[version] = eol
# rescan again to determine what doesn't make the cut
for release, amis in releases.items():
for ami_name, info in amis.items():
version = info["version"]
built = info["build_time"]
if info["end_of_life"]:
eol = datetime.fromisoformat(info["end_of_life"])
else:
eol = None
if ((args.level == "revision" and built < criteria[release]) or
(args.level == "release" and built < criteria[version]) or
(args.level == "version" and criteria[version] and (
(version != "edge" and criteria[version] < now) or
(version == "edge" and ((not eol) or (eol < now)))
))):
for region, ami_id in info["artifacts"].items():
if region not in prune:
prune[region] = []
prune[region].append(ami_id)
else:
if build_name not in after:
after[build_name] = {}
if release not in after[build_name]:
after[build_name][release] = {}
after[build_name][release][ami_name] = info
# scan all regions for AMIs
AWS = boto3.session.Session()
for region in AWS.get_available_regions("ec2"):
print(f"* scanning: {region} ...")
EC2 = AWS.client("ec2", region_name=region)
try:
for image in EC2.describe_images(Owners=["self"])["Images"]:
action = "? UNKNOWN"
if region in prune and image["ImageId"] in prune[region]:
action = "- REMOVING"
elif region in known and image["ImageId"] in known[region]:
action = "+ KEEPING"
print(f" {action}: {image['Name']}\n = {image['ImageId']}",
end="", flush=True)
if action[0] == "-":
EC2.deregister_image(ImageId=image["ImageId"])
for blockdev in image["BlockDeviceMappings"]:
if "Ebs" in blockdev:
print(", {blockdev['Ebs']['SnapshotId']}",
end="", flush=True)
if action[0] == "-":
EC2.delete_snapshot(
SnapshotId=blockdev["Ebs"]["SnapshotId"])
print()
except ClientError as e:
print(e)
# update releases/<profile>.yaml
with open(release_yaml, "w") as data:
yaml.dump(after, data, sort_keys=False)
if __name__ == "__main__":
main(sys.argv)