Apache Flink:如何使用Java Map(或包含DTO的Map)流?]

问题描述 投票:0回答:1

我正在使用Flink,并且具有动态变化的字段和嵌套字段的JSON字符串流到达我的系统。因此,我无法将此传入的JSON模拟并将其转换为静态POJO,而必须依赖于Map。

我的第一个转换是使用GSON解析将JSON字符串流转换为Map对象流,然后将地图包装在名为Data的DTO中。

(inside the first map transformation)
LinkedTreeMap map = gson.fromJson(input, LinkedTreeMap.class);

Data data = new Data(map); // Data has getters, setters for the map and implements Serializable

问题发生在此转换处理之后,我试图将结果流馈送到我的自定义Flink接收器中。接收器中未调用invoke函数。但是,如果我从包含DTO的此Map更改为没有Map的基本或常规DTO,接收器将起作用。

我的DTO看起来像这样:

public class FakeDTO {
    private String id;
    private LinkedTreeMap map; // com.google.gson.internal

    // getters and setters
    // constructors, empty and with fields

我尝试了以下两种解决方案:

env.getConfig().addDefaultKryoSerializer(LinkedTreeMap.class,MapSerializer.class; 
env.getConfig().disableGenericTypes();

有专家建议我可以在这种情况下使用吗?

我正在使用Flink,并且具有动态变化的字段和嵌套字段的JSON字符串流到达我的系统。因此,我无法将此传入的JSON进行模拟并将其转换为静态POJO,并且我有...

java hashmap gson apache-flink flink-streaming
1个回答
0
投票

我能够解决此问题。在我的Flink日志中,我发现没有找到一个名为ReflectionSerializerFactory类的Kryo文件。我在maven中更新了Kryo版本,并为我的地图使用了Map类型,Flink文档说Flink支持。

© www.soinside.com 2019 - 2024. All rights reserved.