如何使用Flink的table API处理延迟事件?

问题描述 投票:0回答:1

看起来像Flink 的 Table API 目前会删除延迟事件。我见过一些利用 DataStreaming API 的示例,但我的整个 Flink 应用程序使用 Table API,因此我试图找到一种使用后者处理延迟事件的方法。我在 3 年前就发现了这个方法,但我不确定它是否还能用。

apache-flink flink-sql
1个回答
0
投票

我能够使用帖子中第二个链接的 Table API 将所有延迟事件转储到接收器:

INSERT INTO lateEventsTable 
    SELECT * 
    FROM sourcedEventsTable
    WHERE CURRENT_WATERMARK(eventTime) IS NOT NULL
        AND eventTime <= CURRENT_WATERMARK(eventTime)

如果您希望能够对延迟事件执行更多操作,则需要使用 DataStream API (AFAIK)。

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.