Source code for dml_util.runners.cloudformation

import json
import logging
from dataclasses import dataclass

import boto3
from botocore.exceptions import ClientError
from daggerml import Dml

from dml_util.runners.base import RunnerBase

logger = logging.getLogger(__name__)


[docs] @dataclass class CfnRunner(RunnerBase): """Runner for CloudFormation templates."""
[docs] @classmethod def funkify(cls, **data): return data
[docs] def fmt(self, stack_id, status, raw_status): return f"{stack_id} : {status} ({raw_status})"
[docs] def describe_stack(self, client, name, StackId): try: stack = client.describe_stacks(StackName=name)["Stacks"][0] except ClientError as e: if "does not exist" in str(e): return None, None raise raw_status = stack["StackStatus"] state = {"StackId": stack["StackId"], "name": name} if StackId is not None and state["StackId"] != StackId: raise RuntimeError(f"stack ID changed from {StackId} to {state['StackId']}!") if raw_status in ["CREATE_COMPLETE", "UPDATE_COMPLETE"]: status = "success" state["outputs"] = {o["OutputKey"]: o["OutputValue"] for o in stack.get("Outputs", [])} elif raw_status in [ "ROLLBACK_COMPLETE", "ROLLBACK_FAILED", "CREATE_FAILED", "DELETE_FAILED", ]: events = client.describe_stack_events(StackName=name)["StackEvents"] status = "failed" failure_events = [e for e in events if "ResourceStatusReason" in e] state["failure_reasons"] = [e["ResourceStatusReason"] for e in failure_events] if StackId is not None: # create failed msg = "Stack failed:\n\n" + json.dumps(state, default=str, indent=2) raise RuntimeError(msg) elif StackId is None: raise RuntimeError("Cfn cannot create new stack while stack is currently being created") else: status = "creating" return state, self.fmt(state["StackId"], status, raw_status)
[docs] def submit(self, client): assert Dml is not None, "dml is not installed..." with Dml.temporary() as dml: with dml.new(data=self.input.dump) as dag: name, js, params = dag.argv[1:4].value() old_state, msg = self.describe_stack(client, name, None) fn = client.create_stack if old_state is None else client.update_stack try: resp = fn( StackName=name, TemplateBody=json.dumps(js), Parameters=[{"ParameterKey": k, "ParameterValue": v} for k, v in params.items()], Capabilities=["CAPABILITY_IAM", "CAPABILITY_NAMED_IAM"], ) except ClientError as e: if not e.response["Error"]["Message"].endswith("No updates are to be performed."): raise resp = old_state state = {"name": name, "StackId": resp["StackId"]} msg = self.fmt(state["StackId"], "creating", None) return state, msg
[docs] def update(self, state): client = boto3.client("cloudformation") result = {} if state == {}: state, msg = self.submit(client) else: state, msg = self.describe_stack(client, **state) if "outputs" in state: def _handler(dump): nonlocal result result["dump"] = dump try: with Dml.temporary() as dml: with dml.new(data=self.input.dump, message_handler=_handler) as dag: for k, v in state["outputs"].items(): dag[k] = v dag.stack_id = state["StackId"] dag.stack_name = state["name"] dag.outputs = state["outputs"] dag.result = dag.outputs except KeyboardInterrupt: raise except Exception: pass state.clear() return state, msg, result.get("dump")