我正在尝试 Spring Cloud Stream 和 Kafka,并在我认为是一个简单的问题上进行了尝试:消费者的简单消息反序列化。
我有以下 Spring 配置:
cloud:
stream:
bindings:
processData-in-0:
destination: test.public.huge_test_table
group: my-consumer-group
content-type: application/json
consumer:
concurrency: 10
kafka:
binder:
brokers: localhost:9092
我有这个 Kafka 主题,其中包含消息,其有效负载为以下格式(例如,我可以通过 Kafdrop 检查这些消息):
{
"metadata": {
// ...
},
"payload": {
title: "Whatever"
// ...
},
"op": "c"
}
然后我创建了我的 DTO:
data class Foo<T> (
@JsonProperty("payload")
val payload: T
@JsonProperty("op")
val op: String
)
data class Bar (
@JsonProperty("title")
val title: String,
//...
)
为了消费这些消息,我尝试这样做:
@Configuration
class MyListener {
private val logger = KotlinLogging.logger {}
@Bean
fun processData(): Consumer<Message<Foo<Bar>> {
return Consumer { message ->
logger.info { "Received message: ${message.payload}" }
}
}
}
但是会抛出类似的错误
class [B cannot be cast to class com.example.dto.FooDTO ([B is in module java.base of loader 'bootstrap'; com.example.dto.FooDTO is in unnamed module of loader 'app')
检查消费者上的
message
对象,我可以看到它是一个 GenericMessage
类型,其中 payload
包含 ByteArray
(因此可能是错误中的 B
)。
但是,如果我使用
Consumer<Message<String>>
,我可以将 payload
视为预期的 JSON 字符串。
我的期望是,通过设置
Consumer<Message<Foo<Bar>>>
,它将在我的应用程序上下文中使用 ObjectMapper
来反序列化该 JSON,就像它在 Spring Web 中所做的那样......?
我还尝试配置一些自定义消息序列化器/反序列化器,但失败了。
我当然可以保留
Message<String>
来获取 JSON 负载,然后手动注入 ObjectMapper
并自行反序列化,但我希望以更流畅的方式配置它。
基本上以下方法有效,但我不想每次都注入
ObjectMapper
并手动反序列化它:
@Bean
fun processData(): Consumer<Message<String>> {
return Consumer { message ->
val payload = objectMapper.readValue<Foo<Bar>>(message.payload.toString())
logger.info { "Received message: ${message.payload}" }
}
}
我还尝试提供一个
MessageConverter
bean,它基本上可以设置我的 ObjectMapper
虽然我认为这已经是默认的,但这是我看到的一些建议
@Bean
fun customMessageConverter(objectMapper: ObjectMapper): MessageConverter {
val converter = MappingJackson2MessageConverter()
converter.objectMapper = objectMapper
return converter
}
那么这可能吗?我错过了什么?
谢谢!
经过进一步调查,我意识到反序列化失败,因为有效负载 JSON 对我的 DTO 具有未知字段。 我确实有
spring.jackson.deserialization.fail-on-unknown-properties=false
(无论如何都是默认的),但只有当我用 @JsonIgnoreProperties(ignoreUnknown = true)
注释 DTO 时它才有效。试图理解为什么会这样。
进一步挖掘源代码后,我可以发现
JsonMessageConverter
使用的默认 spring-cloud-function
正在使用 JsonMapper
,这似乎没有考虑我的 ObjectMapper
属性。
为了解决这个问题,我所做的是注册一个自定义
MappingJackson2MessageConverter
,它使用我的应用程序 ObjectMapper
并负责反序列化泛型类型:
@Bean
fun customMessageConverter(objectMapper: ObjectMapper): MessageConverter {
return object: MappingJackson2MessageConverter() {
init {
this.objectMapper = objectMapper
}
override fun convertFromInternal(message: Message<*>, targetClass: Class<*>, conversionHint: Any?): Any? {
try {
val type = if (conversionHint is ParameterizedType) {
objectMapper.typeFactory.constructType(conversionHint)
} else {
objectMapper.constructType(targetClass)
}
return objectMapper.readValue(message.payload as ByteArray, type)
} catch (e: IOException) {
throw MessageConversionException("couldnt convert", e)
}
}
}
}
不确定这是否是最好的方法,但它有效