我将我的应用程序配置为以这种方式加载基于属性的bean:
@Bean
@ConditionalOnProperty(
prefix = "fleexi.kafka.override-concurrency",
value = {"enabled"}
)
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryWithConcurrency(ConsumerFactory<String, String> kafkaConsumerFactory, FleexiKafkaProperties fleexiKafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConcurrency(fleexiKafkaProperties.getOverrideConcurrency().getListenerConcurrency());
factory.getContainerProperties().setObservationEnabled(true);
factory.setConsumerFactory(kafkaConsumerFactory);
return factory;
}
@Bean
@ConditionalOnMissingBean({ConcurrentKafkaListenerContainerFactory.class})
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.getContainerProperties().setObservationEnabled(true);
factory.setConsumerFactory(kafkaConsumerFactory);
return factory;
}
在第一种情况下,启用并发的可观察性日志traceId和spanId不起作用。
并发破坏了可观察性?
编辑: 这里的例子: https://github.com/claudiomerli/demo-issue-spring-kafka-3459
您的问题在这里:
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryWithConcurrency(
其中
containerFactory
的默认 @KafkaListener
确实是 kafkaListenerContainerFactory
。
因此,由于您的自定义
kafkaListenerContainerFactoryWithConcurrency
已不再使用,因此未记录跟踪也就不足为奇了。
尝试仅使用此配置属性而不是那些 bean:
spring.kafka.consumer.group-id=TEST
spring.kafka.listener.concurrency=10
spring.kafka.listener.observation-enabled=true