import json
import logging
from dataclasses import dataclass
import boto3
from botocore.exceptions import ClientError
from daggerml import Dml
from dml_util.runners.base import RunnerBase
logger = logging.getLogger(__name__)
[docs]
@dataclass
class CfnRunner(RunnerBase):
"""Runner for CloudFormation templates."""
[docs]
@classmethod
def funkify(cls, **data):
return data
[docs]
def fmt(self, stack_id, status, raw_status):
return f"{stack_id} : {status} ({raw_status})"
[docs]
def describe_stack(self, client, name, StackId):
try:
stack = client.describe_stacks(StackName=name)["Stacks"][0]
except ClientError as e:
if "does not exist" in str(e):
return None, None
raise
raw_status = stack["StackStatus"]
state = {"StackId": stack["StackId"], "name": name}
if StackId is not None and state["StackId"] != StackId:
raise RuntimeError(f"stack ID changed from {StackId} to {state['StackId']}!")
if raw_status in ["CREATE_COMPLETE", "UPDATE_COMPLETE"]:
status = "success"
state["outputs"] = {o["OutputKey"]: o["OutputValue"] for o in stack.get("Outputs", [])}
elif raw_status in [
"ROLLBACK_COMPLETE",
"ROLLBACK_FAILED",
"CREATE_FAILED",
"DELETE_FAILED",
]:
events = client.describe_stack_events(StackName=name)["StackEvents"]
status = "failed"
failure_events = [e for e in events if "ResourceStatusReason" in e]
state["failure_reasons"] = [e["ResourceStatusReason"] for e in failure_events]
if StackId is not None: # create failed
msg = "Stack failed:\n\n" + json.dumps(state, default=str, indent=2)
raise RuntimeError(msg)
elif StackId is None:
raise RuntimeError("Cfn cannot create new stack while stack is currently being created")
else:
status = "creating"
return state, self.fmt(state["StackId"], status, raw_status)
[docs]
def submit(self, client):
assert Dml is not None, "dml is not installed..."
with Dml.temporary() as dml:
with dml.new(data=self.input.dump) as dag:
name, js, params = dag.argv[1:4].value()
old_state, msg = self.describe_stack(client, name, None)
fn = client.create_stack if old_state is None else client.update_stack
try:
resp = fn(
StackName=name,
TemplateBody=json.dumps(js),
Parameters=[{"ParameterKey": k, "ParameterValue": v} for k, v in params.items()],
Capabilities=["CAPABILITY_IAM", "CAPABILITY_NAMED_IAM"],
)
except ClientError as e:
if not e.response["Error"]["Message"].endswith("No updates are to be performed."):
raise
resp = old_state
state = {"name": name, "StackId": resp["StackId"]}
msg = self.fmt(state["StackId"], "creating", None)
return state, msg
[docs]
def update(self, state):
client = boto3.client("cloudformation")
result = {}
if state == {}:
state, msg = self.submit(client)
else:
state, msg = self.describe_stack(client, **state)
if "outputs" in state:
def _handler(dump):
nonlocal result
result["dump"] = dump
try:
with Dml.temporary() as dml:
with dml.new(data=self.input.dump, message_handler=_handler) as dag:
for k, v in state["outputs"].items():
dag[k] = v
dag.stack_id = state["StackId"]
dag.stack_name = state["name"]
dag.outputs = state["outputs"]
dag.result = dag.outputs
except KeyboardInterrupt:
raise
except Exception:
pass
state.clear()
return state, msg, result.get("dump")