使用
Spring Cloud Stream Binder for Kinesis
版本 4.0.0
升级批处理处理器后,我们遇到以下异常。请注意,到目前为止我们一直在使用活页夹版本 2.2.0
。我们还使用 Spring Cloud
版本 2023.0.0
和 Spring Boot
版本 3.2.2
。
java.lang.ClassCastException: class org.springframework.messaging.support.GenericMessage cannot be cast to class [B (org.springframework.messaging.support.GenericMessage is in unnamed module of loader org.springframework.boot.loader.launch.LaunchedClassLoader @60addb54; [B is in module java.base of loader 'bootstrap')
我们的消费者bean定义如下:
@Bean
public Consumer<Message<List<byte[]>>> batchConsumer(
BatchItemProcessor batchItemProcessor) {
return batchItemProcessor::consume;
}
我们的属性定义如下:
spring:
cloud:
stream:
bindings:
batchItemProcessor-in-0:
consumer:
back-off-initial-interval: 1000
back-off-max-interval: 10000
back-off-multiplier: 2.0
batch-mode: true
max-attempts: 5
use-native-decoding: true
content-type: application/octet-stream
destination: batchItems
kinesis:
binder:
headers:
- traceparent
kpl-kcl-enabled: true
bindings:
batchItemProcessor-in-0:
consumer:
checkpoint-mode: batch
listener-mode: batch
与较旧的
2.2.0
版本的活页夹相比,是否有任何更改会导致我们的代码因此错误而失败?我们是否缺少任何必需的属性,或者我们的消费者代码是否指定错误以便以批处理模式处理消息?
KclMessageDrivenChannelAdapter
始终根据 Kinesis 记录和嵌入标头的反序列化结果返回 List<Message<Object>>
。显然,Spring Cloud Function 之间发生了一些变化。
要解决此问题,您必须将
Consumer
更改为:
Consumer<Message<List<Message<byte[]>>>>