是否可以选择在 Kinesis Firehose 中转换期间从单个输入记录创建多个记录并以 parquet 格式存储在 S3 中?
例如:
{
"key1": "value1",
"key2_array": [
{
"subkey1": "1",
"subkey2": "2"
},
{
"subkey1": "3",
"subkey2": "4"
}
]
}
应该为每个 key2_array 元素生成两条记录。示例如下:
{
"key1": "value1",
"key2_split":
{
"subkey1": "1",
"subkey2": "2"
}
}
我尝试将输入记录拆分为具有相同输入记录的 recordId 的多条记录,但转换失败并出现错误
返回了具有相同记录 ID 的多条记录。确保 Lambda 函数为每条记录返回唯一的记录 ID。
到目前为止,我的理解是 Kinesis FireHose 转换管道不支持此选项。这个问题有解决方法或替代方法吗?
注意 - This是一个类似的问题,但它涉及红移。
是的,这是可能的,下面是我如何针对传入 json 对象的爆炸嵌套列表的类似用例执行此操作的相关部分:
def multiline_json_dumps(json_list):
result = ""
for json_object in json_list:
result += json.dumps(json_object) + "\n"
return result
[...]
def lambda_handler(event, context):
records = event["records"]
response_records = []
for record in records:
items = flatten_explode_json(record) # this function returns a list of JSON objects
response_records.append(
{
"recordId": record["recordId"],
"result": "Ok",
"data": base64.b64encode(multiline_json_dumps(items).encode("utf-8")),
}
)
[...]
return {"records": response_records}
警告: Lambda 的响应大小限制为 6Mb。确保您的转换不会将响应大小放大到超过该限制。看这个: https://zaccharles.medium.com/deep-dive-lambdas-response-payload-size-limit-8aedba9530ed 它被嵌入到 Firehose 中以依赖 lambda 的响应,并且 Firehose 本身似乎没有办法解决这个问题。