Toolset to render and manage AWS CloudFormation ( https://pypi.org/project/cloudbender )
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

69 lines
1.7 KiB

import os
import sys
import subprocess
import tempfile
import shutil
from functools import wraps
from .exceptions import InvalidHook
import logging
logger = logging.getLogger(__name__)
def execute_hooks(hooks, stack):
for hook in hooks:
tokens = hook.split()
if tokens[0] in dir(sys.modules[__name__]):
logger.info("Executing hook: {}".format(hook))
globals()[tokens[0]](arguments=tokens[1:], stack=stack)
else:
logger.warning("Unknown hook: {}".format(hook))
def exec_hooks(func):
@wraps(func)
def decorated(self, *args, **kwargs):
execute_hooks(self.hooks.get("pre_" + func.__name__, []), self)
response = func(self, *args, **kwargs)
# Only execute post hook for successful actions
if response == "COMPLETE":
execute_hooks(self.hooks.get("post_" + func.__name__, []), self)
return response
return decorated
def pulumi_ws(func):
@wraps(func)
def decorated(self, *args, **kwargs):
# setup temp workspace
self.work_dir = tempfile.mkdtemp(
dir=tempfile.gettempdir(), prefix="cloudbender-"
)
response = func(self, *args, **kwargs)
# Cleanup temp workspace
if os.path.exists(self.work_dir):
shutil.rmtree(self.work_dir)
return response
return decorated
# Various hooks
def cmd(stack, arguments):
"""
Generic command via subprocess
"""
try:
hook = subprocess.run(arguments, stdout=subprocess.PIPE)
logger.info(hook.stdout.decode("utf-8"))
except TypeError:
raise InvalidHook("Invalid argument {}".format(arguments))