下面是我写的简单代码:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val list = new ListBuffer[Tuple3[String,Int,Int]]
val random = new Random()
for(x<- 0 to 4){
if(random.nextBoolean()){list.append(("INSERT",2,1))}else{
list.append(("UPDATE",2,1))
}
}
val data = env.fromElements(list).flatMap(_.toList)
val keyed=data.keyBy(0).sum(1)
keyed.print()
val reKeyed=keyed.keyBy(0).sum(2)
reKeyed.print()
env.execute()
dataStream re [keyed应该将keyed作为输入数据源。但是,打印的结果表明它们来自原始数据源。如果第二次调用KeyBy而不调用sum方法,打印的结果是正确的。所以有什么问题?
keyBy
,则第二次调用将覆盖第一个调用,因此这些元素可能最终位于与以前不同的TaskManager上。对于这种情况,您实际上是在说要使用DataStreamUtils.reinterpretAsKeyedStream
,它应该完全按照您所描述的那样工作,这意味着它不应更改先前键入的Datastream
的分区。