Source code for dml_util.runners.lambda_
"""Lambda Runner."""
import json
import logging
import traceback
from dataclasses import dataclass
from dml_util.aws.dynamodb import DynamoState
from dml_util.aws.s3 import S3Store
from dml_util.runners.base import RunnerBase
logger = logging.getLogger(__name__)
[docs]
@dataclass
class LambdaRunner(RunnerBase):
"""Runner for AWS Lambda functions."""
state_class = DynamoState
def __post_init__(self):
super().__post_init__()
self.s3 = S3Store(bucket=self.config.s3_bucket, prefix=f"{self.prefix}/jobs/{self.input.cache_key}")
logger.info(f"Initialized LambdaRunner. Writing execution data to s3://{self.s3.bucket}/{self.s3.prefix}")
@property
def output_loc(self):
return self.s3._name2uri("output.dump")
[docs]
@classmethod
def handler(cls, event, context):
try:
event["input"] = json.loads(event.pop("dump"))
logger.info(f"Lambda event: {json.dumps(event)}")
response, msg = cls(**event).run()
status = 200 if response else 201
return {"status": status, "response": response, "message": msg}
except Exception as e:
msg = f"Error in lambda: {e}\n\n{traceback.format_exc()}"
return {"status": 400, "response": {}, "message": msg}
[docs]
def gc(self, state):
"""Garbage collect resources used by the runner."""
logger.info("Cleaning up resources for key: %s", self.input.cache_key)
objs = self.s3.ls(recursive=True)
if objs:
logger.info("Found %d objects to clean up in S3: s3://%s/%s/", len(objs), self.s3.bucket, self.s3.prefix)
self.s3.rm(*objs)
logger.info("Cleanup completed for key: %s", self.input.cache_key)