使用基于地理空间邻近度的滚动窗口聚合来连接时态表

问题描述 投票:0回答:1

我正在尝试在 Flink SQL 中将车站状态数据与天气更新结合起来。目标是:

  • 计算 1 分钟窗口内每个站点的平均状态指标
  • 加入基于时间戳和地理邻近度的天气数据

这是我想要实现的目标的粗略图: Architecture diagram

到目前为止,我已经成功地对

dock_status_update
执行聚合并与
bike_station
连接,但随后与
weather_update
的连接我没有得到正确的结果。

这是我到目前为止的查询:

SELECT
    a.station_id,
    b.name AS station_name,
    b.latitude,
    b.longitude,
    a.average_available_docks,
    a.window_start,
    a.window_end,
    w.`timestamp`,
    w.temperature,
    w.humidity
FROM (
    SELECT 
        station_id, window_start, window_end, 
        AVG(available_docks) AS average_available_docks 
    FROM 
        TABLE(TUMBLE(TABLE dock_status_update, 
        DESCRIPTOR(`timestamp`), 
        INTERVAL '1' MINUTES))
    ) 
    GROUP BY 
    station_id, 
    window_start, 
    window_end
) a
JOIN bike_stations b ON a.station_id = b.station_id
LEFT JOIN weather_update FOR SYSTEM_TIME AS OF a.window_end w
    ON CAST(w.latitude * 100 AS INT) = CAST(b.latitude * 100 AS INT)
    AND CAST(w.longitude * 100 AS INT) = CAST(b.longitude * 100 AS INT);

但是,我收到此错误:

org.apache.flink.table.api.ValidationException: Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's time attribute field

如有任何建议,我们将不胜感激。

apache-kafka apache-flink flink-streaming flink-sql
1个回答
0
投票

尝试使用

window_end
,而不是
window_time
。希望这会起作用。
window_time
window_end
基本相同,但定义了水印,这使得该字段可以用于后续需要水印的时间操作。

© www.soinside.com 2019 - 2024. All rights reserved.