我有如下需求,我们从关系型数据库中读取特定表的CDC insertupdate,并将这些作为事件导入到Kafka主题中。
--------...
|---------------------|------------------|------------------|
| Timestamp | ID | Column |
|---------------------|------------------|------------------|
| 10:00 | 1 | A |
|---------------------|------------------|------------------|
| 10:01 | 2 | B |
|---------------------|------------------|------------------|
| 10:01 | 1 | C |
|---------------------|------------------|------------------|
If the only consumer of this data is ksqlDB then you may not need to de-dup as ksqlDB will correctly handle multiple updates to the same key if you import the topic as a TABLE in ksql, i.e. rather than doing:
Do:
|---------------------|------------------|------------------|
| Timestamp | ID | Column |
|---------------------|------------------|------------------|
| 10:01 | 2 | B |
|---------------------|------------------|------------------|
| 10:01 | 1 | C |
|---------------------|------------------|------------------|
At present, when ksqlDB processes such a change log it will output all
在管道的最后,我们希望每天消费一次这些事件,避免同一ID的重复。
例如:目标-主题
CREATE STREAM FOO (... columns ...) WITH (...);
CREATE TABLE FOO (... columns ...) WITH (...);
我有以下要求,我们从关系型数据库中读取特定表的CDC insertupdate,并将这些作为事件导入到Kafka主题中,例如jdbc-source-topic中的一些重复记录,这取决于你如何配置 cache.max.bytes.buffering
.
您可以通过使用24小时窗口和上一秒的时间来避免发出重复的信号。抑制支持. 在此之前,如果你想按照你的建议删除重复的内容。你也可以通过编写你自己的Kafka Streams应用程序来将表具体化为状态存储,并使用suppress api来删除重复的内容,从而获得一些工作。
然而,值得指出的是,从语义上看,重复的内容不会引起任何问题。将 changelog 物化成表的结果,无论是否有重复,都是正确的。 因此,正如我在一开始所说的,删除重复的内容可能根本没有必要。