我正在尝试使用Flink进行从CSV文件加载的(已排序)时间戳事件的基本聚合。
我告诉Flink使用事件时间:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
然后我在KeyedStream上使用时间窗口
val distances = signals
.assignAscendingTimestamps(_.ts)
.map(s => (s.mmsi, s.ts, getPortDistance(s)))
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
.sum(2).print()
问题是将窗口更改为10分钟实际上会在经过一段时间后打印结果!
我的理解是,通过明确告诉Flink使用时间戳字段作为事件时间,操作将不依赖于机器上的实时。我在这里错过了什么吗?
首先,您必须了解水印以及如何生成水印。
什么是水印?
通常,水印是一种声明,通过流中的那一点,到达某个时间戳的所有事件都应该到达。一旦水印到达操作员,操作员就可以将其内部事件时钟提前到水印的值。有关详细信息,请查看official documents。
如何生成水印?
因为你调用assignAscendingTimestamps函数,这意味着你的水印是(最新收到的元素的时间戳 - 1)。因此,您将获得一个上升水印,无法检索无序元素。
怎么解决这个?
定义您自己的水印时间戳记分配器。您可以查看“assignAscendingTimestamps”的详细实现,并尝试编写自己的。