Reactor - 动态更改 flatMap 并发度

问题描述 投票:0回答:1

是否可以在运行时更改 Reactor Flux flatMap 并发度?

flatMap API 允许我们在组装期间配置并发性。

我正在使用 flatMap 订阅 WebClient,WebClient 对外部服务进行 GET 或 REST 调用。在运行时,我可能会遇到诸如速率限制、服务不可用、内部服务器错误等问题,在这种情况下,我想将并发度设置为低至 1 以减少处理元素的数量。当事情恢复正常时,我想再次将并发性提高到最佳水平。

spring spring-boot project-reactor reactive-streams spring-webclient
1个回答
0
投票

你可以这样做:

flatMap(a-> {}, 1)

地点:

flatMap(a-> 函数: {}, **并发: 1)

你还可以设置不同的并发数作为变量:

flatMap(a-> {}, concurrency != null ? concurrency : Integer.MAX_VALUE)

默认情况下,如果未添加并发变量,则会重载并将默认并发变量设置为 Integer.MAX_VALUE。

我更喜欢使用布尔值,并且仅选择是否并发:

flatMap(a-> {}, isConcurrent ? Integer.MAX_VALUE : 1)

Reactor 文档的详细信息:

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html

将此 Flux 发出的元素异步转换为 Publishers,然后通过合并将这些内部发布者扁平化为单个 Flux,从而允许它们交错。

该运算符有三个维度可以与 flatMapSequential 和 concatMap 进行比较:

  • 内部消息的生成和订阅:该算子正在急切地订阅其内部消息。

  • 展平值的排序:此运算符不一定保留原始顺序,因为内部元素在到达时会展平。

  • Interleaving:此运算符让来自不同内部的值交错(类似于合并内部序列)。

并发参数允许控制可以并行订阅和合并的发布者数量。反过来,该参数显示了向上游发出的第一个 Subscription.request(long) 的大小。

public final <V> Flux<V> flatMap(Function<? super T,? extends Publisher<? extends V>> mapper,
                             int concurrency)

带有并发变量的flatMap

放弃支持:此运算符在数据信号触发取消或错误时丢弃内部排队等待反压的元素。

错误模式支持:该运算符支持在映射器函数中出现错误时恢复。映射器抛出的异常的行为就像将值映射到空发布者一样。如果映射器确实映射到标量发布者(一种无需订阅发布者即可立即解析值的优化,例如 Mono.fromCallable(Callable)),但所述发布者抛出异常,则可以以相同的方式恢复。

输入参数

V - 合并的输出序列类型

参数

mapper - 将输入序列转换为 N 个序列的函数 Publisher

并发 - 运行中的内部序列的最大数量

退货

新的助焊剂

© www.soinside.com 2019 - 2024. All rights reserved.