升级到 Spring Cloud Stream Binder for Kinesis 4.0.0 后遇到批处理错误

问题描述 投票:0回答:1

使用

Spring Cloud Stream Binder for Kinesis
版本
4.0.0
升级批处理处理器后,我们遇到以下异常。请注意,到目前为止我们一直在使用活页夹版本
2.2.0
。我们还使用
Spring Cloud
版本
2023.0.0
Spring Boot
版本
3.2.2

java.lang.ClassCastException: class org.springframework.messaging.support.GenericMessage cannot be cast to class [B (org.springframework.messaging.support.GenericMessage is in unnamed module of loader org.springframework.boot.loader.launch.LaunchedClassLoader @60addb54; [B is in module java.base of loader 'bootstrap')

我们的消费者bean定义如下:

@Bean
public Consumer<Message<List<byte[]>>> batchConsumer(
    BatchItemProcessor batchItemProcessor) {
  return batchItemProcessor::consume;
}

我们的属性定义如下:

spring:
  cloud:
    stream:
      bindings:
        batchItemProcessor-in-0:
          consumer:
            back-off-initial-interval: 1000
            back-off-max-interval: 10000
            back-off-multiplier: 2.0
            batch-mode: true
            max-attempts: 5
        use-native-decoding: true
          content-type: application/octet-stream
          destination: batchItems
      kinesis:
        binder:
          headers:
            - traceparent
          kpl-kcl-enabled: true
        bindings:
          batchItemProcessor-in-0:
            consumer:
              checkpoint-mode: batch
              listener-mode: batch

与较旧的

2.2.0
版本的活页夹相比,是否有任何更改会导致我们的代码因此错误而失败?我们是否缺少任何必需的属性,或者我们的消费者代码是否指定错误以便以批处理模式处理消息?

spring-cloud-stream spring-cloud-stream-binder spring-cloud-stream-binder-kinesis
1个回答
0
投票

KclMessageDrivenChannelAdapter
始终根据 Kinesis 记录和嵌入标头的反序列化结果返回
List<Message<Object>>
。显然,Spring Cloud Function 之间发生了一些变化。

要解决此问题,您必须将

Consumer
更改为:

Consumer<Message<List<Message<byte[]>>>>
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.