如何使用boto3查询AWS Batch的cloudwatch日志

问题描述 投票:0回答:1

我正在尝试实现一个小型 AWS lambda 函数,该函数将捕获由我的 AWS 批处理程序创建的 cloudwatch 日志并查找其中的任何错误,如果有,则通过电子邮件发送 SNS 通知。

不知何故,我无法获取最新的batch_id,因为每次AWS Batch运行时Batch ID都会不断变化。我在 AWS lambda 的环境变量中硬编码了这个,但它不起作用,因为每次执行批处理时,batchId 都是唯一的。

这是我到目前为止的代码。

import re
import os
import boto3
import urllib3
urllib3.disable_warnings()
logs_client = boto3.client('logs')
sns_client = boto3.client('sns')

def get_log_stream_name(batch_job_id):
    response = logs_client.describe_jobs(jobs=[batch_job_id])
if response['jobs']:
    container = response['jobs'][0].get('container', {})
    log_stream_name = container.get('logStreamName')
    return log_stream_name
else:
    print(f"No job found with ID {batch_job_id}")
    return None

def get_cloudwatch_logs(log_group_name, log_stream_name, start_time=None,     
                          end_time=None, limit=100):
    # Retrieve log events from the latest log stream
    kwargs = {
        'logGroupName': log_group_name,
        'logStreamName': log_stream_name,
        'limit': limit
    }
    if start_time:
        kwargs['startTime'] = int(start_time)
    if end_time:
        kwargs['endTime'] = int(end_time)

    response = logs_client.get_log_events(**kwargs)
    return response['events']

def check_for_errors(log_events, error_keywords=None):
    if error_keywords is None:
        error_keywords = ["ERROR", "Exception", "FAILED"]
    error_pattern = re.compile("|".join(error_keywords), re.IGNORECASE)
    error_logs = [event for event in log_events if     
        error_pattern.search(event['message'])]
    return error_logs

def send_sns_notification(topic_arn, subject, message):
    # Send a notification to SNS topic
    response = sns_client.publish(
    TopicArn=topic_arn,
    Subject=subject,
    Message=message
)
return response

def lambda_handler(event, context):
    # Get the latest log stream
    print("event:", event)
    print("context:", context)
    batch_job_id = os.getenv('BATCH_JOB_ID')
    print('batch_job_id:', batch_job_id)
    log_group_name = os.getenv('LOG_GROUP_NAME')
    print('log_group_name: ', log_group_name)
    sns_topic_arn = event.getenv('SNS_TOPIC_ARN')
    print('sns_topic_arn: ', sns_topic_arn)

    assert batch_job_id is not None
    assert log_group_name is not None
    assert sns_topic_arn is not None

    log_stream_name = get_log_stream_name(batch_job_id)
    assert log_stream_name is not None

    log_events = get_cloudwatch_logs(log_group_name=log_group_name,
                                 log_stream_name=log_stream_name)
    assert log_events is not None

    # Check for errors in logs
    error_logs = check_for_errors(log_events=log_events)
    assert error_logs is not None
    error_messages = None
    if error_logs:
        error_messages = "\n".join([f"{log['timestamp']}: {log['message']}" for log in 
     error_logs])
    # Send SNS notification
    subject = f"Errors found in AWS Batch job {batch_job_id} logs"
    message = f"Errors detected in AWS Batch job {batch_job_id}:\n\n{error_messages}"
    body = None
    send_sns_notification(sns_topic_arn, subject, message)
    print(f"Sent SNS notification for errors in job {batch_job_id}")
else:
    print("No errors found in CW logs")

在我的 CFN 模板中,我有这个用于创建 lambda 函数。

TradeFileTestLambdaFunction:
  Type: AWS::Serverless::Function
  Properties:
    Description: >
      Handles error events from summa-trade-file-test events.
    Handler: app.lambda_handler
    Runtime: python3.11
    FunctionName:
      !Sub
      - '${TheAppNameForResources}-${TheEnv}'
      - TheEnv: !Ref Env
        TheAppNameForResources: !Ref AppNameForResources
    EphemeralStorage:
      Size: 10240
    Timeout: 900
    Role: !GetAtt MyLambdaExecutionRole.Arn
    Policies:
      - 'AWSLambdaVPCAccessExecutionRole'
      - 'AWSXRayDaemonWriteAccess'
      - CloudWatchLogsReadOnlyAccess
      - SNSPublishPolicy
    Environment:
      Variables:
      ENV: !Ref Env
      SES_IDENTITY_ARN: !Ref SesIdentityArn
      SNS_TOPIC_ARN: !Ref SnsAlertTopic
      LOG_GROUP_NAME: !Ref MyBatchLogGroupName
      BATCH_JOB_ID: !Ref MyBatchJobId
    EventInvokeConfig:
      MaximumRetryAttempts: 0
    Events:
      LogMonitorEvent:
        Type: Schedule
        Properties:
          Schedule: "rate(30 minutes)"
    ReservedConcurrentExecutions: 10

SnsAlertTopic:
  Type: AWS::SNS::Topic
  Properties:
    DisplayName:
      !Sub
        - '${TheEnv}-${TheAppNameForResources}-alert-${TheRegion}'
        - TheEnv: !Ref Env
          TheAppNameForResources: !Ref AppNameForResources
          TheRegion: !Ref AWS::Region
    FifoTopic: false

而且看起来我无法使用describe_jobs...我收到此错误:

{ "errorMessage": "'CloudWatchLogs' 对象没有属性 'describe_jobs'", "errorType": "属性错误", "requestId": "44bc882e-7463-46d0-95bc-1660461d2cbd", “堆栈跟踪”:[ 文件“/var/task/app.py”,第 131 行,位于 lambda_handler log_stream_name = get_log_stream_name(batch_job_id) ”, 文件“/var/task/app.py”,第 71 行,在 get_log_stream_name 中 响应=logs_client.describe_jobs(jobs=[batch_job_id]) ”, “ 文件“/var/lang/lib/python3.11/site-packages/botocore/client.py”,第 918 行,在 getattr 引发属性错误( ” ] }

任何解决此问题的帮助都非常感谢。

amazon-web-services aws-lambda aws-cloudformation
1个回答
0
投票

您需要使用 Batch 客户端查询批量执行的最后一个 id,然后使用 cloudwatch 客户端通过 logs_client.describe_log_streams 检索日志流:

import boto3
import re

batch_client = boto3.client('batch')
JOB_QUEUE = 'batch-job-queue'  

def get_latest_batch_job_id():
    try:
        response = batch_client.list_jobs(
            jobQueue=JOB_QUEUE,
            jobStatus='SUCCEEDED', # You can change this as you need
            maxResults=1  
        )
        jobs = response.get('jobSummaryList', [])
        if jobs:
            latest_job = jobs[0]  # The most recent job
            batch_id = latest_job['jobId']  
            return batch_id
        else:
            print(f"No jobs found in {JOB_QUEUE}")
            return None
    except Exception as e:
        print(f"Error retrieving latest batch job: {e}")
        return None
© www.soinside.com 2019 - 2024. All rights reserved.