#!/usr/bin/env python3 # vim: ts=4 et: # Ensure we're using the Python virtual env with our installed dependencies import os import sys import subprocess sys.pycache_prefix = 'work/__pycache__' # Create the work environment if it doesn't exist. if not os.path.exists('work'): import venv PIP_LIBS = [ 'mergedeep', 'pyhocon', 'python-dateutil', 'ruamel.yaml', ] print('Work environment does not exist, creating...', file=sys.stderr) venv.create('work', with_pip=True) subprocess.run(['work/bin/pip', 'install', '-U', 'pip', 'wheel']) subprocess.run(['work/bin/pip', 'install', '-U', *PIP_LIBS]) # Re-execute using the right virtual environment, if necessary. venv_args = [os.path.join('work', 'bin', 'python3')] + sys.argv if os.path.join(os.getcwd(), venv_args[0]) != sys.executable: print("Re-executing with work environment's Python...\n", file=sys.stderr) os.execv(venv_args[0], venv_args) # We're now in the right Python environment... import argparse import io import logging import shutil import time from glob import glob from subprocess import Popen, PIPE from urllib.request import urlopen import clouds from alpine import Alpine from image_configs import ImageConfigManager ### Constants & Variables STEPS = ['configs', 'state', 'local', 'import', 'publish'] LOGFORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' WORK_CLEAN = {'bin', 'include', 'lib', 'pyvenv.cfg', '__pycache__'} WORK_OVERLAYS = ['configs', 'scripts'] OVMF_FIRMWARE = { 'aarch64': 'usr/share/OVMF/QEMU_EFI.fd', 'x86_64': 'usr/share/OVMF/OVMF.fd' } alpine = Alpine() ### Functions # ensure list has unique values, preserving order def unique_list(x): d = {e: 1 for e in x} return list(d.keys()) def remove_dupe_args(): class RemoveDupeArgs(argparse.Action): def __call__(self, parser, args, values, option_string=None): setattr(args, self.dest, unique_list(values)) return RemoveDupeArgs def are_args_valid(checker): class AreArgsValid(argparse.Action): def __call__(self, parser, args, values, option_string=None): # remove duplicates values = unique_list(values) for x in values: if not checker(x): parser.error(f"{option_string} value is not a {self.metavar}: {x}") setattr(args, self.dest, values) return AreArgsValid def clean_work(): log.info('Cleaning work environment') for x in (set(os.listdir('work')) - WORK_CLEAN): x = os.path.join('work', x) log.debug('removing %s', x) if os.path.isdir(x) and not os.path.islink(x): shutil.rmtree(x) else: os.unlink(x) def is_images_conf(o, x): if not all([ o == 'configs', x.endswith('/images.conf'), os.path.islink(x), ]): return False # must also link to file in the same directory x_link = os.path.normpath(os.readlink(x)) return x_link == os.path.basename(x_link) def install_overlay(overlay): log.info("Installing '%s' overlay in work environment", overlay) dest_dir = os.path.join('work', overlay) os.makedirs(dest_dir, exist_ok=True) for src in unique_list(['.'] + args.custom): src_dir = os.path.join(src, overlay) for x in glob(os.path.join(src_dir, '**'), recursive=True): x = x.removeprefix(src_dir + '/') src_x = os.path.join(src_dir, x) dest_x = os.path.join(dest_dir, x) if is_images_conf(overlay, src_x): rel_x = os.readlink(src_x) if os.path.islink(dest_x): log.debug('overriding %s', dest_x) os.unlink(dest_x) log.debug('ln -s %s %s', rel_x, dest_x) os.symlink(rel_x, dest_x) continue if os.path.isdir(src_x): if not os.path.exists(dest_x): log.debug('makedirs %s', dest_x) os.makedirs(dest_x) if os.path.isdir(dest_x): continue if os.path.exists(dest_x): log.critical('Unallowable destination overwirte detected: %s', dest_x) sys.exit(1) log.debug('cp -p %s %s', src_x, dest_x) shutil.copy(src_x, dest_x) def install_overlays(): for overlay in WORK_OVERLAYS: if not os.path.isdir(os.path.join('work', overlay)): install_overlay(overlay) else: log.info("Using existing '%s' in work environment", overlay) def install_qemu_firmware(): firm_dir = 'work/firmware' if os.path.isdir(firm_dir): log.info('Using existing UEFI firmware in work environment') return log.info('Installing UEFI firmware in work environment') os.makedirs(firm_dir) for arch, bin in OVMF_FIRMWARE.items(): v = alpine.apk_version('community', arch, 'ovmf') ovmf_url = f"{alpine.repo_url('community', arch)}/ovmf-{v}.apk" data = urlopen(ovmf_url).read() # Python tarfile library can't extract from APKs tar_cmd = ['tar', '-zxf', '-', '-C', firm_dir, bin] p = Popen(tar_cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) out, err = p.communicate(input=data) if p.returncode: log.critical('Unable to untar %s to get %s', ovmf_url, bin) log.error('%s = %s', p.returncode, ' '.join(tar_cmd)) log.error('STDOUT:\n%s', out.decode('utf8')) log.error('STDERR:\n%s', err.decode('utf8')) sys.exit(1) firm_bin = os.path.join(firm_dir, f"uefi-{arch}.bin") os.symlink(bin, firm_bin) log.info('Padding "%s" to 67108864 bytes', firm_bin) subprocess.run(['truncate', '-s', '67108864', firm_bin]) ### Command Line & Logging parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter) # general options parser.add_argument( '--debug', action='store_true', help='enable debug output') parser.add_argument( '--clean', action='store_true', help='start with a clean work environment') # config options parser.add_argument( '--custom', metavar='DIR', nargs='+', action=are_args_valid(os.path.isdir), default=[], help='overlay custom directory in work environment') # state options parser.add_argument( '--skip', metavar='KEY', nargs='+', action=remove_dupe_args(), default=[], help='skip variants with dimension key(s)') parser.add_argument( '--only', metavar='KEY', nargs='+', action=remove_dupe_args(), default=[], help='only variants with dimension key(s)') parser.add_argument( '--revise', action='store_true', help='remove existing local/imported image, or bump revision and rebuild' 'if published') parser.add_argument( '--use-broker', action='store_true', help='use the identity broker to get credentials') # packer options parser.add_argument( '--no-color', action='store_true', help='turn off Packer color output') parser.add_argument( '--parallel', metavar='N', type=int, default=1, help='build N images in parallel') parser.add_argument( '--vars', metavar='FILE', nargs='+', action=are_args_valid(os.path.isfile), default=[], help='supply Packer with -vars-file(s)') # positional argument parser.add_argument( 'step', choices=STEPS, help='build up to and including this step') args = parser.parse_args() log = logging.getLogger('build') log.setLevel(logging.DEBUG if args.debug else logging.INFO) console = logging.StreamHandler() logfmt = logging.Formatter(LOGFORMAT, datefmt='%FT%TZ') logfmt.converter = time.gmtime console.setFormatter(logfmt) log.addHandler(console) log.debug(args) # set up credential provider, if we're going to use it if args.use_broker: clouds.set_credential_provider(debug=args.debug) ### Setup Configs latest = alpine.version_info() log.info('Latest Alpine version %s and release %s', latest['version'], latest['release']) if args.clean: clean_work() # install overlay(s) if missing install_overlays() image_configs = ImageConfigManager( conf_path='work/configs/images.conf', yaml_path='work/images.yaml', log='build', alpine=alpine, ) log.info('Configuration Complete') if args.step == 'configs': sys.exit(0) ### What needs doing? if not image_configs.refresh_state( step=args.step, only=args.only, skip=args.skip, revise=args.revise): log.info('No pending actions to take at this time.') sys.exit(0) if args.step == 'state': sys.exit(0) # install firmware if missing install_qemu_firmware() ### Build/Import/Publish with Packer env = os.environ | { 'TZ': 'UTC', 'PACKER_CACHE_DIR': 'work/packer_cache' } packer_cmd = [ 'packer', 'build', '-timestamp-ui', '-parallel-builds', str(args.parallel) ] if args.no_color: packer_cmd.append('-color=false') if args.use_broker: packer_cmd += ['-var', 'USE_BROKER=1'] if args.debug: # do not add '-debug', it will pause between steps packer_cmd += ['-var', 'DEBUG=1'] for var_file in args.vars: packer_cmd.append(f"-var-file={var_file}") packer_cmd += ['.'] log.info('Executing Packer...') log.debug(packer_cmd) out = io.StringIO() p = Popen(packer_cmd, stdout=PIPE, encoding='utf8', env=env) while p.poll() is None: text = p.stdout.readline() out.write(text) print(text, end="") if p.returncode != 0: log.critical('Packer Failure') sys.exit(p.returncode) log.info('Packer Completed') # update final state in work/images.yaml image_configs.refresh_state( step='final', only=args.only, skip=args.skip ) log.info('Build Finished')