FlinkSQL 中的水印如何与延迟事件一起工作

问题描述 投票:0回答:1

我试图了解水印在 FlinkSQL 中的工作原理。我创建了下表:

  CREATE TABLE currency_rates (
  currency STRING,
  conversion_rate STRING,
  update_time TIMESTAMP(3),
  WATERMARK FOR update_time AS update_time, 
  PRIMARY KEY(currency) NOT ENFORCED
  ) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'currency_rates',
  'properties.bootstrap.servers' = 'kafka:9092',
  'properties.group.id' = 'currency_rates1',
  'key.format' = 'raw',
  'value.format' = 'avro',
  'properties.auto.offset.reset' = 'earliest',
  'value.fields-include' = 'ALL',
  'scan.watermark.idle-timeout' = '1000'
  );  


  CREATE TABLE orders (
  order_id    STRING,
  price       DECIMAL(32,2),
  currency    STRING,
  order_time  TIMESTAMP(3),
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS 
  ) WITH (
  'connector' = 'kafka',
  'topic' = 'orders',
  'properties.bootstrap.servers' = 'kafka:9092',
  'properties.group.id' = 'orders1',
  'value.format' = 'avro',
  'properties.auto.offset.reset' = 'latest',
  'value.fields-include' = 'ALL'
  );
  

为了测试这一点,我运行以下查询:

  SELECT
  orders.order_id,
  orders.price,
  orders.currency,
  currency_rates.conversion_rate,
  orders.order_time,
  currency_rates.update_time
  FROM orders
  LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
  ON orders.currency = currency_rates.currency;

这些是我正在制作的插入物:

INSERT INTO currency_rates VALUES ('USD', 'value 1', CURRENT_TIMESTAMP);
INSERT INTO orders VALUES ('ORD001', 100.00, 'USD', CURRENT_TIMESTAMP);
INSERT INTO orders VALUES ('ORD002', 100.00, 'USD', CURRENT_TIMESTAMP);

显示如下结果: enter image description here

然后,我跑:

INSERT INTO currency_rates VALUES ('USD', 'value 2', CURRENT_TIMESTAMP);
INSERT INTO orders VALUES ('ORD007', 2000.00, 'USD', CURRENT_TIMESTAMP - interval '5' minutes);
INSERT INTO orders VALUES ('ORD003', 100.00, 'USD', CURRENT_TIMESTAMP);

并且:

enter image description here

我不明白为什么会出现记录 ORD007,因为它插入时延迟了 5 分钟,所以我认为不应该包含它,因为它迟到了。

apache-flink watermark flink-sql
1个回答
0
投票

仅仅因为一条记录被认为是迟到并不意味着它会被忽略。只有当水印在处理较晚的记录之前实际前进得足够远时,该记录才不会影响查询的结果。

默认情况下,在 Flink SQL 中,水印每 200 毫秒(挂钟时间)前进一次。许多记录可以在水印之间进行处理,从而使许多可能迟到的记录不会被视为迟到。

为了避免这种不确定性,在开源 Flink SQL 上,您可以设置选项

'scan.watermark.emit.strategy'='on-event'
,但这会导致性能损失。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.