我正在研究 quarkus 3 的 Mutiny 框架,我想知道这些代码是否正确,或者有人可能比我更好。我想读取一个大的 base64 字符串并将它们解码为一个字符串以保存在文件中(将来可能是一个文件)。我想一次读取 4 个字节(因为编码是 Base64 ),但以最好的多线程方式。 这是正确的代码还是错误的?如何?谢谢!
@Inject
ManagedExecutor managedExecutor;
List<byte[]> resultDecoded = readBase64InStreamingFashionAndWriteTo(resultEncoded).collect().asList().await().indefinitely();
//trasformao byte
String resultMerge = resultDecoded.stream().map(b -> new String(b, StandardCharsets.UTF_8)).reduce("", (resultString, partialStr) -> resultString + partialStr);
logger.info("Result Decoded: " + resultMerge);
public Multi<byte[]> readBase64InStreamingFashionAndWriteTo(String base64Str) throws IOException {
logger.info("start thread read " + Thread.currentThread());
AtomicInteger i = new AtomicInteger(1);
return Uni.createFrom().item(base64Str)
.emitOn(managedExecutor) //work pool
.log()
.onItem().transform(String::getBytes)
.onItem().transform(ByteArrayInputStream::new)
.onItem().transformToMulti(this::readChunkAsync)// processo asincrono
.emitOn(managedExecutor)
.onItem().transform( str -> {
if (i.getAndAdd(1) <= 2) logger.info(Thread.currentThread() + "**read partial string**: " + str);
return str;})
.onItem().transform(DatatypeConverter::parseBase64Binary);
}
private Multi<String> readChunkAsync(ByteArrayInputStream in) {
return Multi.createFrom().emitter( em -> { //emetto multi di stringhe (stream di stringhe generate leggendo 4 byte alla volta dal base64 in input)
try {
AtomicInteger i = new AtomicInteger(1);
byte[] buf = new byte[4];//4 bytes
int nread = -1;
while ((nread = in.read(buf, 0, buf.length)) != -1) {
if (i.getAndAdd(1) <= 2) logger.info(Thread.currentThread() + "**emiting**: ");
em.emit(new String(buf, StandardCharsets.UTF_8));
}
}catch(Exception ex){
//
} finally {
em.complete();
}
}
);
}
这可行,但由于任何操作中都没有异步 I/O(请参阅
onItem().transform(...)
,因此命令式循环中的相同代码将比那些运算符链接更有效。