我有一个自定义的 Kafka 库,出于业务原因我不得不使用它。我的用例是从 Kafka 读取每条记录并将其保存到存储库。我的存储库的条目是我编写的回调,它实现了类似的功能
interface Listener<T> {
Response apply(T val);
}
图书馆带走了我的听众,迭代来自 Kafka 的
ConsumerRecords<T>
,然后我就走了。我想看看是否可以使用 Spring Data 中的反应式存储库来保存所有内容。类似的东西
Response apply(T val) {
myRepo.save(val)
.subscribe()
...
return response;
}
到目前为止,我只尝试对 Repository.save 的返回值调用 subscribe,但我没有办法处理所有这些 Publishers
但我立即遇到了问题:
ConsumerRecords
内的每次迭代创建一个Mono,但我永远不会丢弃其中任何一个。我实际上无法判断何时处理每个 Mono,所以这听起来只会造成内存泄漏ConsumerRecords
集合开头,所以看起来这自然应该是一个 Flux,但由于我的条目是回调,我不知道如何实现这一点理想的情况是有某种方法来声明 Flux 并对其进行设置,以便将传入的每个有效负载添加到 Flux 中,然后调用 MyRepository.saveAll(Flux vals) 并返回一个我可以稍后处理的 Flux,但是如果我了解 Reactor 指南,我不能只是在现有的 Flux 中添加一些东西。
这只是一个无望的事业吗?如果我只获取从调用
MyRepository.save(T)
返回的 Mono 实例,我什么时候调用它的 dispose ?
要将基于回调/侦听器的代码转换为响应式(但不完全,请参阅下面的警告部分),Reactor 提供了 Flux.create() 和 Flux.push() 静态方法。
它们几乎是等效的,除了push仅处理单线程数据源。
有了这个,您可以:
创建接收到的 Kafka 消息的 Flux:
Flux<R> receiveRecords(KafkaConsumer<R> datasource) {
return Flux.push(sink -> {
Listener<R> reactiveBridge = record -> sink.next(record);
datasource.addListener(reactiveBridge);
sink.onDispose(datasource.removeListener(reactiveBridge));
});
}
有了这个,您可以返回到响应式范例,并将创建的通量flatMap添加到您的存储库中:
Flux<R> savedRecords = receiveRecords(datasource)
.flatMap(record -> reactiveRepo.save(record))
反应式范式的主要优点之一是能够管理背压,即使上游数据流的速度适应下游处理器的能力。
注意:在这种模式下,您将失去这种优势。背压变得难以处理,因为您的数据源推动没有限制。 create 和 push 都有一个变体,允许您自定义背压下的通量行为,但它仅限于使通量失败、删除一些接收到的值或无限制地缓冲接收到的值(这可以导致内存溢出错误)。
因此,如果您的 kafka 消费者非常快(很可能是这种情况),而您的数据库插入速度很慢,您可能会遇到在不丢失数据的情况下难以处理的问题。
你可以尝试使用批量来篡改这个问题:
Flux<Long> savedBatchSizes = receiveRecords(datasource)
.buffer(100, Duration.ofSeconds(2))
.flatMap(batch -> reactiveRepo.saveAll(batch).count())
但是这里没有强有力的保证。