场景是我们需要配置 maxClient,如果达到它,我们必须关闭连接(如果它大于 maxClient)。为此,我们在列表中添加connectionId,如果列表大于最大客户端,则我们关闭TcpNioConnection。但当我们关闭时,它会抛出以下异常。
java.nio.channels.ClosedChannelException:null 在 java.base/java.nio.channels.spi.AbstractSelectableChannel.register(AbstractSelectableChannel.java:222) 在org.springframework.integration.ip.tcp.connection.TcpNioServerConnectionFactory.createConnectionForAcceptedChannel(TcpNioServerConnectionFactory.java:260) 在org.springframework.integration.ip.tcp.connection.TcpNioServerConnectionFactory.doAccept(TcpNioServerConnectionFactory.java:229) 在org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory.keyAcceptable(AbstractConnectionFactory.java:776) 在 org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory.handleKey(AbstractConnectionFactory.java:725) 在org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory.processNioSelections(AbstractConnectionFactory.java:672) 在 org.springframework.integration.ip.tcp.connection.TcpNioServerConnectionFactory.doSelect(TcpNioServerConnectionFactory.java:194) 在org.springframework.integration.ip.tcp.connection.TcpNioServerConnectionFactory.run(TcpNioServerConnectionFactory.java:155) 在 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) 在 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 在 java.base/java.lang.Thread.run(Thread.java:833)
下面是代码
private final Logger log = LoggerFactory.getLogger(TcpConnectionApplicationListener.class);
List<String> connectionIdList = new ArrayList<String>();
@Override
public void onApplicationEvent(TcpConnectionEvent event) {
String connectionId = event.getConnectionId();
String ipAddressPort = null;
if(Objects.nonNull(connectionId)) {
ipAddressPort = connectionId.substring(0,connectionId.lastIndexOf(":"));
ipAddressPort = ipAddressPort.substring(0, ipAddressPort.lastIndexOf(":"));
}
log.info("ipAddressPort >>> "+ipAddressPort);
if (event.getSource() instanceof TcpNioConnection) {
log.info("inside connection factory >>> ");
TcpNioConnection nioConnection = (TcpNioConnection) event.getSource();
if(connectionIdList.size() > 1) {
nioConnection.close();
log.info("isClosed >>> "+ nioConnection.getSocketInfo().isClosed());
return;
}
}
if (event instanceof TcpConnectionOpenEvent) {
connectionIdList.add(event.getConnectionId());
log.info("TcpConnectionOpenEvent connection id = {}",connectionIdList.size());
log.info("TCP Open connection event with id={}", event.getConnectionId());
} else if (event instanceof TcpConnectionCloseEvent) {
connectionIdList.remove(event.getConnectionId());
log.info("TcpConnectionCloseEvent connection id = {}",connectionIdList.size());
log.info("TCP Close connection event with id={}", event.getConnectionId());
}
}
下面是集成流程的代码
IntegrationFlows.from(Tcp.inboundGateway(Tcp.nioServer(port)
.soReceiveBufferSize(512)
.deserializer(new MessageDeserializer())
.serializer(new MessageSerializer())
))
.log()
.transform(Transformers.objectToString())
.handle("outboundService", "processAndSendMessage")
.get();
}
您能否让我知道如何解决此问题,或者是否有其他方法可以在 Tcp.nioServer 中配置最大客户端数,以便仅存在许多连接。
我建议不要关闭连接,而是抛出异常。 这样
TcpNioServerConnectionFactory
中的逻辑就会是这样的:
private TcpNioConnection createTcpNioConnection(SocketChannel socketChannel) {
try {
TcpNioConnection connection = this.tcpNioConnectionSupport.createNewConnection(socketChannel, true,
isLookupHost(), getApplicationEventPublisher(), getComponentName());
connection.setUsingDirectBuffers(this.usingDirectBuffers);
TcpConnectionSupport wrappedConnection = wrapConnection(connection);
if (!wrappedConnection.equals(connection)) {
connection.setSenders(getSenders());
}
initializeConnection(wrappedConnection, socketChannel.socket());
wrappedConnection.publishConnectionOpenEvent();
return connection;
}
catch (Exception ex) {
logger.error(ex, "Failed to establish new incoming connection");
return null;
}
}
您也可以通过自定义
TcpNioConnectionSupport
而不是事件侦听器来执行此操作。