This question涵盖了如何使用Flink SQL对无序流进行排序,但是我宁愿使用DataStream API。 One solution是通过使用ProcessFunction来执行此操作的,该函数使用PriorityQueue缓冲事件,直到水印指示它们不再无序为止,但是在RocksDB状态后端中的执行效果很差(问题是对PriorityQueue的每次访问将需要ser / de)。无论使用哪个状态后端,如何都能有效地做到这一点?
一种更好的方法(Flink的SQL和CEP库在内部完成的工作或多或少是在MapState中缓存乱序流,如下所示:
如果要分别对每个键进行排序,请先对流进行键设置。否则,对于全局排序,请通过常量对流进行键控,以便可以使用KeyedProcessFunction来实现排序。
在该处理函数的open
方法中,实例化一个MapState对象,其中的键是时间戳,值是都具有相同时间戳的流元素的列表。
在onElement
方法中:
[onTimer
被调用时,该时间戳记在映射中的条目已准备好作为已排序流的一部分被释放-因为当前水印现在指示所有较早的事件都应该已经被处理。向下游发送事件后,请不要忘记清除地图中的条目。