我必须通过调用返回List的rest API从某处转储数据。
我尝试过如下:
restApiMethodWhichReturns6000Records
.parallelStream().forEach(id ->{
anotherMethodWhichgetsSomeDataAndPostsToOtherRestCall(id);
});
public void anotherMethodWhichgetsSomeDataAndPostsToOtherRestCall(String id) {
sestApiToPostData(url,methodThatGetsListOfData(id));
}
parallelStream
有时会引起意外行为。它使用常见的ForkJoinPool
。因此,如果您在代码中的其他位置具有并行流,则对于长时间运行的任务,它可能具有阻塞性质。即使在同一个流中,如果某些任务需要耗费时间,也会阻止所有工作线程。
关于这个stackoverflow的一个很好的讨论。在这里,您可以看到一些分配任务特定ForkJoinPool的技巧。
首先确保您的REST服务是非阻塞的。
您可以做的另一件事是通过向JVM提供-Djava.util.concurrent.ForkJoinPool.common.parallelism=4
来使用池大小。
如果API调用是阻塞的,即使您并行运行它们,您也可以并行执行几个调用。
我会尝试使用CompletableFuture
解决方案。
代码将是这样的:
List<CompletableFuture>> apiCallsFutures = restApiMethodWhichReturns6000Records
.stream()
.map(id -> CompletableFuture.supplyAsync(() -> getListOfData(id)) // Mapping the get list of data call to a Completable Future
.thenApply(listOfData -> callAPItoPOSTData(url, listOfData)) // when the get list call is complete, the post call can be performed
.collect(Collectors.toList());
CompletableFuture[] completableFutures = apiCallsFutures.toArray(new CompletableFuture[apiCallsFutures.size()]); // CompletableFuture.allOf accepts only arrays :(
CompletableFuture<Void> all = CompletableFuture.allOf(completableFutures); // Combine all the futures
all.get(); // perform calls
有关CompletableFutures的更多详细信息,请查看:https://www.baeldung.com/java-completablefuture