我是 Flink 的新手,尝试使用 Flink 和 Kafka 作为数据输入,并使用 Iceberg 来存储数据。 这是我已完成的步骤。
一切正常,但我关心的是我将 POJO 的数据流转换为 RowData 的数据流的方式。我有一个复杂且相当大的 POJO 引用并将其转换为 RowData,我必须为每个引用使用 .setField() 方法并处理每个数据类型转换,如 StringData、Map、List 等。
我想在这里检查是否有更好的方法。
Apache Hudi 项目有一个
AvroToRowDataConverters
类,我们已在其他项目中使用它来执行类似的操作。请注意, createRowConverter
方法需要 RowType
,您可以使用 Flink 的 AvroSchemaConverter
从 Avro 获取它,如下所示:
DataType rowDataType =
AvroSchemaConverter.convertToDataType(MyAvroGeneratedPOJO.SCHEMA$.toString());
rowType = (RowType) rowDataType.getLogicalType();
如果您不想引入大量 Hudi 代码,应该可以从
AvroToRowDataConverters
克隆您需要的位。
我认为你也可以使用这种方法将POJO转换为二进制数据,然后反序列化回一行,但它会更慢。