> 的情况,如下所述
Kafka 消息通过 Spring Cloud Streams 绑定到函数方法。输入和输出是 Flux
public Function<Flux<Employee>, Flux<Message<Employee>> enrich() {
return flux-> ...
}
我有经典的 Spring MongoDB 存储库类和查询方法
public interface EmployeeRepository extends ReactiveMongoRepository<Employee, String>{
Mono<Employee> findOneById(String id);
}
我需要从 Flux 传入的 Kafka 消息中提取 id,并且需要调用 MongoDB 存储库方法。提取 id 后,我总是会从数据库调用中得到嵌套的 Mono。尝试了多种方法。如果我使用 map 或 zipwith,我无法超越 Mono 或 Flux in Flux。
public Function<Flux<Employee>, Flux<Message<Employee>> enrich(){
return flux-> flux.map(msg -> MessageBuilder.withPayload(findByOneId(msg.getId())).build()) ....
}
有没有办法使用 zipWith 或其他方式/方法,同时从 Flux 中提取 Id 并将 Id 传递给数据库调用,这样我就可以避免在 Flux 中使用 Mono?
public Function<Flux<Employee>, Flux<Message<Employee>>> enrich(EmployeeRepository employeeRepository) {
return flux -> flux.flatMap(employee ->
employeeRepository.findOneById(employee.getId())
.map(enrichedEmployee -> MessageBuilder.withPayload(enrichedEmployee).build())
);
}