Netty TCP 服务器在自定义异常时保持连接打开

问题描述 投票:0回答:1

我使用项目reactor netty编写了一个TCP服务器,它从客户端接收字节数组请求消息,对其进行处理,然后将字节数组响应消息返回给客户端。服务器/客户端之间的连接大部分都是长期存在的,除非发生某种致命错误。

我有一个用例,如果我的服务器在处理客户端请求时抛出某些异常,我需要保持连接打开,但不将任何数据发送回客户端。目前,当抛出这些异常时,Netty 会自动关闭连接。我尝试在自定义处理程序中添加异常捕获(...),但它似乎永远不会到达它。将 ChannelOption.AUTO_CLOSE 标志设置为 false 也不起作用,因为这似乎只适用于 write() 期间抛出的异常。在我的用例中,我们永远不会将任何数据写回客户端。

下面是我为确保激发 exceptionCaught() 方法所做的解决方法,以便我可以适当地处理异常并保持连接打开:

    DisposableServer someTcpServer = tcpServer
            .host("12.123.456.789")
            .port(12345)
            .wiretap(true)
            .doOnBind(server -> log.info("Starting listener..."))
            .doOnBound(server -> log.info("Listener started on host: {}, port: {}", server.host(), server.port()))
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childOption(ChannelOption.AUTO_CLOSE,false)
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            .doOnConnection(connection -> {
                InetSocketAddress socketAddress = (InetSocketAddress) connection.channel().remoteAddress();
                log.info("Client has connected. Host: {}, Port: {}",
                            socketAddress.getAddress().getHostAddress(), socketAddress.getPort());
            })
            .doOnChannelInit((observer, channel, remoteAddress) ->
                    channel.pipeline()
                            .addFirst(new TcpServerHandler())
            )
            .handle((inbound, outbound) ->
                    inbound
                            .receive()
                            .asByteArray()
                            .flatMap(req -> processRequest(req))
                            .flatMap(rsp -> outbound.sendByteArray(Flux.just(rsp))
                            //any exceptions thrown during the above processRequest()...
                            .onErrorResume(throwable -> {
                                //...will get handled here
                                inbound.withConnection(connection -> connection.channel().pipeline().fireExceptionCaught(throwable));
                                return Mono.empty();
                            })
            ).bindNow();
    someTcpServer.onDispose().block();
}

然后在我的自定义 TCPServerHandler 类中,我处理 exceptionCaught(...) 中的自定义异常,如下所示。

@Slf4j
public class TcpServerHandler extends ChannelDuplexHandler {

    private final AtomicLong startTime = new AtomicLong(0L);
    private final AtomicLong endTime = new AtomicLong(0L);

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof BusinessLogicException1 ||
                cause instanceof BusinessLogicException2) {     
            endTime.set(System.nanoTime());
            log.info("Took {} ms to process", Duration.ofNanos(endTime.get() - startTime.get()).toMillis()))
            //for these specific exceptions, keep the connection open
            ctx.fireChannelActive();
        } else {
            //When an exception that wasn't one of the above^ was thrown
            //I had super.exceptionCaught(...) here and this was causing my
            //exceptionCaught(...) method to be called twice, so I removed the
            //call to super.exceptionCaught(...) and just don't do anything.
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        startTime.set(System.nanoTime());
        ctx.fireChannelRead(msg);
    }
}

这似乎有效,即使没有显式调用 super.exceptionCaught(...),我也可以看到,当抛出除我指定的异常之外的任何异常时,Netty 都会正确关闭连接。只是想知道这是否是正确的方法,或者是否有更好的方法来实现这一目标,因为我对 Netty 还很陌生。

java tcp netty project-reactor reactor-netty
1个回答
0
投票

根据我在reactor-netty中的经验,每次发送和接收时,连接将关闭,下次打开新连接 对于这个问题,我尝试使用 Netty 本身而不是使用 Reactor Netty,并且它在不关闭连接的情况下工作正常

© www.soinside.com 2019 - 2024. All rights reserved.