Debezium KafkaConnector 拖尾
outbox_event
表
表结构:
id (uuid)
key (varchar
topic (varchar)
payload (json)
emittedAt (timestampz)
Kafka连接器配置:
config:
database.dbname: ***
database.hostname: ***
database.password: ***
database.port: ***
database.user: ***
decimal.handling.mode: double
heartbeat.action.query: SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar);
heartbeat.interval.ms: "300000"
key.converter: org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable: "false"
max.batch.size: "8192"
max.queue.size: "13172"
offset.flush.interval.ms: "15000"
offset.flush.timeout.ms: "60000"
plugin.name: pgoutput
publication.autocreate.mode: filtered
publication.name: dbz_publication_outbox
schema.include.list: public
signal.data.collection: public.dbz_signal
slot.drop.on.stop: "false"
slot.name: debezium_outbox
snapshot.mode: never
table.include.list: public.outbox_event
time.precision.mode: connect
tombstones.on.delete: "false"
topic.prefix: outbox
transforms: outbox
transforms.outbox.route.by.field: topic
transforms.outbox.route.topic.replacement: ${routedByValue}
transforms.outbox.table.expand.json.payload: "true"
transforms.outbox.table.field.event.key: key
transforms.outbox.type: io.debezium.transforms.outbox.EventRouter
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: "false"
数据库中的有效负载
{
"data": { "ids":[], "name":"name" }
}
Kafka 消息中的负载
{
"data": {"name":"name" }
}
消息中缺少
ids
数组。
知道可能出了什么问题吗?
我尝试过设置可能:
value.converter: org.apache.kafka.connect.string.StringConverter
transforms.outbox.table.expand.json.payload: "true"
类似的问题,但值为空。你已经解决这个问题了吗?