假定我有一个events的数据流,并且我想将其广播到链接到另一个(丰富)地图操作符(map2)的(丰富)地图操作符(map1)。两张地图的平行度相同。我想要的是map1的每个并行实例的输出都转到map2的一个并行实例(即,两个地图之间没有广播)。到目前为止,这是我所做的,但是我不确定这在逻辑上是否正确。可以吗?
val trainedStream = events.broadcast.map(new Mapper1(...)).setParallelism(par)
trainedStream.startNewChain.map(new Mapper2(...)).setParallelism(par)
后续问题:两个链接的子任务/ map1和map2的并行实例的SubtaskIndex(从RuntimeContext.getIndexOfThisSubtask接收)是否相同?有没有办法检查这个?
代码在Scala中,但我猜Java也是如此
只要有可能,束缚都会自动在Flink中发生。因此,在您的示例中,只需使用
就足够了val trainedStream = events.broadcast.map(new Mapper1(...)).map(new Mapper2(...))
然后在env
上设置并行度。
顺便说一句,您确定要广播事件吗?默认情况下,并行处理Datastream
。广播事件非常很不寻常,因为事件将根据并行性进行多次处理。]
后续问题:两个链接的子任务/ map1和map2的并行实例的SubtaskIndex(从RuntimeContext.getIndexOfThisSubtask接收)是否相同?有没有办法检查这个?
子任务索引对于链接的运算符来说是相同的,因为它们驻留在同一任务中(因此它们甚至不能具有不同的索引)。如果您有任务
mapper1 -> mapper2
,则可以看到链接成功。