Apache Flink:如何将自定义逻辑应用于延迟事件?

问题描述 投票:1回答:2

尽管Flink有一些内置的工具来处理迟到的数据,比如允许迟到,但我想自己处理后期数据。例如,我想监视迟到的事件或只是将它们保存到数据库中。

我怎样才能做到这一点?

streaming apache-flink flink-streaming
2个回答
0
投票

ProcessFunctions(ProcessFunctionKeyedProcessFunction等)通过TimerService对象提供对记录和Context的事件时间戳的访问。 TimerService可以访问当前的水印。

您可以通过比较事件时间戳和水印来识别延迟记录。如果时间戳小于或等于水印,则事件延迟。

由您决定如何处理延迟事件。您可以标记它们,可以丢弃它们,通过侧输出发射它们,或者使用它们执行任何类型的计算。


0
投票

通常在窗口运算符中使用延迟和水印。如果你正在使用窗口操作符,你可以像这样使用sideoutput:

val windowStream = eventStream.keyBy(output => output.rule)
  .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.MINUTES)))
  .sideOutputLateData(lateOutputTag)

并从sideoutput获取后期元素,如下所示:

windowStream.getSideOutput(lateOutputTag).print()
© www.soinside.com 2019 - 2024. All rights reserved.