我使用项目reactor netty编写了一个TCP服务器,它从客户端接收字节数组请求消息,对其进行处理,然后将字节数组响应消息返回给客户端。服务器/客户端之间的连接大部分都是长期存在的,除非发生某种致命错误。
我有一个用例,如果我的服务器在处理客户端请求时抛出某些异常,我需要保持连接打开,但不将任何数据发送回客户端。目前,当抛出这些异常时,Netty 会自动关闭连接。我尝试在自定义处理程序中添加异常捕获(...),但它似乎永远不会到达它。将 ChannelOption.AUTO_CLOSE 标志设置为 false 也不起作用,因为这似乎只适用于 write() 期间抛出的异常。在我的用例中,我们永远不会将任何数据写回客户端。
下面是我为确保激发 exceptionCaught() 方法所做的解决方法,以便我可以适当地处理异常并保持连接打开:
DisposableServer someTcpServer = tcpServer
.host("12.123.456.789")
.port(12345)
.wiretap(true)
.doOnBind(server -> log.info("Starting listener..."))
.doOnBound(server -> log.info("Listener started on host: {}, port: {}", server.host(), server.port()))
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.AUTO_CLOSE,false)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.doOnConnection(connection -> {
InetSocketAddress socketAddress = (InetSocketAddress) connection.channel().remoteAddress();
log.info("Client has connected. Host: {}, Port: {}",
socketAddress.getAddress().getHostAddress(), socketAddress.getPort());
})
.doOnChannelInit((observer, channel, remoteAddress) ->
channel.pipeline()
.addFirst(new TcpServerHandler())
)
.handle((inbound, outbound) ->
inbound
.receive()
.asByteArray()
.flatMap(req -> processRequest(req))
.flatMap(rsp -> outbound.sendByteArray(Flux.just(rsp))
//any exceptions thrown during the above processRequest()...
.onErrorResume(throwable -> {
//...will get handled here
inbound.withConnection(connection -> connection.channel().pipeline().fireExceptionCaught(throwable));
return Mono.empty();
})
).bindNow();
someTcpServer.onDispose().block();
}
然后在我的自定义 TCPServerHandler 类中,我处理 exceptionCaught(...) 中的自定义异常,如下所示。
@Slf4j
public class TcpServerHandler extends ChannelDuplexHandler {
private final AtomicLong startTime = new AtomicLong(0L);
private final AtomicLong endTime = new AtomicLong(0L);
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof BusinessLogicException1 ||
cause instanceof BusinessLogicException2) {
endTime.set(System.nanoTime());
log.info("Took {} ms to process", Duration.ofNanos(endTime.get() - startTime.get()).toMillis()))
//for these specific exceptions, keep the connection open
ctx.fireChannelActive();
} else {
//When an exception that wasn't one of the above^ was thrown
//I had super.exceptionCaught(...) here and this was causing my
//exceptionCaught(...) method to be called twice, so I removed the
//call to super.exceptionCaught(...) and just don't do anything.
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
startTime.set(System.nanoTime());
ctx.fireChannelRead(msg);
}
}
这似乎有效,即使没有显式调用 super.exceptionCaught(...),我也可以看到,当抛出除我指定的异常之外的任何异常时,Netty 都会正确关闭连接。只是想知道这是否是正确的方法,或者是否有更好的方法来实现这一目标,因为我对 Netty 还很陌生。
根据我在reactor-netty中的经验,每次发送和接收时,连接将关闭,下次打开新连接 对于这个问题,我尝试使用 Netty 本身而不是使用 Reactor Netty,并且它在不关闭连接的情况下工作正常