#!/usr/bin/env python3 # This bit has to stay at the very top of the script. It exists to ensure that # running this script all by itself uses the python virtual environment with # our dependencies installed. If will create that environment if it doesn't # exist. import os import sys import subprocess args = [os.path.join("build", "bin", "python3")] + sys.argv # Create the build root if it doesn't exist if not os.path.exists("build"): import venv print("Build environment does not exist, creating...", file=sys.stderr) venv.create("build", with_pip=True) subprocess.run(["build/bin/pip", "install", "-U", "pip", "pyhocon", "boto3", "python-dateutil", "PyYAML"]) print("Re-executing with builder python...", file=sys.stderr) os.execv(args[0], args) else: # If the build root python is not running this script re-execute it with # that python instead to ensure all of our dependencies exist. if os.path.join(os.getcwd(), args[0]) != sys.executable: print("Re-executing with builder python...", file=sys.stderr) os.execv(args[0], args) # Below here is the real script import io import os import re import sys import glob import json import time import shutil import logging import argparse import textwrap import subprocess import urllib.error import dateutil.parser from enum import Enum from collections import defaultdict from datetime import datetime, timedelta from distutils.version import StrictVersion from urllib.request import Request, urlopen import yaml import boto3 import pyhocon # allows us to set values deep within an object that might not be fully defined def dictfactory(): return defaultdict(dictfactory) # undo dictfactory() objects to normal objects def undictfactory(o): if isinstance(o, defaultdict): o = {k: undictfactory(v) for k, v in o.items()} return o # This is an ugly hack. We occasionally need the region name but it's not # attached to anything publicly exposed on the client objects. Hide this here. def region_from_client(client): return client._client_config.region_name # version sorting def sortable_version(x): v = x.split('_rc')[0] return StrictVersion("0.0" if v == "edge" else v) class EC2Architecture(Enum): I386 = "i386" X86_64 = "x86_64" ARM64 = "arm64" class AMIState(Enum): PENDING = "pending" AVAILABLE = "available" INVALID = "invalid" DEREGISTERED = "deregistered" TRANSIENT = "transient" FAILED = "failed" ERROR = "error" class EC2SnapshotState(Enum): PENDING = "pending" COMPLETED = "completed" ERROR = "error" class TaggedAWSObject: """Base class for AWS API models that support tagging """ EDGE = StrictVersion("0.0") missing_known_tags = None _identity = lambda x: x _known_tags = { "Name": _identity, "profile": _identity, "revision": _identity, "profile_build": _identity, "source_ami": _identity, "source_region": _identity, "arch": lambda x: EC2Architecture(x), "end_of_life": lambda x: datetime.fromisoformat(x), "release": lambda v: EDGE if v == "edge" else StrictVersion(v), "version": lambda v: EDGE if v == "edge" else StrictVersion(v), } def __repr__(self): attrs = [] for k, v in self.__dict__.items(): if isinstance(v, TaggedAWSObject): attrs.append(f"{k}=" + object.__repr__(v)) elif not k.startswith("_"): attrs.append(f"{k}={v!r}") attrs = ", ".join(attrs) return f"{self.__class__.__name__}({attrs})" __str__ = __repr__ @property def aws_tags(self): """Convert python tags to AWS API tags See AMI.aws_permissions for rationale. """ for key, values in self.tags.items(): for value in values: yield { "Key": key, "Value": value } @aws_tags.setter def aws_tags(self, values): """Convert AWS API tags to python tags See AMI.aws_permissions for rationale. """ if not getattr(self, "tags", None): self.tags = {} tags = defaultdict(list) for tag in values: tags[tag["Key"]].append(tag["Value"]) self.tags.update(tags) self._transform_known_tags() # XXX(mcrute): The second paragraph might be considered a bug and worth # fixing at some point. For now those are all read-only attributes though. def _transform_known_tags(self): """Convert well known tags into python attributes Some tags have special meanings for the model objects that they're attached to. This copies those tags, transforms them, then sets them in the model attributes. It doesn't touch the tag itself so if that attribute needs updated and re-saved the tag must be updated in addition to the model. """ self.missing_known_tags = [] for k, tf in self._known_tags.items(): v = self.tags.get(k, []) if not v: self.missing_known_tags.append(k) continue if len(v) > 1: raise Exception(f"multiple instances of tag {k}") setattr(self, k, v[0]) class AMI(TaggedAWSObject): @property def aws_permissions(self): """Convert python permissions to AWS API permissions The permissions model for the API makes more sense for a web service but is overly verbose for working with in Python. This and the setter allow transforming to/from the API syntax. The python code should consume the allowed_groups and allowed_users lists directly. """ perms = [] for g in self.allowed_groups: perms.append({"Group": g}) for i in self.allowed_users: perms.append({"UserId": i}) return perms @aws_permissions.setter def aws_permissions(self, perms): """Convert AWS API permissions to python permissions """ for perm in perms: group = perm.get("Group") if group: self.allowed_groups.append(group) user = perm.get("UserId") if user: self.allowed_users.append(user) @classmethod def from_aws_model(cls, ob, region): self = cls() self.linked_snapshot = None self.allowed_groups = [] self.allowed_users = [] self.region = region self.architecture = EC2Architecture(ob["Architecture"]) self.creation_date = ob["CreationDate"] self.description = ob.get("Description", None) self.image_id = ob["ImageId"] self.name = ob.get("Name") self.owner_id = int(ob["OwnerId"]) self.public = ob["Public"] self.state = AMIState(ob["State"]) self.virtualization_type = ob["VirtualizationType"] self.state_reason = ob.get("StateReason", {}).get("Message", None) self.aws_tags = ob.get("Tags", []) # XXX(mcrute): Assumes we only ever have one device mapping, which is # valid for Alpine AMIs but not a good general assumption. # # This should always resolve for AVAILABLE images but any part of the # data structure may not yet exist for images that are still in the # process of copying. if ob.get("BlockDeviceMappings"): self.snapshot_id = \ ob["BlockDeviceMappings"][0]["Ebs"].get("SnapshotId") return self class EC2Snapshot(TaggedAWSObject): @classmethod def from_aws_model(cls, ob, region): self = cls() self.linked_ami = None self.region = region self.snapshot_id = ob["SnapshotId"] self.description = ob.get("Description", None) self.owner_id = int(ob["OwnerId"]) self.progress = int(ob["Progress"].rstrip("%")) / 100 self.start_time = ob["StartTime"] self.state = EC2SnapshotState(ob["State"]) self.volume_size = ob["VolumeSize"] self.aws_tags = ob.get("Tags", []) return self class ColoredFormatter(logging.Formatter): """Log formatter that colors output based on level """ _colors = { "red": "31", "green": "32", "yellow": "33", "blue": "34", "magenta": "35", "cyan": "36", "white": "37", } def _color_wrap(self, text, color, bold=False): code = self._colors[color] if bold: code = "1;{}".format(code) return "\033[{}m{}\033[0m".format(code, text) def format(self, record): msg = super().format(record) # Levels: CRITICAL, ERROR, WARNING, INFO, DEBUG, NOTSET if record.levelno in {logging.ERROR, logging.CRITICAL}: return self._color_wrap(msg, "red") elif record.levelno == logging.WARNING: return self._color_wrap(msg, "yellow") else: return self._color_wrap(msg, "green") class IdentityBrokerClient: """Client for identity broker Export IDENTITY_BROKER_ENDPOINT to override the default broker endpoint. Export IDENTITY_BROKER_API_KEY to specify an API key for the broker. See README_BROKER.md for more information and a spec. """ _DEFAULT_ENDPOINT = "https://aws-access.crute.us/api/account" _DEFAULT_ACCOUNT = "alpine-amis-user" def __init__(self, endpoint=None, key=None, account=None): self.endpoint = endpoint or self._DEFAULT_ENDPOINT self.account = account or self._DEFAULT_ACCOUNT self.key = key self._logger = logging.getLogger() override_endpoint = os.environ.get("IDENTITY_BROKER_ENDPOINT") if override_endpoint: self.endpoint = override_endpoint if not self.key: self.key = os.environ.get("IDENTITY_BROKER_API_KEY") if not self.key: raise Exception("No identity broker key found") def _get(self, path): while True: # to handle rate limits try: res = urlopen(Request(path, headers={"X-API-Key": self.key})) except urllib.error.HTTPError as ex: if ex.headers.get("Location") == "/logout": raise Exception("Identity broker token is expired") if ex.status == 429: self._logger.warning( "Rate-limited by identity broker, sleeping 30 seconds") time.sleep(30) continue raise ex if res.status not in {200, 429}: raise Exception(res.reason) return json.load(res) def get_credentials_url(self): for account in self._get(self.endpoint): if account["short_name"] == self.account: return account["credentials_url"] raise Exception("No account found") def get_regions(self): out = {} for region in self._get(self.get_credentials_url()): if region["enabled"]: out[region["name"]] = region["credentials_url"] return out def get_credentials(self, region): return self._get(self.get_regions()[region]) def _boto3_session_from_creds(self, creds, region): return boto3.session.Session( aws_access_key_id=creds["access_key"], aws_secret_access_key=creds["secret_key"], aws_session_token=creds["session_token"], region_name=region) def boto3_session_for_region(self, region): return self._boto3_session_from_creds( self.get_credentials(region), region) def iter_regions(self): for region, cred_url in self.get_regions().items(): yield self._boto3_session_from_creds(self._get(cred_url), region) class ConfigBuilder: now = datetime.utcnow() tomorrow = now + timedelta(days=1) @staticmethod def unquote(x): return x.strip('"') @staticmethod def force_iso_date(input): return datetime.fromisoformat(input).isoformat(timespec="seconds") @classmethod def resolve_release(cls, input, cfg): if input: # release is explicitly defined, run with it return input if cfg['version'] == 'edge': return 'edge' # hack to determine release value from version's alpine-base APK pkgs_url = f"https://dl-cdn.alpinelinux.org/alpine/v{cfg['version']}/main/{cfg['arch']}/" try: res = urlopen(pkgs_url) except urllib.error.HTTPError as ex: print(f"Unable to urlopen {pkgs_url} - {ex}", file=sys.stderr) exit(1) line = res.readline().decode('utf-8') while line: if line.startswith(' 1: raise Exception(f"Too many images for query {tags!r}") elif len(images) == 0: return None else: return images[0] def get_all_images(client): return get_images_with_tags(client) class ReleaseAMIs: """Copy one or more AMIs to other regions and/or set AMI permissions. By default, source AMI permissions are applied to their copies, unless --public, --private, or --allow-account options are specified. If the source AMI's permissions are different than the options provided, its permissions will be updated to match. """ command_name = "release" @staticmethod def add_args(parser): parser.add_argument("--source-region", default="us-west-2", metavar="REGION", help="region of source AMI(s)") rgroup = parser.add_mutually_exclusive_group(required=True) rgroup.add_argument("--use-broker", action="store_true", help="identity broker provides destination regions and credentials") rgroup.add_argument("--region", "-r", action="append", dest="regions", metavar="REGION", help="destination region (multiple OK)") pgroup = parser.add_mutually_exclusive_group() pgroup.add_argument("--public", action="store_true", default=None, help="make source and copied AMIs public") pgroup.add_argument("--private", dest="public", action="store_false", help="make source and copied AMIs private") pgroup.add_argument("--allow-account", dest="allow_accounts", action="append", metavar="ID", help="make source and copied AMIs " "accessible by AWS account id (multiple OK)") parser.add_argument("amis", metavar="AMI", nargs="+", help="AMI id(s) to copy") def get_source_region_client(self, use_broker, source_region): if use_broker: return IdentityBrokerClient().boto3_session_for_region( source_region).client("ec2") else: return boto3.session.Session(region_name=source_region).client( "ec2") def copy_image(self, from_client, to_client, image_id): source = get_image(from_client, image_id) res = to_client.copy_image( Name=source.name, Description=source.description, SourceImageId=source.image_id, SourceRegion=source.region) tags = [ { "Key": "source_ami", "Value": source.image_id, }, { "Key": "source_region", "Value": source.region, } ] tags.extend(source.aws_tags) to_client.create_tags(Resources=[res["ImageId"]], Tags=tags) return get_image(to_client, res["ImageId"]) def has_incorrect_perms(self, image, perms): if (set(image.allowed_groups) != set(perms['groups']) or set(image.allowed_users) != set(perms['users'])): return True def update_image_permissions(self, client, image): client.reset_image_attribute( Attribute="launchPermission", ImageId=image.image_id) client.modify_image_attribute( Attribute="launchPermission", ImageId=image.image_id, LaunchPermission={"Add": image.aws_permissions}) def run(self, args, root, log): source_perms = {} pending_copy = [] pending_perms = [] source_region = args.source_region log.info(f"Source region {source_region}") source_client = self.get_source_region_client( args.use_broker, source_region) # resolve source ami perms, queue for fixing if necessary for ami in args.amis: image = get_image(source_client, ami) if args.public is True: source_perms[ami] = { 'groups': ['all'], 'users': [] } elif args.public is False: source_perms[ami] = { 'groups': [], 'users': [] } elif args.allow_accounts: source_perms[ami] = { 'groups': [], 'users': args.allow_accounts } else: log.warning(f"Will apply {source_region} {ami} permissions to its copies") source_perms[ami] = { 'groups': image.allowed_groups, 'users': image.allowed_users } if self.has_incorrect_perms(image, source_perms[ami]): log.warning(f"Will update source {source_region} {ami} permissions") pending_perms.append((source_client, ami, source_perms[ami])) # Copy image to regions where it's missing, queue images that need # permission fixes log.info('') for client in iter_regions(args.use_broker, args.regions): region_name = region_from_client(client) # For logging # Don't bother copying to source region if region_name == args.source_region: continue log.info(f"Destination region {region_name}...") for ami in args.amis: src_log = f"* source {ami}" image = get_image_with_tags(client, source_ami=ami, source_region=args.source_region) if not image: log.info(f"{src_log} - copying to {region_name}") ami_copy = self.copy_image(source_client, client, ami) pending_copy.append( (client, ami_copy.image_id, source_perms[ami])) elif self.has_incorrect_perms(image, source_perms[ami]): log.info(f"{src_log} - will update {image.image_id} perms") pending_perms.append( (client, image.image_id, source_perms[ami])) else: log.info(f"{src_log} - verified {image.image_id}") log.info('') if pending_copy: # seems to take at least 3m pending_copy.append(('sleep', 180, '')) # Wait for images to copy while pending_copy: client, id, perms = pending_copy.pop(0) # emulate a FIFO queue if client == 'sleep': if not pending_copy: continue log.info(f"Sleeping {id}s...") time.sleep(id) # recheck every 30s pending_copy.append(('sleep', 30, '')) continue region_name = region_from_client(client) # For logging image = get_image(client, id) if image.state != AMIState.AVAILABLE: log.info(f"- copying: {id} ({region_name})") pending_copy.append((client, id, perms)) else: done_log = f"+ completed: {id} ({region_name})" if self.has_incorrect_perms(image, perms): log.info(f"{done_log} - will update perms") pending_perms.append((client, id, perms)) else: log.info(f"{done_log} - verified perms") # Update all permissions for client, id, perms in pending_perms: region_name = region_from_client(client) # For logging log.info(f"% updating perms: {id} ({region_name})") image = get_image(client, id) image.allowed_groups = perms['groups'] image.allowed_users = perms['users'] self.update_image_permissions(client, image) if pending_perms: log.info('') log.info('Release Completed') class Releases: RELEASE_FIELDS = [ 'description', 'profile', 'profile_build', 'version', 'release', 'arch', 'revision', 'creation_date', 'end_of_life' ] def __init__(self, profile=None, use_broker=None, regions=None): self.profile = profile self.tags = { 'profile': profile } self.use_broker = use_broker self.regions = regions self.clients = {} self.reset_images() self.reset_releases() def reset_images(self): self.images = defaultdict(list) def reset_releases(self): self.releases = dictfactory() # TODO: separate Clients class? def iter_clients(self): if not self.clients: for client in iter_regions(self.use_broker, self.regions): region = region_from_client(client) self.clients[region] = client yield (region, client) else: for region, client in self.clients.items(): yield (region, client) # when we're just interested in the profile's images def load_profile_images(self, log=None): for region, client in self.iter_clients(): if log: log.info(f"Loading '{self.profile}' profile images from {region}...") self.images[region] = get_images_with_tags(client, **self.tags) # not belonging to any profile def load_unknown_images(self, log=None): for region, client in self.iter_clients(): if log: log.info(f"Loading unknown images from {region}...") for image in get_all_images(client): if 'profile' not in image.tags: self.images[region].append(image) # build profile releases object based on loaded self.images def build_releases(self, log=None, trim=None): now = datetime.utcnow() versions = dictfactory() for region, amis in self.images.items(): if log: log.info(f"{region}") for ami in amis: eol = datetime.fromisoformat(ami.end_of_life) # if we're trimming, we're not interested in EOL images if trim and eol < now: continue version = ami.version release = ami.release build = ami.profile_build name = ami.name id = ami.image_id build_time = int(dateutil.parser.parse(ami.creation_date).strftime('%s')) if log: log.info(f" * {ami.image_id} {ami.name}") version_obj = versions[version][release][build][name] for field in self.RELEASE_FIELDS: if field not in version_obj: version_obj[field] = getattr(ami, field) # ensure earliest build_time is used if ('build_time' not in version_obj or build_time < version_obj['build_time']): version_obj['build_time'] = build_time version_obj['creation_date'] = ami.creation_date version_obj['artifacts'][region] = id for version, releases in versions.items(): for release, builds in sorted(releases.items(), reverse=True, key=lambda x: sortable_version(x[0])): for build, revisions in builds.items(): for revision, info in sorted(revisions.items(), reverse=True, key=lambda x: x[1]['build_time']): self.releases[release][build][revision] = info # if we are trimming, we want only the most recent revisions if trim: break # if we are trimming releases, we want only the most recent release if trim == 'release': break class ReleasesYAML: """Update releases/.yaml with profile's currently existing AMIs """ command_name = "release-yaml" @staticmethod def add_args(parser): TRIM_HELP=""" revision = keep last x.y.z-r# of non-EOL releases, release = keep last x.y.# of non-EOL versions """ rgroup = parser.add_mutually_exclusive_group(required=True) rgroup.add_argument("--use-broker", action="store_true", help="identity broker provides destination regions and credentials") rgroup.add_argument("--region", "-r", action="append", dest="regions", metavar="REGION", help="destination region (multiple OK)") parser.add_argument("--trim", "-t", choices=['revision','release'], help=TRIM_HELP) parser.add_argument("profile", metavar="PROFILE", help="profile name") def run(self, args, root, log): release_dir = os.path.join(root, 'releases') if not os.path.exists(release_dir): os.makedirs(release_dir) release_yaml = os.path.join(release_dir, f"{args.profile}.yaml") r = Releases( profile = args.profile, use_broker = args.use_broker, regions = args.regions) r.load_profile_images(log) r.build_releases(trim=args.trim) log.info(f"Writing new {release_yaml}") with open(release_yaml, 'w') as data: yaml.dump(undictfactory(r.releases), data, sort_keys=False) class ReleasesReadme: """Build releases/README_.md from releases/.yaml """ command_name = "release-readme" SECTION_TPL = textwrap.dedent(""" ### Alpine Linux {release} ({date})
click to show/hide

