特定 GroupedFlux 上的延迟元素等

问题描述 投票:0回答:1

我有一个与此类似的代码:

@Test
void shouldSlowdownOnlyEvens() {
    final Flux<Integer> globalFlux = Flux.range(1, 20000)
            .groupBy(number -> number % 2 == 0)
            .flatMap(this::dispatch);

    StepVerifier.create(globalFlux)
            .expectNextCount(20000)
            .verifyComplete();
}

private Publisher<? extends Integer> dispatch(GroupedFlux<Boolean, Integer> groupedFlux) {
    return groupedFlux.key() ?
            // even flux
            groupedFlux
                    .delayElements(Duration.ofMillis(1000))
                    .doOnNext(System.out::println) :
            groupedFlux.doOnNext(System.out::println);
}

在真实代码中,“Flux.range(1, 20000)”是多个主题的Kafka消费者生成的Flux。 通过这段代码,我想减慢偶数 Flux 的速度,并保持奇数 Flux 以相同的频率传播。 像这样使用“delayElements”,会减慢整个 Flux 的速度。也有delaySequence,但是具有一次消耗一堆数字的味道。

前面的代码显示:

enter image description here

这意味着(它打印奇数直到 513,然后开始在偶数和奇数之间交替直到最后,花费一秒钟打印每对。有什么提示吗?

spring-webflux project-reactor backpressure
1个回答
0
投票

这里的问题源于您的

groupBy
悬挂,根据文档

这些组需要在下游被排出和消耗,groupBy才能正常工作。值得注意的是,当标准产生大量组时,如果下游没有适当地消耗这些组,则可能会导致挂起(例如,由于带有maxConcurrency 参数设置得太低)。

(强调我的)

查看

groupBy
代码,它配置
Queues.SMALL_BUFFER_SIZE
的预取,除非明确指定,否则默认为
256

由于您生成了两个组,一个立即耗尽,另一个在指定的延迟之前根本不耗尽,因此当您的序列达到 512 个元素时,

groupBy
“挂起”,直到下游订阅者不耗尽其组(
delayElements
)在延迟后开始排水。然而,它基本上使 groupBy 缓冲区始终保持满(256),这会影响其他组,因为
groupBy
不会从上游更多
request(n)
直到慢速组耗尽其元素。

解决这个问题很容易,但会产生不同的影响,具体取决于您的实际代码实际使用的数据大小。

使用

onBackpressureBuffer
将从上游
request(unbounded)
GroupedFlux
,因此该组将得到适当的排水和缓冲。该运算符有一些影响,即缓冲区可以无限制地增长,如果数据很大和/或存在大量数据突发而下游处理速度不够快,则会导致高内存使用率或 OOM 错误。

使用此运算符调整代码:

private Publisher<? extends Integer> dispatch(GroupedFlux<Boolean, Integer> groupedFlux) {
    return groupedFlux.key() ?
            // even flux
            groupedFlux
                    .onBackpressureBuffer()
                    .delayElements(Duration.ofMillis(1000))
                    .doOnNext(System.out::println) :
            groupedFlux.doOnNext(System.out::println);
}

delayElements
与此运算符一起使用特别危险,因为它有效地将流降频到固定速率。在您的情况下,您的速率为 1Hz(每秒 1 个元素),如果上行速率始终高于 1Hz,则该缓冲区将无限期增长,直到 OOM。
delaySequence
更好,因为它不会降低流的速率,它只是将其抵消到未来。您应该花一些时间考虑一下您真正想要使用哪一个。

如果您想要

delayElements
的行为,那么使用
onBackpressureBuffer
的重载之一也是一个好主意,它可以让您指定最大缓冲区大小(无界缓冲区是万恶之源)并决定策略如果此缓冲区溢出则使用。
您还可以使用一些
bufferXX
运算符来实现类似的操作,但具有更多控制权,例如
bufferWhile

© www.soinside.com 2019 - 2024. All rights reserved.