RDS Postgres 自动恢复破坏 Debezium 连接器

问题描述 投票:0回答:1

问题

我们正在运行 RDS Postgres (16.3) 并使用 Strimzi 为 K8S 中的多个 Postgres 表托管 Debezium 连接器。

我遇到的问题是,如果数据库遇到高负载,RDS 将自动执行数据库恢复,这需要大约 4 分钟。 当数据库恢复完成时,我的连接器的复制槽保持不活动状态,但 Debezium 认为一切正常。 手动更新以下命令将循环连接器 Pod,一切都会自行修复。

kubectl annotate strimzipodset listing-connector-connect strimzi.io/manual-rolling-update="true"

自动恢复是一种不幸的行为,我们将尽最大努力减轻这种情况的发生,但它是 RDS 内置的,我想找到一种方法来配置 Debezium 连接器以在以下情况下自行恢复:发生这种情况。

更多详情

据我所知,当 RDS 不堪重负时,我们会在数据库进入恢复模式之前看到如下所示的日志:

2024-10-01 18:28:13 UTC:10.144.2.119(49204):listing_kafka_connect@listing:[2549]:STATEMENT:  START_REPLICATION SLOT "addorupdatejob" LOGICAL 213/FCD7ED80 ("proto_version" '1', "publication_names" 'dbz_publication', "messages" 'true')
2024-10-01 18:28:13 UTC:10.144.2.119(49194):listing_kafka_connect@listing:[2548] terminating walsender process due to replication timeout

这似乎是复制槽超时并且 RDS 终止了它。 没什么大不了的,除了当数据库回来时,它不会自行修复(总是)。

似乎存在某种竞争条件。 有时,在我们拥有的 4 个复制槽中,当数据库返回时,其中 1、2 或 3 个将成功恢复。

当前配置

这是我的连接器的当前配置:

Spec:                                                                                                                                                            
  Auto Restart:                                                                                                                                                  
    Enabled:  true                                                                                                                                               
  Class:      io.debezium.connector.postgresql.PostgresConnector                                                                                                 
  Config:                                                                                                                                                        
    database.dbname:                                         ${env:DB_NAME}                                                                                      
    database.hostname:                                       ${env:DB_HOST}                                                                                      
    database.password:                                       ${env:CDC_PASSWORD}                                                                                 
    database.port:                                           ${env:DB_PORT}                                                                                      
    database.user:                                           ${env:CDC_USERNAME}                                                                                 
    decimal.handling.mode:                                   double                                                                                              
    errors.max.retries:                                      0                                                                                                   
    heartbeat.action.query:                                  update public.debezium_heartbeat set heartbeat_timestamp = now() where heartbeat = 'addorupdate-job'
    heartbeat.interval.ms:                                   300                                                                                                 
    plugin.name:                                             pgoutput                                                                                            
    poll.interval.ms:                                        50                                                                                                  
    Predicates:                                              IsHeartbeat                                                                                         
    predicates.IsHeartbeat.pattern:                          addorupdatejob.public.debezium_heartbeat                                                            
    predicates.IsHeartbeat.type:                             org.apache.kafka.connect.transforms.predicates.TopicNameMatches                                     
    publication.name:                                        dbz_publication                                                                                     
    slot.drop.on.stop:                                       false                                                                                               
    slot.max.retries:                                        1                                                                                                   
    slot.name:                                               addorupdatejob                                                                                      
    slot.retry.delay.ms:                                     30000                                                                                               
    table.include.list:                                      listing.add_or_update_job_status,public.debezium_heartbeat                                          
    topic.creation.default.partitions:                       6                                                                                                   
    topic.creation.default.replication.factor:               3                                                                                                   
    topic.prefix:                                            addorupdatejob                                                                                      
    Transforms:                                              filter,dropPrefix                                                                                   
    transforms.dropPrefix.regex:                             addorupdatejob.listing.add_or_update_job_status                                                     
    transforms.dropPrefix.replacement:                       omnichannel.listing.sys.add-or-update-job-status.v1                                                 
    transforms.dropPrefix.type:                              org.apache.kafka.connect.transforms.RegexRouter                                                     
    transforms.filter.predicate:                             IsHeartbeat                                                                                         
    transforms.filter.type:                                  org.apache.kafka.connect.transforms.Filter                                                          
    value.converter:                                         io.debezium.converters.BinaryDataConverter                                                          
    value.converter.delegate.converter.type:                 org.apache.kafka.connect.json.JsonConverter                                                         
    value.converter.delegate.converter.type.schemas.enable:  false                                                                                               

最初最大重试次数为 10 次,以确保在数据库恢复之前不会用完重试次数,但这根本不起作用,所以我试图将它们降低,希望出现更正式的“崩溃”或“断开连接”将导致恢复。

出于同样的原因,我也尝试将errors.max.retries设置为0。

我觉得这是一个非常常见的用例,我希望像 Debezium for Postgres 这样的产品足够成熟,能够从容地从类似的情况中恢复。 我确信我只是配置错误。

postgresql amazon-rds debezium
1个回答
-3
投票

不幸的是,我无法访问我已经使用超过 15 年的 Gmail 帐户。我真的很害怕丢失所有文件和文档,但这一切都要感谢 Facebook 上的 Steve,他将我推荐给 Jeffrey,一位专业黑客,帮助我重新获得了访问权限。都是他的功劳。您可以通过 [email protected] 联系 Jeffrey,或致电 +1 (254) 278-0902 给他发短信。他有能力提供帮助,同时确保您的隐私在此过程中保持完整。

© www.soinside.com 2019 - 2024. All rights reserved.