Firehose 转换输出的格式化

问题描述 投票:0回答:4

我正在使用 AWS Kinesis Firehose 和自定义数据转换。 Lambda 使用 Python 3.6 编写,返回如下所示的字符串:

{
    "records": [
        {
            "recordId": "...",
            "result": "Ok",
            "data": "..."
        },
        {
            "recordId": "...",
            "result": "Ok",
            "data": "..."
        },
        {
            "recordId": "...",
            "result": "Ok",
            "data": "..."
        }
    ]
}

这个 Lambda 非常高兴,并且在将输出返回到 Firehose 之前记录与上面类似的输出。但是,Firehose 的 S3 日志随后显示错误:

Invalid output structure: Please check your function and make sure the processed records contain valid result status of Dropped, Ok, or ProcessingFailed.

查看网络上流传的 JS 和 Java 示例,我不清楚我需要做哪些不同的事情;我很困惑。

amazon-web-services amazon-kinesis amazon-kinesis-firehose
4个回答
11
投票

如果您的数据是json对象,您可以尝试以下

import base64
import json
def lambda_handler(event, context):
    output = []
    for record in event['records']:
        # your own business logic.
        json_object = {...} 
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(json.dumps(json_object).encode('utf-8')).decode('utf-8')
        }
        output.append(output_record)
    return {'records': output}

base64.b64encode 函数仅适用于 b'xxx' 字符串,而 output_record 的 'data' 属性需要普通的 'xxx' 字符串。


5
投票

我使用 Node.js 发现了同样的错误。

阅读文档http://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html我的错误不是对每条记录的数据字段进行base64编码。

我决定这样做:

{
    recordId: record.recordId,
    result: 'Ok',
    data: new Buffer(JSON.stringify(data)).toString('base64')
}

0
投票

有同样的问题。就我而言,有一个 http 端点作为 firehose (URL Lambda) 的目标。

数据处理前我的输出:

[{'recordId': '45678',
  'result': 'Ok',
  'data': '***'},
 {'recordId': '98765',
  'result': 'Ok',
  'data': '***'},
 {'recordId': '123456',
  'result': 'Ok',
  'data': '***'}]

我用来获取结果的代码:

    for i, row in df_despacho_destino_final.iterrows():
        output_record = {
            'recordId': str(row['recordId']),
            'result': 'Ok',
            'data': base64.b64encode(json.dumps(row.to_json(date_format='iso')).encode('utf-8')).decode('utf-8')
        }
        output.append(output_record)

logging.info(f"Output data: {output}")

return {'records': output}

消防水带错误:

即使将“return”作为值为“Ok”的字符串传递,我也接受“result”字段不能为空的错误。


-1
投票

您可以在我的仓库中查看代码。 https://github.com/hixichen/golang_lamda_decode_protobuf_firehose

© www.soinside.com 2019 - 2024. All rights reserved.