Socket 服务器多线程 Java NIO

问题描述 投票:0回答:1

我需要帮助来创建一个可以异步处理和处理请求的多线程 java 套接字服务器。

我现在所做的是我创建了java NIO Socket服务器和客户端,客户端将同时发送请求,以测试服务器上的异步机制,第二个请求线程将被线程睡眠中断。

预期结果:第三到第五个请求将被异步处理,无需等待第二个请求完成。

当前结果:在第二个请求线程睡眠完成之前,未处理第三到第五个请求。

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;

public class NIOServer {
    private static Selector selector = null;

    public static void main(String[] args)
    {

        try {
            selector = Selector.open();
            // We have to set connection host,port and
            // non-blocking mode
            ServerSocketChannel serverSocketChannel
                    = ServerSocketChannel.open();
            ServerSocket serverSocket
                    = serverSocketChannel.socket();
            serverSocket.bind(
                    new InetSocketAddress("localhost", 8089));
            serverSocketChannel.configureBlocking(false);
            int ops = serverSocketChannel.validOps();
            serverSocketChannel.register(selector, ops,
                    null);
            while (true) {
                selector.select();
                Set<SelectionKey> selectedKeys
                        = selector.selectedKeys();
                Iterator<SelectionKey> i
                        = selectedKeys.iterator();

                while (i.hasNext()) {
                    SelectionKey key = i.next();

                    if (key.isAcceptable()) {
                        // New client has been accepted
                        handleAccept(serverSocketChannel,
                                key);
                    }
                    else if (key.isReadable()) {
                        // We can run non-blocking operation
                        // READ on our client
                        handleRead(key);
                    }
                    i.remove();
                }
            }
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void
    handleAccept(ServerSocketChannel mySocket,
                 SelectionKey key) throws IOException
    {

        System.out.println("Connection Accepted..");

        // Accept the connection and set non-blocking mode
        SocketChannel client = mySocket.accept();
        client.configureBlocking(false);

        // Register that client is reading this channel
        client.register(selector, SelectionKey.OP_READ);
    }

    private static void handleRead(SelectionKey key)
            throws IOException
    {
//        System.out.println("Reading client's message.");

        // create a ServerSocketChannel to read the request
        SocketChannel client = (SocketChannel)key.channel();

        // Create buffer to read data
        ByteBuffer buffer = ByteBuffer.allocate(1024);

        client.read(buffer);

        // Parse data from buffer to String
        String data = new String(buffer.array()).trim();
        if (data.length() > 0) {
            System.out.println("Received message: " + data);

            System.out.println(new Date());

            if (data.equalsIgnoreCase(
                    "Hello, server! Request 2")) {
                try {
                    Thread.sleep(30000);
                    System.out.println("Sleep is executed");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

            if (data.equalsIgnoreCase(
                    "Testing5")) {
                client.close();
                System.out.println("Connection closed...");
            }
        }
    }
}

你们对如何处理这个机制有什么建议吗? 谢谢你

java multithreading tcp nio
1个回答
0
投票

您遇到的问题是由于 handleRead 方法是在管理 NIO 选择器的同一线程上执行的。当一个请求正在处理时,选择器在当前请求完成之前无法处理其他请求。这就是为什么您的第三个到第五个请求会被阻止,直到第二个请求完成休眠为止。

要解决此问题,您需要在单独的线程中处理请求。这将允许在处理特定请求时异步处理其他请求。您可以通过使用线程池在单独的线程中处理每个请求来实现这一点。

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NIOServer {
    private static Selector selector = null;
    private static ExecutorService threadPool = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {

        try {
            selector = Selector.open();
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            ServerSocket serverSocket = serverSocketChannel.socket();
            serverSocket.bind(new InetSocketAddress("localhost", 8089));
            serverSocketChannel.configureBlocking(false);
            int ops = serverSocketChannel.validOps();
            serverSocketChannel.register(selector, ops, null);

            while (true) {
                selector.select();
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> i = selectedKeys.iterator();

                while (i.hasNext()) {
                    SelectionKey key = i.next();

                    if (key.isAcceptable()) {
                        handleAccept(serverSocketChannel, key);
                    } else if (key.isReadable()) {
                        // Submit the task to the thread pool for asynchronous processing
                        threadPool.submit(() -> {
                            try {
                                handleRead(key);
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        });
                    }
                    i.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (threadPool != null && !threadPool.isShutdown()) {
                threadPool.shutdown();
            }
        }
    }

    private static void handleAccept(ServerSocketChannel mySocket, SelectionKey key) throws IOException {
        System.out.println("Connection Accepted..");

        SocketChannel client = mySocket.accept();
        client.configureBlocking(false);

        client.register(selector, SelectionKey.OP_READ);
    }

    private static void handleRead(SelectionKey key) throws IOException {
        SocketChannel client = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);

        client.read(buffer);

        String data = new String(buffer.array()).trim();
        if (data.length() > 0) {
            System.out.println("Received message: " + data);
            System.out.println(new Date());

            if (data.equalsIgnoreCase("Hello, server! Request 2")) {
                try {
                    Thread.sleep(30000);
                    System.out.println("Sleep is executed");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

            if (data.equalsIgnoreCase("Testing5")) {
                client.close();
                System.out.println("Connection closed...");
            }
        }
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.