Coverage for src/container_collection/batch/register_batch_job.py: 100%
12 statements
« prev ^ index » next coverage.py v7.1.0, created at 2024-09-25 18:23 +0000
« prev ^ index » next coverage.py v7.1.0, created at 2024-09-25 18:23 +0000
1import boto3
2from deepdiff import DeepDiff
5def register_batch_job(job_definition: dict) -> str:
6 """
7 Register job definition to AWS Batch.
9 If a definition for the given job definition name already exists, and the
10 contents of the definition are not changed, then the method will return the
11 existing job definition ARN rather than creating a new revision.
13 Parameters
14 ----------
15 job_definition
16 Batch job definition.
18 Returns
19 -------
20 :
21 Job definition ARN.
22 """
24 client = boto3.client("batch")
25 response = client.describe_job_definitions(
26 jobDefinitionName=job_definition["jobDefinitionName"],
27 )
29 if len(response["jobDefinitions"]) > 0:
30 existing_definition = sorted(response["jobDefinitions"], key=lambda d: d["revision"]).pop()
31 diff = DeepDiff(job_definition, existing_definition, ignore_order=True)
33 if "values_changed" not in diff:
34 return existing_definition["jobDefinitionArn"]
36 response = client.register_job_definition(**job_definition)
37 return response["jobDefinitionArn"]