我目前正在开发一个TCP服务器/客户端应用程序,并且我已经成功地在服务器和客户端之间建立了通信。虽然对于小消息来说通信工作得很好,但在处理较大的消息(例如模型或对象)时,我遇到了一些序列化问题。
为了解决大消息的序列化问题,我实现了一种机制,不是一次性发送整个大消息
Mono.just(Unpooled.copiedBuffer(serializedBlock))
,而是将其分成多个较小的块。在接收端,客户端通过将这些块按正确的顺序重新组合在一起来重建原始大消息。
对于发送部分:
Flux<ByteBuf> chunkedFlux = Flux.range(0, (int) Math.ceil(SerializationUtils.serialize(blockMessage).length / (double) 2048))
.map(chunkNumber -> {
int startIdx = chunkNumber * 2048;
int endIdx = Math.min(startIdx + 2048, SerializationUtils.serialize(blockMessage).length);
byte[] chunk = new byte[endIdx - startIdx];
System.arraycopy(SerializationUtils.serialize(blockMessage), startIdx, chunk, 0, chunk.length);
return Unpooled.wrappedBuffer(chunk);
});
//Sent new mined block to broker
brokerConnectionStorage.getConnection()
.outbound().send(chunkedFlux)
.then()
.subscribe();
对于接收部分:
public class MessageProcessor implements Processor {
@Autowired
private StorageServices storage;
@Autowired
private List<Connection> connectionStorage;
//Pass the processor in handler
public void forwardMessage(NettyInbound inbound, NettyOutbound outbound) {
Flux<byte[]> fluxData = inbound.receive().retain()
.cast(ByteBuf.class)
.map(this::receiveChunk)
.doOnNext(map -> log.info("Map:{}", map.length));
Flux.from(fluxData)
.reduce(this::aggregate)
.subscribe(result -> System.out.println("Concatenated byte array: " + new String(result)));
}
和处理器接口
public interface Processor {
void process(Message message);
default byte[] receiveChunk(ByteBuf byteBuf) {
//byteBuf.retain();
byte[] chunk = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(chunk).release();
return chunk;
}
default Message mapToMessage(byte[] concatenateChunks) {
return (Message) SerializationUtils.deserialize(concatenateChunks);
}
default byte[] aggregate(byte[] arr1, byte[] arr2) {
byte[] result = new byte[arr1.length + arr2.length];
System.arraycopy(arr1, 0, result, 0, arr1.length);
System.arraycopy(arr2, 0, result, arr1.length, arr2.length);
return result;
}
}
对于序列化,我正在使用
import org.apache.commons.lang3.SerializationUtils;
我面临的问题是,我在
reduce()
之后没有看到任何处理的日志或数据
我尝试使用
collectList()
和ByteBufMono mono = inbound.receive().aggregate().retain();
但面临同样的问题。
我检查了消息的总大小,发送者和接收者的消息大小是相同的。