Spring Kafka 消息转换在 Spring Boot 3.3 上失败

问题描述 投票:0回答:1

我们刚刚将 Spring Boot 项目从

3.2.4
更新为
3.3.0
,我们遇到了一些有关很棒的库
spring-kafka
的问题。我们的 kafka 监听器是
ack-mode=record
并期望将 kafka json 字符串消息转换为 POJO 作为参数,一旦我们更新版本,它就停止工作。 在调试问题时,我们注意到,如果我们使用
ConsumerRecord<>
,消息将以字符串形式出现,但序列化 POJO 的预期位置是
null

下载演示项目:https://uploadnow.io/f/r4XXkw9

向主题发布消息时的当前堆栈:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.example.demo.TestListener.consume(com.example.demo.PojoMessage)]
Bean [com.example.demo.TestListener@2d270181]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2869) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2814) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2778) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2701) ~[spring-kafka-3.2.0.jar:3.2.0]
    at io.micrometer.observation.Observation.observe(Observation.java:565) ~[micrometer-observation-1.13.0.jar:1.13.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2699) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2541) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2430) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2085) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1461) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1426) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296) ~[spring-kafka-3.2.0.jar:3.2.0]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:1589) ~[na:na]
    Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.checkAckArg(MessagingMessageListenerAdapter.java:446) ~[spring-kafka-3.2.0.jar:3.2.0]
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:427) ~[spring-kafka-3.2.0.jar:3.2.0]
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:384) ~[spring-kafka-3.2.0.jar:3.2.0]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85) ~[spring-kafka-3.2.0.jar:3.2.0]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-3.2.0.jar:3.2.0]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2800) ~[spring-kafka-3.2.0.jar:3.2.0]
Caused by: org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void com.example.demo.TestListener.consume(com.example.demo.PojoMessage): 1 error(s): [Error in object 'message': codes []; arguments []; default message [Payload value must not be empty]] 
    at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:128) ~[spring-messaging-6.1.8.jar:6.1.8]
    at org.springframework.kafka.listener.adapter.KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaNullAwarePayloadArgumentResolver.java:48) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118) ~[spring-messaging-6.1.8.jar:6.1.8]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-6.1.8.jar:6.1.8]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) ~[spring-messaging-6.1.8.jar:6.1.8]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:70) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:420) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:384) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2800) ~[spring-kafka-3.2.0.jar:3.2.0]
    ... 12 common frames omitted

项目配置

我最好的猜测是,我们在反序列化配置上犯了一些错误,到目前为止我们奇迹般地工作,因为我们以非典型的方式使用这个库。在我们的基础设施中,我们有“许多 kafka 代理”,一些应用程序需要从不同的代理发布/使用,因此我们必须为每个代理创建配置。

其中一位经纪人的自定义 Kafka 属性

@Component
class KafkaBrokerPropertiesConfiguration {

    @Primary
    @Bean("privateKafkaProperties")
    @ConfigurationProperties("spring.kafka.private")
    fun private() = KafkaProperties()
}

其中一位经纪商的配置

@Configuration
class PrivateKafkaConfiguration(
    @Qualifier("privateKafkaProperties")
    private val kafkaProperties: KafkaProperties
) {

    @Primary
    @Bean("privateKafkaTemplate")
    fun privateKafkaTemplate(
        @Qualifier("privateKafkaAdmin")
        kafkaAdmin: KafkaAdmin
    ): KafkaTemplate<String, String> =
        KafkaTemplate(privateProducerFactory()).apply {
            setKafkaAdmin(kafkaAdmin)
            setObservationEnabled(true)
        }

    @Primary
    @Bean("privateProducerFactory")
    fun privateProducerFactory(): ProducerFactory<String, String> = DefaultKafkaProducerFactory(
        kafkaProperties.buildProducerProperties(null)
    )

    @Bean("privateKafkaAdmin")
    fun privateKafkaAdmin(): KafkaAdmin = KafkaAdmin(kafkaProperties.buildAdminProperties(null))
        .apply { setClusterId("admin-private") }

    @Primary
    @Bean("privateConsumerFactory")
    fun privateConsumerFactory(): ConsumerFactory<String, String> = DefaultKafkaConsumerFactory(
        kafkaProperties.buildConsumerProperties(null)
    )

    @Primary
    @Bean("privateListenerContainerFactory")
    fun privateListenerContainerFactory(
        @Qualifier("privateKafkaTemplate")
        kafkaTemplate: KafkaTemplate<String, String>,
        objectMapper: ObjectMapper
    ): KafkaListenerContainerFactory<*> {
        val containerFactory = ConcurrentKafkaListenerContainerFactory<String, String>()
        val jsonMessageConverter = JsonMessageConverter(objectMapper)
        containerFactory.consumerFactory = privateConsumerFactory()
        containerFactory.containerProperties.ackMode = kafkaProperties.listener.ackMode
        containerFactory.setBatchMessageConverter(BatchMessagingMessageConverter(jsonMessageConverter))
        containerFactory.setRecordMessageConverter(jsonMessageConverter)
        containerFactory.containerProperties.isObservationEnabled = true
        return containerFactory
    }
}

