Coverage for src/container_collection/fargate/register_fargate_task.py: 100%

13 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2024-09-25 18:23 +0000

1import boto3 

2from deepdiff import DeepDiff 

3 

4 

5def register_fargate_task(task_definition: dict) -> str: 

6 """ 

7 Register task definition to ECS Fargate. 

8 

9 If a definition for the given task definition name already exists, and the 

10 contents of the definition are not changed, then the method will return the 

11 existing task definition ARN rather than creating a new revision. 

12 

13 Parameters 

14 ---------- 

15 task_definition 

16 Fargate task definition. 

17 

18 Returns 

19 ------- 

20 : 

21 Task definition ARN. 

22 """ 

23 

24 client = boto3.client("ecs") 

25 response = client.list_task_definitions(familyPrefix=task_definition["family"]) 

26 

27 if len(response["taskDefinitionArns"]) > 0: 

28 response = client.describe_task_definition(taskDefinition=task_definition["family"]) 

29 existing_definition = response["taskDefinition"] 

30 diff = DeepDiff(task_definition, existing_definition, ignore_order=True) 

31 

32 if "values_changed" not in diff: 

33 return existing_definition["taskDefinitionArn"] 

34 

35 response = client.register_task_definition(**task_definition) 

36 return response["taskDefinition"]["taskDefinitionArn"]