Reactor是一个基础库,用于构建JVM上的反应式快速数据应用程序。它提供了Java,Groovy和其他JVM语言的抽象,使构建事件和数据驱动的应用程序更容易。它也非常快。
将反应式 Mono 上下文传播到 Caffeine AsyncCache
在我们的反应式应用程序(Spring WebFlux、Reactor 项目)中,我们在反应式上下文中携带重要的日志信息。问题是当我们使用 Caffeine AsyncCache 时,上下文是......
如何从 WebFlux 中的 Mono<List<T>> 中提取内容并将其传递到调用链中
我希望能够从 Mono> 中提取 List,将其传递给下游服务进行处理(或者可能从读取(RequestParams params)返回...
我目前正在尝试了解Redis的一些基本实现。我知道 Redis 是单线程的,并且我已经偶然发现了以下问题:Redis 是单线程的,...
我有一个水槽,上面有一个订户。在压力测试期间,我们意识到队列已满并且接收器发出 FAIL_OVERFLOW 信号。继续测试后...
我需要在反应式管道中有一个条件循环。只要条件为真,它就应该继续循环,一旦条件变为假,它就应该停止循环并返回它所去的任何值...
如何在Java Reactor中创建Tuple2、Tuple3等?
创建此问题和答案是为了帮助其他人快速查找如何创建Reactor Tuple,例如reactor.util.function.Tuple2 我在
如何使用Java中的projectreactor将错误翻转为无错误?
我有 Mono 的实例 它可以具有价值,也可以是错误。现在我需要将其转换为 Mono,其中错误和无错误翻转 单核细胞增多症 .doMagic( $ -> { ...
我需要实现以下行为: 发出 REST 发布请求 如果响应返回状态为 429 Too much requests,则最多重试 3 次,延迟 1 秒 如果第三次重试失败或出现任何情况
如何在使用反应式数据源的 WebFlux 上编写自定义验证器
在 Spring MVC 中,我有一个 @UniqueEmail 自定义休眠验证器(用于在注册时检查电子邮件的唯一性),如下所示: 公共类 UniqueEmailValidator 实现 ConstraintValidat...
flatMap 中的单声道超时 - 在“flatMap”中的 20000 毫秒内没有观察到任何项目或终端信号(并且未配置后备)
我正在使用 Spring WebFlux 发出 POST 请求,但收到以下错误: java.util.concurrent.TimeoutException: 没有观察到任何项目或 'flatMap 中 20000ms 内的终端信号...
我有一个场景,第二个单声道依赖于第一个单声道,而 ThridMono 调用依赖于第二个单声道输出。 我编写的代码如下。 首先Mono.flatMap{ val secondaryMono = callWebservice(
我有一个自定义的 Kafka 库,出于商业原因我不得不使用它。我的用例是从 Kafka 读取每条记录并将其保存到存储库。我的存储库的条目是我编写的回调...
Spring Boot 响应式编程中 subscribeOn(Schedulers.boundedElastic()) 的意外线程行为
我在 Spring Boot 中使用 getAPI 进行反应式编程。在应用 subscribeOn(Schedulers.boundedElastic()) 时,我希望线程是有界弹性的。然而,我却没有这么做,而是……
我之前将实体保存到表中,如下所示: insertTableOne(foo).then(insertTableTwo(foo)).then(Mono.just(foo)); 团队成员建议使用 .and() 来并行插入: 插入表格O...
是否可以在运行时更改Reactor Flux flatMap并发度? flatMap API 允许我们在组装期间配置并发性。 我正在使用 flatMap 订阅
Kotlin 协程 + opentelemetry 失去上下文并获得 NPE
我试图了解如何使用 spring webflux 和 opentelemetry 与 kotlin 协程一起使用。 当我尝试在协程中执行一些日志时,我在 kotlin.coroutines.jvm.internal 上收到 NPE。
我有一个 Spring Boot 服务,它使用 WebClient 调用另一个外部服务。外部服务的平均延迟约为 15 ms(通过 Datadog 测量)。但是当外部API是c...
我有这样的代码逻辑,它使用 R-Socket 将消息从一台服务器发送到另一台服务器,整个代码是使用反应器(Flux/Mono)编写的。有时我注意到有些消息不是
我有点困惑。 问题在于通过重试创建反应式调用链。它进入步骤 3,而不保存步骤 2 中重试的值。如果我删除 onErrorContinue,它将重试成功保存...
如何使 Reactor TestPublisher 在第一个订阅时返回错误,但为下一个订阅发出值以测试重试?
我想编写一个单元测试,以确保某些重试机制到位,因此被测试的代码可以在发布者第一次订阅时发出错误时处理。 假设我