我正在使用Flink,并且具有动态变化的字段和嵌套字段的JSON字符串流到达我的系统。因此,我无法将此传入的JSON模拟并将其转换为静态POJO,而必须依赖于Map。
我的第一个转换是使用GSON解析将JSON字符串流转换为Map对象流,然后将地图包装在名为Data的DTO中。
(inside the first map transformation) LinkedTreeMap map = gson.fromJson(input, LinkedTreeMap.class); Data data = new Data(map); // Data has getters, setters for the map and implements Serializable
问题发生在此转换处理之后,我试图将结果流馈送到我的自定义Flink接收器中。接收器中未调用invoke函数。但是,如果我从包含DTO的此Map更改为没有Map的基本或常规DTO,接收器将起作用。
我的DTO看起来像这样:
public class FakeDTO { private String id; private LinkedTreeMap map; // com.google.gson.internal // getters and setters // constructors, empty and with fields
我尝试了以下两种解决方案:
env.getConfig().addDefaultKryoSerializer(LinkedTreeMap.class,MapSerializer.class; env.getConfig().disableGenericTypes();
有专家建议我可以在这种情况下使用吗?
我正在使用Flink,并且具有动态变化的字段和嵌套字段的JSON字符串流到达我的系统。因此,我无法将此传入的JSON进行模拟并将其转换为静态POJO,并且我有...
我能够解决此问题。在我的Flink日志中,我发现没有找到一个名为ReflectionSerializerFactory类的Kryo文件。我在maven中更新了Kryo版本,并为我的地图使用了Map类型,Flink文档说Flink支持。