尽管Flink有一些内置的工具来处理迟到的数据,比如允许迟到,但我想自己处理后期数据。例如,我想监视迟到的事件或只是将它们保存到数据库中。
我怎样才能做到这一点?
ProcessFunctions(ProcessFunction
,KeyedProcessFunction
等)通过TimerService
对象提供对记录和Context
的事件时间戳的访问。 TimerService
可以访问当前的水印。
您可以通过比较事件时间戳和水印来识别延迟记录。如果时间戳小于或等于水印,则事件延迟。
由您决定如何处理延迟事件。您可以标记它们,可以丢弃它们,通过侧输出发射它们,或者使用它们执行任何类型的计算。
通常在窗口运算符中使用延迟和水印。如果你正在使用窗口操作符,你可以像这样使用sideoutput:
val windowStream = eventStream.keyBy(output => output.rule)
.window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.MINUTES)))
.sideOutputLateData(lateOutputTag)
并从sideoutput获取后期元素,如下所示:
windowStream.getSideOutput(lateOutputTag).print()