我需要帮助来创建一个可以异步处理和处理请求的多线程 java 套接字服务器。
我现在所做的是我创建了java NIO Socket服务器和客户端,客户端将同时发送请求,以测试服务器上的异步机制,第二个请求线程将被线程睡眠中断。
预期结果:第三到第五个请求将被异步处理,无需等待第二个请求完成。
当前结果:在第二个请求线程睡眠完成之前,未处理第三到第五个请求。
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
public class NIOServer {
private static Selector selector = null;
public static void main(String[] args)
{
try {
selector = Selector.open();
// We have to set connection host,port and
// non-blocking mode
ServerSocketChannel serverSocketChannel
= ServerSocketChannel.open();
ServerSocket serverSocket
= serverSocketChannel.socket();
serverSocket.bind(
new InetSocketAddress("localhost", 8089));
serverSocketChannel.configureBlocking(false);
int ops = serverSocketChannel.validOps();
serverSocketChannel.register(selector, ops,
null);
while (true) {
selector.select();
Set<SelectionKey> selectedKeys
= selector.selectedKeys();
Iterator<SelectionKey> i
= selectedKeys.iterator();
while (i.hasNext()) {
SelectionKey key = i.next();
if (key.isAcceptable()) {
// New client has been accepted
handleAccept(serverSocketChannel,
key);
}
else if (key.isReadable()) {
// We can run non-blocking operation
// READ on our client
handleRead(key);
}
i.remove();
}
}
}
catch (IOException e) {
e.printStackTrace();
}
}
private static void
handleAccept(ServerSocketChannel mySocket,
SelectionKey key) throws IOException
{
System.out.println("Connection Accepted..");
// Accept the connection and set non-blocking mode
SocketChannel client = mySocket.accept();
client.configureBlocking(false);
// Register that client is reading this channel
client.register(selector, SelectionKey.OP_READ);
}
private static void handleRead(SelectionKey key)
throws IOException
{
// System.out.println("Reading client's message.");
// create a ServerSocketChannel to read the request
SocketChannel client = (SocketChannel)key.channel();
// Create buffer to read data
ByteBuffer buffer = ByteBuffer.allocate(1024);
client.read(buffer);
// Parse data from buffer to String
String data = new String(buffer.array()).trim();
if (data.length() > 0) {
System.out.println("Received message: " + data);
System.out.println(new Date());
if (data.equalsIgnoreCase(
"Hello, server! Request 2")) {
try {
Thread.sleep(30000);
System.out.println("Sleep is executed");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
if (data.equalsIgnoreCase(
"Testing5")) {
client.close();
System.out.println("Connection closed...");
}
}
}
}
你们对如何处理这个机制有什么建议吗? 谢谢你
您遇到的问题是由于 handleRead 方法是在管理 NIO 选择器的同一线程上执行的。当一个请求正在处理时,选择器在当前请求完成之前无法处理其他请求。这就是为什么您的第三个到第五个请求会被阻止,直到第二个请求完成休眠为止。
要解决此问题,您需要在单独的线程中处理请求。这将允许在处理特定请求时异步处理其他请求。您可以通过使用线程池在单独的线程中处理每个请求来实现这一点。
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class NIOServer {
private static Selector selector = null;
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
try {
selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress("localhost", 8089));
serverSocketChannel.configureBlocking(false);
int ops = serverSocketChannel.validOps();
serverSocketChannel.register(selector, ops, null);
while (true) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> i = selectedKeys.iterator();
while (i.hasNext()) {
SelectionKey key = i.next();
if (key.isAcceptable()) {
handleAccept(serverSocketChannel, key);
} else if (key.isReadable()) {
// Submit the task to the thread pool for asynchronous processing
threadPool.submit(() -> {
try {
handleRead(key);
} catch (IOException e) {
e.printStackTrace();
}
});
}
i.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (threadPool != null && !threadPool.isShutdown()) {
threadPool.shutdown();
}
}
}
private static void handleAccept(ServerSocketChannel mySocket, SelectionKey key) throws IOException {
System.out.println("Connection Accepted..");
SocketChannel client = mySocket.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
}
private static void handleRead(SelectionKey key) throws IOException {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
client.read(buffer);
String data = new String(buffer.array()).trim();
if (data.length() > 0) {
System.out.println("Received message: " + data);
System.out.println(new Date());
if (data.equalsIgnoreCase("Hello, server! Request 2")) {
try {
Thread.sleep(30000);
System.out.println("Sleep is executed");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
if (data.equalsIgnoreCase("Testing5")) {
client.close();
System.out.println("Connection closed...");
}
}
}
}