我一直在致力于从 Spring 2.X 到 3.X 的迁移,我的旧代码在消费者中使用了 @EnableBinding(Sink::class) 和 @StreamListener(Sink.INPUT) 。由于两者都被弃用并从消费者中删除,当我使用消费者运行 Kafka 测试时,会发生以下错误:
org.opentest4j.AssertionFailedError:
expected: 1L
but was: 0L
我尝试将 @KafkaListener(topics=["kafka-test"], group-id="test-group") 放入 StreamListener 所在位置:但它导致了相同的错误。
我重写了 StreamListener 所在的消费者:
fun consume(message: Message<*>) {
到
fun consume(): Consumer<Message<*>> = Consumer { message ->
但这也导致了错误:
Too many arguments for public open fun consume(): Consumer<Message<*>>
通过的论点是:
kafkaConsumer.consume(GenericMessage("""{"aV": 1, "dI": 3, "id": $alertId}""".toByteArray()))
将其重写为:
@KafkaListener(topics = ["test-topic"], groupId = "group-test")
fun consume(message: GenericMessage<ByteArray>) {`
结果与预期相同 1L,但错误为 0L。
在迁移之前,仅删除 EnableBinding 和 StreamListener 但所有相同的旧 2.X 依赖项也会发生相同的错误。我想知道对于 kafka 消费者来说,是否有一些东西的功能与这两者相同?
当前迁移的 kafka 依赖项为 Spring-kafka 3.1.4、spring-cloud-stream 4.1.1、kafka-clients 6.2.1、spring-integration 6.1.6。
而不是
kafkaConsumer.consume(GenericMessage(...))
你必须使用
kafkaConsumer.consume().accept(GenericMessage(...))