Reactor是一个基础库,用于构建JVM上的反应式快速数据应用程序。它提供了Java,Groovy和其他JVM语言的抽象,使构建事件和数据驱动的应用程序更容易。它也非常快。
考虑我有大量整数,并且此方法模拟异步外部 api 数据检索,它可以为某些特定的未知输入返回空响应: 公共静态单声道 考虑到我有大量的整数,并且此方法模拟异步外部 api 数据检索,它可以为某些特定的未知输入返回空响应: public static Mono<String> getApiData(int i) { if (i == 3) return Mono.empty(); // i'm using 3 just as an example return Mono.just(String.valueOf(i * 2)); } 这些方法将根据结果执行 getApiData 输出: // when getApiData returns non empty mono public static Mono<Boolean> updateDatabaseWithApiData(int apiInput, String apiOutput) { System.out.println(apiInput + " -> " + apiOutput); // lots of unrelated logic return Mono.just(true); } // when getApiData returns empty mono public static Mono<Boolean> logFailure(int apiInput) { System.out.println(apiInput + " -> failure"); // registering errors logs return Mono.just(false); } 用这个我想编写一个像这样的方法Mono<Boolean> processFluxUntilFailure(Flux<Integer> flux),它对每个元素应用getApiData,并且在发生故障时停止。那么如果至少有一个元素达到updateDatabaseWithApiData,则返回Mono.just(true),否则返回Mono.just(false)。 所以我会得到这个输出: public static void main(String[] args) { Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5); processFluxUntilFailure(flux).subscribe(value -> System.out.println("result " + value)); } 所需输出: 1 -> 2 2 -> 4 3 -> failure result true 因为我们已经处理了(至少 1)2 个成功的元素。 考虑到: 这是我真正问题的简化版本 我无法预测数据何时会为空 getApiData 我无法改变所描述的方法,只能processFluxUntilFailure 我试过这个: public static Mono<Boolean> processFluxUntilFailure(Flux<Integer> flux) { return flux.flatMap(apiInput -> getApiData(apiInput) .flatMap(apiOutput -> updateDatabaseWithApiData(apiInput, apiOutput)) .switchIfEmpty(Mono.defer(() -> logFailure(apiInput))) ) .reduce((b1, b2) -> b1 || b2); } 这导致了 1 -> 2 2 -> 4 3 -> failure 4 -> 8 5 -> 10 result true 我怎样才能从这次尝试中获得我想要的输出?换句话说,如果满足某些异步条件,我如何“停止”flatMap? 如果有任何不那么冗长的方法,我很乐意: public static Mono<Boolean> processAndUpdate(Flux<Integer> flux) { return flux .flatMap(apiInput -> getApiData(apiInput) .flatMap(apiOutput -> updateDatabaseWithApiData(apiInput, apiOutput)) .switchIfEmpty(Mono.defer(() -> logFailure(apiInput))) ).<Boolean>handle((b, sink) -> { if (b) sink.next(true); else sink.complete(); }) .defaultIfEmpty(false) .reduce((b1, b2) -> b1 || b2); } 给我这个输出: 1 -> 2 2 -> 4 3 -> failure result true 显然 flatMap 和 handle 并没有急于评估,将它们放在一起就成功了
使用 Flux.groupBy 排序数据,将 take(Duration.ofMillis) 替换为例如直到
考虑到下面的代码我使用 Flux groupBy 和 .take(Duration.ofMillis(10)),我每秒可以处理大约 50K 记录。在使用 Flux.just 在 localhost 上进行测试时,我可以将延迟值设置为 1 m...
在我的 API 中,我有一个调用外部 API 的服务。此外部 API 按请求收取费用。因此我想对外部 API 的请求进行排队。 这就是今天的样子: @Compon...
无法在reactor kafka中使用schemaRegistry
我正在使用reactor kafka设置kafka消费者。 Producer 与 kafka 模式注册表集成 @Value("${spring.kafka.schemaRegistryUrls}") 私有字符串
我有一个变化,所有的值同时出现。我想对这些值的块并行执行一项时间密集型任务。为什么这段代码在同一个线程上执行每个映射......
我有一个场景来处理客户号码列表并更新数据库中的详细信息 公共无效updateCustomerDetailsInDB(ListnumberList) for(字符串数字:数字列表){ if(!isExistBy(
我目前正在开发一个TCP服务器/客户端应用程序,并且我已经成功建立了服务器和客户端之间的通信。虽然对于小消息来说沟通效果很好,但我
我想知道在 spring webflux 中进行某种切换的最佳实践是什么。 我的意思是,假设我需要理解传入参数(例如请求参数),以经典的方式我会写...
JAVA PROJECT REACTOR CORE 中 TCP 连接的消息分块
我目前正在开发一个TCP服务器/客户端应用程序,并且我已经成功建立了服务器和客户端之间的通信。虽然对于小消息来说沟通效果很好,但我
Flux.doOnEach 和 Flux.doOnNext 与 Flux.publishOn 的不同行为
我试图理解使用 doOnEach 运算符和空 Flux 进行发布的后果。我有一个简单的测试: 私有静态无效incrementOnNext(AtomicInteger值,Signal符号...
我有一个代码,我想重构它以删除元组,但我不确定如何在其他方法中重用 validateInfractionId 和 validateInfractionVersion 方法中的列表...
我正在使用 Spring 数据生成一个 ReactiveCrudRepository,我正在调用 saveAll。 我有一个执行多个 API 请求的集成测试,最后一个更新 3 个实体,使用 saveAl ...
我正在研究 REST API,对于收集端点,希望 _embedded 数组异步填充。不幸的是,我似乎无法在拥有
Mono.just() 在我的用例中当发出的元素挂起时仍然阻塞
我有两个测试使用 sleep() 模拟需要时间处理的 api 调用,并测试 Mono.just() 是否使其成为非阻塞。 在我的第一个测试中,我直接发出了一个字符串,但让它阻塞了......
我正在使用 Reactive kafka 来消费事件。 问题:我向队列中推送了 7 个事件,但消费者只消费了其中的 5 个。 (仅在部署在服务器上时发生,在本地工作正常
我正在使用 Spring Cloud Gateway 4 来实现代理。 它应该做的一件事是,当接受一个大的视频文件时,在将它发送到请求的目的地之前,给它加水印。 作为...
处理返回 Flux 的 Spring Integration MessageHandler
我的消息处理程序返回一个通量,理想情况下,我希望 spring 集成来处理阻塞/订阅通量,以便我下面的第二个处理程序获得已解析的对象。但是当以下...
我想要达到的目标: 将尽可能多的 http 请求并行地从一个积极的 Flux 发送到一个非常可靠的第三方服务 背景: 第三方服务很靠谱。。。
Spring Application使用Reactor Kafka消费消息。 问题 1:是否有标准约定在应用程序关闭期间暂停消息消费并完成处理动态消息?
如何在Spring Cloud Gateway中对外部API进行身份验证,然后再将请求转发给它的命运?
Spring Cloud Gateway 似乎是在将请求转发到后端之前执行某种验证(特别是身份验证检查)的有效工具,但我找不到太多重点教程...