我是 Flink 新手。我有这个用例 我有一个双精度数据流,我试图获取整个数据流的总和。 我使用过ReduceFunction和AggregateFunction。
案例一: 在Reduce函数中,输出是滚动Sum的数据流。为了获得最终的 Sum,我必须遍历outputStream,最后一个值将是我的total。 就我而言,我不想迭代整个数据流来获得最终的总和,也不想使用额外的数据流来存储最终的聚合值。
案例2: 我只能在 countWindow() 之后访问aggregate()方法,并且在countWindow()中我们必须传递大小。因为我不知道我的数据流的大小(用户将向我发送数据),所以我无法使用它。
下面是我的ReduceFunction的实现
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Double> dataStream = env.fromElements(2.00, 3.00, 4.00, 11.00, 13.00, 14.00);
DataStream<Double> singleOutputStreamOperator = dataStream.keyBy(value -> "key").reduce(new ReduceFunction<Double>() {
@Override
public Double reduce(Double aDouble, Double t1) throws Exception {
return aDouble+ t1;
}
});
singleOutputStreamOperator.print();
DataStream.Collector<Double> doubleCollector = new DataStream.Collector<>();
singleOutputStreamOperator.collectAsync(doubleCollector);
singleOutputStreamOperator.executeAndCollect("Aggregation");
Double result = null;
while( doubleCollector.getOutput().hasNext() ) {
result = doubleCollector.getOutput().next();
System.out.println("result = " + result);
}
输出如下:2.0、5.0、9.0、20.0、33.0。 我只想获取 33.0 作为我的聚合值并将其存储在变量中并将其提供给用户。
有没有更好的方法来解决我的用例?