我有一个 Netty 客户端,其连接&&自动重新连接按以下方式实现。该代码多年来一直运行良好。
引导程序初始化
Bootstrap bootstrap = new Bootstrap;
bootstrap.group(new NioEventLoopGroup(NUM_OF_WORKER_THREADS, new NamedThreadFactory(client.hostname+"-%d")))
bootstrap.channel(NioSocketChannel.class)
.handler(new MyChannelInitializer(sslContext, client))
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
实际连接逻辑
public void connect() {
try {
ChannelFuture cf = bootstrap.connect(hostname, port).sync().await();
} catch (Exception e) {
if (group != null && !group.isShutdown()) {
logger.error("Shutting down Event loop group: {}, host: {}" , group, hostname);
group.shutdownGracefully();
}
throw new Exception("Connection failed to " + hostname, e);
}
}
重新连接
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
ctx.channel().close();
//Connect again
connect();
}
重新连接触发器挂接到通道未注册回调中。最近观察到一个奇怪的问题,连接进入 close_wait 状态并且线程永远卡住。为什么连接超时(3秒)没有触发?
这里是具体回调处理的线程转储。
我怀疑这里的根本原因是由于在回调处理程序线程中调用阻塞方法(同步和等待)。有没有僵局?
"server-callback-worker-1" #214 prio=5 os_prio=0 cpu=1322.64ms elapsed=83237.37s tid=0x00007f1b0c005800 nid=0x1a9e89 in Object.wait() [0x00007f1b5c1fd000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait([email protected]/Native Method)
- waiting on <no object reference available>
at java.lang.Object.wait([email protected]/Object.java:328)
at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:254)
- waiting to re-lock in wait() <0x0000000461b0aba8> (a io.netty.bootstrap.AbstractBootstrap$PendingRegistrationPromise)
at io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:131)
at io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:30)
at io.netty.util.concurrent.DefaultPromise.sync(DefaultPromise.java:405)
at io.netty.channel.DefaultChannelPromise.sync(DefaultChannelPromise.java:119)
at io.netty.channel.DefaultChannelPromise.sync(DefaultChannelPromise.java:30)
at com.example.Client.connect(Client.java:113)
at com.example.Client.reconnect(Client.java:141)
at com.example.handler.ClientHandler.channelUnregistered(ClientHandler.java:72)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:219)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:195)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:188)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelUnregistered(DefaultChannelPipeline.java:1388)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:215)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:195)
at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:821)
at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:821)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run([email protected]/Thread.java:829)
我尝试使用“专用执行器”重构重新连接逻辑,而不是在回调处理程序线程中进行处理。这是正确的方法吗?我们需要使用通道EventLoop来安排重新连接吗?
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
ctx.channel().close();
client.setConnected(false);
scheduledExecutorService.schedule(this::reconnectAttempt, 3, TimeUnit.SECONDS);
}
await()
或
sync()
。这导致您的案例形成死锁。不要调用await,而是向引导帽返回的承诺添加一个侦听器,其中包含错误处理代码。