Coverage for src/container_collection/batch/terminate_batch_job.py: 100%

8 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2024-09-25 18:23 +0000

1from time import sleep 

2 

3import boto3 

4 

5TERMINATION_REASON = "Termination requested by workflow." 

6"""Reason sent for terminating jobs from a workflow.""" 

7 

8 

9def terminate_batch_job(job_arn: str) -> None: 

10 """ 

11 Terminate job on AWS Batch. 

12 

13 Task will sleep for 1 minute after sending the termination request. 

14 

15 Parameters 

16 ---------- 

17 job_arn 

18 Job ARN. 

19 """ 

20 

21 client = boto3.client("batch") 

22 client.terminate_job(jobId=job_arn, reason=TERMINATION_REASON) 

23 sleep(60)