有什么方法可以在 Apache Flink 中对 DataStream 进行聚合并存储聚合值吗?

问题描述 投票:0回答:0

我是 Flink 新手。我有这个用例 我有一个双精度数据流,我试图获取整个数据流的总和。 我使用过ReduceFunction和AggregateFunction。

案例一: 在Reduce函数中,输出是滚动Sum的数据流。为了获得最终的 Sum,我必须遍历outputStream,最后一个值将是我的total。 就我而言,我不想迭代整个数据流来获得最终的总和,也不想使用额外的数据流来存储最终的聚合值。

案例2: 我只能在 countWindow() 之后访问aggregate()方法,并且在countWindow()中我们必须传递大小。因为我不知道我的数据流的大小(用户将向我发送数据),所以我无法使用它。

下面是我的ReduceFunction的实现

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<Double> dataStream = env.fromElements(2.00, 3.00, 4.00, 11.00, 13.00, 14.00);

DataStream<Double> singleOutputStreamOperator = dataStream.keyBy(value -> "key").reduce(new ReduceFunction<Double>() {
  @Override
  public Double reduce(Double aDouble, Double t1) throws Exception {
    return aDouble+ t1;
  }
});

singleOutputStreamOperator.print();
DataStream.Collector<Double> doubleCollector = new DataStream.Collector<>();
singleOutputStreamOperator.collectAsync(doubleCollector);
singleOutputStreamOperator.executeAndCollect("Aggregation");

Double result = null;
while( doubleCollector.getOutput().hasNext() ) {
  result = doubleCollector.getOutput().next();
  System.out.println("result = " + result);
}

输出如下:2.0、5.0、9.0、20.0、33.0。 我只想获取 33.0 作为我的聚合值并将其存储在变量中并将其提供给用户。

有没有更好的方法来解决我的用例?

aggregate apache-flink flink-streaming
© www.soinside.com 2019 - 2024. All rights reserved.