Coverage for src/container_collection/fargate/check_fargate_task.py: 100%

27 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2024-09-25 18:23 +0000

1from __future__ import annotations 

2 

3from time import sleep 

4 

5import boto3 

6from prefect.context import TaskRunContext 

7from prefect.states import Failed, State 

8 

9RETRIES_EXCEEDED_EXIT_CODE = 80 

10"""Exit code used when task run retries exceed the maximum retries.""" 

11 

12 

13def check_fargate_task(cluster: str, task_arn: str, max_retries: int) -> int | State: 

14 """ 

15 Check for exit code of an AWS Fargate task. 

16 

17 If this task is running within a Prefect flow, it will use the task run 

18 context to get the current run count. While the run count is below the 

19 maximum number of retries, the task will continue to attempt to get the exit 

20 code, and can be called with a retry delay to periodically check the status 

21 of jobs. 

22 

23 If this task is not running within a Prefect flow, the ``max_retries`` 

24 parameters is ignored. Tasks that are still running will throw an exception. 

25 

26 Parameters 

27 ---------- 

28 cluster 

29 ECS cluster name. 

30 task_arn : str 

31 Task ARN. 

32 max_retries 

33 Maximum number of retries. 

34 

35 Returns 

36 ------- 

37 : 

38 Exit code if the job is complete, otherwise throws an exception. 

39 """ 

40 

41 context = TaskRunContext.get() 

42 

43 if context is not None and context.task_run.run_count > max_retries: 

44 return RETRIES_EXCEEDED_EXIT_CODE 

45 

46 client = boto3.client("ecs") 

47 response = client.describe_tasks(cluster=cluster, tasks=[task_arn])["tasks"] 

48 

49 # Task responses are not immediately available. Wait until available. 

50 while len(response) != 1: 

51 sleep(10) 

52 response = client.describe_tasks(cluster=cluster, tasks=[task_arn])["tasks"] 

53 

54 status = response[0]["lastStatus"] 

55 

56 # Wait until task is running or stopped. 

57 while status not in ("RUNNING", "STOPPED"): 

58 sleep(10) 

59 response = client.describe_tasks(cluster=cluster, tasks=[task_arn])["tasks"] 

60 status = response[0]["lastStatus"] 

61 

62 # For tasks that are running, throw the appropriate exception. 

63 if context is not None and status == "RUNNING": 

64 return Failed() 

65 if status == "RUNNING": 

66 message = "Task is in RUNNING state and does not have exit code." 

67 raise RuntimeError(message) 

68 

69 return response[0]["containers"][0]["exitCode"]