滑动窗口中如何区分元素和偏移时间?

问题描述 投票:0回答:1

我想知道是否可以在 Apache Flink 滑动窗口中区分元素和偏移时间。让我更详细地解释一下。

我想从例如:13:00 到 13:59:59 中获取元素。不过,我还添加了一个偏移/幻灯片,因此我还有 12:30 的元素,这样我就可以根据前一小时的值计算内容,但我不想再次计算它们(它已经在前一个窗口期间计算过)。是否可以直接跳过前一小时元素的计算?

我的流定义:

var myStream= env.fromSource(source, watermarkStrategy, "data")
    .keyBy(MyType::getKey)
    .connect(otherStream)
    .process(new EnrichWithMyTypeProcessFunction())
    .assignTimestampsAndWatermarks(watermarkStrategyWithMyType)
    .keyBy(elem -> elem.getKey())
    .window(SlidingEventTimeWindows.of(Time.minutes(90), Time.minutes(60), Time.minutes(-30)))
    .trigger(EventTimeTrigger.create())
    .sideOutputLateData(lateDataTag)
    .process(new CalculateFromHourFunction());

感谢您的任何提示!

apache-flink flink-streaming sliding-window
1个回答
0
投票

我认为你可以使用 30 分钟的幻灯片来完成此操作,而不是

.trigger()
。在您的
CalculateFromHourFunction
中,您将看到 90 分钟窗口中的所有元素(在您的示例中为 12:30 到 13:59:59.999),因此您可以进行所需的任何计算,以告知 1 小时窗口的值从前 30 分钟开始。

这不是最有效的方法,因为每个窗口元素将被收集和处理两次,一次用于 30 分钟的预窗口,第二次用于 1 小时的实际窗口。您可以使用自定义

KeyedProcessFunction
做得更好,您可以在其中执行自己的窗口操作并使用状态来存储 30 分钟的窗口前结果,但这会复杂得多。

© www.soinside.com 2019 - 2024. All rights reserved.