我想通过在 SQL Server 上启用 CDC 功能来仅捕获使用 Debezium 插入的记录,但所有记录都带有状态“r”。当我使用像转换操作:“c”这样的条件时,不会检索到任何记录。检查SQL Server上的CDC记录,我可以看到插入状态是区分的,但由于它们都带有状态“r”,所以我无法区分插入的记录。我该如何克服这种情况?
连接器:
{
"name": "account-activity-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"database.hostname": "127.0.0.1",
"database.port": "50516",
"database.user": "xx",
"database.password": "xxxx",
"database.names": "xxxdb",
"database.trustServerCertificate": true,
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "xxx",
"decimal.handling.mode": "double",
"schema.history.internal.kafka.topic": "schemahistory",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"snapshot.mode": "initial",
"skipped.operations": "u,d,r",
"topic.prefix": "data-transfer",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"include.schema.changes": "false",
"transforms":"unwrap",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields":"op",
"transforms.unwrap.delete.handling.mode":"rewrite",
"transforms.unwrap.drop.tombstones":"false"
}
}
Debezium 记录:
{
"Id": "5E4AB241-8091-4FA7-AEF3-08DB6CC9CC08",
"TenantId": "212B4D87-C910-427B-A2EB-0E1CBBA5B43B",
.....
"__op": "r",
"__deleted": "false"
}
我只需要接收插入的记录即可正常工作,但所有记录都带有状态“r”,无论其实际状态如何。这让我很难区分哪些记录是插入。
当连接器将快照事件作为 READ 操作发出时,即 op =“r”,您将需要在代码中添加逻辑以决定哪些事件要考虑,哪些事件不考虑。就像你只能考虑“op”=“c”或“u”或“d”的事件