当前使用 Flink 事件时间,但有时流会空闲,在此期间我希望窗口关闭,以便数据得到输出,而不是等待另一个事件通过。我可以在流空闲时添加定期水印吗?我尝试使用AssignerWithPeriodicWatermarks,但现在已弃用,当前发出周期性水印的方法是什么?该文档是垃圾,无法让我了解如何实现它。这是我当前设置的来源:
```val SomeStream = env
.addSource(new FlinkKafkaConsumer011[String](Pattern.compile(config.getInputFaultStream), new SimpleStringSchema(), properties))
.map(x => JsonUtil.fromJson[SomeInput](x) match {
case Success(value) => value
case Failure(f) => null
})
.filter(x => x != null && x.timestamp != null && x.faultStatus != null && fixDate.isInstant(x.timestamp))
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(60))
.withIdleness(Duration.ofSeconds(300))
.withTimestampAssigner(new SerializableTimestampAssigner[SomeInput] {
override def extractTimestamp(element: SomeInput, recordTimestamp: Long): Long = fixDate.makeInstant(element.timestamp).toEpochMilli
}))
.name("ReadFaultEventInput")```
这种情况(所有分区都完全空闲)可能很难处理。您使用的
withIdleness
方法仅在至少一个分区上仍然有消息时才有效。
如果您可以安排保持活动消息,这可能是在缺乏真实事件流量的情况下保持水印前进的好方法。
否则,您可以实施水印策略,该策略使用处理时间计时器来推进水印,尽管缺少事件。这可能有点冒险,因为它无法区分中断和安静期。还有一些风险是,当活动恢复时,有些活动会迟到,因为它们落后于人为的高级水印。
这里是如何使用旧水印界面执行此操作的示例。恐怕我没有一个示例来展示如何使用较新的 WatermarkStrategy 界面执行此操作。也许理解其工作原理的最佳方法是阅读代码。您可以从https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/BoundedOutOfOrdernessWatermarks.java开始。