假设我有一个 CompletableFuture,它包装了一个阻塞调用,例如使用 JDBC 查询后端。在这种情况下,由于我没有将任何执行程序服务作为参数传递给 CompletableFuture.supplyAsync(),因此通过后端获取资源的实际阻塞工作应该由公共 Fork/Join 池中的线程完成。 让公共 FJpool 中的线程执行阻塞调用不是不好的做法吗?我在这里的优点是我的主线程不会阻塞,因为我将阻塞调用委托为异步运行。 here 检查 abt JDBC 调用是否阻塞。如果这个推论是正确的,为什么可以选择将默认的通用 FJpool 与 CompletableFuture 一起使用?
CompletableFuture<List<String>> fetchUnicorns =
CompletableFuture.supplyAsync(() -> {
return unicornService.getUnicorns();
});
fetchUnicorns.thenAccept(/**Do something with the result*/);
但是官方对此有一个解决方案:
class BlockingGetUnicorns implements ForkJoinPool.ManagedBlocker {
List<String> unicorns;
public boolean block() {
unicorns = unicornService.getUnicorns();
return true;
}
public boolean isReleasable() { return false; }
}
CompletableFuture<List<String>> fetchUnicorns =
CompletableFuture.supplyAsync(() -> {
BlockingGetUnicorns getThem = new BlockingGetUnicorns();
try {
ForkJoinPool.managedBlock(getThem);
} catch (InterruptedException ex) {
throw new AssertionError();
}
return getThem.unicorns;
});
ForkJoinPool.ManagedBlocker
是潜在阻塞操作的抽象,允许 Fork/Join 池在识别到工作线程即将被阻塞时创建补偿线程。应该很明显使用起来方便多了
CompletableFuture<List<String>> fetchUnicorns =
CompletableFuture.supplyAsync(() -> unicornService.getUnicorns(),
Executors.newSingleThreadExecutor());
这里。在生产环境中,您将保留对执行器的引用,重用它并最终对其调用
shutDown
。对于执行器不被重用的用例,
CompletableFuture<List<String>> fetchUnicorns =
CompletableFuture.supplyAsync(() -> unicornService.getUnicorns(),
r -> new Thread(r).start());
就足够了,这样,线程将在作业完成后自动处理。
如果这个推论是正确的,为什么可以选择将默认的通用 FJpool 与 CompletableFuture 一起使用?因为并非所有工作都是阻塞的。
您可以选择使用
CompletableFuture.supplyAsync(Supplier<U>, Executor)
ForkJoinPool
成为合理默认值的原因是对于计算繁重的任务,它的性能与为每个任务创建
Thread
类似,但没有创建
Thread
的开销。这是一个示例,它将运行的
CompletableFuture
实例数量限制为您计算机上的核心数量(在我的计算机上需要 2 分钟):
public static void main(String... args) {
var completableFutures = new ArrayList<CompletableFuture<Void>>(100);
for (int i = 0; i < 100; i++) {
int index = i;
completableFutures.add(
CompletableFuture.runAsync(
() -> {
try {
Thread.sleep(10000);
System.out.printf(
"Thread: %s, Index: %d, Time:%d%n",
Thread.currentThread().getName(), index, System.currentTimeMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}));
}
completableFutures.forEach(CompletableFuture::join);
}
自 Java 21 以来,现在有了 虚拟线程。通过将 Executors.newVirtualThreadPerTaskExecutor()
提供给
CompletableFuture.supplyAsync(Supplier, Executor)
。下面的示例同时运行所有
CompletableFuture
实例(在我的机器上需要 10 秒):
public static void main(String... args) {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (var i = 0; i < 100; i++) {
var index = i;
CompletableFuture.runAsync(
() -> {
try {
Thread.sleep(10000);
System.out.printf(
"Thread: %s, Index: %d, Time:%d%n",
Thread.currentThread().getName(), index, System.currentTimeMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
},
executor);
}
}
}