我想实现部分刷新收到的内容。例如,我有处理程序:
return client
.post()
.body(BodyInserters.fromDataBuffers(
request.body(BodyExtractors.toDataBuffers())))
.exchange()
.....
如果收到一定数量的数据库,如何强制刷新?
首先,对此有一个警告:
application/streaming+json
所做的事情。现在要实现这一点,Reactor为几个windowXYZ
运营商提供了不同的策略。 Flux.window(int)
基于元素的数量,基于持续时间的windowTimeout(Duration)
,以及其他。在这种情况下,您可能想要使用windowUntil(Predicate)
。
让我们尝试实现在缓冲一定数量的数据时刷新的东西。
Flux<DataBuffer> buffers = //...;
int maxSize = //...;
AtomicInteger currentSize = new AtomicInteger(0);
Flux<Flux<DataBuffer>> bufferWindow = buffers.windowUntil(buf -> {
if (currentSize.addAndGet(buf.readableByteCount()) < maxSize) {
return false;
}
currentSize.set(0);
return true;
});
WebClient.create()
.post()
.body((outputMessage, context) -> outputMessage.writeAndFlushWith(bufferWindow))
.retrieve();
请注意,如果您在无限数据流上运行,则实现存在缺陷:在达到配额或源完成之前,这不会刷新。因此,这可能会使数据保留的时间超过必要的时间。