Reactor是一个基础库,用于构建JVM上的反应式快速数据应用程序。它提供了Java,Groovy和其他JVM语言的抽象,使构建事件和数据驱动的应用程序更容易。它也非常快。
我想分享(即分割)我的通量,但 share() 似乎不会导致我的订阅被共享。为什么? 我有一个由昂贵的数据库调用发出的通量。我想将通量和过程分开......
我有以下四种方法。 Mono 撕裂( ); Mono doB(A a); Mono doC(B b); Mono 关闭(A a); 最简单的工作流程如下所示。 ...
如何捕获Reactor RetryExhaustedException?
我有代码抛出reactor.core.Exceptions$RetryExhaustedException,我想捕获该特定异常。 不过,这个RetryExhaustedException并不是一个公共类,它是一个包-
ClassCastException:保存实体时无法将 Long 转换为 Integer 类
最近越来越 java.lang.ClassCastException: 类 java.lang.Long 无法转换为类 java.lang.Integer (java.lang.Long 和 java.lang.Integer 位于 loader 'bootst... 的 java.base 模块中...
如何解决 Mono.fromFuture 方法调用不一致的问题?
我正在开发一个运行Webflux(版本5.3)的Spring Boot项目。 在 REST 端点中,两个不同的 AWS DynamoDB 账户有两个异步更新,并通过 Mono.when() 聚合在一起。那个...
如何对使用 TransactionTemplate.execute 的方法进行单元测试并验证在execute() 中执行的代码
如何对以下代码进行单元测试? 公共 Mono 创建(MyObject myobject) { 返回 Mono.fromCallable(() -> transactionTemplate.execute(status -> { 尝试 {
我有三个工作类型的课程,如下所示。 界面工作{ 布尔执行(); } 类 Service1 实现 Work{ @覆盖 布尔执行(){ //一些
我正在尝试制作几个反应式微服务: 制作人之一: @RestController @RequiredArgsConstructor 公共类事件控制器{ 私人最终水槽。Many水槽; @PostM...
拥有像下面这样的异步发布者,Project Reactor 有没有一种方法可以等待整个流处理完成? 当然,无需为未知的持续时间添加睡眠...
Project Reactor Mono.就在 flatMap 中
尝试搜索一些有关此用法的信息,但尚未找到明确的答案。在阻塞操作上使用 flatMap(非反应式)。当某些操作如
为什么在 Flux 发射完元素后,Reactor 项目中的 Schedulers.newParallel() 没有停止运行?
我有一个原始的字符串 Flux,并在 main() 方法中运行此代码。 包com.example; 导入reactor.core.publisher.Flux; 导入reactor.core.scheduler.Schedulers; 导入reactor.util.
对于 Spring Boot Web 客户端,记录不可解析响应的首选机制是什么?
这是我的设置。我的大部分代码都通过 webflux 使用 .bodyToMono 。但是,如果发生解析错误,Webclient 没有很好的方法来获取导致问题的原始正文。在这个例子中,
如何在Spring Cloud Gateway中获取请求体并addHeader
我打算将一个项目从Zuul迁移到Spring Cloud Gateway。 我有一个“校验和代码”,但我不知道如何迁移它。 在 zuul 代码中,我获取了 url 参数和 json 正文,然后我执行
我有一个http客户端和执行器,当所有工作完成后应该将其关闭。 我正在尝试按照此处针对 RxJava 1.x 描述的方式使用 Flux.using 方法: https://github.com/meddle0x53/learning-
使用ReactiveSecurityContextHolder设置后Spring WebFlux安全上下文为空
我正在使用 Spring Boot 和 Spring Security 开发反应式应用程序,特别是处理 Kafka 消费者中的身份验证。尽管使用
由于背压,下游消费者的 Kafka 主题分区暂停。最终只有一个分区被排空
在我的项目中,我使用 spring-kafka (v3.1.1) 和reactor-kafka (v1.3.22) 来消费来自特定主题的事件。我们称之为“主题a”。下游消费者处理每条消息...
Project Reactor 中 doOnNext 的“即发即忘”操作
我有一个 Flux 流。对于处理的每个元素,我希望触发一个异步/非阻塞操作。例如,从数据库更新返回 Mono 的方法。 我想要...
这是我在响应式流规范中找到的内容 调用 onSubscribe、onNext、onError 或 onComplete 必须正常返回,除非任何提供的参数为 null,在这种情况下它必须...
functionCatalog.lookup(“sendFluxToWeb|sendFluxToKafka”)的问题
我想要实现的目标: 以反应方式将字符串流发送到 webapi 并发送到 kafka。 显示一些代码: 导入 org.springframework.boot.SpringApplication; 导入 org.springframework...
我需要创建一个从外部缓存获取数据的方法。如果找到数据,它会进行一些处理,否则它应该进行不同的处理。 该方法应该返回 Mono 单声道