避免 Flux<>Input 的函数方法中嵌套 Flux<Mono<>>

问题描述 投票:0回答:1
我正在尝试 Spring Cloud Kafka Reactive 和 Reactive Mongo。我对 Reactor 没有经验。我最终遇到了嵌套 Flux

> 的情况,如下所述

以下是方法和片段:

Kafka 消息通过 Spring Cloud Streams 绑定到函数方法。输入和输出是 Flux

public Function<Flux<Employee>, Flux<Message<Employee>> enrich() { return flux-> ... }
我有经典的 Spring MongoDB 存储库类和查询方法

public interface EmployeeRepository extends ReactiveMongoRepository<Employee, String>{ Mono<Employee> findOneById(String id); }
我需要从 Flux 传入的 Kafka 消息中提取 id,并且需要调用 MongoDB 存储库方法。提取 id 后,我总是会从数据库调用中得到嵌套的 Mono。尝试了多种方法。如果我使用 map 或 zipwith,我无法超越 Mono 或 Flux in Flux。

public Function<Flux<Employee>, Flux<Message<Employee>> enrich(){ return flux-> flux.map(msg -> MessageBuilder.withPayload(findByOneId(msg.getId())).build()) .... }
有没有办法使用 zipWith 或其他方式/方法,同时从 Flux 中提取 Id 并将 Id 传递给数据库调用,这样我就可以避免在 Flux 中使用 Mono?

spring-webflux reactive-programming project-reactor spring-cloud-function
1个回答
0
投票
flatMap 允许并发处理 Flux 中的多个元素, 使用 flatMap 可以防止嵌套 Flux 或 Mono 结构。

public Function<Flux<Employee>, Flux<Message<Employee>>> enrich(EmployeeRepository employeeRepository) { return flux -> flux.flatMap(employee -> employeeRepository.findOneById(employee.getId()) .map(enrichedEmployee -> MessageBuilder.withPayload(enrichedEmployee).build()) ); }
    
© www.soinside.com 2019 - 2024. All rights reserved.