我必须设计一个具有 3 个主线程池的服务器,以便读取数据、处理数据并将结果输出到客户端。我这样编码,但它总是注意到这种错误:
java.io.StreamCorruptedException:无效的流标头:00000000 在 java.base/java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:958) 在 java.base/java.io.ObjectInputStream.(ObjectInputStream.java:392) 在 demo.NioServer.read(NioServer.java:99) 在 java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) 在 java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 在 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1136) 在 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 在 java.base/java.lang.Thread.run(Thread.java:840)
我不知道为什么,这是我的代码:
package demo;
import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class NioServer {
private ServerSocketChannel server;
private Selector selector;
private ByteBuffer data;
private ByteArrayInputStream bais;
private ByteArrayOutputStream baos;
private ObjectInputStream in;
private ObjectOutputStream out;
private final ExecutorService input = Executors.newCachedThreadPool();
private final ExecutorService processor = Executors.newFixedThreadPool(5);
private final ExecutorService output = Executors.newCachedThreadPool();
private final BlockingQueue<SelectionKey> keyManager = new LinkedBlockingQueue<>(5);
private final BlockingQueue<SelectionKey> destination = new LinkedBlockingQueue<>(5);
private final BlockingQueue<Request> requests = new LinkedBlockingQueue<>(5);
private final BlockingQueue<Response> responses = new LinkedBlockingQueue<>(5);
public NioServer() throws IOException {
server = null;
selector = null;
}
private void init() throws IOException {
server = ServerSocketChannel.open();
selector = Selector.open();
server.socket().bind(new InetSocketAddress("127.0.0.1", 4999));
server.configureBlocking(false);
server.register(selector, SelectionKey.OP_ACCEPT);
}
public void run() throws IOException {
init();
try {
while (true) {
selector.select();
for(SelectionKey key : selector.selectedKeys()) {
if(key.isAcceptable()) {
accept(key);
} else if(key.isReadable()) {
keyManager.put(key);
input.submit(this::read);
processor.submit(this::process);
output.submit(this::write);
}
}
selector.selectedKeys().clear();
}
} catch(IOException | InterruptedException e) {
e.printStackTrace();
} finally {
for(SelectionKey key : selector.selectedKeys()) {
key.channel().close();
}
}
}
private void accept(SelectionKey key) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(this.selector, SelectionKey.OP_READ);
}
private void read() {
try {
SelectionKey key = keyManager.take();
SocketChannel client = (SocketChannel)key.channel();
data = ByteBuffer.allocate(1024);
int numRead = -1;
try {
numRead = client.read(data);
} catch (IOException e) {
key.cancel();
client.close();
return;
}
if(numRead == -1) {
client.close();
key.cancel();
return;
}
data.flip();
bais = new ByteArrayInputStream(data.array());
in = new ObjectInputStream(bais);
requests.put((Request)in.readObject());
destination.put(key);
} catch (IOException | InterruptedException | ClassNotFoundException e) {
e.printStackTrace();
}
}
private void process() {
try {
Request request = requests.take();
responses.put(new Response(request.getInfo() + " enrolled"));
} catch(InterruptedException e) {
System.out.println(e.toString());
}
}
private void write() {
try {
Response response = responses.take();
SelectionKey key = destination.take();
SocketChannel client = (SocketChannel)key.channel();
baos = new ByteArrayOutputStream();
out = new ObjectOutputStream(baos);
out.writeObject(response);
out.flush();
client.write(ByteBuffer.wrap(baos.toByteArray()));
client.close();
} catch (InterruptedException | IOException e) {
System.out.println(e.toString());
}
}
}
问题出在这一行:in = new ObjectInputStream(bais); 我尝试修复这个问题,并意识到如果我定义大小为 n 的固定线程池,在给出正确结果 n 次后,就会发生这个错误。有时,错误出现得更早。
有人可以向我解释为什么会发生这种情况吗?如果可以的话,你能给我建议一些解决方案吗? 非常感谢!
哦,孩子,我们从哪里开始。
我希望您这样做是出于学习目的,因为创建正确的 NIO 服务器非常困难。所以最好利用现有的。尝试 netti/jetty,它是众所周知的库。
现在让我们剖析代码中的问题:
ByteBuffer data
变量(无论如何你都做错了,你以非线程安全的方式访问它)。client.read(data)
阅读整封消息。它可以读取其中的一部分。可以读两本。它可以读取第一条消息的第二部分和第二条消息的第一部分。SelectionKey.OP_READ
,并在完成后将其恢复。否则两个线程可能会从同一通道读取数据。out.writeObject(response)
,你需要等到频道准备好。这是我第一眼看到的,可能还有更多。
有关问题的完整列表,您应该在 CodeReview
中发布代码如果您对工作示例感兴趣,您可以尝试查看 netty 源代码,但它非常复杂。
我有一个带有简单版本的NIO连接器的宠物项目,但它远非完美。例如,它有上面列表中的第三个问题。