我有这样的代码逻辑,它使用 R-Socket 将消息从一台服务器发送到另一台服务器,整个代码是使用反应器(Flux/Mono)编写的。有时我注意到一些消息没有被传递,但是当尝试调试时,我发现在某个步骤之后日志丢失了
logger.info("starting publish to other servers");
Mono<List<Map.Entry<String, GroupedFlux<String,OtherServers>>>> ipsOfOtherServer = otherServers
.filter(//some filter condition//)
.groupBy(OtherServers::getServerIp)
.collectMap(GroupedFlux::key)
.flatMapMany(map -> Flux.fromIterable(map.entrySet()))
.collectList()
.doOnSuccess(list -> {
if(list.isEmpty()) {
logger.info("list is empty");
}
});
ipsOfOtherServer
.flatMapMany(Flux::fromIterable)
.concatMap(//logic to send message to other server//)
.block()
在调试时,我注意到第一个日志,但之后什么也没有。如果列表为空,我期待第二个日志,但也没有出现。知道发生了什么或如何调试。
Reactor 链可能无法按预期执行,可能是由于异步行为或 block() 方法的使用方式问题。
在每个阶段添加详细的日志记录,并确定执行停止的位置以及第一个之后的日志丢失的原因。
例如,添加
.doOnError()
来捕获并记录任何错误。同样,添加 .doOnSubscribe()
、.doOnNext()
和 .doOnComplete()
来监控执行流程。如果 otherServers.filter()
在到达 groupBy
之前产生空结果,则可能不会执行步骤(包括 collectMap
、flatMapMany
和 doOnSuccess
)。
更新代码:
logger.info("Starting publish to other servers");
Mono<List<Map.Entry<String, GroupedFlux<String, OtherServers>>>> ipsOfOtherServer = otherServers
.filter(// some filter condition //)
.doOnNext(server -> logger.info("Filtered server: " + server))
.groupBy(OtherServers::getServerIp)
.collectMap(GroupedFlux::key)
.flatMapMany(map -> Flux.fromIterable(map.entrySet()))
.collectList()
.doOnSuccess(list -> {
if (list.isEmpty()) {
logger.info("List is empty after grouping by server IP");
} else {
logger.info("Grouped servers: " + list);
}
})
.doOnError(error -> logger.error("Error occurred while collecting server IPs: ", error));
ipsOfOtherServer
.flatMapMany(Flux::fromIterable)
.concatMap(// logic to send message to other server //)
.doOnComplete(() -> logger.info("Completed sending messages to all servers"))
.doOnError(error -> logger.error("Error occurred during message sending: ", error))
.block();