Source code for leruli.task

from sys import intern
from . import internal
import os
import io
import tarfile
import gzip
import docker
import requests as rq
import uuid


[docs]def task_submit( directory: str, code: str, version: str, command: str, cores: int = 1, memorymb: int = 4000, timeseconds: int = 24 * 60 * 60, ): """Submits a given directory content as job to Leruli Queue/Cloud.""" api_secret = internal.get_api_secret() if api_secret is None: return s3_client = internal.get_s3_client() if s3_client is None: return if os.path.exists(f"{directory}/leruli.job"): raise ValueError("Directory already submitted.") bucket = str(uuid.uuid4()) s3_client.make_bucket(bucket) # in-memory tar file buffer = io.BytesIO() with tarfile.open(fileobj=buffer, mode="w:gz") as tar: tar.add(".", arcname=os.path.basename(".")) runscript = io.BytesIO(("#!/bin/bash\n" + " ".join(command)).encode("ascii")) tarinfo = tarfile.TarInfo(name="run.sh") tarinfo.size = runscript.getbuffer().nbytes tar.addfile(tarinfo=tarinfo, fileobj=runscript) buffer.seek(0) # upload s3_client.put_object(bucket, "run.tgz", buffer, buffer.getbuffer().nbytes) # submit to API codeversion = f"{code}:{version}" payload = { "secret": api_secret, "bucketid": bucket, "name": "default", "codeversion": codeversion, "cores": cores, "memorymb": memorymb, "timelimit": timeseconds, } res = rq.post(f"{internal.BASEURL}/v22_1/task-submit", json=payload) if res.status_code != 200: print("Cannot submit jobs. Please check the input.") return jobid = res.json() # local handle with open(f"{directory}/leruli.job", "w") as fh: fh.write(f"{jobid}\n") with open(f"{directory}/leruli.bucket", "w") as fh: fh.write(f"{bucket}\n") return jobid
[docs]def task_status(jobid: str): """Queries the status of a job at Leruli Queue/Cloud.""" api_secret = internal.get_api_secret() payload = { "secret": api_secret, "jobid": jobid, } status = rq.post(f"{internal.BASEURL}/v22_1/task-status", json=payload).json() if type(status) == list: return ": ".join(status.json()) return status
[docs]def task_get(directory: str, bucket: str): """Downloads the input and output files of a Leruli Queue/Cloud task into a directory.""" s3_client = internal.get_s3_client() for obj in s3_client.list_objects(bucket): object = obj.object_name try: response = s3_client.get_object(bucket, object) content = response.read() finally: response.close() response.release_conn() with open(f"{directory}/{object}", "wb") as fh: fh.write(content)
[docs]def task_cancel(jobid: str): """Cancels a task on Leruli Queue/Cloud.""" api_secret = internal.get_api_secret() payload = { "secret": api_secret, "jobid": jobid, } status = rq.post(f"{internal.BASEURL}/v22_1/task-cancel", json=payload) return status.json()
[docs]def task_publish_code(code: str, version: str): """Uploads a local docker image to use with Leruli Queue/Cloud.""" s3_client = internal.get_s3_client() api_secret = internal.get_api_secret() client = docker.from_env() image = client.images.get(f"{code}:{version}") cache = io.BytesIO() for chunk in image.save(): cache.write(chunk) cache.seek(0) tgz = gzip.compress(cache.read()) s3_client.fput(f"code-{api_secret}", f"{code}-{version}.tgz", tgz, len(tgz))
[docs]def task_prune(bucket: str): """Irreversibly deletes the Leruli Queue/Cloud store of input and output files.""" s3_client = internal.get_s3_client() for obj in s3_client.list_objects(bucket, recursive=True): s3_client.remove_object(bucket, obj.object_name) s3_client.remove_bucket(bucket)