当我尝试启动连接到本地 kafka 代理的应用程序时,一切正常。
但是当我尝试使用 ssl 身份验证连接到我的托管代理时,出现以下错误:
2023-03-22 12:07:29,309 ERROR[restartedMain] InputEventConsumer - error while consuming message
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:664)
at reactor.kafka.receiver.internals.ConsumerFactory.createConsumer(ConsumerFactory.java:34)
at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$withHandler$16(DefaultKafkaReceiver.java:132)
at reactor.core.publisher.MonoCallable.call(MonoCallable.java:92)
at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:81)
at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
at reactor.core.publisher.Flux.subscribe(Flux.java:8642)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8815)
at reactor.core.publisher.Flux.subscribe(Flux.java:8608)
at reactor.core.publisher.Flux.subscribe(Flux.java:8532)
at reactor.core.publisher.Flux.subscribe(Flux.java:8502)
at messaging.inputEvent.InputEventConsumer.consumeMessage(InputEventConsumer.java:55)
at messaging.inputEvent.InputEventConsumerInitialiser.lambda$initialiseConsumers$0(InputEventConsumerInitialiser.java:42)
at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:581)
at messaging.inputEvent.InputEventConsumerInitialiser.initialiseConsumers(InputEventConsumerInitialiser.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:344)
at org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:229)
at org.springframework.context.event.ApplicationListenerMethodAdapter.onApplicationEvent(ApplicationListenerMethodAdapter.java:166)
at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:176)
at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:169)
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:143)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:421)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:378)
at org.springframework.boot.context.event.EventPublishingRunListener.ready(EventPublishingRunListener.java:114)
at org.springframework.boot.SpringApplicationRunListeners.lambda$ready$6(SpringApplicationRunListeners.java:82)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at org.springframework.boot.SpringApplicationRunListeners.doWithListeners(SpringApplicationRunListeners.java:120)
at org.springframework.boot.SpringApplicationRunListeners.doWithListeners(SpringApplicationRunListeners.java:114)
at org.springframework.boot.SpringApplicationRunListeners.ready(SpringApplicationRunListeners.java:82)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:322)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1303)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1292)
at SalesTenderEventConsumerApplication.main(SalesTenderEventConsumerApplication.java:10)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
Caused by: org.apache.kafka.common.KafkaException: java.security.UnrecoverableKeyException: Cannot recover key
at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createSSLContext(DefaultSslEngineFactory.java:268)
at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.configure(DefaultSslEngineFactory.java:173)
at org.apache.kafka.common.security.ssl.SslFactory.instantiateSslEngineFactory(SslFactory.java:140)
at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:97)
Caused by: org.apache.kafka.common.KafkaException: java.security.UnrecoverableKeyException: Cannot recover key
at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:73)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:736)
... 43 common frames omitted
Caused by: java.security.UnrecoverableKeyException: Cannot recover key
at sun.security.provider.KeyProtector.recover(KeyProtector.java:315)
at sun.security.provider.JavaKeyStore.engineGetKey(JavaKeyStore.java:143)
at sun.security.provider.JavaKeyStore$JKS.engineGetKey(JavaKeyStore.java:57)
at sun.security.provider.KeyStoreDelegator.engineGetKey(KeyStoreDelegator.java:96)
Caused by: java.security.UnrecoverableKeyException: Cannot recover key
at sun.security.provider.JavaKeyStore$DualFormatJKS.engineGetKey(JavaKeyStore.java:71)
at java.security.KeyStore.getKey(KeyStore.java:1023)
at sun.security.ssl.SunX509KeyManagerImpl.<init>(SunX509KeyManagerImpl.java:145)
at sun.security.ssl.KeyManagerFactoryImpl$SunX509.engineInit(KeyManagerFactoryImpl.java:70)
at javax.net.ssl.KeyManagerFactory.init(KeyManagerFactory.java:256)
at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createSSLContext(DefaultSslEngineFactory.java:251)
... 51 common frames omitted
我的消费者代码:
@EventListener(ApplicationReadyEvent.class)
public void initialiseConsumers() {
consumers = Arrays.asList(new InputEventConsumer[kafkaConfig.getConsumersCount()]);
IntStream.range(0, kafkaConfig.getConsumersCount()).forEach(i -> {
consumers.set(i, applicationContext.getBean(InputEventConsumer.class));
consumers.get(i).setConsumerIndex(i);
consumers.get(i).setInputEventReceiver(kafkaConfig.inputEventReceiver());
consumers.get(i).consumeMessage();
});
}
public Disposable consumeMessage() {
log.info("starting consumption");
return processKafkaRecord().subscribe(record -> log.info("successfully consumed event"),
error -> log.error("error while consuming message", error));
}
我添加的 ssl 属性是:
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
props.put(SslConfigs.SSL_PROTOCOL_CONFIG, protocol);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, passwordsConfig.getKafka().getSslKeyPassword());
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, passwordsConfig.getKafka().getSslTruststorePassword());
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, passwordsConfig.getKafka().getSslKeystorePassword());
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "");
sslConfigFilesLocations.getTruststorePath().ifPresent(path ->
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, path.toAbsolutePath().toString()));
sslConfigFilesLocations.getKeystorePath().ifPresent(path ->
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, path.toAbsolutePath().toString()));
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
现在,问题是相同的配置值我有另一个服务(不是 webflux)并且消费者能够从托管代理消费。所以,问题不应该与经纪人或网络有关。
我尝试为错误找到其他解决方案,但我无法弄清楚哪个有效。
此外,如果有帮助,我正在使用以下依赖项
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.projectlombok:lombok'
implementation "io.projectreactor.kafka:reactor-kafka:1.3.15"
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
implementation 'io.projectreactor.addons:reactor-extra'
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.0'
implementation group: 'com.github.ben-manes.caffeine', name: 'caffeine', version: '2.8.8'
implementation group: 'com.azure', name: 'azure-security-keyvault-keys', version: '4.2.8'
implementation group: 'com.azure', name: 'azure-identity', version: '1.2.1'
testImplementation 'org.spockframework:spock-core:2.0-M4-groovy-3.0'
testImplementation 'org.codehaus.groovy:groovy-all:3.0.7'
testImplementation 'io.mockk:mockk:1.12.0'