Kinesis Firehose - 在转换记录时拆分为多个记录

问题描述 投票:0回答:1

是否可以选择在 Kinesis Firehose 中转换期间从单个输入记录创建多个记录并以 parquet 格式存储在 S3 中?

例如:

{
    "key1": "value1",
    "key2_array": [
        {
            "subkey1": "1",
            "subkey2": "2"
        },
        {
            "subkey1": "3",
            "subkey2": "4"
        }
    ]
}

应该为每个 key2_array 元素生成两条记录。示例如下:

{
    "key1": "value1",
    "key2_split": 
        {
            "subkey1": "1",
            "subkey2": "2"
        }
}

我尝试将输入记录拆分为具有相同输入记录的 recordId 的多条记录,但转换失败并出现错误

返回了具有相同记录 ID 的多条记录。确保 Lambda 函数为每条记录返回唯一的记录 ID。

到目前为止,我的理解是 Kinesis FireHose 转换管道不支持此选项。这个问题有解决方法或替代方法吗?

注意 - This是一个类似的问题,但它涉及红移。

amazon-web-services amazon-kinesis amazon-kinesis-firehose
1个回答
0
投票

是的,这是可能的,下面是我如何针对传入 json 对象的爆炸嵌套列表的类似用例执行此操作的相关部分:

def multiline_json_dumps(json_list):
    result = ""
    for json_object in json_list:
        result += json.dumps(json_object) + "\n"
    return result

[...]
def lambda_handler(event, context):
    records = event["records"]
    response_records = []
    for record in records:
        items = flatten_explode_json(record)  # this function returns a list of JSON objects
        response_records.append(
            {
                "recordId": record["recordId"],
                "result": "Ok",
                "data": base64.b64encode(multiline_json_dumps(items).encode("utf-8")),
            }
        )
    [...]
    return {"records": response_records}

警告: Lambda 的响应大小限制为 6Mb。确保您的转换不会将响应大小放大到超过该限制。看这个: https://zaccharles.medium.com/deep-dive-lambdas-response-payload-size-limit-8aedba9530ed 它被嵌入到 Firehose 中以依赖 lambda 的响应,并且 Firehose 本身似乎没有办法解决这个问题。

© www.soinside.com 2019 - 2024. All rights reserved.