Netty客户端自动重连

问题描述 投票:0回答:1

我有一个 Netty 客户端,其连接&&自动重新连接按以下方式实现。该代码多年来一直运行良好。

引导程序初始化

Bootstrap bootstrap = new Bootstrap;

bootstrap.group(new NioEventLoopGroup(NUM_OF_WORKER_THREADS, new NamedThreadFactory(client.hostname+"-%d")))
bootstrap.channel(NioSocketChannel.class)
    .handler(new MyChannelInitializer(sslContext, client))
    .option(ChannelOption.SO_KEEPALIVE, true)
    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);

实际连接逻辑

public void connect() { 
    try {
        ChannelFuture cf = bootstrap.connect(hostname, port).sync().await();
    } catch (Exception e) {
        if (group != null && !group.isShutdown()) {
            logger.error("Shutting down Event loop group: {}, host: {}" , group, hostname);
            group.shutdownGracefully();
        }
        throw new Exception("Connection failed to " + hostname, e);
    }
}

重新连接

public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
    super.channelUnregistered(ctx);
    ctx.channel().close();
    //Connect again
    connect();
}

重新连接触发器挂接到通道未注册回调中。最近观察到一个奇怪的问题,连接进入 close_wait 状态并且线程永远卡住。为什么连接超时(3秒)没有触发?

这里是具体回调处理的线程转储。

我怀疑这里的根本原因是由于在回调处理程序线程中调用阻塞方法(同步和等待)。有没有僵局?

"server-callback-worker-1" #214 prio=5 os_prio=0 cpu=1322.64ms elapsed=83237.37s tid=0x00007f1b0c005800 nid=0x1a9e89 in Object.wait()  [0x00007f1b5c1fd000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait([email protected]/Native Method)
    - waiting on <no object reference available>
    at java.lang.Object.wait([email protected]/Object.java:328)
    at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:254)
    - waiting to re-lock in wait() <0x0000000461b0aba8> (a io.netty.bootstrap.AbstractBootstrap$PendingRegistrationPromise)
    at io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:131)
    at io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:30)
    at io.netty.util.concurrent.DefaultPromise.sync(DefaultPromise.java:405)
    at io.netty.channel.DefaultChannelPromise.sync(DefaultChannelPromise.java:119)
    at io.netty.channel.DefaultChannelPromise.sync(DefaultChannelPromise.java:30)
    at com.example.Client.connect(Client.java:113)
    at com.example.Client.reconnect(Client.java:141)
    at com.example.handler.ClientHandler.channelUnregistered(ClientHandler.java:72)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:219)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:195)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:188)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelUnregistered(DefaultChannelPipeline.java:1388)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:215)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:195)
    at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:821)
    at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:821)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at java.lang.Thread.run([email protected]/Thread.java:829)

我尝试使用“专用执行器”重构重新连接逻辑,而不是在回调处理程序线程中进行处理。这是正确的方法吗?我们需要使用通道EventLoop来安排重新连接吗 @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { super.channelUnregistered(ctx); ctx.channel().close(); client.setConnected(false); scheduledExecutorService.schedule(this::reconnectAttempt, 3, TimeUnit.SECONDS); }

	
netty netty4
1个回答
0
投票
await()

sync()
。这导致您的案例形成死锁。
不要调用await,而是向引导帽返回的承诺添加一个侦听器,其中包含错误处理代码。

© www.soinside.com 2019 - 2024. All rights reserved.