我们正在运行 RDS Postgres (16.3) 并使用 Strimzi 为 K8S 中的多个 Postgres 表托管 Debezium 连接器。
我遇到的问题是,如果数据库遇到高负载,RDS 将自动执行数据库恢复,这需要大约 4 分钟。 当数据库恢复完成时,我的连接器的复制槽保持不活动状态,但 Debezium 认为一切正常。 手动更新以下命令将循环连接器 Pod,一切都会自行修复。
kubectl annotate strimzipodset listing-connector-connect strimzi.io/manual-rolling-update="true"
自动恢复是一种不幸的行为,我们将尽最大努力减轻这种情况的发生,但它是 RDS 内置的,我想找到一种方法来配置 Debezium 连接器以在以下情况下自行恢复:发生这种情况。
据我所知,当 RDS 不堪重负时,我们会在数据库进入恢复模式之前看到如下所示的日志:
2024-10-01 18:28:13 UTC:10.144.2.119(49204):listing_kafka_connect@listing:[2549]:STATEMENT: START_REPLICATION SLOT "addorupdatejob" LOGICAL 213/FCD7ED80 ("proto_version" '1', "publication_names" 'dbz_publication', "messages" 'true')
2024-10-01 18:28:13 UTC:10.144.2.119(49194):listing_kafka_connect@listing:[2548] terminating walsender process due to replication timeout
这似乎是复制槽超时并且 RDS 终止了它。 没什么大不了的,除了当数据库回来时,它不会自行修复(总是)。
似乎存在某种竞争条件。 有时,在我们拥有的 4 个复制槽中,当数据库返回时,其中 1、2 或 3 个将成功恢复。
这是我的连接器的当前配置:
Spec:
Auto Restart:
Enabled: true
Class: io.debezium.connector.postgresql.PostgresConnector
Config:
database.dbname: ${env:DB_NAME}
database.hostname: ${env:DB_HOST}
database.password: ${env:CDC_PASSWORD}
database.port: ${env:DB_PORT}
database.user: ${env:CDC_USERNAME}
decimal.handling.mode: double
errors.max.retries: 0
heartbeat.action.query: update public.debezium_heartbeat set heartbeat_timestamp = now() where heartbeat = 'addorupdate-job'
heartbeat.interval.ms: 300
plugin.name: pgoutput
poll.interval.ms: 50
Predicates: IsHeartbeat
predicates.IsHeartbeat.pattern: addorupdatejob.public.debezium_heartbeat
predicates.IsHeartbeat.type: org.apache.kafka.connect.transforms.predicates.TopicNameMatches
publication.name: dbz_publication
slot.drop.on.stop: false
slot.max.retries: 1
slot.name: addorupdatejob
slot.retry.delay.ms: 30000
table.include.list: listing.add_or_update_job_status,public.debezium_heartbeat
topic.creation.default.partitions: 6
topic.creation.default.replication.factor: 3
topic.prefix: addorupdatejob
Transforms: filter,dropPrefix
transforms.dropPrefix.regex: addorupdatejob.listing.add_or_update_job_status
transforms.dropPrefix.replacement: omnichannel.listing.sys.add-or-update-job-status.v1
transforms.dropPrefix.type: org.apache.kafka.connect.transforms.RegexRouter
transforms.filter.predicate: IsHeartbeat
transforms.filter.type: org.apache.kafka.connect.transforms.Filter
value.converter: io.debezium.converters.BinaryDataConverter
value.converter.delegate.converter.type: org.apache.kafka.connect.json.JsonConverter
value.converter.delegate.converter.type.schemas.enable: false
最初最大重试次数为 10 次,以确保在数据库恢复之前不会用完重试次数,但这根本不起作用,所以我试图将它们降低,希望出现更正式的“崩溃”或“断开连接”将导致恢复。
出于同样的原因,我也尝试将errors.max.retries设置为0。
我觉得这是一个非常常见的用例,我希望像 Debezium for Postgres 这样的产品足够成熟,能够从容地从类似的情况中恢复。 我确信我只是配置错误。
不幸的是,我无法访问我已经使用超过 15 年的 Gmail 帐户。我真的很害怕丢失所有文件和文档,但这一切都要感谢 Facebook 上的 Steve,他将我推荐给 Jeffrey,一位专业黑客,帮助我重新获得了访问权限。都是他的功劳。您可以通过 [email protected] 联系 Jeffrey,或致电 +1 (254) 278-0902 给他发短信。他有能力提供帮助,同时确保您的隐私在此过程中保持完整。