我需要按顺序进行两个 http 调用,一个依赖于另一个,在另一种情况下进行两个并行调用并组合它们的结果。
对于案例 1:相互依赖的顺序调用,我正在使用“zipWhen”将我的结果组合在一起
对于案例 2:异步并行调用,我使用“zip”订阅两个 http 请求的发布者并合并结果。
在这两种情况下,我发现包含第一个响应的 ByteBuf 在进行第二次调用时自动丢失引用。我怀疑这与 responseSingle 有关,但我不确定为什么会这样。
我在下面包含了 Case1(顺序)的代码示例:
package io.spring.workshop.reactornetty.http;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.server.HttpServer;
import java.nio.charset.StandardCharsets;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class HttpCompressionTests {
@Test
public void httpCompressionTest() {
DisposableServer server =
HttpServer.create() // Prepares a HTTP server for configuration.
.port(0) // Configures the port number as zero, this will let the system pick up
// an ephemeral port when binding the server.
.handle((req, res) -> res.sendString(Mono.just("compressed response")))
.compress(true) // Enables compression.
.wiretap(true) // Applies a wire logger configuration.
.bindNow(); // Starts the server in a blocking fashion, and waits for it to finish initializing.
assertNotNull(server);
String response =
HttpClient.create() // Prepares a HTTP client for configuration.
.port(server.port()) // Obtains the server's port and provides it as a port to which this
// client should connect.
.compress(true) // Enables compression.
.wiretap(true) // Applies a wire logger configuration.
.get() // Specifies that GET method will be used.
.uri("/test") // Specifies the path.
.responseSingle((res, body) -> Mono.zip(Mono.just(res), body.defaultIfEmpty(Unpooled.EMPTY_BUFFER))) // Receives the response body.
// .aggregate()
// .asString()
.log("http-client")
.zipWhen(string1 ->
HttpClient.create() // Prepares a HTTP client for configuration.
.port(server.port()) // Obtains the server's port and provides it as a port to which this
// client should connect.
.compress(true) // Enables compression.
.wiretap(true) // Applies a wire logger configuration.
.get() // Specifies that GET method will be used.
.uri("/test") // Specifies the path.
.responseSingle((res, body) -> Mono.zip(Mono.just(res), body.defaultIfEmpty(Unpooled.EMPTY_BUFFER))), // Receives the response body.
(string1, string2) -> StandardCharsets.UTF_8.decode(string1.getT2().nioBuffer()).toString()
).block();
assertEquals("compressed response", response);
server.disposeNow(); // Stops the server and releases the resources.
}
}
任何帮助将不胜感激。谢谢!!
我尝试保留我的第一个响应 ByteBuf,这似乎可行,但我将不得不手动减少引用计数,我认为这可能会导致 Mem 泄漏。