我正在尝试实现一个小型 AWS lambda 函数,该函数将捕获由我的 AWS 批处理程序创建的 cloudwatch 日志并查找其中的任何错误,如果有,则通过电子邮件发送 SNS 通知。
不知何故,我无法获取最新的batch_id,因为每次AWS Batch运行时Batch ID都会不断变化。我在 AWS lambda 的环境变量中硬编码了这个,但它不起作用,因为每次执行批处理时,batchId 都是唯一的。
这是我到目前为止的代码。
import re
import os
import boto3
import urllib3
urllib3.disable_warnings()
logs_client = boto3.client('logs')
sns_client = boto3.client('sns')
def get_log_stream_name(batch_job_id):
response = logs_client.describe_jobs(jobs=[batch_job_id])
if response['jobs']:
container = response['jobs'][0].get('container', {})
log_stream_name = container.get('logStreamName')
return log_stream_name
else:
print(f"No job found with ID {batch_job_id}")
return None
def get_cloudwatch_logs(log_group_name, log_stream_name, start_time=None,
end_time=None, limit=100):
# Retrieve log events from the latest log stream
kwargs = {
'logGroupName': log_group_name,
'logStreamName': log_stream_name,
'limit': limit
}
if start_time:
kwargs['startTime'] = int(start_time)
if end_time:
kwargs['endTime'] = int(end_time)
response = logs_client.get_log_events(**kwargs)
return response['events']
def check_for_errors(log_events, error_keywords=None):
if error_keywords is None:
error_keywords = ["ERROR", "Exception", "FAILED"]
error_pattern = re.compile("|".join(error_keywords), re.IGNORECASE)
error_logs = [event for event in log_events if
error_pattern.search(event['message'])]
return error_logs
def send_sns_notification(topic_arn, subject, message):
# Send a notification to SNS topic
response = sns_client.publish(
TopicArn=topic_arn,
Subject=subject,
Message=message
)
return response
def lambda_handler(event, context):
# Get the latest log stream
print("event:", event)
print("context:", context)
batch_job_id = os.getenv('BATCH_JOB_ID')
print('batch_job_id:', batch_job_id)
log_group_name = os.getenv('LOG_GROUP_NAME')
print('log_group_name: ', log_group_name)
sns_topic_arn = event.getenv('SNS_TOPIC_ARN')
print('sns_topic_arn: ', sns_topic_arn)
assert batch_job_id is not None
assert log_group_name is not None
assert sns_topic_arn is not None
log_stream_name = get_log_stream_name(batch_job_id)
assert log_stream_name is not None
log_events = get_cloudwatch_logs(log_group_name=log_group_name,
log_stream_name=log_stream_name)
assert log_events is not None
# Check for errors in logs
error_logs = check_for_errors(log_events=log_events)
assert error_logs is not None
error_messages = None
if error_logs:
error_messages = "\n".join([f"{log['timestamp']}: {log['message']}" for log in
error_logs])
# Send SNS notification
subject = f"Errors found in AWS Batch job {batch_job_id} logs"
message = f"Errors detected in AWS Batch job {batch_job_id}:\n\n{error_messages}"
body = None
send_sns_notification(sns_topic_arn, subject, message)
print(f"Sent SNS notification for errors in job {batch_job_id}")
else:
print("No errors found in CW logs")
在我的 CFN 模板中,我有这个用于创建 lambda 函数。
TradeFileTestLambdaFunction:
Type: AWS::Serverless::Function
Properties:
Description: >
Handles error events from summa-trade-file-test events.
Handler: app.lambda_handler
Runtime: python3.11
FunctionName:
!Sub
- '${TheAppNameForResources}-${TheEnv}'
- TheEnv: !Ref Env
TheAppNameForResources: !Ref AppNameForResources
EphemeralStorage:
Size: 10240
Timeout: 900
Role: !GetAtt MyLambdaExecutionRole.Arn
Policies:
- 'AWSLambdaVPCAccessExecutionRole'
- 'AWSXRayDaemonWriteAccess'
- CloudWatchLogsReadOnlyAccess
- SNSPublishPolicy
Environment:
Variables:
ENV: !Ref Env
SES_IDENTITY_ARN: !Ref SesIdentityArn
SNS_TOPIC_ARN: !Ref SnsAlertTopic
LOG_GROUP_NAME: !Ref MyBatchLogGroupName
BATCH_JOB_ID: !Ref MyBatchJobId
EventInvokeConfig:
MaximumRetryAttempts: 0
Events:
LogMonitorEvent:
Type: Schedule
Properties:
Schedule: "rate(30 minutes)"
ReservedConcurrentExecutions: 10
SnsAlertTopic:
Type: AWS::SNS::Topic
Properties:
DisplayName:
!Sub
- '${TheEnv}-${TheAppNameForResources}-alert-${TheRegion}'
- TheEnv: !Ref Env
TheAppNameForResources: !Ref AppNameForResources
TheRegion: !Ref AWS::Region
FifoTopic: false
而且看起来我无法使用describe_jobs...我收到此错误:
{ "errorMessage": "'CloudWatchLogs' 对象没有属性 'describe_jobs'", "errorType": "属性错误", "requestId": "44bc882e-7463-46d0-95bc-1660461d2cbd", “堆栈跟踪”:[ 文件“/var/task/app.py”,第 131 行,位于 lambda_handler log_stream_name = get_log_stream_name(batch_job_id) ”, 文件“/var/task/app.py”,第 71 行,在 get_log_stream_name 中 响应=logs_client.describe_jobs(jobs=[batch_job_id]) ”, “ 文件“/var/lang/lib/python3.11/site-packages/botocore/client.py”,第 918 行,在 getattr 引发属性错误( ” ] }
任何解决此问题的帮助都非常感谢。
您需要使用 Batch 客户端查询批量执行的最后一个 id,然后使用 cloudwatch 客户端通过 logs_client.describe_log_streams 检索日志流:
import boto3
import re
batch_client = boto3.client('batch')
JOB_QUEUE = 'batch-job-queue'
def get_latest_batch_job_id():
try:
response = batch_client.list_jobs(
jobQueue=JOB_QUEUE,
jobStatus='SUCCEEDED', # You can change this as you need
maxResults=1
)
jobs = response.get('jobSummaryList', [])
if jobs:
latest_job = jobs[0] # The most recent job
batch_id = latest_job['jobId']
return batch_id
else:
print(f"No jobs found in {JOB_QUEUE}")
return None
except Exception as e:
print(f"Error retrieving latest batch job: {e}")
return None