我在使用套接字、Inflater 和 Deflater 的 java 项目中遇到了一个奇怪的问题,我想我只需要另一双眼睛来关注它。
我看到套接字服务器上的读取只有 2 个标头字节 789c,它没有完成 inflate 操作。然后我在开始时再次收到带有标头字节的完整有效负载,当我尝试使用此有效负载继续进行膨胀操作时,我收到错误:
2024-04-07T08:12:47.850-07:00 INFO 14688 --- [compressed-socket-demo-server] [ main] demo.server.ServerApplication : Started ServerApplication in 0.417 seconds (process running for 0.745)
2024-04-07T08:12:53.581-07:00 DEBUG 14688 --- [compressed-socket-demo-server] [Pool-1-worker-2] demo.server.MessageListener : readCount: 2
2024-04-07T08:12:53.583-07:00 DEBUG 14688 --- [compressed-socket-demo-server] [Pool-1-worker-2] demo.server.MessageListener : readOutputStream byte array: 789c
2024-04-07T08:12:53.583-07:00 DEBUG 14688 --- [compressed-socket-demo-server] [Pool-1-worker-2] demo.server.MessageListener : inflater not finished and doesn't needs input, calling inflater.inflate
2024-04-07T08:12:53.583-07:00 DEBUG 14688 --- [compressed-socket-demo-server] [Pool-1-worker-2] demo.server.MessageListener : readCount: 13
2024-04-07T08:12:53.583-07:00 DEBUG 14688 --- [compressed-socket-demo-server] [Pool-1-worker-2] demo.server.MessageListener : readOutputStream byte array: 789cf348cdc9c95730000009e60245
2024-04-07T08:12:53.583-07:00 DEBUG 14688 --- [compressed-socket-demo-server] [Pool-1-worker-2] demo.server.MessageListener : inflater not finished and doesn't needs input, calling inflater.inflate
2024-04-07T08:12:53.584-07:00 ERROR 14688 --- [compressed-socket-demo-server] [Pool-1-worker-2] demo.server.MessageListener :
MessageListener error
java.util.zip.DataFormatException: invalid stored block lengths
at java.base/java.util.zip.Inflater.inflateBytesBytes(Native Method) ~[na:na]
at java.base/java.util.zip.Inflater.inflate(Inflater.java:376) ~[na:na]
at java.base/java.util.zip.Inflater.inflate(Inflater.java:470) ~[na:na]
at demo.server.MessageListener.listen(MessageListener.java:72) ~[classes/:na]
at demo.server.SeverSocketListener.lambda$start$0(SeverSocketListener.java:42) ~[classes/:na]
at java.base/java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1556) ~[na:na]
at java.base/java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:507) ~[na:na]
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java) ~[na:na]
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1491) ~[na:na]
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:2073) ~[na:na]
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:2035) ~[na:na]
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:187) ~[na:na]
这是服务器的相关部分:
public void listen() {
try {
InputStream inputStream = clientSocket.getInputStream();
byte[] readBuff = new byte[1024];
ByteArrayOutputStream readOutputStream = new ByteArrayOutputStream();
Inflater inflater = new Inflater();
byte[] inflateBuff = new byte[1024];
int readCount;
int inflateCount = 0;
while ((readCount = inputStream.read(readBuff)) != -1) {
log.debug("readCount: {}", readCount);
readOutputStream.write(readBuff, 0, readCount);
log.debug("readOutputStream byte array: {}", hexFormat.formatHex(readOutputStream.toByteArray()));
inflater.setInput(readOutputStream.toByteArray());
while (inflater.getRemaining() > 0 && !inflater.finished() && !inflater.needsInput()) {
log.debug("inflater not finished and doesn't needs input, calling inflater.inflate");
inflateCount += inflater.inflate(inflateBuff);
}
if (inflater.finished()) {
log.debug("inflater finished, handling message");
handleMessage(Arrays.copyOfRange(inflateBuff, 0, inflateCount));
inflater.reset();
inflateCount = 0;
readOutputStream.close();
readOutputStream = new ByteArrayOutputStream();
}
}
} catch (IOException | DataFormatException e) {
log.error("MessageListener error", e);
throw new RuntimeException(e);
}
}
还有一些客户:
@Override
public void run(ApplicationArguments args) throws Exception {
try (Socket socket = new Socket("localhost", 4943)) {
OutputStream socketOutputStream = socket.getOutputStream();
for (int i = 0; i < 1000; i++) {
StringBuilder message = new StringBuilder("Hello ").append(i);
writeMessage(socketOutputStream, message.toString().getBytes(StandardCharsets.UTF_8));
}
}
}
private void writeMessage(OutputStream socketOutputStream, byte[] message) {
log.debug("Writing {}", message);
try {
DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(socketOutputStream);
deflaterOutputStream.write(message);
deflaterOutputStream.finish();
deflaterOutputStream.flush();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
我已将带有服务器和客户端的完整 Maven 多模块项目推送到此处的公共 github 存储库:https://github.com/gadams00/compressed-socket-demo。
我确信我做错了什么,但我就是不知道是什么。
我按照评论中的建议使用 InflaterInputStream 做了更多测试。 InflaterInputStream 似乎没有任何方法来处理剩余的输入。
一些额外的背景: java 套接字服务器是与第三方套接字客户端集成的一部分,该客户端发送使用 zlib 压缩的消息。在发送另一条消息之前,它不会等待套接字上的响应。 在 java 套接字服务器中,我使用包裹在客户端套接字输入流周围的 InflaterInputStream。当 InflaterInputStream 填充其内部缓冲区时,它包含多个压缩消息。 InflaterInputStream 似乎没有提供任何方法来处理剩余的输入。一旦包含的 Inflater 完成,任何后续读取都将返回 -1。我是否缺少某种方法来完成这项工作,或者我是否需要自己直接使用 Inflater 以便我可以处理包含多个压缩消息的输入?
在我的用例中,客户端连接到服务器套接字,在服务器向客户端发送一些启动信息的情况下进行握手,然后客户端不断发送压缩的有效负载,而无需在有效负载之间等待来自服务器的任何类型的响应。这会导致服务器中读取的套接字输入流包含潜在的多个压缩的有效负载。我最初想以某种方式控制客户端的发送,以便服务器端的读取只包含一个有效负载,但我认为这就是 tcp 流的工作方式。此外,这是为了与不太可能更改协议的第三方集成。
InflaterInputStream 不是针对此用例构建的,它根本不适合 InputStream,因为 InputStream 读取字节,接口中没有任何内容来指示一个压缩字节数组的结尾和下一个压缩字节数组的开始。 InflaterInputStream 旨在对单个紧缩的字节流进行充气,一旦紧缩器完成,InflaterInputStream 就会进入 EOF 状态,并且所有后续读取都将返回 -1,并且无法在不打开新的情况下重置 InflaterInputStream。问题在于,在第一个压缩字节数组结束后,InflaterInputStream 中的内部缓冲区可能已经从套接字读取了更多数据,而无法恢复该数据并将其提供给下一个 inflate。
我的解决方案是从服务器端的套接字输入流中读取并自己管理 Inflater,特别是处理剩余字节并适当地调用 setInput。我还必须小心处理在下一次读取中继续的读取缓冲区末尾的部分有效负载。这解决了我的问题。