Coverage for src/container_collection/fargate/terminate_fargate_task.py: 100%

8 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2024-09-25 18:23 +0000

1from time import sleep 

2 

3import boto3 

4 

5TERMINATION_REASON = "Termination requested by workflow." 

6"""Reason sent for terminating jobs from a workflow.""" 

7 

8 

9def terminate_fargate_task(cluster: str, task_arn: str) -> None: 

10 """ 

11 Terminate task on AWS Fargate. 

12 

13 Parameters 

14 ---------- 

15 cluster 

16 ECS cluster name. 

17 task_arn 

18 Task ARN. 

19 """ 

20 

21 client = boto3.client("ecs") 

22 client.stop_task(cluster=cluster, task=task_arn, reason=TERMINATION_REASON) 

23 sleep(60)