Source code for dml_util.dags.batch
#!/usr/bin/env python3
import json
import os
import zipfile
from pathlib import Path
from tempfile import NamedTemporaryFile, TemporaryDirectory
from dml_util import __version__
from dml_util.aws.s3 import S3Store
_here_ = Path(__file__).parent
LOCAL_TEST = os.getenv("DML_TESTING")
[docs]
def zipit(directory_path, output_zip):
# FIXME: Use reproducible zip
with zipfile.ZipFile(output_zip, "w", zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(directory_path):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, directory_path)
zipf.write(file_path, arcname)
[docs]
def zip_up(s3, filepath):
with TemporaryDirectory() as tmpd:
os.system(f"cp {filepath} {tmpd}")
if LOCAL_TEST:
root = _here_.parent.parent.parent.parent
os.system(f"pip install {root} -t {tmpd}")
else:
os.system(f"pip install 'dml-util=={__version__}' -t {tmpd}")
with NamedTemporaryFile(suffix=".zip") as tmpf:
zipit(tmpd, tmpf.name)
tmpf.flush()
return s3.put(filepath=tmpf.name, suffix=".zip")
[docs]
def load():
s3 = S3Store()
with open(_here_ / "cf.json") as f:
js = json.load(f)
zipfile = zip_up(s3, _here_ / "impl.py")
print(f"zipfile: {zipfile.uri}")
code_data = dict(zip(["S3Bucket", "S3Key"], s3.parse_uri(zipfile.uri)))
js["Resources"]["Fn"]["Properties"]["Code"] = code_data
params = {"Bucket": s3.bucket, "Prefix": "opt/dml/exec/batch"}
return js, params, "LambdaFunctionArn", "dml-util-lambda-adapter"