如何将FlatMap映射到Apache Flink中的数据库?

问题描述 投票:1回答:1

[我正在使用Apache Flink尝试将JSON记录从Kafka获取到InfluxDB,然后在处理过程中将它们从一个JSON记录拆分为多个InfluxDB点。

我找到了flatMap变换,感觉很符合目的。核心代码如下:

DataStream<InfluxDBPoint> dataStream = stream.flatMap(new FlatMapFunction<JsonConsumerRecord, InfluxDBPoint>() {
    @Override
    public void flatMap(JsonConsumerRecord record, Collector<InfluxDBPoint> out) throws Exception {
        Iterator<Entry<String, JsonNode>> iterator = //...

        while (iterator.hasNext()) {
            // extract point from input
            InfluxDBPoint point = //...

            out.collect(point);
        }
    }
});

出于某种原因,我只能将收集到的这些点之一流式传输到数据库中。

即使我打印出所有映射的条目,似乎也可以正常工作:dataStream.print()产生:

org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@144fd091
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@57256d1
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@28c38504
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@2d3a66b3

我误解了flatMap还是Influx连接器中可能存在一些错误?

java apache-flink flink-streaming
1个回答
0
投票

问题实际上与以下事实有关:Influx中的系列(由其标签集和度量as seen here定义)只能有一个点每时间,因此即使我的字段有所不同,最后一点也覆盖了所有点具有相同时间值的先前点。

© www.soinside.com 2019 - 2024. All rights reserved.