Debezium-JDBC 连接器使用除 pk (id) 之外的其他列处理操作

问题描述 投票:0回答:1

我创建了一个 cdc 设置,使用 debezium 作为源,使用 jdbc 作为我的接收器连接器,数据位于 Postgres 数据库中。该过程按预期运行良好。现在我们面临一些用例问题,涉及两个数据库上的现有数据,但具有不同的primary_key id。因此,我们计划使用一个附加列(接收器数据库上的 record_id),该列将由源中的 id(pk) 填充,并将作为更新和删除操作的基础。

我应该在接收器连接器的配置中添加什么来实现此目的:

{
    "name": "test_table_sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "utanga_dev.public.test_table",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://host.docker.internal:8081",
        "key.converter.schema.registry.url": "http://host.docker.internal:8081",
        "connection.url": "jdbc:postgresql://db:5432/utanga_dev?user=utanga&password=changeme",
        "key.converter.schemas.enable": "true",
        "value.converter.schemas.enable": "true",
        "auto.create": "false",
        "auto.evolve": "true",
        "insert.mode": "upsert",
        "pk.mode": "record_value",
        "pk.fields":"record_id", // currently not working
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "table.name.format": "${topic}",
        "delete.enabled":true,
        "delete.handling.mode":"none",
        "delete.tombstone.handling.mode":"drop",
        "transforms.unwrap.delete.handling.mode": "none",
        "transforms.unwrap.drop.tombstones": "false"
    }
}
jdbc apache-kafka debezium change-data-capture
1个回答
0
投票

因此,我最终对配置进行了转换,将 id 设为 record_id,并在更新和删除操作时排除 id。这是我的配置:

{
    "name": "test_table-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "utanga_dev.public.test_table",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://host.docker.internal:8081",
        "key.converter.schema.registry.url": "http://host.docker.internal:8081",
        "connection.url": "jdbc:postgresql://db:5432/utanga_dev?user=utanga&password=changeme",
        "key.converter.schemas.enable": "true",
        "value.converter.schemas.enable": "true",
        "auto.create": "false",
        "auto.evolve": "true",
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "transforms": "unwrap, RenameKey, ExcludeId",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "table.name.format": "${topic}",
        "delete.enabled":true,
        "delete.handling​.mode":"none",
        "delete.tombstone.handling.mode":"drop",
        "transforms.unwrap.delete.handling.mode": "none",
        "transforms.unwrap.drop.tombstones": "false",
        "transforms.RenameKey.type": "org.apache.kafka.connect.transforms.ReplaceField$Key", 
        "transforms.RenameKey.renames": "id:record_id",
        "transforms.ExcludeId.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.ExcludeId.blacklist": "id"
    }
}

但是,目前我仍然不确定这是否是正确的方法或有效的方法。任何建议将不胜感激。

© www.soinside.com 2019 - 2024. All rights reserved.