我有一个类型为DataStream[(String, somecaseclass)]
的Flink DataStream。我想分组在Tuple
的第一个字段String
并创建一个ListBuffer[somecaseclass]
。以下是我的尝试:
val emptylistbuffer = new ListBuffer[somecaseclass]
inputstream
.keyBy(0)
.fold(emptylistbuffer){case(outputbuffer,b) => {outputbuffer+=b._2}}
但是这给了我每行的输出,这意味着如果有10个输入行,第一行输出行只是在第一行连接,第十行给出了十行的连接输出。但是,我只想要第十排。我检查了Flink DataStream
几乎所有的转换,但没有什么适合用例。
输入:
(filename1.dat,somecaseclass("abc","1",2))
(filename1.dat,somecaseclass("dse","2",3))
(filename1.dat,somecaseclass("daa","1",4))
预期产量:
(filename.dat,ListBuffer(somecaseclass("abc","1",2),somecaseclass("dse","2",3),somecaseclass("daa","1",4)))
DataStream API认为DataStream
是无界的。这意味着DataStream
可能提供无限数量的记录。因此,在收到所有记录之后,不可能“仅仅”发出聚合结果(在您的情况下是完整的ListBuffer
),因为可能有更多记录需要聚合(添加到ListBuffer
)。原则上,DataStream
上的聚合永远不会产生最终结果,因为可能会有更多记录。由于这不太实用,Flink的DataStream API为每个传入记录生成一个新结果。
在无界流上计算聚合的常用方法是窗口。 Windows在流上定义有界部分,可以在其上计算聚合并发出最终结果。 Flink根据时间或记录计数提供内置窗口。例如,在1小时的翻滚窗口上的记录收集功能将收集在一小时内到达的所有记录。
请查看有关different window types的Flink文档以及如何使用它们。