[我正在使用Apache Flink尝试将JSON记录从Kafka获取到InfluxDB,然后在处理过程中将它们从一个JSON记录拆分为多个InfluxDB点。
我找到了flatMap
变换,感觉很符合目的。核心代码如下:
DataStream<InfluxDBPoint> dataStream = stream.flatMap(new FlatMapFunction<JsonConsumerRecord, InfluxDBPoint>() {
@Override
public void flatMap(JsonConsumerRecord record, Collector<InfluxDBPoint> out) throws Exception {
Iterator<Entry<String, JsonNode>> iterator = //...
while (iterator.hasNext()) {
// extract point from input
InfluxDBPoint point = //...
out.collect(point);
}
}
});
出于某种原因,我只能将收集到的这些点之一流式传输到数据库中。
即使我打印出所有映射的条目,似乎也可以正常工作:dataStream.print()
产生:
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@144fd091
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@57256d1
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@28c38504
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@2d3a66b3
我误解了flatMap
还是Influx连接器中可能存在一些错误?
问题实际上与以下事实有关:Influx中的系列(由其标签集和度量as seen here定义)只能有一个点每时间,因此即使我的字段有所不同,最后一点也覆盖了所有点具有相同时间值的先前点。