以下 AWS CLI 命令运行良好:
aws firehose put-record --delivery-stream-name 52N-STA-DF-ICBG --cli-binary-format raw-in-base64-out --record='{"Data":"{\"ADF_Metadata\":{\"OTF_Metadata\":{\"DestinationTableName\":\"dataset\",\"DestinationDatabaseName\":\"52n_sta_iceberg\",\"Operation\":\"INSERT\"}},\"ADF_Record\":{\"dataset_id\":2010,\"identifier\":\"2010\",\"sta_identifier\":\"2010\",\"name\":\"oven temperature\",\"description\":\"This is a datastream for an oven’s internal temperature.\",\"first_time\":\"2020-06-26T09:42:02.000\",\"last_time\":\"2021-06-26T09:42:02.000\",\"result_time_start\":null,\"result_time_end\":null,\"observed_area\":null,\"fk_procedure_id\":1,\"fk_phenomenon_id\":1,\"fk_feature_id\":1,\"fk_platform_id\":2,\"fk_unit_id\":1,\"fk_format_id\":5,\"fk_aggregation_id\":null,\"observation_type\":\"simple\"}}\n"
}'
但是,当我尝试使用 Java 应用程序执行此操作时,Firehose 未能成功将记录传递到 Iceberg 表:
public void icebergMerge(ObjectNode dataNode, String tableName, String operation)
throws STACRUDException {
ObjectNode rootNode = mapper.createObjectNode();
ObjectNode adf_metadata = rootNode.putObject(FirehoseConstants.ADF_METADATA);
ObjectNode otf_metadata = adf_metadata.putObject(FirehoseConstants.OTF_METADATA);
otf_metadata.put(FirehoseConstants.TABLE, tableName.toLowerCase());
otf_metadata.put(FirehoseConstants.DATABASE, FirehoseConstants.DATABASE_NAME);
otf_metadata.put(FirehoseConstants.OPERATION, operation);
ObjectNode adf_record = rootNode.putObject(FirehoseConstants.ADF_RECORD);
adf_record.setAll(dataNode);
try {
String dataPayload = mapper.writeValueAsString(rootNode) + "\n";
ObjectNode outerNode = mapper.createObjectNode();
outerNode.put("Data", dataPayload);
streamToFirehose(mapper.writeValueAsString(outerNode));
} catch (JsonProcessingException e) {
throw new STACRUDException("Bad request: cannot parse payload for table " + operation);
}
}
private void streamToFirehose(String jsonPayload) {
try {
Record record = Record.builder()
.data(SdkBytes.fromUtf8String(jsonPayload))
.build();
PutRecordRequest putRecordRequest = PutRecordRequest.builder()
.deliveryStreamName(FirehoseConstants.DELIVERY_STREAM_NAME)
.record(record)
.build();
PutRecordResponse resp = firehoseClient.putRecord(putRecordRequest);
LOGGER.debug("Record sent successfully to Firehose.");
} catch (Exception e) {
LOGGER.debug("Error sending to Firehose: " + e.getMessage());
}
}
这是调试器在
jsonPayload
函数中的 streamToFirehose()
的样子:
{"Data":"{\"ADF_Metadata\":{\"OTF_Metadata\":{\"DestinationTableName\":\"dataset\",\"DestinationDatabaseName\":\"52n_sta_iceberg\",\"Operation\":\"INSERT\"}},\"ADF_Record\":{\"dataset_id\":2010,\"identifier\":\"2010\",\"sta_identifier\":\"2010\",\"name\":\"oven temperature\",\"description\":\"This is a datastream for an oven’s internal temperature.\",\"first_time\":\"2020-06-26T09:42:02.000\",\"last_time\":\"2021-06-26T09:42:02.000\",\"result_time_start\":null,\"result_time_end\":null,\"observed_area\":null,\"fk_procedure_id\":1,\"fk_phenomenon_id\":1,\"fk_feature_id\":1,\"fk_platform_id\":2,\"fk_unit_id\":1,\"fk_format_id\":5,\"fk_aggregation_id\":null,\"observation_type\":\"simple\"}}\n"}
AWS 开发工具包版本:2.27.17
有效负载格式按照AWS文档:https://docs.aws.amazon.com/firehose/latest/dev/apache-iceberg-format-input-record.html
该表在 Glue Catalog 中定义
我知道“ “是多余的,但有/没有它问题都是一样的
问题出在记录的格式上。 Java 中的
putRecord
API 自动将负载封装为 { "Data": payload }
格式。因此 outerNode
对象是多余的。