我有一个与此类似的代码:
@Test
void shouldSlowdownOnlyEvens() {
final Flux<Integer> globalFlux = Flux.range(1, 20000)
.groupBy(number -> number % 2 == 0)
.flatMap(this::dispatch);
StepVerifier.create(globalFlux)
.expectNextCount(20000)
.verifyComplete();
}
private Publisher<? extends Integer> dispatch(GroupedFlux<Boolean, Integer> groupedFlux) {
return groupedFlux.key() ?
// even flux
groupedFlux
.delayElements(Duration.ofMillis(1000))
.doOnNext(System.out::println) :
groupedFlux.doOnNext(System.out::println);
}
在真实代码中,“Flux.range(1, 20000)”是多个主题的Kafka消费者生成的Flux。 通过这段代码,我想减慢偶数 Flux 的速度,并保持奇数 Flux 以相同的频率传播。 像这样使用“delayElements”,会减慢整个 Flux 的速度。也有delaySequence,但是具有一次消耗一堆数字的味道。
前面的代码显示:
这意味着(它打印奇数直到 513,然后开始在偶数和奇数之间交替直到最后,花费一秒钟打印每对。有什么提示吗?
这里的问题源于您的
groupBy
悬挂,根据文档:
这些组需要在下游被排出和消耗,groupBy才能正常工作。值得注意的是,当标准产生大量组时,如果下游没有适当地消耗这些组,则可能会导致挂起(例如,由于带有maxConcurrency 参数设置得太低)。
(强调我的)
groupBy
代码,它配置Queues.SMALL_BUFFER_SIZE
的预取,除非明确指定,否则默认为256
。
由于您生成了两个组,一个立即耗尽,另一个在指定的延迟之前根本不耗尽,因此当您的序列达到 512 个元素时,
groupBy
“挂起”,直到下游订阅者不耗尽其组(delayElements
)在延迟后开始排水。然而,它基本上使 groupBy 缓冲区始终保持满(256),这会影响其他组,因为 groupBy
不会从上游更多 request(n)
直到慢速组耗尽其元素。
解决这个问题很容易,但会产生不同的影响,具体取决于您的实际代码实际使用的数据大小。
onBackpressureBuffer
将从上游 request(unbounded)
GroupedFlux
,因此该组将得到适当的排水和缓冲。该运算符有一些影响,即缓冲区可以无限制地增长,如果数据很大和/或存在大量数据突发而下游处理速度不够快,则会导致高内存使用率或 OOM 错误。
使用此运算符调整代码:
private Publisher<? extends Integer> dispatch(GroupedFlux<Boolean, Integer> groupedFlux) {
return groupedFlux.key() ?
// even flux
groupedFlux
.onBackpressureBuffer()
.delayElements(Duration.ofMillis(1000))
.doOnNext(System.out::println) :
groupedFlux.doOnNext(System.out::println);
}
delayElements
与此运算符一起使用特别危险,因为它有效地将流降频到固定速率。在您的情况下,您的速率为 1Hz(每秒 1 个元素),如果上行速率始终高于 1Hz,则该缓冲区将无限期增长,直到 OOM。 delaySequence
更好,因为它不会降低流的速率,它只是将其抵消到未来。您应该花一些时间考虑一下您真正想要使用哪一个。
如果您想要
delayElements
的行为,那么使用 onBackpressureBuffer
的重载之一也是一个好主意,它可以让您指定最大缓冲区大小(无界缓冲区是万恶之源)并决定策略如果此缓冲区溢出则使用。bufferXX
运算符来实现类似的操作,但具有更多控制权,例如 bufferWhile
。