WebClient如何刷新部分接收数据?

问题描述 投票:0回答:1

我想实现部分刷新收到的内容。例如,我有处理程序:

return client
    .post()
    .body(BodyInserters.fromDataBuffers(
        request.body(BodyExtractors.toDataBuffers())))
    .exchange()
    .....

如果收到一定数量的数据库,如何强制刷新?

spring-boot spring-webflux reactor-netty
1个回答
0
投票

首先,对此有一个警告:

  • 默认情况下(即如果您不手动刷新),只要通道准备就绪并且刷新策略认为方便,Netty就会缓冲字节并不时刷新它们。这针对性能进行了优化。
  • 如果您希望手动刷新,它不能保证对方将以相同的方式接收这些字节组;中间人可能会一路缓冲事物。这可能无法实现您的目标:手动刷新通常不是关于性能优化,而是协议语义。
  • 使用手动刷新策略仅在将其与协议语义(如消息分隔符)配对时才有用,以便对方知道如何拆分消息(这就是Spring WebFlux为SSE和application/streaming+json所做的事情。

现在要实现这一点,Reactor为几个windowXYZ运营商提供了不同的策略。 Flux.window(int)基于元素的数量,基于持续时间的windowTimeout(Duration),以及其他。在这种情况下,您可能想要使用windowUntil(Predicate)

让我们尝试实现在缓冲一定数量的数据时刷新的东西。

Flux<DataBuffer> buffers = //...;

int maxSize = //...;
AtomicInteger currentSize = new AtomicInteger(0);
Flux<Flux<DataBuffer>> bufferWindow = buffers.windowUntil(buf -> {
    if (currentSize.addAndGet(buf.readableByteCount()) < maxSize) {
        return false;
    }
    currentSize.set(0);
    return true;
});

WebClient.create()
        .post()
        .body((outputMessage, context) -> outputMessage.writeAndFlushWith(bufferWindow))
        .retrieve();

请注意,如果您在无限数据流上运行,则实现存在缺陷:在达到配额或源完成之前,这不会刷新。因此,这可能会使数据保留的时间超过必要的时间。

© www.soinside.com 2019 - 2024. All rights reserved.