在我的 SpringBoot Java 项目中,我使用的是 kafka,特别是 ReactiveKafka。我正在更新依赖项,特别是这些:
reactor.core.Exceptions$ErrorCallbackNotImplemented: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
沿着堆栈跟踪,根本原因是:
Caused by: javax.security.auth.login.LoginException: No LoginModule found for org.apache.kafka.common.security.scram.ScramLoginModule
是否存在依赖版本不兼容的问题?
我使用依赖分析器检查了 intellij,但使用 kafka 的所有依赖项都使用相同的版本
这是我在配置类中的代码:
@Configuration
@AllArgsConstructor
public class KafkaProducerConfiguration {
private KafkaProperties properties;
@Autowired
public KafkaProducerConfiguration(KafkaProperties properties) {
this.properties = properties;
}
@Bean
public ReactiveKafkaProducerTemplate<String, String> kafkaProducerTemplate() {
Map<String, Object> producerProps = properties.buildProducerProperties();
return new ReactiveKafkaProducerTemplate<>(SenderOptions.create(producerProps));
}
}
我在 application.yml 中的属性如下所示:
spring:
...
kafka:
bootstrap-servers: my-bootstrap-server
ssl:
protocol: TLSv1.2
properties:
security.protocol: SASL_SSL
sasl.mechanism: SCRAM-SHA-512
sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="my_user" password="my_password";
ssl.enabled.protocols: TLSv1.2
ssl.endpoint.identification.algorithm: HTTPS
producer:
client-id: my-client
通过在溢出时查看此错误,我尝试添加
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
在返回之前,但我仍然收到错误。
我还尝试将 ContextClassLoader 设置为 null,什么也没有。
尝试将其添加到配置中:
@Configuration
公共类 KafkaConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
}
对于制作人:
@Bean
public ProducerFactory<String, Message> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Message> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
PS:我正在使用外部对象而不是字符串