AWS Firehose(Java SDK)-无法将数据传送到 Glue 中定义的 Iceberg 表

问题描述 投票:0回答:1

以下 AWS CLI 命令运行良好:

aws firehose put-record --delivery-stream-name 52N-STA-DF-ICBG --cli-binary-format raw-in-base64-out --record='{"Data":"{\"ADF_Metadata\":{\"OTF_Metadata\":{\"DestinationTableName\":\"dataset\",\"DestinationDatabaseName\":\"52n_sta_iceberg\",\"Operation\":\"INSERT\"}},\"ADF_Record\":{\"dataset_id\":2010,\"identifier\":\"2010\",\"sta_identifier\":\"2010\",\"name\":\"oven temperature\",\"description\":\"This is a datastream for an oven’s internal temperature.\",\"first_time\":\"2020-06-26T09:42:02.000\",\"last_time\":\"2021-06-26T09:42:02.000\",\"result_time_start\":null,\"result_time_end\":null,\"observed_area\":null,\"fk_procedure_id\":1,\"fk_phenomenon_id\":1,\"fk_feature_id\":1,\"fk_platform_id\":2,\"fk_unit_id\":1,\"fk_format_id\":5,\"fk_aggregation_id\":null,\"observation_type\":\"simple\"}}\n"
}'

但是,当我尝试使用 Java 应用程序执行此操作时,Firehose 未能成功将记录传递到 Iceberg 表:

public void icebergMerge(ObjectNode dataNode, String tableName, String operation)
            throws STACRUDException {

        ObjectNode rootNode = mapper.createObjectNode();

        ObjectNode adf_metadata = rootNode.putObject(FirehoseConstants.ADF_METADATA);
        ObjectNode otf_metadata = adf_metadata.putObject(FirehoseConstants.OTF_METADATA);
        otf_metadata.put(FirehoseConstants.TABLE, tableName.toLowerCase());
        otf_metadata.put(FirehoseConstants.DATABASE, FirehoseConstants.DATABASE_NAME);
        otf_metadata.put(FirehoseConstants.OPERATION, operation);


        ObjectNode adf_record = rootNode.putObject(FirehoseConstants.ADF_RECORD);
        adf_record.setAll(dataNode);

        try {
            String dataPayload = mapper.writeValueAsString(rootNode) + "\n";
            ObjectNode outerNode = mapper.createObjectNode();
            outerNode.put("Data", dataPayload);
            streamToFirehose(mapper.writeValueAsString(outerNode));
        } catch (JsonProcessingException e) {
            throw new STACRUDException("Bad request: cannot parse payload for table " + operation);
        }
    }

private void streamToFirehose(String jsonPayload) {
        try {
            Record record = Record.builder()
                    .data(SdkBytes.fromUtf8String(jsonPayload))
                    .build();

            PutRecordRequest putRecordRequest = PutRecordRequest.builder()
                    .deliveryStreamName(FirehoseConstants.DELIVERY_STREAM_NAME)
                    .record(record)
                    .build();

            PutRecordResponse resp = firehoseClient.putRecord(putRecordRequest);
            LOGGER.debug("Record sent successfully to Firehose.");
        } catch (Exception e) {
            LOGGER.debug("Error sending to Firehose: " + e.getMessage());
        }
    }

dashboard

这是调试器在

jsonPayload
函数中的
streamToFirehose()
的样子:

{"Data":"{\"ADF_Metadata\":{\"OTF_Metadata\":{\"DestinationTableName\":\"dataset\",\"DestinationDatabaseName\":\"52n_sta_iceberg\",\"Operation\":\"INSERT\"}},\"ADF_Record\":{\"dataset_id\":2010,\"identifier\":\"2010\",\"sta_identifier\":\"2010\",\"name\":\"oven temperature\",\"description\":\"This is a datastream for an oven’s internal temperature.\",\"first_time\":\"2020-06-26T09:42:02.000\",\"last_time\":\"2021-06-26T09:42:02.000\",\"result_time_start\":null,\"result_time_end\":null,\"observed_area\":null,\"fk_procedure_id\":1,\"fk_phenomenon_id\":1,\"fk_feature_id\":1,\"fk_platform_id\":2,\"fk_unit_id\":1,\"fk_format_id\":5,\"fk_aggregation_id\":null,\"observation_type\":\"simple\"}}\n"}
amazon-web-services amazon-kinesis-firehose apache-iceberg
1个回答
0
投票

问题出在记录的格式上。 Java 中的

putRecord
API 自动将负载封装为
{ "Data": payload }
格式。因此
outerNode
对象是多余的。

© www.soinside.com 2019 - 2024. All rights reserved.