它看起来像Flink 的 Table API 目前会删除延迟事件。我见过一些利用 DataStreaming API 的示例,但我的整个 Flink 应用程序使用 Table API,因此我试图找到一种使用后者处理延迟事件的方法。我在 3 年前就发现了这个方法,但我不确定它是否还能用。
我能够使用帖子中第二个链接的 Table API 将所有延迟事件转储到接收器:
INSERT INTO lateEventsTable
SELECT *
FROM sourcedEventsTable
WHERE CURRENT_WATERMARK(eventTime) IS NOT NULL
AND eventTime <= CURRENT_WATERMARK(eventTime)
如果您希望能够对延迟事件执行更多操作,则需要使用 DataStream API (AFAIK)。