如何在 Debezium 连接器中接收墓碑消息?

问题描述 投票:0回答:1

我正在使用 Kafka 和 Debezium Postgres 连接器。 我只想提取记录值而不提取其他详细信息,因此我使用

"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
SMT。

虽然这适用于在插入和更新期间提取记录值,但我遇到了删除事件的问题。添加此 SMT 后,当从 Postgres 表中删除记录时,我不再接收事件。

在添加 ExtractNewRecordState SMT 之前,当删除一行时,我会收到一条逻辑删除消息,这正是我所期望的。

有没有办法在应用此转换后继续接收墓碑消息?

我尝试了一些 smt 但似乎不起作用。 这是配置:

{
"name": "debezium-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.dbname": "xxx",
"database.hostname": "xxx",
"database.password": "xx",
"database.port": "5432",
"database.server.name": "test-server",
"database.user": "xxxx",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "true",
"plugin.name": "wal2json",
"publication.name": "dbz_publication",
"slot.name": "debezium_slot",
"table.include.list": "myTable",
"tasks.max": "1",
"transforms": "unwrap,ExtractField,ExtractKey",
"transforms.ExtractField.field": "data",
"transforms.ExtractField.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ExtractKey.field": "key",
"transforms.ExtractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "xxxx",
"value.converter.schemas.enable": "true"
}
postgresql apache-kafka apache-kafka-connect debezium
1个回答
0
投票

终于找到了,文档中不是很明显https://debezium.io/documentation/reference/stable/transformations/event-flattening.html

我使用的是“delete.handling.mode”属性,但我应该使用的是

"transforms.unwrap.delete.handling.mode"
属性,因为我使用的是“transforms.unwrap”smt 并将其设置为“none”以便能够接收逻辑删除消息.

doc中还有一条注释,要求为最新版本使用另一个属性。

此选项计划在未来版本中删除。在其 地方,使用delete.tombstone.handling.mode选项。

© www.soinside.com 2019 - 2024. All rights reserved.