Source code for container_collection.batch.get_batch_logs
import boto3
LOG_GROUP_NAME = "/aws/batch/job"
"""AWS Batch log group name."""
[docs]def get_batch_logs(job_arn: str, log_filter: str) -> str:
"""
Get logs for AWS Batch job.
Parameters
----------
job_arn
Job ARN.
log_filter
Filter for log events.
Returns
-------
:
All filtered log events.
"""
client = boto3.client("batch")
response = client.describe_jobs(jobs=[job_arn])["jobs"][0]
log_stream = response["container"]["logStreamName"]
client = boto3.client("logs")
log_events: list[str] = []
response = client.filter_log_events(
logGroupName=LOG_GROUP_NAME,
logStreamNames=[log_stream],
filterPattern=log_filter,
)
if response["events"]:
log_events = log_events + [event["message"] for event in response["events"]]
while "nextToken" in response:
response = client.filter_log_events(
logGroupName="/aws/batch/job",
logStreamNames=[log_stream],
filterPattern=log_filter,
nextToken=response["nextToken"],
)
log_events = log_events + [event["message"] for event in response["events"]]
return "\n".join(log_events)