我正在使用Flink来处理一些来自某个Data Source的JSON格式的数据。
现在,我的处理过程很简单:从JSON格式的数据中提取每个元素,并将它们打印成日志文件。
这是我的一段代码。
// create proper deserializer to deserializer the JSON-format data into ObjectNode
PravegaDeserializationSchema<ObjectNode> adapter = new PravegaDeserializationSchema<>(ObjectNode.class, new JavaSerializer<>());
// create connector to receive data from Pravega
FlinkPravegaReader<ObjectNode> source = FlinkPravegaReader.<ObjectNode>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withDeserializationSchema(adapter)
.build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> dataStream = env.addSource(source).name("Pravega Stream");
dataStream.???.print();
假设从Pravega来的数据是这样的: {"name":"titi", "age":18}
正如我所说的,现在我只需要提取出 name
和 age
并打印出来。
那么我怎么才能做到这一点呢?
根据我的理解,我需要做一些自定义代码,在 ???
. 我可能需要创建一个自定义的POJO类,其中包含了 ObjectNode
. 但是我不知道怎么做。我看了Flink的官方文档,也试着在google上搜索如何为Flink创建一个自定义的POJO,但我还是搞不清楚。
能否请你给我举个例子?
你为什么不简单地使用一些更有意义的东西,而不是 JavaSerializer
? 也许是来自 此处.
然后,你可以用你想使用的字段创建一个POJO,并简单地反序列化JSON数据到你的POJO,而不是在你的POJO上。ObjectNode
此外,如果有一些特殊的原因,您需要拥有 ObjectNode
在反序列化上,你可以简单地做一些像:
//I assume You have created the class named MyPojo
dataStream.map(new MapFunction<ObjectNode, MyPojo>() {
ObjectMapper mapper = new ObjectMapper();
@Override
public MyPojo map(final ObjectNode value) throws Exception {
mapper.readValue(value.asText(), MyPojo.class)
}
})