Coverage for src/container_collection/fargate/register_fargate_task.py: 100%
13 statements
« prev ^ index » next coverage.py v7.1.0, created at 2024-09-25 18:23 +0000
« prev ^ index » next coverage.py v7.1.0, created at 2024-09-25 18:23 +0000
1import boto3
2from deepdiff import DeepDiff
5def register_fargate_task(task_definition: dict) -> str:
6 """
7 Register task definition to ECS Fargate.
9 If a definition for the given task definition name already exists, and the
10 contents of the definition are not changed, then the method will return the
11 existing task definition ARN rather than creating a new revision.
13 Parameters
14 ----------
15 task_definition
16 Fargate task definition.
18 Returns
19 -------
20 :
21 Task definition ARN.
22 """
24 client = boto3.client("ecs")
25 response = client.list_task_definitions(familyPrefix=task_definition["family"])
27 if len(response["taskDefinitionArns"]) > 0:
28 response = client.describe_task_definition(taskDefinition=task_definition["family"])
29 existing_definition = response["taskDefinition"]
30 diff = DeepDiff(task_definition, existing_definition, ignore_order=True)
32 if "values_changed" not in diff:
33 return existing_definition["taskDefinitionArn"]
35 response = client.register_task_definition(**task_definition)
36 return response["taskDefinition"]["taskDefinitionArn"]