在反应式映射内操作局部变量并仅返回最终结果

问题描述 投票:0回答:1

我正在开发逻辑来计算摘要详细信息并在执行一些数据库操作后返回它。原来是这样的

@Override
    public Mono<ResponseEntity<?>> generateSummary(Locale local) throws Exception {
        Summary summary = new Summary();

        return test1Repository.findBySession("test").map(session -> {

            return test2Repository.findAllById(session.getListOfObject()).map(data -> {

                if(condition){
                    **Logic to update the summary object
                }
                if(condition){
                    **Logic to update the summary object
                }
                return summary;
            });
        }).map(result -> ResponseEntity.ok(result));
    }
public class Summary {
    private AtomicInteger disconnectCount;
    private AtomicInteger connectCount;
}

结果,我根据

session.getListOfObject()
计数获得了 Summary 对象的列表。像这样的东西

[
    {
        "disconnectCount": 1,
        "connectCount": 1,
    },
    {
        "disconnectCount": 1,
        "connectCount": 1,
    },
    {
        "disconnectCount": 1,
        "connectCount": 1,
    }
]

但我想在完成所有操作后返回一个 Summary 对象。如何在 Java 反应式环境中实现这种行为?也许我没有正确返回,但我不确定

java spring-boot mono spring-webflux reactive-programming
1个回答
0
投票
  1. map操作将输入元素一一变换,并返回与输入一样多的元素。在您的情况下,您想要执行 reduction 操作,即使用 reducereduceWith 等。归约操作会将输入通量中的元素合并为单个输出信号/元素,即 Mono
  2. 您从 Mono 对象映射通量(即
    myMono.map(value -> myFlux().map(values -> ...))
    。这将导致
    Mono<Flux<MyValue>>
    结果。要获得 Mono,您应该使用 flatMap,它将您的嵌套发布者“扁平化”到外部发布者(您可以通过查看“monad”)的概念来了解有关此类操作的更多信息。现在,

这是一个基于您的代码片段的简单示例:

public record Summary(int disconnectCount, int connectCount) {
    Summary() { this(0, 0); }
    Summary merge(Summary other) { 
        return Summary(disconnectCount + other.disconnectCount, 
                       connectCount + other.connectCount);
    }
}
@Override
    public Mono<ResponseEntity<?>> generateSummary(Locale local) throws Exception {
        Summary summary = new Summary();

        return test1Repository.findBySession("test")
            .flatMap(session -> {
                test2Repository.findAllById(session.getListOfObject())
                    .map(data -> data.isConnection() ? new Summary(0, 1) : new Summary(1, 0))
                    .reduce(Summary::merge);
            })
            .map(result -> ResponseEntity.ok(result));
    }
© www.soinside.com 2019 - 2024. All rights reserved.