Coverage for src/container_collection/batch/check_batch_job.py: 100%
27 statements
« prev ^ index » next coverage.py v7.1.0, created at 2024-09-25 18:23 +0000
« prev ^ index » next coverage.py v7.1.0, created at 2024-09-25 18:23 +0000
1from __future__ import annotations
3from time import sleep
5import boto3
6from prefect.context import TaskRunContext
7from prefect.states import Failed, State
9RETRIES_EXCEEDED_EXIT_CODE = 80
10"""Exit code used when task run retries exceed the maximum retries."""
13def check_batch_job(job_arn: str, max_retries: int) -> int | State | bool:
14 """
15 Check for exit code of an AWS Batch job.
17 If this task is running within a Prefect flow, it will use the task run
18 context to get the current run count. While the run count is below the
19 maximum number of retries, the task will continue to attempt to get the exit
20 code, and can be called with a retry delay to periodically check the status
21 of jobs.
23 If this task is not running within a Prefect flow, the ``max_retries``
24 parameters is ignored. Jobs that are still running will throw an exception.
26 Parameters
27 ----------
28 job_arn
29 Job ARN.
30 max_retries
31 Maximum number of retries.
33 Returns
34 -------
35 :
36 Exit code if the job is complete, otherwise throws an exception.
37 """
39 context = TaskRunContext.get()
41 if context is not None and context.task_run.run_count > max_retries:
42 return RETRIES_EXCEEDED_EXIT_CODE
44 client = boto3.client("batch")
45 response = client.describe_jobs(jobs=[job_arn])["jobs"]
47 # Job responses are not immediately available. Wait until available.
48 while len(response) != 1:
49 sleep(10)
50 response = client.describe_jobs(jobs=[job_arn])["jobs"]
52 status = response[0]["status"]
54 # Wait until job is running or completed.
55 while status not in ("RUNNING", "SUCCEEDED", "FAILED"):
56 sleep(10)
57 response = client.describe_jobs(jobs=[job_arn])["jobs"]
58 status = response[0]["status"]
60 # For jobs that are running, throw the appropriate exception.
61 if context is not None and status == "RUNNING":
62 return Failed()
63 if status == "RUNNING":
64 message = "Job is in RUNNING state and does not have exit code."
65 raise RuntimeError(message)
67 return response[0]["attempts"][0]["container"]["exitCode"]