我有一个工作设置,可以使用 Kinesis Firehose 将 CloudWatch 日志发送到 S3 存储桶。不幸的是,S3 中的文件不包含格式正确的 JSON。格式正确的 JSON 对象数组将如下所示:[{}, {}, {}]。但是,Firehose 创建的 S3 文件如下所示:{}{}{}。我尝试通过在开头和结尾添加方括号以及在 JSON 对象之间添加逗号来修改 Kinesis Firehose 数据转换 Lambda 蓝图。我尝试修改的 Lambda 蓝图称为“处理发送到 Kinesis Firehose 的 CloudWatch 日志”。以下是相关部分:
import base64
import json
import gzip
import boto3
def transformLogEvent(log_event):
"""Transform each log event.
The default implementation below just extracts the message and appends a newline to it.
Args:
log_event (dict): The original log event. Structure is {"id": str, "timestamp": long, "message": str}
Returns:
str: The transformed log event.
"""
return log_event['message'] + ',\n'
def processRecords(records):
for r in records:
data = loadJsonGzipBase64(r['data'])
recId = r['recordId']
# CONTROL_MESSAGE are sent by CWL to check if the subscription is reachable.
# They do not contain actual data.
if data['messageType'] == 'CONTROL_MESSAGE':
yield {
'result': 'Dropped',
'recordId': recId
}
elif data['messageType'] == 'DATA_MESSAGE':
joinedData = ''.join([transformLogEvent(e) for e in data['logEvents']])
dataBytes = joinedData.encode("utf-8")
encodedData = base64.b64encode(dataBytes).decode('utf-8')
yield {
'data': encodedData,
'result': 'Ok',
'recordId': recId
}
else:
yield {
'result': 'ProcessingFailed',
'recordId': recId
}
def loadJsonGzipBase64(base64Data):
return json.loads(gzip.decompress(base64.b64decode(base64Data)))
def lambda_handler(event, context):
isSas = 'sourceKinesisStreamArn' in event
streamARN = event['sourceKinesisStreamArn'] if isSas else event['deliveryStreamArn']
region = streamARN.split(':')[3]
streamName = streamARN.split('/')[1]
records = list(processRecords(event['records']))
projectedSize = 0
recordListsToReingest = []
for idx, rec in enumerate(records):
originalRecord = event['records'][idx]
if rec['result'] != 'Ok':
continue
# If a single record is too large after processing, split the original CWL data into two, each containing half
# the log events, and re-ingest both of them (note that it is the original data that is re-ingested, not the
# processed data). If it's not possible to split because there is only one log event, then mark the record as
# ProcessingFailed, which sends it to error output.
if len(rec['data']) > 6000000:
cwlRecord = loadJsonGzipBase64(originalRecord['data'])
if len(cwlRecord['logEvents']) > 1:
rec['result'] = 'Dropped'
recordListsToReingest.append(
[createReingestionRecord(isSas, originalRecord, data) for data in splitCWLRecord(cwlRecord)])
else:
rec['result'] = 'ProcessingFailed'
print(('Record %s contains only one log event but is still too large after processing (%d bytes), ' +
'marking it as %s') % (rec['recordId'], len(rec['data']), rec['result']))
del rec['data']
else:
projectedSize += len(rec['data']) + len(rec['recordId'])
# 6000000 instead of 6291456 to leave ample headroom for the stuff we didn't account for
if projectedSize > 6000000:
recordListsToReingest.append([createReingestionRecord(isSas, originalRecord)])
del rec['data']
rec['result'] = 'Dropped'
# call putRecordBatch/putRecords for each group of up to 500 records to be re-ingested
if recordListsToReingest:
recordsReingestedSoFar = 0
client = boto3.client('kinesis' if isSas else 'firehose', region_name=region)
maxBatchSize = 500
flattenedList = [r for sublist in recordListsToReingest for r in sublist]
for i in range(0, len(flattenedList), maxBatchSize):
recordBatch = flattenedList[i:i + maxBatchSize]
# last argument is maxAttempts
args = [streamName, recordBatch, client, 0, 20]
if isSas:
putRecordsToKinesisStream(*args)
else:
putRecordsToFirehoseStream(*args)
recordsReingestedSoFar += len(recordBatch)
print('Reingested %d/%d' % (recordsReingestedSoFar, len(flattenedList)))
print('%d input records, %d returned as Ok or ProcessingFailed, %d split and re-ingested, %d re-ingested as-is' % (
len(event['records']),
len([r for r in records if r['result'] != 'Dropped']),
len([l for l in recordListsToReingest if len(l) > 1]),
len([l for l in recordListsToReingest if len(l) == 1])))
# encapsulate in square brackets for proper JSON formatting
last = len(event['records'])-1
start = '['+base64.b64decode(records[0]['data']).decode('utf-8')
end = base64.b64decode(records[last]['data']).decode('utf-8')+']'
records[0]['data'] = base64.b64encode(start.encode('utf-8'))
records[last]['data'] = base64.b64encode(end.encode('utf-8'))
return {'records': records}
}
我在之前添加了逗号 在transformLogEvent方法和lambda_handler方法末尾的这些行中添加方括号:
last = len(event['records'])-1
start = '['+base64.b64decode(records[0]['data']).decode('utf-8')
end = base64.b64decode(records[last]['data']).decode('utf-8')+']'
records[0]['data'] = base64.b64encode(start.encode('utf-8'))
records[last]['data'] = base64.b64encode(end.encode('utf-8'))
我在测试此 Lambda 函数时收到此错误:
{
"errorMessage": "'data'",
"errorType": "KeyError",
"requestId": "04ac151c-c429-484d-813f-ffd5d65286e2",
"stackTrace": [
" File \"/var/task/lambda_function.py\", line 270, in lambda_handler\n start = '['+base64.b64decode(records[0]['data']).decode('utf-8')\n"
]
}
我认为这是我引用 Python 列表的方式错误。我想知道如何修复此错误和/或是否有人对如何解决 JSON 格式问题有更好的解决方案,请记住 CloudWatch Logs 事件以压缩 gzip 格式发送到 Kinesis Data Firehose。
它失败是因为您的“processReocrds()”函数可以返回没有“data”键的“记录”。具体来说,如果输入记录是 CONTROL_MESSAGE,或者它既不是 CONTORL_MESSAGE 也不是 DATA_MESSAGE 记录,则该函数“生成”一个没有“data”键的“字典”。如果第一条或最后一条记录(本例中为第一条记录)就是这种情况,它将导致 python 脚本抛出您收到的异常。