我正在使用 AWS Kinesis Firehose 和自定义数据转换。 Lambda 使用 Python 3.6 编写,返回如下所示的字符串:
{
"records": [
{
"recordId": "...",
"result": "Ok",
"data": "..."
},
{
"recordId": "...",
"result": "Ok",
"data": "..."
},
{
"recordId": "...",
"result": "Ok",
"data": "..."
}
]
}
这个 Lambda 非常高兴,并且在将输出返回到 Firehose 之前记录与上面类似的输出。但是,Firehose 的 S3 日志随后显示错误:
Invalid output structure: Please check your function and make sure the processed records contain valid result status of Dropped, Ok, or ProcessingFailed.
查看网络上流传的 JS 和 Java 示例,我不清楚我需要做哪些不同的事情;我很困惑。
如果您的数据是json对象,您可以尝试以下
import base64
import json
def lambda_handler(event, context):
output = []
for record in event['records']:
# your own business logic.
json_object = {...}
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(json.dumps(json_object).encode('utf-8')).decode('utf-8')
}
output.append(output_record)
return {'records': output}
base64.b64encode 函数仅适用于 b'xxx' 字符串,而 output_record 的 'data' 属性需要普通的 'xxx' 字符串。
我使用 Node.js 发现了同样的错误。
阅读文档http://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html我的错误不是对每条记录的数据字段进行base64编码。
我决定这样做:
{
recordId: record.recordId,
result: 'Ok',
data: new Buffer(JSON.stringify(data)).toString('base64')
}
有同样的问题。就我而言,有一个 http 端点作为 firehose (URL Lambda) 的目标。
数据处理前我的输出:
[{'recordId': '45678',
'result': 'Ok',
'data': '***'},
{'recordId': '98765',
'result': 'Ok',
'data': '***'},
{'recordId': '123456',
'result': 'Ok',
'data': '***'}]
我用来获取结果的代码:
for i, row in df_despacho_destino_final.iterrows():
output_record = {
'recordId': str(row['recordId']),
'result': 'Ok',
'data': base64.b64encode(json.dumps(row.to_json(date_format='iso')).encode('utf-8')).decode('utf-8')
}
output.append(output_record)
logging.info(f"Output data: {output}")
return {'records': output}
即使将“return”作为值为“Ok”的字符串传递,我也接受“result”字段不能为空的错误。