{rows}

""") AMI_TPL = ( " [{id}](https://{r}.console.aws.amazon.com/ec2/home" "#Images:visibility=public-images;imageId={id}) " "([launch](https://{r}.console.aws.amazon.com/ec2/home" "#launchAmi={id})) |" ) @staticmethod def add_args(parser): parser.add_argument("profile", metavar="PROFILE", help="profile name") @staticmethod def extract_ver(x): return sortable_version(x['release']) def resolve_sections(self, release_data, log): sects = dictfactory() for release, builds in sorted(release_data.items(), reverse=True): version = '.'.join(release.split('.')[0:2]) if version in sects: continue for build, revisions in builds.items(): ver = sects[version] ver['release'] = release name, info = sorted( revisions.items(), key=lambda x: x[1]['build_time'], reverse=True)[0] if name in ver['builds']: log.warning( f"Duplicate AMI '{name}' in builds " f"'{info['profile_build']} and " f"'{ver['builds'][name]['build']}") ver['builds'][name] = { 'build': info['profile_build'], 'built': int(info['build_time']), 'amis': info['artifacts'] } self.sections = sorted( undictfactory(sects).values(), key=self.extract_ver, reverse=True) def get_ami_markdown(self): ami_list = "## AMIs\n" for section in self.sections: built = 0 regions = [] rows = ["| Region |", "| ------ |"] for name, info in sorted(section['builds'].items()): rows[0] += f" {name} |" rows[1] += " --- |" regions = set(regions) | set(info['amis'].keys()) built = max(built, info['built']) for region in sorted(regions): row = f"| {region} |" for name, info in sorted(section['builds'].items()): amis = info['amis'] if region in amis: row += self.AMI_TPL.format(r=region, id=amis[region]) else: row += ' |' rows.append(row) ami_list += self.SECTION_TPL.format( release=section['release'].capitalize(), date=datetime.utcfromtimestamp(built).date(), rows="\n".join(rows)) return ami_list def run(self, args, root, log): profile = args.profile release_dir = os.path.join(root, "releases") profile_file = os.path.join(release_dir, f"{profile}.yaml") with open(profile_file, "r") as data: self.resolve_sections(yaml.safe_load(data), log) ami_markdown = self.get_ami_markdown() readme = "" readme_md = os.path.join(release_dir, f"README_{profile}.md") action = "Updated" if os.path.exists(readme_md): with open(readme_md, "r") as file: readme = file.read() else: action = "Created" re_images = re.compile(r"## AMIs.*\Z", flags=re.DOTALL) if re_images.search(readme): readme = re_images.sub(ami_markdown, readme) else: log.warning("appending") readme += "\n" + ami_markdown with open(readme_md, "w") as file: file.write(readme) log.info(f"{action} {readme_md}") class PruneAMIs: """Prune Released AMIs """ command_name = "prune" @staticmethod def add_args(parser): LEVEL_HELP = """ revision = x.y.z-r#, release = x.y.#, end-of-life = EOL versions (#.#), UNKNOWN = AMIs with no profile tag """ parser.add_argument("level", choices=["revision", "release", "end-of-life", "UNKNOWN"], help=LEVEL_HELP) rgroup = parser.add_mutually_exclusive_group(required=True) rgroup.add_argument("--use-broker", action="store_true", help="use identity broker to obtain per-region credentials") rgroup.add_argument("--region", "-r", metavar='REGION', dest='regions', action="append", help="regions to prune, may be specified multiple times") parser.add_argument("profile", metavar='PROFILE', help="profile to prune") parser.add_argument("builds", metavar='BUILD', nargs="*", help="build(s) within profile to prune") agroup = parser.add_mutually_exclusive_group() agroup.add_argument( '--keep', metavar='NUM', type=int, default=0, help='keep NUM most-recent additional otherwise-pruneable per LEVEL') agroup.add_argument('--defer-eol', metavar='DAYS', type=int, default=0, help='defer end-of-life pruning for additional days') parser.add_argument( '--no-pretend', action='store_true', help='actually prune images') @staticmethod def check_args(args): if args.level != 'end-of-life' and args.defer_eol != 0: return ["--defer-eol may only be used with 'end-of-life' pruning."] if args.keep < 0: return ["Only non-negative integers are valid for --keep."] def __init__(self): self.pruneable = dictfactory() def find_pruneable(self, r, args, log): level = args.level builds = args.builds keep = args.keep defer_eol = args.defer_eol now = datetime.utcnow() - timedelta(days=args.defer_eol) # build releases from profile images r.load_profile_images(log) r.build_releases() # scan for pruning criteria criteria = dictfactory() for release, rdata in r.releases.items(): for build, bdata in rdata.items(): if builds and build not in builds: continue for ami_name, info in bdata.items(): version = info['version'] built = info['build_time'] eol = datetime.fromisoformat(info['end_of_life']) # default: level == 'release' basis = version if level == 'revision': basis = release elif level == 'end-of-life': # not enough in common with revision/release if build not in criteria[version]: criteria[version][build] = [now] if eol < now and eol not in criteria[version][build]: criteria[version][build].append(eol) criteria[version][build].sort(reverse=True) continue # revsion/release have enough commonality if build not in criteria[basis]: criteria[basis][build] = [built] elif built not in criteria[basis][build]: criteria[basis][build].append(built) criteria[basis][build].sort(reverse=True) # scan again to determine what doesn't make the cut for release, rdata in r.releases.items(): for build, bdata in rdata.items(): if builds and build not in builds: continue for ami_name, info in bdata.items(): version = info['version'] built = info['build_time'] eol = datetime.fromisoformat(info['end_of_life']) # default: level == 'release' basis = version value = built if level == 'revision': basis = release elif level == 'end-of-life': value = eol c = criteria[basis][build] if keep < len(c) and value < c[keep]: for region, ami in info['artifacts'].items(): self.pruneable[region][ami] = { 'name': ami_name, } # populate AMI creation_date and snapshot_id from Release().images for region, images in r.images.items(): for image in images: if image.image_id in self.pruneable[region]: p = self.pruneable[region][image.image_id] p['name'] = image.name p['creation_date'] = dateutil.parser.parse( image.creation_date).strftime('%Y-%m-%d') p['snapshot_id'] = image.snapshot_id def all_unknown_pruneable(self, r, log): r.load_unknown_images(log) for region, images in r.images.items(): for image in images: self.pruneable[region][image.image_id] = { 'name': image.name, 'creation_date': dateutil.parser.parse( image.creation_date).strftime('%Y-%m-%d'), 'snapshot_id': image.snapshot_id } def run(self, args, root, log): # instantiate Releases object r = Releases( profile=args.profile, use_broker=args.use_broker, regions=args.regions) if args.level == 'UNKNOWN': self.all_unknown_pruneable(r, log) else: self.find_pruneable(r, args, log) for region, amis in sorted(self.pruneable.items()): r_str = f"{args.level} AMIs in {region}" if not amis: log.info(f"No pruneable {r_str}.") continue if args.no_pretend: log.error(f"REMOVING {r_str}:") else: log.warning(f"Removable {r_str}:") for ami, info in sorted(amis.items(), key=lambda x: x[1]['creation_date']): a_str = f" * {ami} ({info['creation_date']}) {info['name']}" if args.no_pretend: log.warning(a_str) r.clients[region].deregister_image(ImageId=ami) r.clients[region].delete_snapshot(SnapshotId=info['snapshot_id']) else: log.info(a_str) def find_repo_root(): """Find the root of the repo, which contains a .git folder """ path = os.getcwd() while ".git" not in set(os.listdir(path)) and path != "/": path = os.path.dirname(path) if path == "/": raise Exception("No repo found, stopping at /") return path def main(): """An introspective main method Just some silly metaprogramming to make commands really easy to write and to avoid needing to hand register them. Commands have a specific interface, per below, but should be really easy to create and will be auto discovered. Commands are objects that have the following attributes: __doc__ (python docstring) used as help text in the CLI command_name (string) name of the command as invoked by the cli add_args(parser) (class or static method) passed an argparse subparser at setup time that will ultimately handle the arguments for the command at runtime. Should add any configuration necessary for the command to use later. Must not rely on object state as it is not invoked with an instance of the object. check_args(self, args) (class method, optional) passed the set of arguments that were parsed before the command was invoked. This function should return a list of errors or None. Non-empty lists will cause the command to not be executed and help being printed. run(self, args, root, log) (instance method) passed the arguments object as parsed by argparse as well as a string indicating the root of the repository (the folder containing the .git folder) and an instance of a stanard libary logger for output. Should throw exceptions on error and return when completed. Should *not* execute sys.exit """ dispatch = {} parser = argparse.ArgumentParser() subs = parser.add_subparsers(dest="command_name", required=True) # Configure logger logger = logging.getLogger() handler = logging.StreamHandler() handler.setFormatter(ColoredFormatter(fmt="%(message)s")) logger.addHandler(handler) logger.setLevel(logging.INFO) for command in sys.modules[__name__].__dict__.values(): if not hasattr(command, "command_name"): continue dispatch[command.command_name] = command() doc = getattr(command, "__doc__", "") subparser = subs.add_parser( command.command_name, help=doc, description=doc) add_args = getattr(command, "add_args", None) if add_args: command.add_args(subparser) args = parser.parse_args() command = dispatch[args.command_name] errors = getattr(command, "check_args", lambda x: [])(args) if errors: logger.error("\n".join(errors)) # Ugly hack, gets the help for the subcommand, no public API for this parser._actions[1]._name_parser_map[args.command_name].print_help() else: command.run(args, find_repo_root(), logger) if __name__ == "__main__": main()