如果这是微不足道的,请原谅我,但是我在文档之外没有太多使用 Flink 的经验。
假设有一个购买流。每次购买间隔 1 秒(或任意数量),并通过
orderID
关联。
事件列表示例:
[
{
"item": "bananas",
"cost": "9.99",
"orderId": 12345,
"timestamp": "2024-11-06T12:00:00Z"
},
{
"item": "apples",
"cost": "5.49",
"orderId": 12345,
"timestamp": "2024-11-06T12:00:01Z"
},
{
"item": "oranges",
"cost": "7.29",
"orderId": 12345,
"timestamp": "2024-11-06T12:00:02Z"
},
{
"item": "grapes",
"cost": "12.99",
"orderId": 12345,
"timestamp": "2024-11-06T12:00:03Z"
}
]
当一个事件到达时,我想启动一项聚合作业,并在一定的时间限制内等待同一 orderId 的后续事件。结果将是:
{
"total": "35.76",
"items": ["bananas", "apples", "oranges", "grapes"]
}
例如,香蕉到达,这会触发作业等待可能与相同订单 ID 匹配的任何其他商品。随着每个新项目的聚合,超时时间会延长。一旦超时达到 0,聚合事件就会被发送出去。
这是可以在高负载下轻松完成的事情吗?不想使用窗口的想法是,理想情况下,我希望等待时间最短,以便尽快发送聚合。
您描述的分组语义正是会话窗口所做的。
这可以通过 Flink SQL 中的查询来完成:
SELECT orderId, SUM(cost) AS total, ARRAY_AGG(item) AS items
FROM TABLE(
SESSION(DATA => TABLE orders,
TIMECOL => DESCRIPTOR(timestamp),
GAP => INTERVAL '1' SECOND))
GROUP BY orderId, window_start, window_end;