使用线程池处理时无法读取对象

问题描述 投票:0回答:1

我必须设计一个具有 3 个主线程池的服务器,以便读取数据、处理数据并将结果输出到客户端。我这样编码,但它总是注意到这种错误:

java.io.StreamCorruptedException:无效的流标头:00000000 在 java.base/java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:958) 在 java.base/java.io.ObjectInputStream.(ObjectInputStream.java:392) 在 demo.NioServer.read(NioServer.java:99) 在 java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) 在 java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 在 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1136) 在 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 在 java.base/java.lang.Thread.run(Thread.java:840)

我不知道为什么,这是我的代码:

package demo;


import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class NioServer {
    private ServerSocketChannel server;
    private Selector selector;
    private ByteBuffer data;
    private ByteArrayInputStream bais;
    private ByteArrayOutputStream baos;
    private ObjectInputStream in;
    private ObjectOutputStream out;
    private final ExecutorService input = Executors.newCachedThreadPool();
    private final ExecutorService processor = Executors.newFixedThreadPool(5);
    private final ExecutorService output = Executors.newCachedThreadPool();
    private final BlockingQueue<SelectionKey> keyManager = new LinkedBlockingQueue<>(5);
    private final BlockingQueue<SelectionKey> destination = new LinkedBlockingQueue<>(5);
    private final BlockingQueue<Request> requests = new LinkedBlockingQueue<>(5);
    private final BlockingQueue<Response> responses = new LinkedBlockingQueue<>(5);
    public NioServer() throws IOException {
        server = null;
        selector = null;
    }

    private void init() throws IOException {
        server = ServerSocketChannel.open();
        selector = Selector.open();
        server.socket().bind(new InetSocketAddress("127.0.0.1", 4999));
        server.configureBlocking(false);
        server.register(selector, SelectionKey.OP_ACCEPT);
    }

    public void run() throws IOException {
        init();
        try {
            while (true) {
                selector.select();
                for(SelectionKey key : selector.selectedKeys()) {
                    if(key.isAcceptable()) {
                        accept(key);
                    } else if(key.isReadable()) {
                        keyManager.put(key);
                        input.submit(this::read);
                        processor.submit(this::process);
                        output.submit(this::write);
                    }
                }
                selector.selectedKeys().clear();
            }
        } catch(IOException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            for(SelectionKey key : selector.selectedKeys()) {
                key.channel().close();
            }
        }
    }

    private void accept(SelectionKey key) throws IOException {
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.register(this.selector, SelectionKey.OP_READ);
    }

    private void read() {
        try {
            SelectionKey key = keyManager.take();
            SocketChannel client = (SocketChannel)key.channel();
            data = ByteBuffer.allocate(1024);
            int numRead = -1;
            try {
                numRead = client.read(data);
            } catch (IOException e) {
                key.cancel();
                client.close();
                return;
            }
            if(numRead == -1) {
                client.close();
                key.cancel();
                return;
            }
            data.flip();
            bais = new ByteArrayInputStream(data.array());
            in = new ObjectInputStream(bais);
            requests.put((Request)in.readObject());
            destination.put(key);
        } catch (IOException | InterruptedException | ClassNotFoundException e) {
            e.printStackTrace();
        }
    }

    private void process() {
        try {
            Request request = requests.take();
            responses.put(new Response(request.getInfo() + " enrolled"));
        } catch(InterruptedException e) {
            System.out.println(e.toString());
        }
    }

    private void write() {
        try {
            Response response = responses.take();
            SelectionKey key = destination.take();
            SocketChannel client = (SocketChannel)key.channel();
            baos = new ByteArrayOutputStream();
            out = new ObjectOutputStream(baos);
            out.writeObject(response);
            out.flush();
            client.write(ByteBuffer.wrap(baos.toByteArray()));
            client.close();
        } catch (InterruptedException | IOException e) {
            System.out.println(e.toString());
        }
    }

}




问题出在这一行:in = new ObjectInputStream(bais); 我尝试修复这个问题,并意识到如果我定义大小为 n 的固定线程池,在给出正确结果 n 次后,就会发生这个错误。有时,错误出现得更早。

有人可以向我解释为什么会发生这种情况吗?如果可以的话,你能给我建议一些解决方案吗? 非常感谢!

java multithreading threadpool selector
1个回答
0
投票

哦,孩子,我们从哪里开始。

我希望您这样做是出于学习目的,因为创建正确的 NIO 服务器非常困难。所以最好利用现有的。尝试 netti/jetty,它是众所周知的库。

现在让我们剖析代码中的问题:

  1. 不要共享
    ByteBuffer data
    变量(无论如何你都做错了,你以非线程安全的方式访问它)。
  2. TCP是流协议。无法保证
    client.read(data)
    阅读整封消息。它可以读取其中的一部分。可以读两本。它可以读取第一条消息的第二部分和第二条消息的第一部分。
  3. 您需要在开始阅读之前清除
    SelectionKey.OP_READ
    ,并在完成后将其恢复。否则两个线程可能会从同一通道读取数据。
  4. 你不能只是
    out.writeObject(response)
    ,你需要等到频道准备好。

这是我第一眼看到的,可能还有更多。

有关问题的完整列表,您应该在 CodeReview

中发布代码

如果您对工作示例感兴趣,您可以尝试查看 netty 源代码,但它非常复杂。

我有一个带有简单版本的NIO连接器的宠物项目,但它远非完美。例如,它有上面列表中的第三个问题。

© www.soinside.com 2019 - 2024. All rights reserved.