我试图了解水印在 FlinkSQL 中的工作原理。我创建了下表:
CREATE TABLE currency_rates (
currency STRING,
conversion_rate STRING,
update_time TIMESTAMP(3),
WATERMARK FOR update_time AS update_time,
PRIMARY KEY(currency) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'currency_rates',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'currency_rates1',
'key.format' = 'raw',
'value.format' = 'avro',
'properties.auto.offset.reset' = 'earliest',
'value.fields-include' = 'ALL',
'scan.watermark.idle-timeout' = '1000'
);
CREATE TABLE orders (
order_id STRING,
price DECIMAL(32,2),
currency STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'orders1',
'value.format' = 'avro',
'properties.auto.offset.reset' = 'latest',
'value.fields-include' = 'ALL'
);
为了测试这一点,我运行以下查询:
SELECT
orders.order_id,
orders.price,
orders.currency,
currency_rates.conversion_rate,
orders.order_time,
currency_rates.update_time
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;
这些是我正在制作的插入物:
INSERT INTO currency_rates VALUES ('USD', 'value 1', CURRENT_TIMESTAMP);
INSERT INTO orders VALUES ('ORD001', 100.00, 'USD', CURRENT_TIMESTAMP);
INSERT INTO orders VALUES ('ORD002', 100.00, 'USD', CURRENT_TIMESTAMP);
然后,我跑:
INSERT INTO currency_rates VALUES ('USD', 'value 2', CURRENT_TIMESTAMP);
INSERT INTO orders VALUES ('ORD007', 2000.00, 'USD', CURRENT_TIMESTAMP - interval '5' minutes);
INSERT INTO orders VALUES ('ORD003', 100.00, 'USD', CURRENT_TIMESTAMP);
并且:
我不明白为什么会出现记录 ORD007,因为它插入时延迟了 5 分钟,所以我认为不应该包含它,因为它迟到了。
仅仅因为一条记录被认为是迟到并不意味着它会被忽略。只有当水印在处理较晚的记录之前实际前进得足够远时,该记录才不会影响查询的结果。
默认情况下,在 Flink SQL 中,水印每 200 毫秒(挂钟时间)前进一次。许多记录可以在水印之间进行处理,从而使许多可能迟到的记录不会被视为迟到。
为了避免这种不确定性,在开源 Flink SQL 上,您可以设置选项
'scan.watermark.emit.strategy'='on-event'
,但这会导致性能损失。