Toolset to render and manage AWS CloudFormation ( https://pypi.org/project/cloudbender )
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
69 lines
1.7 KiB
69 lines
1.7 KiB
import os |
|
import sys |
|
import subprocess |
|
import tempfile |
|
import shutil |
|
from functools import wraps |
|
|
|
from .exceptions import InvalidHook |
|
|
|
import logging |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
def execute_hooks(hooks, stack): |
|
for hook in hooks: |
|
tokens = hook.split() |
|
if tokens[0] in dir(sys.modules[__name__]): |
|
logger.info("Executing hook: {}".format(hook)) |
|
globals()[tokens[0]](arguments=tokens[1:], stack=stack) |
|
else: |
|
logger.warning("Unknown hook: {}".format(hook)) |
|
|
|
|
|
def exec_hooks(func): |
|
@wraps(func) |
|
def decorated(self, *args, **kwargs): |
|
execute_hooks(self.hooks.get("pre_" + func.__name__, []), self) |
|
response = func(self, *args, **kwargs) |
|
|
|
# Only execute post hook for successful actions |
|
if response == "COMPLETE": |
|
execute_hooks(self.hooks.get("post_" + func.__name__, []), self) |
|
|
|
return response |
|
|
|
return decorated |
|
|
|
|
|
def pulumi_ws(func): |
|
@wraps(func) |
|
def decorated(self, *args, **kwargs): |
|
# setup temp workspace |
|
self.work_dir = tempfile.mkdtemp( |
|
dir=tempfile.gettempdir(), prefix="cloudbender-" |
|
) |
|
|
|
response = func(self, *args, **kwargs) |
|
|
|
# Cleanup temp workspace |
|
if os.path.exists(self.work_dir): |
|
shutil.rmtree(self.work_dir) |
|
|
|
return response |
|
|
|
return decorated |
|
|
|
|
|
# Various hooks |
|
def cmd(stack, arguments): |
|
""" |
|
Generic command via subprocess |
|
""" |
|
|
|
try: |
|
hook = subprocess.run(arguments, stdout=subprocess.PIPE) |
|
logger.info(hook.stdout.decode("utf-8")) |
|
except TypeError: |
|
raise InvalidHook("Invalid argument {}".format(arguments))
|
|
|