在我的 API 中,我有一个调用外部 API 的服务。此外部 API 按请求收取费用。因此我想对外部 API 的请求进行排队。
这就是今天的样子:
@Component
public class OrderService {
private final WebClient webClient;
public OrderService(WebClient webClient) {
this.webClient = webClient;
}
public Mono<List<Order>> getOrders(List<String> orderNumbers) {
return webClient
.get()
.uri(builder -> builder.queryParam("orderNumbers", String.join(",", orderNumbers)).build())
.retrieve()
.bodyToFlux(Order.class)
.collectList();
}
}
我想要的是以某种方式排队10个订单号,并在队列中达到10个订单号后,发送批量请求,然后将响应返回给所有原始请求者。
假设
OrderService::getOrders
被调用 3 次,首先调用 4 个订单号,然后调用 4 个附加订单号,最后再次调用 4 个订单号。前两个请求会将总共 8 个订单号排队,无需调用外部 API。一旦第三个请求到达,队列中就有 12 个订单号。此时,我们将发送包含前 10 个订单号的请求。最后两个仍处于排队状态,因为外部 API 允许的最大订单数为 10。
我的API看到流量很大,所以排队和等待不是问题,他们不必等待很长时间,直到队列有10个项目。
我尝试阅读如何使用 Spring WebFlux 来实现这一目标,我的研究向我指出了 Sinks 和 Processor。然而,我很难理解这一切是如何联系在一起的。例如,我如何跟踪哪些请求请求了特定的订单号。
有更了解的人可以帮助我如何实现这一目标吗?
我会尝试使用countDownLatch,这允许您在获得正在使用您的服务的所有线程的计数后触发一个操作。
@Component
public class OrderService {
private static final String QUEUE_UP_COUNT = 10;
private final WebClient webClient;
private CountDownLatch countDownLatch = new CountDownLatch(QUEUE_UP_COUNT );
public OrderService(WebClient webClient) {
this.webClient = webClient;
}
public Mono<List<Order>> getOrders(List<String> orderNumbers) {
countDownLatch.countDown();
countDownLatch.wait();
List<Order> orders = webClient
.get()
.uri(builder -> builder.queryParam("orderNumbers", String.join(",", orderNumbers)).build())
.retrieve()
.bodyToFlux(Order.class)
.collectList();
// Here you need to create a new the countDownLatch since it doesn't allow to restart
countDownLatch = new CountDownLatch(QUEUE_UP_COUNT );
}
}
它可以类似于以下内容:
首先,为传入请求创建一个队列:
private Sinks.Many<Tuples2(String, Sinks.One<Order>)> queue = Sinks.many().unicast().onBackpressureBuffer()
并有以下方法来接受请求:
public Mono<List<Order>> getOrders(List<String> orderNumbers) {
return Flux.fromIterable(orderNumbers)
.flatMapSequential(::getOrder)
.collectList();
}
public Mono<Order> getOrder(String number) {
Sinks.One<Order> callback = Sinks.one();
// TODO handle FAIL_NON_SERIALIZED here by doing a park-retry
queue.tryEmitValue(Tuples.of(number, callback));
return callback.asMono();
}
运行此程序进行处理
public void run() {
queue
.bufferTimeout(10, Duration.ofMilliseconds(100))
.flatMap( list -> {
List numbers = list.stream().map(Tuples::getT1).collect(Collectors.toList());
request(numbers)
.doOnNext(orders -> {
for(int i = 0; i < orders.size(); i++) {
list.get(i).getT2().tryEmitValue(orders[i]);
}
})
.then()
})
.subscribe()
}
public Mono<List<Order>> request(List<String> orderNumbers) {
return webClient
.get()
.uri(builder -> builder.queryParam("orderNumbers", String.join(",", orderNumbers)).build())
.retrieve()
.bodyToFlux(Order.class)
.collectList();
}