project-reactor 相关问题

Reactor是一个基础库,用于构建JVM上的反应式快速数据应用程序。它提供了Java,Groovy和其他JVM语言的抽象,使构建事件和数据驱动的应用程序更容易。它也非常快。

Java Reactor + Caffeine 缓存 + Mockito = 严格存根参数不匹配

我在 Spring-boot 项目中使用 Java Reactor,我需要在其中一个中间步骤中缓存我的数据。它在 Spring Boot 3、Java 17、Junit5 上...... 我的缓存服务如下所示: @Comp...

回答 1 投票 0

Blockhound 在网络客户端上进行网络调用时出错

我们正在使用reactor框架并使用WebClient进行外部网络调用。我们使用 blackhound 来确保我们的代码是非阻塞的。然而,blockhound 抛出异常

回答 1 投票 0

如何在Java中对Reactor Flux中的元素进行缓冲和分组

给定无限通量的对象,其中每个对象都有一个 ID,我如何使用 Flux 创建按 ID 属性分组的更新缓冲列表(保留最后发出的值)? 谢谢 例子 哦...

回答 3 投票 0

Project Reactor 中 doOnComplete() 和 subscribe() 第三个参数的区别

我目前正在使用 Project Reactor,我遇到了一个场景,我不确定是否使用 doOnComplete() 还是 subscribe() 的第三个参数。两者似乎都是在酒吧时被称为...

回答 1 投票 0

switchIfEmpty 在之前的 flatMap 返回 Mono.error() 时执行

我有一个带有类似代码的 Spring Gateway 过滤器: 公共 Mono 示例{ 返回 redisTemplate.opsForValue() .get(缓存键) .flatMap(this::throwMono) .

回答 1 投票 0

subscribe()方法是否阻塞?

我从 Project Reactor 开始,我无法理解为什么这段代码无限地打印“Hello”并且从不返回 Disposable 对象。 Flux Flux = Flux.generate(sink -&... 我从 Project Reactor 开始,我无法理解为什么这段代码无限地打印“Hello”并且从不返回 Disposable 对象。 Flux<Object> flux = Flux.generate(sink -> sink.next("Hello")); Disposable disposable = flux.subscribe(System.out::println); disposable.dispose(); System.out.println("This doesn't print"); 我认为,当调用 subscribe() 方法时,它必须立即返回 Disposable 对象,如果我愿意,我可以使用该对象取消订阅。我知道这个 subscribe 方法中的代码在同一个线程中运行,如果我在 delayElements 调用之前替换 subscribe 方法,那么下面的代码将起作用,因为它在单独的守护线程中运行,所以可以解释为什么吗?它是否停止在 subscribe 方法并且不返回 Disposable 以及是否有任何方法可以通过调用 subscribe 方法来管理订阅?是否可以类比delayElements方法,在单独的线程中执行,并且调用subscribe方法的结果立即返回Disposable? 我找不到这个问题的具体答案。在我看到的所有示例中,要么是有限数据流,要么使用了delayElements方法。 是的,subscribe()在这个特定场景中会阻塞。 Reactor 是并发无关的,这意味着默认情况下它不会对您强制执行任何线程/异步性,并在调用线程(在本例中为主线程)上执行管道。 您可以使用 subscribeOn 或 publishOn 显式更改此设置,也可以使用 delayElements 等某些运算符隐式更改。 Flux<Object> flux = Flux.generate(sink -> sink.next("Hello")).publishOn(Schedulers.parallel()); Disposable disposable = flux.subscribe(System.out::println); disposable.dispose(); System.out.println("This doesn't print");

回答 1 投票 0

反应堆中 vavr Either 的意外返回类型

使用 vavr 的 Either 有两种简单的方法。 公共要么 testEither(int s) { 如果(s==0) return Either.left("错误"); 返回 Either.right(s);...

回答 2 投票 0

如何在spring-webflux WebFilter中正确使用slf4j MDC

我参考了博客文章《使用 Reactor Context 和 MDC 进行上下文日志记录》,但我不知道如何在 WebFilter 中访问 Reactor 上下文。 @成分 公共类 RequestIdFilter 实现 WebF...

回答 6 投票 0

如何检查 Mono<Void> 是否真的被调用了?

我是反应式编程的新手(并且我读过很多类似的问题)。在我当前的宠物项目中,我遇到了以下问题(我正在为任务中的场景提供伪代码......

回答 1 投票 0

在异步上下文中调用 subscribe 是否正确?

我将加密密钥存储在 Redis 数据库中。我想实现一个后备机制来生成密钥,除非它们已经存在。 让我困惑的是我提出的解决方案是我打电话给

回答 1 投票 0

Kafka消费者偏移提交问题

对于使用 Reactor Kafka 的 Spring WebFlux,我为我的消费者提供了以下代码: 公共EventConsumer(KafkaReceiverinputEventReceiver, 消息助手

回答 1 投票 0

如何定时执行任务并响应外部信号?

想象一个重复执行的任务,每次执行之间最多有 10 秒的延迟。还有一个外部信号会导致任务立即执行。 (外部

回答 2 投票 0

如何拆分事件在每个组上应用不同的地图,然后将它们合并回来

我有一个项目列表,其中有 2 种类型的 id。我需要将它们分成 2 组,然后为一组调用另一种方法,该方法将丰富 id 并提供不同的 id,而对于另一组...

回答 1 投票 0

Spring Reactor:以非阻塞方式添加延迟

关于如何在方法中添加延迟但以非阻塞方式的小问题。 模拟长进程的一种非常流行的方法是使用 Thread.sleep(); 然而,对于 Reactor 项目来说,这是一个块......

回答 1 投票 0

Mono.block() 抛出 InspirationTargetException

我正在执行一个返回 Mono 的 WebClient get 调用,该调用成功了。但我试图通过在 Mono 上执行 block() 来获取字符串。结果,我得到了异常抛出。 这是我的代码

回答 1 投票 0

WebClient 未成功调用“POST”操作

我正在玩Spring的WebClient。 REST 端点的主要实现(在 DemoPOJORouter 和 DemoPOJOHandler 中)似乎可以工作。此外,DemoClientRouter 中的 http.Get 端点和

回答 1 投票 0

ReactDeserializationException发生时如何移动到下一个偏移量reactor-kafka接收器?

当将 spring-webflux 与reactor-kafka接收器一起使用时,当发生 RecordDeserializationException 时,如何手动移动/提交偏移量?从 RecordDeserializationException 我可以获取分区并......

回答 1 投票 0

反应式变换函数的行为不符合预期

当我提取各个步骤并单独执行它们时,我创建的反应器变换函数不会给出相同的结果。 我有包含价格类型列表的 DTO 记录。 公众反...

回答 1 投票 0

如何泛化Reactor函数?

我有以下静态函数来在调用另一个通量之前记录一些信息。 公共静态 Flux> logBefore(Consumer&...

回答 1 投票 0

Spring Data R2DBC - 未考虑背压?

此线程是 Github 问题的延续:https://github.com/spring-projects/spring-data-r2dbc/issues/194 语境: 你好, 我只是尝试了一个非常简单的示例,基于两个反应性存储库......

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.