我想知道是否可以在 Apache Flink 滑动窗口中区分元素和偏移时间。让我更详细地解释一下。
我想从例如:13:00 到 13:59:59 中获取元素。不过,我还添加了一个偏移/幻灯片,因此我还有 12:30 的元素,这样我就可以根据前一小时的值计算内容,但我不想再次计算它们(它已经在前一个窗口期间计算过)。是否可以直接跳过前一小时元素的计算?
我的流定义:
var myStream= env.fromSource(source, watermarkStrategy, "data")
.keyBy(MyType::getKey)
.connect(otherStream)
.process(new EnrichWithMyTypeProcessFunction())
.assignTimestampsAndWatermarks(watermarkStrategyWithMyType)
.keyBy(elem -> elem.getKey())
.window(SlidingEventTimeWindows.of(Time.minutes(90), Time.minutes(60), Time.minutes(-30)))
.trigger(EventTimeTrigger.create())
.sideOutputLateData(lateDataTag)
.process(new CalculateFromHourFunction());
感谢您的任何提示!
我认为你可以使用 30 分钟的幻灯片来完成此操作,而不是
.trigger()
。在您的 CalculateFromHourFunction
中,您将看到 90 分钟窗口中的所有元素(在您的示例中为 12:30 到 13:59:59.999),因此您可以进行所需的任何计算,以告知 1 小时窗口的值从前 30 分钟开始。
这不是最有效的方法,因为每个窗口元素将被收集和处理两次,一次用于 30 分钟的预窗口,第二次用于 1 小时的实际窗口。您可以使用自定义
KeyedProcessFunction
做得更好,您可以在其中执行自己的窗口操作并使用状态来存储 30 分钟的窗口前结果,但这会复杂得多。