我正在使用 Kafka 和 Debezium Postgres 连接器。 我只想提取记录值而不提取其他详细信息,因此我使用
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
SMT。
虽然这适用于在插入和更新期间提取记录值,但我遇到了删除事件的问题。添加此 SMT 后,当从 Postgres 表中删除记录时,我不再接收事件。
在添加 ExtractNewRecordState SMT 之前,当删除一行时,我会收到一条逻辑删除消息,这正是我所期望的。
有没有办法在应用此转换后继续接收墓碑消息?
我尝试了一些 smt 但似乎不起作用。 这是配置:
{
"name": "debezium-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.dbname": "xxx",
"database.hostname": "xxx",
"database.password": "xx",
"database.port": "5432",
"database.server.name": "test-server",
"database.user": "xxxx",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "true",
"plugin.name": "wal2json",
"publication.name": "dbz_publication",
"slot.name": "debezium_slot",
"table.include.list": "myTable",
"tasks.max": "1",
"transforms": "unwrap,ExtractField,ExtractKey",
"transforms.ExtractField.field": "data",
"transforms.ExtractField.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ExtractKey.field": "key",
"transforms.ExtractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "xxxx",
"value.converter.schemas.enable": "true"
}
终于找到了,文档中不是很明显https://debezium.io/documentation/reference/stable/transformations/event-flattening.html
我使用的是“delete.handling.mode”属性,但我应该使用的是
"transforms.unwrap.delete.handling.mode"
属性,因为我使用的是“transforms.unwrap”smt 并将其设置为“none”以便能够接收逻辑删除消息.
doc中还有一条注释,要求为最新版本使用另一个属性。
此选项计划在未来版本中删除。在其 地方,使用delete.tombstone.handling.mode选项。