我是 Flink 新手。我有这个用例 我有一个双精度数据流,我正在尝试获取整个数据流的总和。
我使用过ReduceFunction和AggregateFunction。
案例一: 在Reduce函数中,输出是滚动Sum的数据流。为了获得最终的 Sum,我必须遍历outputStream,最后一个值将是我的total。 就我而言,我不想迭代整个数据流来获得最终的总和,也不想使用额外的数据流来存储最终的聚合值。
案例2: 我只能在 countWindow() 之后访问aggregate()方法,并且在countWindow()中我们必须传递大小。因为我不知道我的数据流的大小(用户将向我发送数据),所以我无法使用它。
下面是我的ReduceFunction的实现
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Double> dataStream = env.fromElements(2.00, 3.00, 4.00, 11.00, 13.00, 14.00);
DataStream<Double> singleOutputStreamOperator = dataStream.keyBy(value -> "key").reduce(new ReduceFunction<Double>() {
@Override
public Double reduce(Double aDouble, Double t1) throws Exception {
return aDouble+ t1;
}
});
singleOutputStreamOperator.print();
DataStream.Collector<Double> doubleCollector = new DataStream.Collector<>();
singleOutputStreamOperator.collectAsync(doubleCollector);
singleOutputStreamOperator.executeAndCollect("Aggregation");
Double result = null;
while( doubleCollector.getOutput().hasNext() ) {
result = doubleCollector.getOutput().next();
System.out.println("result = " + result);
}
输出如下:2.0、5.0、9.0、20.0、33.0、47.0。 我只想获取 47.0 作为我的聚合值并将其存储在变量中并将其提供给用户。
有没有更好的方法来解决我的用例?
一般来说,流应用程序并不总是产生最终结果——有些应用程序是要连续运行的。但有时你知道输入是有界的,你关心的只是最终结果。在这些情况下,以批处理模式运行应用程序是有意义的。
这是一个例子:
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class DataStreamJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
DataStreamSource<Double> dataStream =
env.fromElements(2.00, 3.00, 4.00, 11.00, 13.00, 14.00);
dataStream.map(e -> new Tuple2<Integer, Double>(1, e))
.returns(Types.TUPLE(Types.INT, Types.DOUBLE))
.keyBy(t -> t.f0)
.reduce((ReduceFunction<Tuple2<Integer, Double>>)
(t0, t1) -> new Tuple2<Integer, Double>(t0.f0, t0.f1 + t1.f1))
.map(t -> t.f1)
.print();
env.execute();
}
}
运行时,它只打印最终结果(47.0)。
有关运行时执行模式的更多信息,请参阅文档。