替代Kafka 3.6.2中的@EnableBinding和@StreamListener

问题描述 投票:0回答:1

我一直在致力于从 Spring 2.X 到 3.X 的迁移,我的旧代码在消费者中使用了 @EnableBinding(Sink::class) 和 @StreamListener(Sink.INPUT) 。由于两者都被弃用并从消费者中删除,当我使用消费者运行 Kafka 测试时,会发生以下错误:

org.opentest4j.AssertionFailedError: 
expected: 1L
 but was: 0L

我尝试将 @KafkaListener(topics=["kafka-test"], group-id="test-group") 放入 StreamListener 所在位置:但它导致了相同的错误。

我重写了 StreamListener 所在的消费者:

    fun consume(message: Message<*>) {

fun consume(): Consumer<Message<*>> = Consumer { message ->

但这也导致了错误:

Too many arguments for public open fun consume(): Consumer<Message<*>>

通过的论点是:

kafkaConsumer.consume(GenericMessage("""{"aV": 1, "dI": 3, "id": $alertId}""".toByteArray()))

将其重写为:


@KafkaListener(topics = ["test-topic"], groupId = "group-test")
    fun consume(message: GenericMessage<ByteArray>) {`

结果与预期相同 1L,但错误为 0L。

在迁移之前,仅删除 EnableBinding 和 StreamListener 但所有相同的旧 2.X 依赖项也会发生相同的错误。我想知道对于 kafka 消费者来说,是否有一些东西的功能与这两者相同?

当前迁移的 kafka 依赖项为 Spring-kafka 3.1.4、spring-cloud-stream 4.1.1、kafka-clients 6.2.1、spring-integration 6.1.6。

spring spring-boot spring-cloud spring-kafka spring-cloud-stream
1个回答
0
投票

而不是

kafkaConsumer.consume(GenericMessage(...))

你必须使用

kafkaConsumer.consume().accept(GenericMessage(...))
© www.soinside.com 2019 - 2024. All rights reserved.