在消费者上反序列化自定义消息

问题描述 投票:0回答:1

我正在尝试 Spring Cloud Stream 和 Kafka,并在我认为是一个简单的问题上进行了尝试:消费者的简单消息反序列化。

我有以下 Spring 配置:

  cloud:
    stream:
      bindings:
        processData-in-0:
          destination: test.public.huge_test_table
          group: my-consumer-group
          content-type: application/json
          consumer:
            concurrency: 10
      kafka:
        binder:
          brokers: localhost:9092

我有这个 Kafka 主题,其中包含消息,其有效负载为以下格式(例如,我可以通过 Kafdrop 检查这些消息):

{
    "metadata": {
         // ...
    },
    "payload": {
        title: "Whatever"
        // ...
    },
    "op": "c"
}

然后我创建了我的 DTO:

data class Foo<T> (
    @JsonProperty("payload")
    val payload: T
    @JsonProperty("op")
    val op: String
)

data class Bar (
    @JsonProperty("title")
    val title: String,
    //...
)

为了消费这些消息,我尝试这样做:

@Configuration
class MyListener {

    private val logger = KotlinLogging.logger {}

    @Bean
    fun processData(): Consumer<Message<Foo<Bar>> {
        return Consumer { message ->
            logger.info { "Received message: ${message.payload}" }
        }
    }
}

但是会抛出类似的错误

 class [B cannot be cast to class com.example.dto.FooDTO ([B is in module java.base of loader 'bootstrap'; com.example.dto.FooDTO is in unnamed module of loader 'app')

检查消费者上的

message
对象,我可以看到它是一个
GenericMessage
类型,其中
payload
包含
ByteArray
(因此可能是错误中的
B
)。

但是,如果我使用

Consumer<Message<String>>
,我可以将
payload
视为预期的 JSON 字符串。

我的期望是,通过设置

Consumer<Message<Foo<Bar>>>
,它将在我的应用程序上下文中使用
ObjectMapper
来反序列化该 JSON,就像它在 Spring Web 中所做的那样......?

我还尝试配置一些自定义消息序列化器/反序列化器,但失败了。

我当然可以保留

Message<String>
来获取 JSON 负载,然后手动注入
ObjectMapper
并自行反序列化,但我希望以更流畅的方式配置它。

基本上以下方法有效,但我不想每次都注入

ObjectMapper
并手动反序列化它:

    @Bean
    fun processData(): Consumer<Message<String>> {
        return Consumer { message ->
            val payload = objectMapper.readValue<Foo<Bar>>(message.payload.toString())
            logger.info { "Received message: ${message.payload}" }
        }
    }

我还尝试提供一个

MessageConverter
bean,它基本上可以设置我的
ObjectMapper
虽然我认为这已经是默认的,但这是我看到的一些建议

    @Bean
    fun customMessageConverter(objectMapper: ObjectMapper): MessageConverter {
        val converter = MappingJackson2MessageConverter()
        converter.objectMapper = objectMapper
        return converter
    }

那么这可能吗?我错过了什么?

谢谢!

编辑

经过进一步调查,我意识到反序列化失败,因为有效负载 JSON 对我的 DTO 具有未知字段。 我确实有

spring.jackson.deserialization.fail-on-unknown-properties=false
(无论如何都是默认的),但只有当我用
@JsonIgnoreProperties(ignoreUnknown = true)
注释 DTO 时它才有效。试图理解为什么会这样。

spring-boot kotlin apache-kafka spring-cloud-stream-binder-kafka
1个回答
0
投票

进一步挖掘源代码后,我可以发现

JsonMessageConverter
使用的默认
spring-cloud-function
正在使用
JsonMapper
,这似乎没有考虑我的
ObjectMapper
属性。

为了解决这个问题,我所做的是注册一个自定义

MappingJackson2MessageConverter
,它使用我的应用程序
ObjectMapper
并负责反序列化泛型类型:

    @Bean
    fun customMessageConverter(objectMapper: ObjectMapper): MessageConverter {
        return object: MappingJackson2MessageConverter() {

            init {
                this.objectMapper = objectMapper
            }

            override fun convertFromInternal(message: Message<*>, targetClass: Class<*>, conversionHint: Any?): Any? {
                try {
                    val type = if (conversionHint is ParameterizedType) {
                        objectMapper.typeFactory.constructType(conversionHint)
                    } else {
                        objectMapper.constructType(targetClass)
                    }
                    return objectMapper.readValue(message.payload as ByteArray, type)
                } catch (e: IOException) {
                    throw MessageConversionException("couldnt convert", e)
                }
            }
        }
    }

不确定这是否是最好的方法,但它有效

© www.soinside.com 2019 - 2024. All rights reserved.