import os import re import hashlib import yaml import time import pathlib import pprint from datetime import datetime, timedelta from import tzutc from botocore.exceptions import ClientError from .utils import dict_merge, search_refs, ensure_dir, get_s3_url from .connection import BotoConnection from .jinja import JinjaEnv, read_config_file from . import __version__ from .exceptions import ParameterNotFound, ParameterIllegalValue from .hooks import exec_hooks import cfnlint.core import cfnlint.template import cfnlint.graph try: import importlib.resources as pkg_resources except ImportError: import importlib_resources as pkg_resources from . import templates import logging logger = logging.getLogger(__name__) # Ignore any ! during re-loading of CFN templates class SafeLoaderIgnoreUnknown(yaml.SafeLoader): def ignore_unknown(self, node): return node.tag SafeLoaderIgnoreUnknown.add_constructor(None, SafeLoaderIgnoreUnknown.ignore_unknown) class Stack(object): def __init__(self, name, template, path, rel_path, ctx): self.stackname = name self.template = template self.path = pathlib.Path(path) self.rel_path = rel_path self.ctx = ctx self.tags = {} self.parameters = {} self.outputs = {} self.options = {} self.region = 'global' self.profile = '' self.onfailure = 'DELETE' self.notfication_sns = [] = (self.profile, self.region, self.stackname) self.aws_stackid = None self.md5 = None self.mode = 'CloudBender' self.provides = template self.cfn_template = None self.cfn_parameters = [] self.cfn_data = None self.connection_manager = BotoConnection(self.profile, self.region) self.status = None self.store_outputs = False self.dependencies = set() self.hooks = {'post_create': [], 'post_update': [], 'pre_create': [], 'pre_update': []} self.default_lock = None self.multi_delete = True self.template_bucket_url = None def dump_config(self): logger.debug("".format(, pprint.pformat(vars(self)))) def read_config(self, sg_config={}): """ reads stack config """ # First set various attributes based on parent stackgroup config self.tags.update(sg_config.get('tags', {})) self.parameters.update(sg_config.get('parameters', {})) self.options.update(sg_config.get('options', {})) # by default inherit parent group settings for p in ['region', 'profile', 'notfication_sns', 'template_bucket_url']: if p in sg_config: setattr(self, p, sg_config[p]) # now override stack specific settings _config = read_config_file(self.path, sg_config.get('variables', {})) for p in ["region", "stackname", "template", "default_lock", "multi_delete", "provides", "onfailure", "notification_sns", "template_bucket_url"]: if p in _config: setattr(self, p, _config[p]) for p in ["parameters", "tags"]: if p in _config: setattr(self, p, dict_merge(getattr(self, p), _config[p])) # Inject Artifact if not explicitly set if 'Artifact' not in self.tags: self.tags['Artifact'] = self.provides if 'options' in _config: self.options = dict_merge(self.options, _config['options']) if 'Mode' in self.options: self.mode = self.options['Mode'] if 'StoreOutputs' in self.options and self.options['StoreOutputs']: self.store_outputs = True if 'dependencies' in _config: for dep in _config['dependencies']: self.dependencies.add(dep) # Some sanity checks if self.onfailure not in ["DO_NOTHING", "ROLLBACK", "DELETE"]: raise ParameterIllegalValue("onfailure must be one of DO_NOTHING | ROLLBACK | DELETE") logger.debug("Stack {} added.".format( def render(self): """Renders the cfn jinja template for this stack""" template_metadata = { 'Template.Name': self.template, 'Template.Hash': "__HASH__", 'CloudBender.Version': __version__ } _config = {'mode': self.mode, 'options': self.options, 'metadata': template_metadata} jenv = JinjaEnv(self.ctx['artifact_paths']) jenv.globals['_config'] = _config template = jenv.get_template('{0}{1}'.format(self.template, '.yaml.jinja'))'Rendering %s', template.filename) try: self.cfn_template = template.render(_config) self.cfn_data = yaml.load(self.cfn_template, Loader=SafeLoaderIgnoreUnknown) except Exception as e: # In case we rendered invalid yaml this helps to debug if self.cfn_template: _output = "" for i, line in enumerate(self.cfn_template.splitlines(), start=1): _output = _output + '{}: {}\n'.format(i, line) logger.error(_output) raise e if not'CloudBender::', self.cfn_template) and not'Iterate:', self.cfn_template):"CloudBender not required -> removing Transform and Conglomerate parameter") self.cfn_template = self.cfn_template.replace('Transform: [CloudBender]', '') _res = """ Conglomerate: Type: String Description: Project / Namespace this stack is part of """ self.cfn_template = re.sub(_res, '', self.cfn_template) include = [] search_refs(self.cfn_data, include, self.mode) if self.mode == "Piped" and len(include): _res = "" for attr in include: _res = _res + """ {0}: Type: String Description: Parameter to provide remote stack attribute {0}""".format(attr) self.cfn_template = re.sub(r'Parameters:', r'Parameters:' + _res + '\n', self.cfn_template)"Piped mode: Added parameters for remote stack references") # Re-read updated template self.cfn_data = yaml.load(self.cfn_template, Loader=SafeLoaderIgnoreUnknown) # Check for empty top level Parameters, Outputs and Conditions and remove for key in ['Parameters', 'Outputs', 'Conditions']: if key in self.cfn_data and not self.cfn_data[key]: del self.cfn_data[key] self.cfn_template = self.cfn_template.replace('\n' + key + ":", '') # Remove and condense multiple empty lines self.cfn_template = re.sub(r'\n\s*\n', '\n\n', self.cfn_template) self.cfn_template = re.sub(r'^\s*', '', self.cfn_template) self.cfn_template = re.sub(r'\s*$', '', self.cfn_template) # set md5 last self.md5 = hashlib.md5(self.cfn_template.encode('utf-8')).hexdigest() self.cfn_template = self.cfn_template.replace('__HASH__', self.md5) # Update internal data structures self._parse_metadata() def _parse_metadata(self): # Extract dependencies try: for dep in self.cfn_data['Metadata']['CloudBender']['Dependencies']: self.dependencies.add(dep) except KeyError: pass # Add CloudBender dependencies include = [] search_refs(self.cfn_data, include, self.mode) for ref in include: if self.mode != "Piped": self.dependencies.add(ref.split('.')[0]) else: self.dependencies.add(ref.split('DoT')[0]) # Extract hooks try: for hook, func in self.cfn_data['Metadata']['Hooks'].items(): if hook in ['post_update', 'post_create', 'pre_create', 'pre_update']: if isinstance(func, list): self.hooks[hook].extend(func) else: self.hooks[hook].append(func) except KeyError: pass def write_template_file(self): if self.cfn_template: yaml_file = os.path.join(self.ctx['template_path'], self.rel_path, self.stackname + ".yaml") ensure_dir(os.path.join(self.ctx['template_path'], self.rel_path)) with open(yaml_file, 'w') as yaml_contents: yaml_contents.write(self.cfn_template)'Wrote %s to %s', self.template, yaml_file) # upload template to s3 if set if self.template_bucket_url: try: (bucket, path) = get_s3_url(self.template_bucket_url, self.rel_path, self.stackname + ".yaml") 's3', 'put_object', {'Bucket': bucket, 'Key': path, 'Body': self.cfn_template, 'ServerSideEncryption': 'AES256'}, profile=self.profile, region=self.region)"Uploaded template to s3://{}/{}".format(bucket, path)) except ClientError as e: logger.error("Error trying to upload template so S3: {}, {}".format(self.template_bucket_url, e)) else: if len(self.cfn_template) > 51200: logger.warning("template_bucket_url not set and rendered template exceeds maximum allowed size of 51200, actual size: {} !".format(len(self.cfn_template))) else: logger.error('No cfn template rendered yet for stack {}.'.format(self.stackname)) def delete_template_file(self): yaml_file = os.path.join(self.ctx['template_path'], self.rel_path, self.stackname + ".yaml") try: os.remove(yaml_file) logger.debug('Deleted cfn template %s.', yaml_file) except OSError: pass if self.template_bucket_url: try: (bucket, path) = get_s3_url(self.template_bucket_url, self.rel_path, self.stackname + ".yaml") 's3', 'delete_object', {'Bucket': bucket, 'Key': path}, profile=self.profile, region=self.region)"Deleted template from s3://{}/{}".format(bucket, path)) except ClientError as e: logger.error("Error trying to delete template from S3: {}, {}".format(self.template_bucket_url, e)) def read_template_file(self): """ Reads rendered yaml template from disk or s3 and extracts metadata """ if not self.cfn_template: if self.template_bucket_url: try: (bucket, path) = get_s3_url(self.template_bucket_url, self.rel_path, self.stackname + ".yaml") template = 's3', 'get_object', {'Bucket': bucket, 'Key': path}, profile=self.profile, region=self.region) logger.debug("Got template from s3://{}/{}".format(bucket, path)) self.cfn_template = template['Body'].read().decode('utf-8') # Overwrite local copy yaml_file = os.path.join(self.ctx['template_path'], self.rel_path, self.stackname + ".yaml") ensure_dir(os.path.join(self.ctx['template_path'], self.rel_path)) with open(yaml_file, 'w') as yaml_contents: yaml_contents.write(self.cfn_template) except ClientError as e: logger.error("Could not find template file on S3: {}/{}, {}".format(bucket, path, e)) else: yaml_file = os.path.join(self.ctx['template_path'], self.rel_path, self.stackname + ".yaml") try: with open(yaml_file, 'r') as yaml_contents: self.cfn_template = logger.debug('Read cfn template %s.', yaml_file) except FileNotFoundError as e: logger.warn("Could not find template file: {}".format(yaml_file)) raise e self.cfn_data = yaml.load(self.cfn_template, Loader=SafeLoaderIgnoreUnknown) self._parse_metadata() else: logger.debug('Using cached cfn template %s.', self.stackname) def validate(self): """Validates the rendered template via cfn-lint""" self.read_template_file() try: ignore_checks = self.cfn_data['Metadata']['cfnlint_ignore'] except KeyError: ignore_checks = [] # Ignore some more checks around injected parameters as we generate these if self.mode == "Piped": ignore_checks = ignore_checks + ['W2505', 'W2509', 'W2507'] # Ignore checks regarding overloaded properties if self.mode == "CloudBender": ignore_checks = ignore_checks + ['E3035', 'E3002', 'E3012', 'W2001', 'E3001', 'E0002', 'E1012'] filename = os.path.join(self.ctx['template_path'], self.rel_path, self.stackname + ".yaml")'Validating {0}'.format(filename)) lint_args = ['--template', filename] if ignore_checks: lint_args.append('--ignore-checks') lint_args = lint_args + ignore_checks'Ignoring checks: {}'.format(','.join(ignore_checks))) (args, filenames, formatter) = cfnlint.core.get_args_filenames(lint_args) (template, rules, matches) = cfnlint.core.get_template_rules(filename, args) if not matches: matches.extend(cfnlint.core.run_checks(filename, template, rules, [self.region])) if len(matches): for match in matches: logger.error(formatter._format(match)) else:"Passed.") def get_outputs(self, include='.*', values=False): """ gets outputs of the stack """ self.read_template_file() try: stacks = "cloudformation", "describe_stacks", {'StackName': self.stackname}, profile=self.profile, region=self.region)['Stacks'] try: for output in stacks[0]['Outputs']: self.outputs[output['OutputKey']] = output['OutputValue'] logger.debug("Stack outputs for {} in {}: {}".format(self.stackname, self.region, self.outputs)) except KeyError: pass except ClientError: logger.warn("Could not get outputs of {}".format(self.stackname)) pass if self.outputs:'{} {} Outputs:\n{}'.format(self.region, self.stackname, pprint.pformat(self.outputs, indent=2))) if self.store_outputs: try: filename = self.cfn_data['Metadata']['CustomOutputs']['Name'] my_template = self.cfn_data['Metadata']['CustomOutputs']['Template'] except (TypeError, KeyError): filename = self.stackname + ".yaml" my_template = pkg_resources.read_text(templates, 'outputs.yaml') output_file = os.path.join(self.ctx['outputs_path'], self.rel_path, filename) ensure_dir(os.path.join(self.ctx['outputs_path'], self.rel_path)) jenv = JinjaEnv() template = jenv.from_string(my_template) data = {'stackname': "/".join([self.rel_path, self.stackname]), 'timestamp': datetime.strftime(, "%d/%m/%y %H:%M"), 'outputs': self.outputs, 'parameters': self.parameters} with open(output_file, 'w') as output_contents: output_contents.write(template.render(**data))'Wrote outputs for %s to %s', self.stackname, output_file) def create_docs(self, template=False, graph=False): """ Read rendered template, parse documentation fragments, eg. parameter description and create a mardown doc file for the stack same idea as eg. helm-docs for values.yaml """ try: self.read_template_file() except FileNotFoundError: return if not template: doc_template = pkg_resources.read_text(templates, '') jenv = JinjaEnv() template = jenv.from_string(doc_template) data = {} else: doc_template = template data['name'] = self.stackname data['description'] = self.cfn_data['Description'] data['dependencies'] = self.dependencies if 'Parameters' in self.cfn_data: data['parameters'] = self.cfn_data['Parameters'] set_parameters = self.resolve_parameters() for p in set_parameters: data['parameters'][p]['value'] = set_parameters[p] if 'Outputs' in self.cfn_data: data['outputs'] = self.cfn_data['Outputs'] # Check for existing outputs yaml, if found add current value column and set header to timestamp from outputs file output_file = os.path.join(self.ctx['outputs_path'], self.rel_path, self.stackname + ".yaml") try: with open(output_file, 'r') as yaml_contents: outputs = yaml.safe_load( for p in outputs['Outputs']: data['outputs'][p]['last_value'] = outputs['Outputs'][p] data['timestamp'] = outputs['TimeStamp'] except (FileNotFoundError, KeyError, TypeError): pass doc_file = os.path.join(self.ctx['docs_path'], self.rel_path, self.stackname + ".md") ensure_dir(os.path.join(self.ctx['docs_path'], self.rel_path)) with open(doc_file, 'w') as doc_contents: doc_contents.write(template.render(**data))'Wrote documentation for %s to %s', self.stackname, doc_file) # Write Graph in Dot format if graph: filename = os.path.join(self.ctx['template_path'], self.rel_path, self.stackname + ".yaml") lint_args = ['--template', filename] (args, filenames, formatter) = cfnlint.core.get_args_filenames(lint_args) (template, rules, matches) = cfnlint.core.get_template_rules(filename, args) template_obj = cfnlint.template.Template(filename, template, [self.region]) path = os.path.join(self.ctx['docs_path'], self.rel_path, self.stackname + ".dot") g = cfnlint.graph.Graph(template_obj) try: g.to_dot(path)'DOT representation of the graph written to %s', path) except ImportError: logger.error( 'Could not write the graph in DOT format. Please install either `pygraphviz` or `pydot` modules.') def resolve_parameters(self): """ Renders parameters for the stack based on the source template and the environment configuration """ self.read_template_file() # if we run in Piped Mode, inspect all outputs of the running Conglomerate members if self.mode == "Piped": stack_outputs = {} try: stack_outputs = self._inspect_stacks(self.tags['Conglomerate']) except KeyError: pass _found = {} if 'Parameters' in self.cfn_data: _errors = [] self.cfn_parameters = [] for p in self.cfn_data['Parameters']: # In Piped mode we try to resolve all Paramters first via stack_outputs if self.mode == "Piped": try: # first reverse the rename due to AWS alphanumeric restriction for parameter names _p = p.replace('DoT', '.') value = str(stack_outputs[_p]) self.cfn_parameters.append({'ParameterKey': p, 'ParameterValue': value})'Got {} = {} from running stack'.format(p, value)) continue except KeyError: pass # Key name in config tree is: stacks..parameters. if p in self.parameters: value = str(self.parameters[p]) self.cfn_parameters.append({'ParameterKey': p, 'ParameterValue': value}) # Hide NoEcho parameters in shell output if 'NoEcho' in self.cfn_data['Parameters'][p] and self.cfn_data['Parameters'][p]['NoEcho']: value = '****' _found[p] = value else: # If we have a Default defined in the CFN skip, as AWS will use it if 'Default' not in self.cfn_data['Parameters'][p]: _errors.append(p) if _errors: raise ParameterNotFound('Cannot find value for parameters: {0}'.format(_errors)) # Warning of excessive parameters, might be useful to spot typos early _warnings = [] for p in self.parameters.keys(): if p not in self.cfn_data['Parameters']: _warnings.append(p)'{} {} set parameters:\n{}'.format(self.region, self.stackname, pprint.pformat(_found, indent=2))) if _warnings: logger.warning('Ignored additional parameters: {}.'.format(_warnings)) # Return dict of explicitly set parameters return _found @exec_hooks def create(self): """Creates a stack """ # Prepare parameters self.resolve_parameters()'Creating {0} {1}'.format(self.region, self.stackname)) kwargs = {'StackName': self.stackname, 'Parameters': self.cfn_parameters, 'OnFailure': self.onfailure, 'NotificationARNs': self.notfication_sns, 'Tags': [{"Key": str(k), "Value": str(v)} for k, v in self.tags.items()], 'Capabilities': ['CAPABILITY_IAM', 'CAPABILITY_NAMED_IAM', 'CAPABILITY_AUTO_EXPAND']} kwargs = self._add_template_arg(kwargs) self.aws_stackid = 'cloudformation', 'create_stack', kwargs, profile=self.profile, region=self.region) status = self._wait_for_completion() self.get_outputs() return status @exec_hooks def update(self): """Updates an existing stack """ # Prepare parameters self.resolve_parameters()'Updating {0} {1}'.format(self.region, self.stackname)) try: kwargs = {'StackName': self.stackname, 'Parameters': self.cfn_parameters, 'NotificationARNs': self.notfication_sns, 'Tags': [{"Key": str(k), "Value": str(v)} for k, v in self.tags.items()], 'Capabilities': ['CAPABILITY_IAM', 'CAPABILITY_NAMED_IAM', 'CAPABILITY_AUTO_EXPAND']} kwargs = self._add_template_arg(kwargs) self.aws_stackid = 'cloudformation', 'update_stack', kwargs, profile=self.profile, region=self.region) except ClientError as e: if 'No updates are to be performed' in e.response['Error']['Message']:'No updates for {0}'.format(self.stackname)) return "COMPLETE" else: raise e status = self._wait_for_completion() self.get_outputs() return status @exec_hooks def delete(self): """Deletes a stack """'Deleting {0} {1}'.format(self.region, self.stackname)) self.aws_stackid = 'cloudformation', 'delete_stack', {'StackName': self.stackname}, profile=self.profile, region=self.region) status = self._wait_for_completion() return status def create_change_set(self, change_set_name): """ Creates a Change Set with the name ``change_set_name``. """ # Prepare parameters self.resolve_parameters() self.read_template_file()'Creating change set {0} for stack {1}'.format(change_set_name, self.stackname)) kwargs = {'StackName': self.stackname, 'ChangeSetName': change_set_name, 'Parameters': self.cfn_parameters, 'Tags': [{"Key": str(k), "Value": str(v)} for k, v in self.tags.items()], 'Capabilities': ['CAPABILITY_IAM', 'CAPABILITY_NAMED_IAM']} kwargs = self._add_template_arg(kwargs) 'cloudformation', 'create_change_set', kwargs, profile=self.profile, region=self.region) return self._wait_for_completion() def get_status(self): """ Returns the stack's status. :returns: The stack's status. """ try: status = "cloudformation", "describe_stacks", {"StackName": self.stackname}, profile=self.profile, region=self.region)["Stacks"][0]["StackStatus"] except ClientError as e: if e.response["Error"]["Message"].endswith("does not exist"): return None else: raise e return status def describe_events(self): """ Returns a dictionary contianing the stack events. :returns: The CloudFormation events for a stack. """ try: status = "cloudformation", "describe_stack_events", {"StackName": self.stackname}, profile=self.profile, region=self.region) except ClientError as e: if e.response["Error"]["Message"].endswith("does not exist"): return None else: raise e return status def _wait_for_completion(self, timeout=0): """ Waits for a stack operation to finish. Prints CloudFormation events while it waits. :param timeout: Timeout before returning :returns: The final stack status. """ def timed_out(elapsed): return elapsed >= timeout if timeout else False status = "IN_PROGRESS" self.most_recent_event_datetime = ( - timedelta(seconds=3) ) elapsed = 0 while status == "IN_PROGRESS" and not timed_out(elapsed): status = self._get_simplified_status(self.get_status()) if not status: return None self._log_new_events() time.sleep(4) elapsed += 4 return status @staticmethod def _get_simplified_status(status): """ Returns the simplified Stack Status. """ if status: if status.endswith("ROLLBACK_COMPLETE"): return "FAILED" elif status.endswith("_COMPLETE"): return "COMPLETE" elif status.endswith("_IN_PROGRESS"): return "IN_PROGRESS" elif status.endswith("_FAILED"): return "FAILED" else: return 'Unknown' def _log_new_events(self): """ Log the latest stack events while the stack is being built. """ events = self.describe_events() if events: events = events["StackEvents"] events.reverse() new_events = [ event for event in events if event["Timestamp"] > self.most_recent_event_datetime ] for event in new_events:" ".join([ self.region, self.stackname, event["LogicalResourceId"], event["ResourceType"], event["ResourceStatus"], event.get("ResourceStatusReason", "") ])) self.most_recent_event_datetime = event["Timestamp"] # stackoutput inspection def _inspect_stacks(self, conglomerate): # Get all stacks of the conglomertate running_stacks = "cloudformation", "describe_stacks", profile=self.profile, region=self.region) stacks = [] for stack in running_stacks['Stacks']: for tag in stack['Tags']: if tag['Key'] == 'Conglomerate' and tag['Value'] == conglomerate: stacks.append(stack) break # Gather stack outputs, use Tag['Artifact'] as name space: Artifact.OutputName stack_outputs = {} for stack in stacks: # If stack has an Artifact Tag put resources into the namespace Artifact.Resource artifact = None for tag in stack['Tags']: if tag['Key'] == 'Artifact': artifact = tag['Value'] if artifact: key_prefix = "{}.".format(artifact) else: key_prefix = "" try: for output in stack['Outputs']: # Gather all outputs of the stack into one dimensional key=value structure stack_outputs[key_prefix + output['OutputKey']] = output['OutputValue'] except KeyError: pass # Add outputs from stacks into the data for jinja under StackOutput return stack_outputs def _add_template_arg(self, kwargs): if self.template_bucket_url: # name # so we need the region, AWS as usual (bucket, path) = get_s3_url(self.template_bucket_url, self.rel_path, self.stackname + ".yaml") bucket_region ='s3', 'get_bucket_location', {'Bucket': bucket}, profile=self.profile, region=self.region)['LocationConstraint'] kwargs['TemplateURL'] = 'https://{}.s3.{}{}'.format(bucket, bucket_region, path) else: kwargs['TemplateBody'] = self.cfn_template return kwargs