我正在为我的 spring-boot 应用程序使用
WebClient
和自定义 BodyExtractor
类
WebClient webLCient = WebClient.create();
webClient.get()
.uri(url, params)
.accept(MediaType.APPLICATION.XML)
.exchange()
.flatMap(response -> {
return response.body(new BodyExtractor());
})
BodyExtractor.java
@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
Flux<DataBuffer> body = response.getBody();
body.map(dataBuffer -> {
try {
JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
Unmarshaller unmarshaller = jc.createUnmarshaller();
return (T) unmarshaller.unmarshal(dataBuffer.asInputStream())
} catch(Exception e){
return null;
}
}).next();
}
上面的代码适用于小负载,但不适用于大负载,我认为这是因为我只使用
next
读取单个通量值,并且我不确定如何组合和读取所有 dataBuffer
。
我是reactor新手,所以我不知道很多flux/mono的技巧。
这实际上并不像其他答案所暗示的那么复杂。
传输数据而不将其全部缓冲在内存中的唯一方法是使用管道,如 @jin-kwon 建议的那样。但是,通过使用 Spring 的 BodyExtractors 和 DataBufferUtils 实用程序类可以非常简单地完成此操作。
示例:
private InputStream readAsInputStream(String url) throws IOException {
PipedOutputStream osPipe = new PipedOutputStream();
PipedInputStream isPipe = new PipedInputStream(osPipe);
ClientResponse response = webClient.get().uri(url)
.accept(MediaType.APPLICATION.XML)
.exchange()
.block();
final int statusCode = response.rawStatusCode();
// check HTTP status code, can throw exception if needed
// ....
Flux<DataBuffer> body = response.body(BodyExtractors.toDataBuffers())
.doOnError(t -> {
log.error("Error reading body.", t);
// close pipe to force InputStream to error,
// otherwise the returned InputStream will hang forever if an error occurs
try(isPipe) {
//no-op
} catch (IOException ioe) {
log.error("Error closing streams", ioe);
}
})
.doFinally(s -> {
try(osPipe) {
//no-op
} catch (IOException ioe) {
log.error("Error closing streams", ioe);
}
});
DataBufferUtils.write(body, osPipe)
.subscribe(DataBufferUtils.releaseConsumer());
return isPipe;
}
如果您不关心检查响应代码或引发失败状态代码的异常,则可以使用
跳过
block()
调用和中间
ClientResponse
变量
flatMap(r -> r.body(BodyExtractors.toDataBuffers()))
相反。
Bk Santiago 的答案略有修改的版本使用
reduce()
而不是 collect()
。非常相似,但不需要额外的课程:
Java:
body.reduce(new InputStream() {
public int read() { return -1; }
}, (s: InputStream, d: DataBuffer) -> new SequenceInputStream(s, d.asInputStream())
).flatMap(inputStream -> /* do something with single InputStream */
或者 Kotlin:
body.reduce(object : InputStream() {
override fun read() = -1
}) { s: InputStream, d -> SequenceInputStream(s, d.asInputStream()) }
.flatMap { inputStream -> /* do something with single InputStream */ }
这种方法相对于使用
collect()
的好处就是你不需要有不同的类来收集东西。
我创建了一个新的空
InputStream()
,但如果该语法令人困惑,您也可以将其替换为 ByteArrayInputStream("".toByteArray())
,而不是创建一个空 ByteArrayInputStream
作为初始值。
这是其他答案的另一种变体。而且它仍然不利于记忆。
// WARNING: NOT-MEMORY-FRIENDLY!
// WARNING: NOT-MEMORY-FRIENDLY!
// WARNING: NOT-MEMORY-FRIENDLY!
static Mono<InputStream> asStream(WebClient.ResponseSpec response) {
return response.bodyToFlux(DataBuffer.class)
.map(b -> b.asInputStream(true))
.reduce(SequenceInputStream::new);
}
static void doSome(WebClient.ResponseSpec response) {
asStream(response)
.doOnNext(stream -> {
// do some with stream
// close the stream!!!
})
.block();
}
我能够通过使用
Flux#collect
和 SequenceInputStream
使其工作
@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
Flux<DataBuffer> body = response.getBody();
return body.collect(InputStreamCollector::new, (t, dataBuffer)-> t.collectInputStream(dataBuffer.asInputStream))
.map(inputStream -> {
try {
JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
Unmarshaller unmarshaller = jc.createUnmarshaller();
return (T) unmarshaller.unmarshal(inputStream);
} catch(Exception e){
return null;
}
}).next();
}
InputStreamCollector.java
public class InputStreamCollector {
private InputStream is;
public void collectInputStream(InputStream is) {
if (this.is == null) this.is = is;
this.is = new SequenceInputStream(this.is, is);
}
public InputStream getInputStream() {
return this.is;
}
}
有一种更简洁的方法可以直接使用底层的reactor-netty
HttpClient
来完成此操作,而不是使用WebClient
。构图层次是这样的:
WebClient -uses-> HttpClient -uses-> TcpClient
显示代码比解释更容易:
HttpClient.create()
.get()
.responseContent() // ByteBufFlux
.aggregate() // ByteBufMono
.asInputStream() // Mono<InputStream>
.block() // We got an InputStream, yay!
但是,正如我已经指出的,使用
InputStream
是一种阻塞操作,它违背了使用非阻塞 HTTP 客户端的目的,更不用说聚合整个响应了。请参阅 this 了解 Java NIO 与 IO 的比较。
您可以使用管道。
static <R> Mono<R> pipeAndApply(
final Publisher<DataBuffer> source, final Executor executor,
final Function<? super ReadableByteChannel, ? extends R> function) {
return using(Pipe::open,
p -> {
executor.execute(() -> write(source, p.sink())
.doFinally(s -> {
try {
p.sink().close();
} catch (final IOException ioe) {
log.error("failed to close pipe.sink", ioe);
throw new RuntimeException(ioe);
}
})
.subscribe(releaseConsumer()));
return just(function.apply(p.source()));
},
p -> {
try {
p.source().close();
} catch (final IOException ioe) {
log.error("failed to close pipe.source", ioe);
throw new RuntimeException(ioe);
}
});
}
或使用
CompletableFuture
,
static <R> Mono<R> pipeAndApply(
final Publisher<DataBuffer> source,
final Function<? super ReadableByteChannel, ? extends R> function) {
return using(Pipe::open,
p -> fromFuture(supplyAsync(() -> function.apply(p.source())))
.doFirst(() -> write(source, p.sink())
.doFinally(s -> {
try {
p.sink().close();
} catch (final IOException ioe) {
log.error("failed to close pipe.sink", ioe);
throw new RuntimeException(ioe);
}
})
.subscribe(releaseConsumer())),
p -> {
try {
p.source().close();
} catch (final IOException ioe) {
log.error("failed to close pipe.source", ioe);
throw new RuntimeException(ioe);
}
});
}