使用其中一位经纪人的容器工厂的主题监听器

@Component
class TestListener {

    @KafkaListener(
        topics = ["test-topic"],
        containerFactory = "privateListenerContainerFactory"
    )
    fun consume(@Payload message: PojoMessage) {
        println(message)
    }
}

application.yaml

spring:
  application:
    name: demo

  config:
    import:
      - classpath:kafka-private-config.yaml

kafka-private-config.yaml

spring:
  kafka:
    private:
      bootstrap-servers: localhost:29091
      properties:
        security:
          protocol: PLAINTEXT
      producer:
        retries: 3
        acks: all
        properties:
          linger.ms: 50
          max.block.ms: 120000
      listener:
        concurrency: 1
        type: single
        ack-mode: record
      consumer:
        group-id: demo-consumer
        auto-offset-reset: latest
        enable-auto-commit: false
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        properties:
          max.poll.interval.ms: 3600000
          session.timeout.ms: 60000
          heartbeat.interval.ms: 3000
          allow.auto.create.topics: true
        max-poll-records: 5
      retry:
        topic:
          enabled: false

我尝试过什么?

  • this代码的计算结果为 false,因此我更改了侦听器以使其返回任何值,只是为了强制
    isConversionNeeded()
    true
    但似乎它尝试转换但也失败了
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public java.lang.String com.example.demo.TestListener.consume(com.example.demo.PojoMessage)]
Bean [com.example.demo.TestListener@7718ecad]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2869) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2814) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2778) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2701) ~[spring-kafka-3.2.0.jar:3.2.0]
    at io.micrometer.observation.Observation.observe(Observation.java:565) ~[micrometer-observation-1.13.0.jar:1.13.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2699) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2541) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2430) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2085) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1461) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1426) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296) ~[spring-kafka-3.2.0.jar:3.2.0]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:1589) ~[na:na]
    Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.checkAckArg(MessagingMessageListenerAdapter.java:446) ~[spring-kafka-3.2.0.jar:3.2.0]
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:424) ~[spring-kafka-3.2.0.jar:3.2.0]
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:384) ~[spring-kafka-3.2.0.jar:3.2.0]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85) ~[spring-kafka-3.2.0.jar:3.2.0]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-3.2.0.jar:3.2.0]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2800) ~[spring-kafka-3.2.0.jar:3.2.0]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:424) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:384) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2800) ~[spring-kafka-3.2.0.jar:3.2.0]
    ... 12 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.util.LinkedHashMap] to [com.example.demo.PojoMessage] for GenericMessage [payload={name=sdasd asdasd, age=1}, headers={kafka_offset=3, kafka_consumer=org.springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer@4148e62f, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=asdadas, kafka_receivedTopic=test-topic, kafka_receivedTimestamp=1718203444770, kafka_groupId=demo-consumer}]
    at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:151) ~[spring-messaging-6.1.8.jar:6.1.8]
    at org.springframework.kafka.listener.adapter.KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaNullAwarePayloadArgumentResolver.java:48) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118) ~[spring-messaging-6.1.8.jar:6.1.8]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-6.1.8.jar:6.1.8]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) ~[spring-messaging-6.1.8.jar:6.1.8]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:70) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:420) ~[spring-kafka-3.2.0.jar:3.2.0]
    ... 16 common frames omitted
spring spring-boot spring-kafka
1个回答
0
投票

在这里回答:https://github.com/spring-projects/spring-kafka/discussions/3296#discussioncomment-9755724

基本上,对于 kotlin 项目,lib 上有一个错误,检查参数是 kotlin 类。

© www.soinside.com 2019 - 2024. All rights reserved.