我正在使用 Spring Cloud Gateway 4 来实现代理。
它应该做的一件事是,在接受大型视频文件时,在将其发送到请求的目的地之前为其添加水印。
因为这是一个大文件,我不想把它完全加载到内存中。并且水印软件在不同的服务中,在非反应性 Spring Boot 3 应用程序中实现。
据我现在所知,我想将正文或完整请求发送到水印软件。然后我轮询水印作业是否完成,并在完成时检索新文件(作为输入流或 Flux)。然后链应该使用这个新的身体作为“那个”身体。
如果有任何提示(指向方法的指针,类似的反应过程),我将不胜感激,这些提示允许我将请求流式传输到其他地方,然后允许轮询循环。
虽然我确实使用 Spring Boot 3,但我通常不使用 Reactive(甚至不使用 WebClient),所以尝试尝试确实很尴尬。
尝试 1
尽量将遗体送到下一个服务处。 我希望它能上传,除了
block()
终止流程。我明白为什么会这样,但我不明白我怎么能等到上传完成而不阻塞。
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
LOGGER.info("Uploading file for watermarking");
// Attempt 1
var destination = "http://localhost:8085/watermark?mark=test123";
var res = WebClient.create().post()
.uri(destination)
.body(exchange.getRequest().getBody(), DataBuffer.class)
.retrieve()
.bodyToMono(String.class);
res.block(); // Reactor understandably does not like this at all
return chain.filter(exchange);
}
尝试2
这里我尝试将其定义为身体通量的结果转化。但我不太确定这将如何与链中的轮询步骤联系起来。
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
LOGGER.info("Uploading file for watermarking");
var client = WebClient.create().post().uri(destination);
exchange.getRequest().getBody()
.flatMap(b -> client.body(BodyInserters.fromProducer(b, DataBuffer.class)).exchange())
.subscribe();
return chain.filter(exchange);
}
更新
我确实看过
ModifyRequestBodyGatewayFilterFactory
,但不清楚如何将数据流式传输到其他服务,然后进行民意调查。下面的代码在大文件上中断,因为它试图完全加载 DataBuffer。
var cfg = new ModifyRequestBodyGatewayFilterFactory.Config()
.setRewriteFunction( DataBuffer.class, DataBuffer.class,
(exch, originalBody) -> Mono.just(originalBody) );
return new ModifyRequestBodyGatewayFilterFactory().apply(cfg)
.filter(exchange, chain);