Kafka批量消费无重复记录

问题描述 投票:0回答:1

我有如下需求,我们从关系型数据库中读取特定表的CDC insertupdate,并将这些作为事件导入到Kafka主题中。

--------...

|---------------------|------------------|------------------|
|      Timestamp      |        ID        |      Column      |
|---------------------|------------------|------------------|
|        10:00        |         1        |         A        |
|---------------------|------------------|------------------|
|        10:01        |         2        |         B        |
|---------------------|------------------|------------------|
|        10:01        |         1        |         C        |
|---------------------|------------------|------------------|

If the only consumer of this data is ksqlDB then you may not need to de-dup as ksqlDB will correctly handle multiple updates to the same key if you import the topic as a TABLE in ksql, i.e. rather than doing:

Do:

|---------------------|------------------|------------------|
|      Timestamp      |        ID        |      Column      |
|---------------------|------------------|------------------|
|        10:01        |         2        |         B        |
|---------------------|------------------|------------------|
|        10:01        |         1        |         C        |
|---------------------|------------------|------------------|

At present, when ksqlDB processes such a change log it will output all

在管道的最后,我们希望每天消费一次这些事件,避免同一ID的重复。

例如:目标-主题
apache-kafka stream ksqldb
1个回答
0
投票

CREATE STREAM FOO (... columns ...) WITH (...);

CREATE TABLE FOO (... columns ...) WITH (...);

我有以下要求,我们从关系型数据库中读取特定表的CDC insertupdate,并将这些作为事件导入到Kafka主题中,例如jdbc-source-topic中的一些重复记录,这取决于你如何配置 cache.max.bytes.buffering.

您可以通过使用24小时窗口和上一秒的时间来避免发出重复的信号。抑制支持. 在此之前,如果你想按照你的建议删除重复的内容。你也可以通过编写你自己的Kafka Streams应用程序来将表具体化为状态存储,并使用suppress api来删除重复的内容,从而获得一些工作。

然而,值得指出的是,从语义上看,重复的内容不会引起任何问题。将 changelog 物化成表的结果,无论是否有重复,都是正确的。 因此,正如我在一开始所说的,删除重复的内容可能根本没有必要。

© www.soinside.com 2019 - 2024. All rights reserved.