当我受到阻塞库的限制时,将阻塞代码转换为反应式代码

问题描述 投票:0回答:1

我有一个自定义的 Kafka 库,出于业务原因我不得不使用它。我的用例是从 Kafka 读取每条记录并将其保存到存储库。我的存储库的条目是我编写的回调,它实现了类似的功能

interface Listener<T> {
    Response apply(T val);
}

图书馆带走了我的听众,迭代来自 Kafka 的

ConsumerRecords<T>
,然后我就走了。我想看看是否可以使用 Spring Data 中的反应式存储库来保存所有内容。类似的东西

Response apply(T val) {

    myRepo.save(val)
       .subscribe()

...
    return response;

}

到目前为止,我只尝试对 Repository.save 的返回值调用 subscribe,但我没有办法处理所有这些 Publishers

但我立即遇到了问题:

  1. 我正在为
    ConsumerRecords
    内的每次迭代创建一个Mono,但我永远不会丢弃其中任何一个。我实际上无法判断何时处理每个 Mono,所以这听起来只会造成内存泄漏
  2. 我知道这个库以
    ConsumerRecords
    集合开头,所以看起来这自然应该是一个 Flux,但由于我的条目是回调,我不知道如何实现这一点

理想的情况是有某种方法来声明 Flux 并对其进行设置,以便将传入的每个有效负载添加到 Flux 中,然后调用 MyRepository.saveAll(Flux vals) 并返回一个我可以稍后处理的 Flux,但是如果我了解 Reactor 指南,我不能只是在现有的 Flux 中添加一些东西。

这只是一个无望的事业吗?如果我只获取从调用

MyRepository.save(T)
返回的 Mono 实例,我什么时候调用它的 dispose ?

apache-kafka spring-webflux project-reactor spring-data-elasticsearch
1个回答
0
投票

要将基于回调/侦听器的代码转换为响应式(但不完全,请参阅下面的警告部分),Reactor 提供了 Flux.create()Flux.push() 静态方法。

它们几乎是等效的,除了push仅处理单线程数据源。

它们如何适合您的情况

有了这个,您可以:

  1. 创建接收到的 Kafka 消息的 Flux:

    Flux<R> receiveRecords(KafkaConsumer<R> datasource) {
        return Flux.push(sink -> {
            Listener<R> reactiveBridge = record -> sink.next(record);
            datasource.addListener(reactiveBridge);
            sink.onDispose(datasource.removeListener(reactiveBridge));
        });
    }
    
  2. 有了这个,您可以返回到响应式范例,并将创建的通量flatMap添加到您的存储库中:

    Flux<R> savedRecords = receiveRecords(datasource)
        .flatMap(record -> reactiveRepo.save(record))
    

警告

反应式范式的主要优点之一是能够管理背压,即使上游数据流的速度适应下游处理器的能力。

注意:在这种模式下,您将失去这种优势。背压变得难以处理,因为您的数据源推动没有限制。 createpush 都有一个变体,允许您自定义背压下的通量行为,但它仅限于使通量失败、删除一些接收到的值或无限制地缓冲接收到的值(这可以导致内存溢出错误)。

因此,如果您的 kafka 消费者非常快(很可能是这种情况),而您的数据库插入速度很慢,您可能会遇到在不丢失数据的情况下难以处理的问题。

你可以尝试使用批量来篡改这个问题:

Flux<Long> savedBatchSizes = receiveRecords(datasource)
    .buffer(100, Duration.ofSeconds(2))
    .flatMap(batch -> reactiveRepo.saveAll(batch).count())

但是这里没有强有力的保证。

© www.soinside.com 2019 - 2024. All rights reserved.