timeWindow没有考虑事件时间

问题描述 投票:0回答:1

我正在尝试使用Flink进行从CSV文件加载的(已排序)时间戳事件的基本聚合。

我告诉Flink使用事件时间:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

然后我在KeyedStream上使用时间窗口

val distances = signals
  .assignAscendingTimestamps(_.ts)
  .map(s => (s.mmsi, s.ts, getPortDistance(s)))
  .keyBy(0)
  .window(TumblingEventTimeWindows.of(Time.seconds(1)))
  .sum(2).print()

问题是将窗口更改为10分钟实际上会在经过一段时间后打印结果!

我的理解是,通过明确告诉Flink使用时间戳字段作为事件时间,操作将不依赖于机器上的实时。我在这里错过了什么吗?

apache-flink flink-streaming
1个回答
0
投票

首先,您必须了解水印以及如何生成水印。

什么是水印?

通常,水印是一种声明,通过流中的那一点,到达某个时间戳的所有事件都应该到达。一旦水印到达操作员,操作员就可以将其内部事件时钟提前到水印的值。有关详细信息,请查看official documents

如何生成水印?

因为你调用assignAscendingTimestamps函数,这意味着你的水印是(最新收到的元素的时间戳 - 1)。因此,您将获得一个上升水印,无法检索无序元素。

怎么解决这个?

定义您自己的水印时间戳记分配器。您可以查看“assignAscendingTimestamps”的详细实现,并尝试编写自己的。

© www.soinside.com 2019 - 2024. All rights reserved.