如何使用Flink对乱序事件时间流进行排序

问题描述 投票:0回答:1

This question涵盖了如何使用Flink SQL对无序流进行排序,但是我宁愿使用DataStream API。 One solution是通过使用ProcessFunction来执行此操作的,该函数使用PriorityQueue缓冲事件,直到水印指示它们不再无序为止,但是在RocksDB状态后端中的执行效果很差(问题是对PriorityQueue的每次访问将需要ser / de)。无论使用哪个状态后端,如何都能有效地做到这一点?

apache-flink flink-streaming
1个回答
0
投票

一种更好的方法(Flink的SQL和CEP库在内部完成的工作或多或少是在MapState中缓存乱序流,如下所示:

如果要分别对每个键进行排序,请先对流进行键设置。否则,对于全局排序,请通过常量对流进行键控,以便可以使用KeyedProcessFunction来实现排序。

在该处理函数的open方法中,实例化一个MapState对象,其中的键是时间戳,值是都具有相同时间戳的流元素的列表。

onElement方法中:

  • 如果事件迟到,则将其丢弃或发送到侧面输出
  • 否则,将事件附加到对应于其时间戳的地图条目上]
  • 为该事件的时间戳注册事件时间计时器
  • [onTimer被调用时,该时间戳记在映射中的条目已准备好作为已排序流的一部分被释放-因为当前水印现在指示所有较早的事件都应该已经被处理。向下游发送事件后,请不要忘记清除地图中的条目。

© www.soinside.com 2019 - 2024. All rights reserved.