将 Flink DataStream<POJOs> 转换为 DataStream<RowData> 以供 Apache Iceberg 使用的最佳方法

问题描述 投票:0回答:1

我是 Flink 的新手,尝试使用 Flink 和 Kafka 作为数据输入,并使用 Iceberg 来存储数据。 这是我已完成的步骤。

  1. 从 kafkaSource 读取 java POJO 的数据流(使用 Avro 模式)。
  2. 使用 MapFunction 包装器将 POJO 的 Datastream 转换为 DataStream RowData。
  3. 使用FlinkSink将RowData的DataStream发送到Iceberg。

一切正常,但我关心的是我将 POJO 的数据流转换为 RowData 的数据流的方式。我有一个复杂且相当大的 POJO 引用并将其转换为 RowData,我必须为每个引用使用 .setField() 方法并处理每个数据类型转换,如 StringData、Map、List 等。

我想在这里检查是否有更好的方法。

java apache-flink apache-iceberg
1个回答
0
投票

Apache Hudi 项目有一个

AvroToRowDataConverters
类,我们已在其他项目中使用它来执行类似的操作。请注意,
createRowConverter
方法需要
RowType
,您可以使用 Flink 的
AvroSchemaConverter
从 Avro 获取它,如下所示:

        DataType rowDataType =
                AvroSchemaConverter.convertToDataType(MyAvroGeneratedPOJO.SCHEMA$.toString());
        rowType = (RowType) rowDataType.getLogicalType();

如果您不想引入大量 Hudi 代码,应该可以从

AvroToRowDataConverters
克隆您需要的位。

我认为你也可以使用这种方法将POJO转换为二进制数据,然后反序列化回一行,但它会更慢。

© www.soinside.com 2019 - 2024. All rights reserved.