Postgres 的 Debezium 连接器不适用于分区表

问题描述 投票:0回答:1

我使用 Debezium 连接器设置了 Postgres->kafka CDC。 Debezium CDC 能够针对常规 postgres 表发布对 Kafka 主题 public.some_topic 的所有更改。

但它不会将 Postgres 分区表(列表分区)的更改发布到 kafka。

curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d 
'{
"name": "test-connector",
"config": 
{
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "tasks.max": "1",
  "plugin.name": "pgoutput",
  "database.hostname": "postgres",
  "database.port": "5432",
  "database.user": "postgres",
  "database.password": "postgres",
  "database.dbname": "test",
  "database.server.name": "postgres",
  "table.include.list": "public.orders",
  "slot.name": "orders_slot",
  "publication.name": "orders_publication",
  "topic.prefix": "ordertest"
}
}'

我什至尝试使用“table.types”:“PARTITIONED TABLE,TABLE”选项,但它仍然不起作用

源连接器配置有什么问题以及如何修复?

postgresql apache-kafka confluent-platform debezium cdc
1个回答
0
投票

来自文档

分区 PostgreSQL 表

当 Debezium PostgreSQL 连接器捕获 a 中的更改时 分区表,默认行为是更改事件记录 路由到每个分区的不同主题。发出记录 从所有分区到一个主题,配置ByLogicalTableRouter SMT。因为分区表中的每个键都保证是 unique,配置 key.enforce.uniqueness=false 以便 SMT 执行 不添加键字段以确保键唯一。添加关键字段 是默认行为。

更多文档

分区 PostgreSQL 表

当 Debezium PostgreSQL 连接器捕获 a 中的更改时 分区表,默认行为是更改事件记录 路由到每个分区的不同主题。发出记录 从所有分区到一个主题,配置主题路由SMT。 因为分区表中的每个键都保证是唯一的, 配置 key.enforce.uniqueness=false 以便 SMT 不添加 键字段以确保键唯一。添加一个关键字段是 默认行为。

所以(未经测试!!)配置可能涉及这样的事情:

curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
  "name": "test-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "plugin.name": "pgoutput",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "test",
    "database.server.name": "postgres",
    "table.include.list": "public.orders",
    "slot.name": "orders_slot",
    "publication.name": "orders_publication",
    "topic.prefix": "ordertest",
    "transforms": "route",
    "transforms.route.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.route.topic.regex": "(.*)\\.(.*)\\.(.*)$",
    "transforms.route.topic.replacement": "$1.$2",
    "transforms.route.key.enforce.uniqueness": "false"
  }
}'
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.