在Flink中,有什么方法可以对DataStream进行聚合并存储聚合值吗?

问题描述 投票:0回答:1

我是 Flink 新手。我有这个用例 我有一个双精度数据流,我正在尝试获取整个数据流的总和。

我使用过ReduceFunction和AggregateFunction。

案例一: 在Reduce函数中,输出是滚动Sum的数据流。为了获得最终的 Sum,我必须遍历outputStream,最后一个值将是我的total。 就我而言,我不想迭代整个数据流来获得最终的总和,也不想使用额外的数据流来存储最终的聚合值。

案例2: 我只能在 countWindow() 之后访问aggregate()方法,并且在countWindow()中我们必须传递大小。因为我不知道我的数据流的大小(用户将向我发送数据),所以我无法使用它。

下面是我的ReduceFunction的实现

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<Double> dataStream = env.fromElements(2.00, 3.00, 4.00, 11.00, 13.00, 14.00);

DataStream<Double> singleOutputStreamOperator = dataStream.keyBy(value -> "key").reduce(new ReduceFunction<Double>() {
  @Override
  public Double reduce(Double aDouble, Double t1) throws Exception {
    return aDouble+ t1;
  }
});

singleOutputStreamOperator.print();
DataStream.Collector<Double> doubleCollector = new DataStream.Collector<>();
singleOutputStreamOperator.collectAsync(doubleCollector);
singleOutputStreamOperator.executeAndCollect("Aggregation");

Double result = null;
while( doubleCollector.getOutput().hasNext() ) {
  result = doubleCollector.getOutput().next();
  System.out.println("result = " + result);
}

输出如下:2.0、5.0、9.0、20.0、33.0、47.0。 我只想获取 47.0 作为我的聚合值并将其存储在变量中并将其提供给用户。

有没有更好的方法来解决我的用例?

aggregate apache-flink flink-streaming
1个回答
0
投票

一般来说,流应用程序并不总是产生最终结果——有些应用程序是要连续运行的。但有时你知道输入是有界的,你关心的只是最终结果。在这些情况下,以批处理模式运行应用程序是有意义的。

这是一个例子:

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class DataStreamJob {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = 
          StreamExecutionEnvironment.getExecutionEnvironment();

        env.setRuntimeMode(RuntimeExecutionMode.BATCH);

        DataStreamSource<Double> dataStream =
          env.fromElements(2.00, 3.00, 4.00, 11.00, 13.00, 14.00);

        dataStream.map(e -> new Tuple2<Integer, Double>(1, e))
                .returns(Types.TUPLE(Types.INT, Types.DOUBLE))
                .keyBy(t -> t.f0)
                .reduce((ReduceFunction<Tuple2<Integer, Double>>)
                  (t0, t1) -> new Tuple2<Integer, Double>(t0.f0, t0.f1 + t1.f1))
                .map(t -> t.f1)
                .print();

        env.execute();
    }
}

运行时,它只打印最终结果(47.0)。

有关运行时执行模式的更多信息,请参阅文档

© www.soinside.com 2019 - 2024. All rights reserved.