哪个版本的 reactor-kafka 适用于 Spring 3.1.0-M2?

问题描述 投票:0回答:1

使用 reactor-kafka 1.2.2.RELEASE 运行 Spring 3.1.0-M2 会产生以下错误。哪个版本与 3.2.0-M2 兼容?

我正在尝试创建一个从 Kafka 消费并使用 R2DBC 写入 MS SQL 数据库的反应式应用程序。

2023-04-03T08:28:19.934-06:00 ERROR 22144 --- [event-tracker-1] reactor.core.scheduler.Schedulers        : KafkaScheduler worker in group main failed with an uncaught exception

java.lang.NoSuchMethodError: 'void org.apache.kafka.clients.consumer.Consumer.close(long, java.util.concurrent.TimeUnit)'
at reactor.kafka.receiver.internals.DefaultKafkaReceiver$CloseEvent.run(DefaultKafkaReceiver.java:685) ~[reactor-kafka-1.2.2.RELEASE.jar:1.2.2.RELEASE]
at reactor.kafka.receiver.internals.DefaultKafkaReceiver.doEvent(DefaultKafkaReceiver.java:401) ~[reactor-kafka-1.2.2.RELEASE.jar:1.2.2.RELEASE]
at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$start$14(DefaultKafkaReceiver.java:335) ~[reactor-kafka-1.2.2.RELEASE.jar:1.2.2.RELEASE]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.5.4.jar:3.5.4]
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440) ~[reactor-core-3.5.4.jar:3.5.4]
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527) ~[reactor-core-3.5.4.jar:3.5.4]
at reactor.kafka.receiver.internals.KafkaSchedulers$EventScheduler.lambda$decorate$1(KafkaSchedulers.java:100) ~[reactor-kafka-1.2.2.RELEASE.jar:1.2.2.RELEASE]
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) ~[reactor-core-3.5.4.jar:3.5.4]
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) ~[reactor-core-3.5.4.jar:3.5.4]
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

来自 pom.xml

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.0-M2</version>
    <relativePath/>
</parent>

<properties>
    <java.version>17</java.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-r2dbc</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    <dependency>
        <groupId>com.microsoft.sqlserver</groupId>
        <artifactId>mssql-jdbc</artifactId>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>io.r2dbc</groupId>
        <artifactId>r2dbc-mssql</artifactId>
        <version>1.0.0.RELEASE</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>io.projectreactor.kafka</groupId>
        <artifactId>reactor-kafka</artifactId>
        <version>1.2.2.RELEASE</version>
    </dependency>
</dependencies>

以下是消费服务的一部分。我收到一条记录,检查它是否有效,然后检查它是否已经存在于数据库中,最后保存记录。我尝试了一些 onError 配置。上面的异常似乎优先,所以没有捕获到错误。

@Transactional
Flux<EventInterface> consumeEventDTO() {
    return reactiveKafkaConsumerTemplate
            .receiveAutoAck()
            .delayElements(Duration.ofSeconds(2L)) // BACKPRESSURE
            .doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
                    consumerRecord.key(),
                    consumerRecord.value(),
                    consumerRecord.topic(),
                    consumerRecord.offset())
            )
            .map(ConsumerRecord::value)
            .flatMap(this::exists)
            .flatMap(this::save)
            .onErrorComplete(UncategorizedR2dbcException.class)
            .doOnNext(event -> {
                log.info("successfully consumed {}={}", EventInterface.class.getSimpleName(), event);
            })
            .doOnError(throwable -> log.error("something bad happened while consuming : {}", throwable.getMessage()));
}
java spring-boot apache-kafka project-reactor
1个回答
0
投票

reactor-kafka:1.2.2.RELEASE
好几年了

spring-kafka:3.0.5
应该使用
reactor-kafka:1.3.17
并且还带来了Kafka客户端+ Streams版本
3.3.2
...

您看到的错误与您的

kafka-clients
版本冲突这一事实有关......
此外,Spring Boot Parent 
reactor-kafka:1.2.2.RELEASE
版本不是“稳定的”;它们是预发布里程碑......


在这篇文章中,最新的稳定版本是

kafka-clients:2.0.0

,您可以在
文档页面上找到


© www.soinside.com 2019 - 2024. All rights reserved.