如何为Apache Flink创建一个自定义的POJO?

问题描述 投票:0回答:1

我正在使用Flink来处理一些来自某个Data Source的JSON格式的数据。

现在,我的处理过程很简单:从JSON格式的数据中提取每个元素,并将它们打印成日志文件。

这是我的一段代码。

// create proper deserializer to deserializer the JSON-format data into ObjectNode
PravegaDeserializationSchema<ObjectNode> adapter = new PravegaDeserializationSchema<>(ObjectNode.class, new JavaSerializer<>());
// create connector to receive data from Pravega
FlinkPravegaReader<ObjectNode> source = FlinkPravegaReader.<ObjectNode>builder()
    .withPravegaConfig(pravegaConfig)
    .forStream(stream)
    .withDeserializationSchema(adapter)
    .build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> dataStream = env.addSource(source).name("Pravega Stream");
dataStream.???.print();

假设从Pravega来的数据是这样的: {"name":"titi", "age":18}

正如我所说的,现在我只需要提取出 nameage 并打印出来。

那么我怎么才能做到这一点呢?

根据我的理解,我需要做一些自定义代码,在 ???. 我可能需要创建一个自定义的POJO类,其中包含了 ObjectNode. 但是我不知道怎么做。我看了Flink的官方文档,也试着在google上搜索如何为Flink创建一个自定义的POJO,但我还是搞不清楚。

能否请你给我举个例子?

json apache-flink flink-streaming pojo
1个回答
0
投票

你为什么不简单地使用一些更有意义的东西,而不是 JavaSerializer? 也许是来自 此处.

然后,你可以用你想使用的字段创建一个POJO,并简单地反序列化JSON数据到你的POJO,而不是在你的POJO上。ObjectNode

此外,如果有一些特殊的原因,您需要拥有 ObjectNode 在反序列化上,你可以简单地做一些像:

//I assume You have created the class named MyPojo
dataStream.map(new MapFunction<ObjectNode, MyPojo>() {
            ObjectMapper mapper = new ObjectMapper();

            @Override
            public MyPojo map(final ObjectNode value) throws Exception {
                mapper.readValue(value.asText(), MyPojo.class)
            }
})
© www.soinside.com 2019 - 2024. All rights reserved.