使用 Spring Boot 3.2.6 和 Spring Cloud 2023.0.2 的 Kafka Producer 配置中存在 `bootstrap.servers` 问题

问题描述 投票:0回答:1

升级到较新版本的 Spring Boot 和 Spring Cloud 依赖项后,我在 Spring Boot 应用程序中遇到 Kafka 生产者配置问题。

环境

  • Spring Boot:3.2.6
  • Spring Cloud 依赖项:2023.0.2
  • Spring Cloud 配置服务器

问题描述

在带有 Spring Cloud 依赖项 2021.0.8 的 Spring Boot 2.7.8 中,通过方法

org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
创建
DefaultKafkaProducerFactory
bean 的
kafkaProducerFactory
类已正确填充来自 Spring Config Server 的属性,包括
bootstrap.servers
属性。

    @Bean
    @ConditionalOnMissingBean(ProducerFactory.class)
    public DefaultKafkaProducerFactory<?, ?> kafkaProducerFactory(
            ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
        DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
                this.properties.buildProducerProperties());
        String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix != null) {
            factory.setTransactionIdPrefix(transactionIdPrefix);
        }
        customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
        return factory;
    }

升级到 Spring Boot 3.2.6 和 Spring Cloud Dependency 2023.0.2 后,我注意到通过方法

DefaultKafkaProducerFactory
创建的
kafkaProducerFactory
的 bean 配置逻辑发生了变化。具体来说,现在使用添加
 this.applyKafkaConnectionDetailsForProducer(properties, connectionDetails);
,它将 Spring Config 服务器值设置的
bootstrap.servers
属性覆盖为
localhost:9092

    @Bean
    @ConditionalOnMissingBean({ProducerFactory.class})
    public DefaultKafkaProducerFactory<?, ?> kafkaProducerFactory(KafkaConnectionDetails connectionDetails, ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers, ObjectProvider<SslBundles> sslBundles) {
        Map<String, Object> properties = this.properties.buildProducerProperties((SslBundles)sslBundles.getIfAvailable());
        this.applyKafkaConnectionDetailsForProducer(properties, connectionDetails);
        DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory(properties);
        String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix != null) {
            factory.setTransactionIdPrefix(transactionIdPrefix);
        }

        customizers.orderedStream().forEach((customizer) -> {
            customizer.customize(factory);
        });
        return factory;
    }

KAFKA_BOOTSTRAP_SERVERS
环境变量设置正确,并且此配置应该从 Spring Config Server 获取。我看到
Map<String, Object> properties = this.properties.buildProducerProperties((SslBundles)sslBundles.getIfAvailable());
使用 bootstrap.servers -> my.eastus2.azure.confluence.cloud:9092 正确填充了属性。然而,下一步将使用 localhost:9092 覆盖它

配置片段

这是我的配置片段,它在旧版本中有效,但在新版本中失败:

# application.yml
spring:
  # To overwrite properties loaded from App Config, include below line in application.yml
  config:
    import: optional:configserver:https://my.config.server.com/v1/config/
   producer:
     key-serializer: org.apache.kafka.common.serialization.StringSerializer
     value-serializer: org.apache.kafka.common.serialization.StringSerializer
    properties:
      ### Auth ###
      sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
        clientId='${my_client_id}' \
        clientSecret='${my_client_secret}' \
        scope='' \
        extension_logicalCluster='${my_kafka_cluster_id}' \
        extension_identityPoolId='${my_identity_pool}';"

      ### Schema Registry ###
      bearer.auth.client.id: ${my_client_id}
      bearer.auth.client.secret: ${my_client_secret}
      bearer.auth.identity.pool.id: ${my_identity_pool:pool-xyz}

      kekcipherkms.provider.identity.client.id: ${my_client_id}
      kekcipherkms.provider.identity.client.secret: ${my_client_secret}
      kekcipherkms.provider.identity.scope: my.scope.w

    ### Producer ###
    producer:
      properties:
        topic: ${topic}

观察到的行为

  • Spring Boot 2.7.8 + Spring Cloud 2021.0.8:
    DefaultKafkaProducerFactory
    正确使用配置服务器中的
    bootstrap.servers
    值。
  • Spring Boot 3.2.6 + Spring Cloud 2023.0.2:
    bootstrap.servers
    值由
    localhost:9092
    方法覆盖为
    applyKafkaConnectionDetailsForProducer

预期行为

DefaultKafkaProducerFactory
应使用 Spring Config Server 中的
bootstrap.servers
属性,就像在以前的版本中一样。

问题

如何配置我的 Spring Boot 应用程序以在较新版本的 Spring Boot 和 Spring Cloud 中正确使用 Spring Config Server 中的

bootstrap.servers
值?我是否缺少一些东西来确保 KafkaConnectionDetails
bootstrap.servers
= 我的 Spring Config Servers 值而不是默认的 localhost:9092 ?

其他背景

如果有人遇到类似的问题或对新版本中引入的可能影响此行为的更改有见解,我们将不胜感激您的帮助。

java spring-boot spring-kafka spring-cloud-config
1个回答
0
投票

通过在 Stack Overflow 上进行更多搜索并从其他类似帖子中获得一些帮助找到了我的答案:

为什么 springboot 没有在 application.yml 中拾取 apache kafka bootstrap-servers? Gary Russell 的回答

Spring Boot 3.1.X 及以上版本的 Kafka 客户端连接问题 Artem Bilan 的回答

现有的 Spring Config Server 提供了一个

spring.kafka.properties.bootstrap.servers
值,但从 Spring Boot 3.1.0 开始,它需要是
spring.kafka.bootstrap-servers

因此,在我可以提交票证让公司更新 Spring 配置服务器之前,我的解决方案是执行以下操作:

# application.yml
spring:
  kafka:
    bootstrap-servers: ${spring.kafka.properties.bootstrap.servers}
© www.soinside.com 2019 - 2024. All rights reserved.