我目前正在开发一个TCP服务器/客户端应用程序,并且我已经成功地在服务器和客户端之间建立了通信。虽然对于小消息来说通信工作得很好,但在处理较大的消息(例如模型或对象)时,我遇到了一些序列化问题。
为了解决大消息的序列化问题,我实现了一种机制,不是一次性发送整个大消息
Mono.just(Unpooled.copiedBuffer(serializedBlock))
,而是将其分成多个较小的块。在接收端,客户端通过将这些块按正确的顺序重新组合在一起来重建原始大消息。
对于发送部分:
Flux<ByteBuf> chunkedFlux = Flux.range(0, (int) Math.ceil(SerializationUtils.serialize(blockMessage).length / (double) 2048))
.map(chunkNumber -> {
int startIdx = chunkNumber * 2048;
int endIdx = Math.min(startIdx + 2048, SerializationUtils.serialize(blockMessage).length);
byte[] chunk = new byte[endIdx - startIdx];
System.arraycopy(SerializationUtils.serialize(blockMessage), startIdx, chunk, 0, chunk.length);
return Unpooled.wrappedBuffer(chunk);
});
//Sent new mined block to broker
brokerConnectionStorage.getConnection()
.outbound().send(chunkedFlux)
.then()
.subscribe();
对于接收部分:
public class MessageProcessor implements Processor {
@Autowired
private StorageServices storage;
@Autowired
private List<Connection> connectionStorage;
//Pass the processor in handler
public void forwardMessage(NettyInbound inbound, NettyOutbound outbound) {
Flux<byte[]> fluxData = inbound.receive().retain()
.cast(ByteBuf.class)
.map(this::receiveChunk)
.doOnNext(map -> log.info("Map:{}", map.length));
Flux.from(fluxData)
.reduce(this::aggregate)
.subscribe(result -> System.out.println("Concatenated byte array: " + new String(result)));
}
}
和处理器接口
public interface Processor {
void process(Message message);
default byte[] receiveChunk(ByteBuf byteBuf) {
//byteBuf.retain();
byte[] chunk = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(chunk).release();
return chunk;
}
default Message mapToMessage(byte[] concatenateChunks) {
return (Message) SerializationUtils.deserialize(concatenateChunks);
}
default byte[] aggregate(byte[] arr1, byte[] arr2) {
byte[] result = new byte[arr1.length + arr2.length];
System.arraycopy(arr1, 0, result, 0, arr1.length);
System.arraycopy(arr2, 0, result, arr1.length, arr2.length);
return result;
}
}
对于序列化,我使用
import org.apache.commons.lang3.SerializationUtils;
。
我面临的问题是,我在
reduce()
之后没有看到任何日志或数据处理。
我尝试使用
collectList()
和ByteBufMono mono = inbound.receive().aggregate().retain();
但面临同样的问题。
我检查了消息的总大小,发送者和接收者的消息大小是相同的。
我已经做了更多的测试并发送小消息(例如交易),这部分工作正常,但是当涉及到更大的数据(例如块)时,就不起作用了。
第一个屏幕来自发送块消息和 10 笔交易的发送者。
List<Block> blocks = storage.getPseudoBlocks();
blocks.forEach(block -> {
List<PseudoTransaction> transactions = new ArrayList<>();
block.getTransactions().forEach(tx -> {
transactions.add(storage.getMempoolTransactionByKey(tx.split("_")[0]));
});
Flux<ByteBuf> chunks = Mono.just(block)
.doOnNext(b -> log.info("Sending block:{}", b.getHash()))
.flux()
.flatMap(blockToFlux ->{
return Flux.range(0, (int) Math.ceil(SerializationUtils.serialize(block).length / (double) 2048))
.map(chunkNumber -> {
int startIdx = chunkNumber * 2048;
int endIdx = Math.min(startIdx + 2048, SerializationUtils.serialize(block).length);
byte[] chunk = new byte[endIdx - startIdx];
System.arraycopy(SerializationUtils.serialize(block), startIdx, chunk, 0, chunk.length);
return Unpooled.wrappedBuffer(chunk);
});
})
.doOnNext(chunk -> log.info("Chunk:{}", chunk));
brokerConnection.outbound().sendObject(
Flux.fromIterable(transactions).delayElements(Duration.ofMillis(150))
.doOnNext(t -> log.info("Sending transactions:{}", t.getPseudoHash()))
.map(transaction -> Unpooled.copiedBuffer(SerializationUtils.serialize(transaction)))
.mergeWith(chunks))
.then()
.subscribe();
});
在接收方,我收到 12 条消息,其中 2 条用于区块,10 条用于交易。
inbound.receive().retain()
.doOnNext(this::receiveChunk)
.doOnNext(l->log.info("Data:{}", l))
.collectList().then().subscribe(result -> System.out.println("Result: "+ result));
所以所有的块数据(块)都是同时到来的,这意味着我可以以某种方式按窗口时间对它们进行分组并反序列化块?