升级到较新版本的 Spring Boot 和 Spring Cloud 依赖项后,我在 Spring Boot 应用程序中遇到 Kafka 生产者配置问题。
在带有 Spring Cloud 依赖项 2021.0.8 的 Spring Boot 2.7.8 中,通过方法
org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
创建 DefaultKafkaProducerFactory
bean 的 kafkaProducerFactory
类已正确填充来自 Spring Config Server 的属性,包括 bootstrap.servers
属性。
@Bean
@ConditionalOnMissingBean(ProducerFactory.class)
public DefaultKafkaProducerFactory<?, ?> kafkaProducerFactory(
ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
this.properties.buildProducerProperties());
String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
return factory;
}
升级到 Spring Boot 3.2.6 和 Spring Cloud Dependency 2023.0.2 后,我注意到通过方法
DefaultKafkaProducerFactory
创建的 kafkaProducerFactory
的 bean 配置逻辑发生了变化。具体来说,现在使用添加 this.applyKafkaConnectionDetailsForProducer(properties, connectionDetails);
,它将 Spring Config 服务器值设置的 bootstrap.servers
属性覆盖为 localhost:9092
。
@Bean
@ConditionalOnMissingBean({ProducerFactory.class})
public DefaultKafkaProducerFactory<?, ?> kafkaProducerFactory(KafkaConnectionDetails connectionDetails, ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers, ObjectProvider<SslBundles> sslBundles) {
Map<String, Object> properties = this.properties.buildProducerProperties((SslBundles)sslBundles.getIfAvailable());
this.applyKafkaConnectionDetailsForProducer(properties, connectionDetails);
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory(properties);
String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
customizers.orderedStream().forEach((customizer) -> {
customizer.customize(factory);
});
return factory;
}
KAFKA_BOOTSTRAP_SERVERS
环境变量设置正确,并且此配置应该从 Spring Config Server 获取。我看到 Map<String, Object> properties = this.properties.buildProducerProperties((SslBundles)sslBundles.getIfAvailable());
使用 bootstrap.servers -> my.eastus2.azure.confluence.cloud:9092 正确填充了属性。然而,下一步将使用 localhost:9092 覆盖它
这是我的配置片段,它在旧版本中有效,但在新版本中失败:
# application.yml
spring:
# To overwrite properties loaded from App Config, include below line in application.yml
config:
import: optional:configserver:https://my.config.server.com/v1/config/
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
### Auth ###
sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
clientId='${my_client_id}' \
clientSecret='${my_client_secret}' \
scope='' \
extension_logicalCluster='${my_kafka_cluster_id}' \
extension_identityPoolId='${my_identity_pool}';"
### Schema Registry ###
bearer.auth.client.id: ${my_client_id}
bearer.auth.client.secret: ${my_client_secret}
bearer.auth.identity.pool.id: ${my_identity_pool:pool-xyz}
kekcipherkms.provider.identity.client.id: ${my_client_id}
kekcipherkms.provider.identity.client.secret: ${my_client_secret}
kekcipherkms.provider.identity.scope: my.scope.w
### Producer ###
producer:
properties:
topic: ${topic}
DefaultKafkaProducerFactory
正确使用配置服务器中的 bootstrap.servers
值。bootstrap.servers
值由 localhost:9092
方法覆盖为 applyKafkaConnectionDetailsForProducer
。DefaultKafkaProducerFactory
应使用 Spring Config Server 中的 bootstrap.servers
属性,就像在以前的版本中一样。
如何配置我的 Spring Boot 应用程序以在较新版本的 Spring Boot 和 Spring Cloud 中正确使用 Spring Config Server 中的
bootstrap.servers
值?我是否缺少一些东西来确保 KafkaConnectionDetails bootstrap.servers
= 我的 Spring Config Servers 值而不是默认的 localhost:9092 ?
如果有人遇到类似的问题或对新版本中引入的可能影响此行为的更改有见解,我们将不胜感激您的帮助。
通过在 Stack Overflow 上进行更多搜索并从其他类似帖子中获得一些帮助找到了我的答案:
为什么 springboot 没有在 application.yml 中拾取 apache kafka bootstrap-servers? Gary Russell 的回答
Spring Boot 3.1.X 及以上版本的 Kafka 客户端连接问题 Artem Bilan 的回答
现有的 Spring Config Server 提供了一个
spring.kafka.properties.bootstrap.servers
值,但从 Spring Boot 3.1.0 开始,它需要是 spring.kafka.bootstrap-servers
因此,在我可以提交票证让公司更新 Spring 配置服务器之前,我的解决方案是执行以下操作:
# application.yml
spring:
kafka:
bootstrap-servers: ${spring.kafka.properties.bootstrap.servers}