当预期日志丢失时如何调试以下通量代码?

问题描述 投票:0回答:1

我有这样的代码逻辑,它使用 R-Socket 将消息从一台服务器发送到另一台服务器,整个代码是使用反应器(Flux/Mono)编写的。有时我注意到一些消息没有被传递,但是当尝试调试时,我发现在某个步骤之后日志丢失了

logger.info("starting publish to other servers");
                Mono<List<Map.Entry<String, GroupedFlux<String,OtherServers>>>> ipsOfOtherServer = otherServers
                .filter(//some filter condition//)
                .groupBy(OtherServers::getServerIp)
                .collectMap(GroupedFlux::key)
                .flatMapMany(map -> Flux.fromIterable(map.entrySet()))
                .collectList()
                .doOnSuccess(list -> {
                    if(list.isEmpty()) {
                        logger.info("list is empty");
                    }
                });

                 ipsOfOtherServer
                 .flatMapMany(Flux::fromIterable)
                 .concatMap(//logic to send message to other server//)
                 .block()

在调试时,我注意到第一个日志,但之后什么也没有。如果列表为空,我期待第二个日志,但也没有出现。知道发生了什么或如何调试。

java project-reactor flux reactor-netty rsocket
1个回答
0
投票

Reactor 链可能无法按预期执行,可能是由于异步行为或 block() 方法的使用方式问题。

在每个阶段添加详细的日志记录,并确定执行停止的位置以及第一个之后的日志丢失的原因。

例如,添加

.doOnError()
来捕获并记录任何错误。同样,添加
.doOnSubscribe()
.doOnNext()
.doOnComplete()
来监控执行流程。如果
otherServers.filter()
在到达
groupBy
之前产生空结果,则可能不会执行步骤(包括
collectMap
flatMapMany
doOnSuccess
)。

更新代码:

logger.info("Starting publish to other servers");
        Mono<List<Map.Entry<String, GroupedFlux<String, OtherServers>>>> ipsOfOtherServer = otherServers
            .filter(// some filter condition //)
            .doOnNext(server -> logger.info("Filtered server: " + server))
            .groupBy(OtherServers::getServerIp)
            .collectMap(GroupedFlux::key)
            .flatMapMany(map -> Flux.fromIterable(map.entrySet()))
            .collectList()
            .doOnSuccess(list -> {
                if (list.isEmpty()) {
                    logger.info("List is empty after grouping by server IP");
                } else {
                    logger.info("Grouped servers: " + list);
                }
            })
            .doOnError(error -> logger.error("Error occurred while collecting server IPs: ", error));

        ipsOfOtherServer
            .flatMapMany(Flux::fromIterable)
            .concatMap(// logic to send message to other server //)
            .doOnComplete(() -> logger.info("Completed sending messages to all servers"))
            .doOnError(error -> logger.error("Error occurred during message sending: ", error))
            .block();
© www.soinside.com 2019 - 2024. All rights reserved.