Reactor是一个基础库,用于构建JVM上的反应式快速数据应用程序。它提供了Java,Groovy和其他JVM语言的抽象,使构建事件和数据驱动的应用程序更容易。它也非常快。
线程 reactor-http-nio 长时间计算某些东西并消耗 CPU 和内存
我们有一个服务,我们通过 websocket 连接并从中接收消息。我们更新了 Spring Boot 3.0.6、reactor: 1.1.6、reactor-core 3.5.5 服务的依赖项。 连接后...
Reactor-netty:Theads reactor-http-nio 长时间计算一些东西并消耗 cpu 和内存
我们有一个服务,我们通过 websocket 连接并从中接收消息。 我们更新了 spring boot 3.0.6、reactor: 1.1.6、reactor-core 3.5.5 服务的依赖项。 还有一个公关...
Spring Boot 3 / reactor - 在 WebFilter 中访问跟踪
使用 spring-boot-starter-parent 3.0.6 和 spring-boot-starter-webflux 弹簧启动启动器安全 微米追踪 千分尺示踪桥otel 我也使用 Hooks.enableAutomaticContextPropa ...
缓冲流中的唯一值,直到它不再唯一 - 在 groupby 中使用缓存
我有一个卡夫卡话题的通量。所以我必须处理大量的无限热源。不过,为了简单起见,我将把它与整数通量结合起来。 我想把这个通量变成一个
我能以某种方式简化或改进这段代码吗?基本问题是我需要构建一个包含来自异步调用的多个值的 DTO。单声道和单声道 // getMovie 返回 Mono 返回
我正在实施死信逻辑,但在使用 webClient 调用外部服务时发生异常后,无法确认记录 订单消费者 。收到() .concatMa...
我正在处理一个应该调用下游服务的服务,并且根据该下游服务的响应,将调用我的下一个递归。 这是参考的示例代码...
Flux.sample(with Duration) 和 Flux.sampleTimeout() 有什么区别?
此方法的 2 个重载: public final Flux sampleTimeout(Function> throttlerFactory) 和 公开决赛 Flux
Mono 返回的错误没有被 onErrorResume 捕获
我正在使用 RxJava 2 通过 micronaut 框架进行反应式编程。我试图了解以下代码的问题: 有趣的 getItemDetails( itemRequest:请求到 ): 单声道 我正在使用 RxJava 2 通过 micronaut 框架进行反应式编程。我试图理解以下代码的问题: fun getItemDetails( itemRequest: RequestTo ): Mono<List<ResponseTo>> { return itemRequest.itemIdList.toFlux() .flatMap { findItemStatus(it) } .flatMap { response -> Mono.justOrEmpty(ResponseTo(response)) .onErrorResume { logger.error("Error occurred while mapping entity to response: $response", it) Mono.empty() } } .collectList() } RequestTo:由一个String列表组成。 这个想法是迭代这个列表,从数据库中获取每个项目的详细信息(通过返回数据库实体的findItemStatus()函数调用处理)并将其映射到响应对象。 我想这样工作,如果 findItemStatus() flatMap 操作返回的任何实体在将其映射到 ResponseTo 时导致异常,我希望在最终列表中跳过该项目但继续处理通量中的剩余项目.我不想将 onErrorResume 块保留在第二个 flatMap 之外,因为如果通量中的任何中间实体在映射时导致错误并且通量中的其余项目将不会被映射,这将关闭流。 我不确定我的代码有什么问题,但是当 Mono.justOrEmpty(ResponseTo(response)) 中抛出异常时,它不会在 onErrorResume 中被捕获。 我把测试用例放在这里: def "test getItemDetails() when there is an error mapping DB response to Response TO for any item"() { def itemEntity1 = new DiscontinuedItemsEntity("52782904", null, null, UUID.randomUUID()) def itemEntity2 = new DiscontinuedItemsEntity("52778904", null, null, UUID.randomUUID()) def requestBody = new DiscontinuedItemsTcinRequest(["52782904", "52782904"]) when: def response = itemsStatusService.getItemDetails(requestBody).block() def responseList = response.itemsList then: 1 * discontinuedItemsRepository.findDiscontinuedItems(*_) >> Mono.just(itemEntity1) 1 * discontinuedItemsRepository.findDiscontinuedItems(*_) >> Mono.just(itemEntity2) responseList.size() == 1 responseList.get(0).itemId == itemEntity1.itemId } 我需要这个测试用例通过,但是异常被抛出并且没有被处理,因为它没有被 onErrorResume 捕获
如何理解Reactor中的Flux.sampleTimeout方法?
此方法的 2 个重载: public final Flux sampleTimeout(Function> throttlerFactory) 和 公开决赛 Flux
Project Reactor:retryWhen() 的超时未按预期工作
下面的代码使用project reactor来模拟一个简单的交易,我们首先使用newTrx()来启动一个trx,然后调用getTrxResult()来获取trx的结果。 请注意,我们将重试 doGet() s ...
Flux with repeatWhen 使用 onCancel 永远不会终止
我想用我的代码做什么 我想构建一个无限的 Flux 来检查条件(功能标志),然后根据条件是否为真继续处理请求。 T...
我是反应式编程的新手,我正在通过 micronaut 框架和 kotlin 使用反应堆。我正在尝试了解响应式编程的优势以及我们如何使用 Map 和 F 实现它...
我是响应式编程的新手,想了解一些基础知识。 @测试 公共无效 testMonoThen() { Mono fromRunnable = Mono.fromRunnable(() -> log.info("
java中flux.subscribe如何实现wait/notify
我刚开始使用 java 进行函数式编程,遇到了一些困难。 我正在编写一种方法来建立与数据库的反应性会话并将通量对象返回给调用者。 来电...
如何用spring cloud gateway拦截websocket消息
我在处理 websocket 连接的聊天服务前面运行云网关服务。 我们有没有一些方便的方法来记录套接字客户端和聊天服务之间发生的交互...
我们正在尝试计算 Flux 上的不同事物。我们目前使用 AtomicIntegers 来进行计数,但是一旦 Flux 完成,我们如何才能获得这些值呢?我试过了。然后,.collectList().flatMap ...
我正在尝试使用 Flux 来处理数据库中的记录并将它们传递给 Kinesis 流——这非常有效,但如果我的 Kinesis 客户得到“Ba ...
public Mono saveAccount(Mono requestMono){ var reqMono = requestMono.map(AccountSaveRequest::convertEntity); 返回 reqMono.flatMap(req ->
我想在带有 WebClient 的 WebFlux 中的 Mono 上实现条件重放。情况如下: 我从外部服务调用一个铭文 api,它进行处理然后保存 ...