我正在尝试在 Flink SQL 中将车站状态数据与天气更新结合起来。目标是:
到目前为止,我已经成功地对
dock_status_update
执行聚合并与 bike_station
连接,但随后与 weather_update
的连接我没有得到正确的结果。
这是我到目前为止的查询:
SELECT
a.station_id,
b.name AS station_name,
b.latitude,
b.longitude,
a.average_available_docks,
a.window_start,
a.window_end,
w.`timestamp`,
w.temperature,
w.humidity
FROM (
SELECT
station_id, window_start, window_end,
AVG(available_docks) AS average_available_docks
FROM
TABLE(TUMBLE(TABLE dock_status_update,
DESCRIPTOR(`timestamp`),
INTERVAL '1' MINUTES))
)
GROUP BY
station_id,
window_start,
window_end
) a
JOIN bike_stations b ON a.station_id = b.station_id
LEFT JOIN weather_update FOR SYSTEM_TIME AS OF a.window_end w
ON CAST(w.latitude * 100 AS INT) = CAST(b.latitude * 100 AS INT)
AND CAST(w.longitude * 100 AS INT) = CAST(b.longitude * 100 AS INT);
但是,我收到此错误:
org.apache.flink.table.api.ValidationException: Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's time attribute field
如有任何建议,我们将不胜感激。
尝试使用
window_end
,而不是 window_time
。希望这会起作用。 window_time
与 window_end
基本相同,但定义了水印,这使得该字段可以用于后续需要水印的时间操作。