在 Debezium 中,所有记录均以 op:“r”形式出现。我该如何克服这种情况?

问题描述 投票:0回答:1

我想通过在 SQL Server 上启用 CDC 功能来仅捕获使用 Debezium 插入的记录,但所有记录都带有状态“r”。当我使用像转换操作:“c”这样的条件时,不会检索到任何记录。检查SQL Server上的CDC记录,我可以看到插入状态是区分的,但由于它们都带有状态“r”,所以我无法区分插入的记录。我该如何克服这种情况?

连接器:

{
    "name": "account-activity-connector",
    "config": {
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
        "tasks.max": "1",
        "database.hostname": "127.0.0.1",
        "database.port": "50516",
        "database.user": "xx",
        "database.password": "xxxx",
        "database.names": "xxxdb",
        "database.trustServerCertificate": true,
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "xxx",
        "decimal.handling.mode": "double",
        "schema.history.internal.kafka.topic": "schemahistory",
        "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
        "snapshot.mode": "initial",
        "skipped.operations": "u,d,r",
        "topic.prefix": "data-transfer",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "include.schema.changes": "false",
        "transforms":"unwrap", 
        "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.add.fields":"op",
        "transforms.unwrap.delete.handling.mode":"rewrite",
        "transforms.unwrap.drop.tombstones":"false"
    }
}

Debezium 记录:

{
    "Id": "5E4AB241-8091-4FA7-AEF3-08DB6CC9CC08",
    "TenantId": "212B4D87-C910-427B-A2EB-0E1CBBA5B43B",
    .....
    "__op": "r",
    "__deleted": "false"
}

我只需要接收插入的记录即可正常工作,但所有记录都带有状态“r”,无论其实际状态如何。这让我很难区分哪些记录是插入。

sql-server debezium cdc
1个回答
0
投票

当连接器将快照事件作为 READ 操作发出时,即 op =“r”,您将需要在代码中添加逻辑以决定哪些事件要考虑,哪些事件不考虑。就像你只能考虑“op”=“c”或“u”或“d”的事件

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.