我使用 Debezium 连接器设置了 Postgres->kafka CDC。 Debezium CDC 能够针对常规 postgres 表发布对 Kafka 主题 public.some_topic 的所有更改。
但它不会将 Postgres 分区表(列表分区)的更改发布到 kafka。
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d
'{
"name": "test-connector",
"config":
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"plugin.name": "pgoutput",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "test",
"database.server.name": "postgres",
"table.include.list": "public.orders",
"slot.name": "orders_slot",
"publication.name": "orders_publication",
"topic.prefix": "ordertest"
}
}'
我什至尝试使用“table.types”:“PARTITIONED TABLE,TABLE”选项,但它仍然不起作用
源连接器配置有什么问题以及如何修复?
来自文档:
分区 PostgreSQL 表
当 Debezium PostgreSQL 连接器捕获 a 中的更改时 分区表,默认行为是更改事件记录 路由到每个分区的不同主题。发出记录 从所有分区到一个主题,配置ByLogicalTableRouter SMT。因为分区表中的每个键都保证是 unique,配置 key.enforce.uniqueness=false 以便 SMT 执行 不添加键字段以确保键唯一。添加关键字段 是默认行为。
和更多文档:
分区 PostgreSQL 表
当 Debezium PostgreSQL 连接器捕获 a 中的更改时 分区表,默认行为是更改事件记录 路由到每个分区的不同主题。发出记录 从所有分区到一个主题,配置主题路由SMT。 因为分区表中的每个键都保证是唯一的, 配置 key.enforce.uniqueness=false 以便 SMT 不添加 键字段以确保键唯一。添加一个关键字段是 默认行为。
所以(未经测试!!)配置可能涉及这样的事情:
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
"name": "test-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"plugin.name": "pgoutput",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "test",
"database.server.name": "postgres",
"table.include.list": "public.orders",
"slot.name": "orders_slot",
"publication.name": "orders_publication",
"topic.prefix": "ordertest",
"transforms": "route",
"transforms.route.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.route.topic.regex": "(.*)\\.(.*)\\.(.*)$",
"transforms.route.topic.replacement": "$1.$2",
"transforms.route.key.enforce.uniqueness": "false"
}
}'