使用默认的通用 fork/join 池和 CompletableFuture 来进行长阻塞调用是不是不好的做法?

问题描述 投票:0回答:3

假设我有一个 CompletableFuture,它包装了一个阻塞调用,例如使用 JDBC 查询后端。在这种情况下,由于我没有将任何执行程序服务作为参数传递给 CompletableFuture.supplyAsync(),因此通过后端获取资源的实际阻塞工作应该由公共 Fork/Join 池中的线程完成。 让公共 FJpool 中的线程执行阻塞调用不是不好的做法吗?我在这里的优点是我的主线程不会阻塞,因为我将阻塞调用委托为异步运行。 here 检查 abt JDBC 调用是否阻塞。如果这个推论是正确的,为什么可以选择将默认的通用 FJpool 与 CompletableFuture 一起使用?

CompletableFuture<List<String>> fetchUnicorns = CompletableFuture.supplyAsync(() -> { return unicornService.getUnicorns(); }); fetchUnicorns.thenAccept(/**Do something with the result*/);
    
java asynchronous java-8 completable-future
3个回答
23
投票
您不应该使用阻塞调用(以这种方式)的原因是,假设非阻塞作业,公共池并行性已配置为利用现有的 CPU 核心。阻塞的线程将减少使用同一池的其他任务的并行性。

但是官方对此有一个解决方案:

class BlockingGetUnicorns implements ForkJoinPool.ManagedBlocker { List<String> unicorns; public boolean block() { unicorns = unicornService.getUnicorns(); return true; } public boolean isReleasable() { return false; } } CompletableFuture<List<String>> fetchUnicorns = CompletableFuture.supplyAsync(() -> { BlockingGetUnicorns getThem = new BlockingGetUnicorns(); try { ForkJoinPool.managedBlock(getThem); } catch (InterruptedException ex) { throw new AssertionError(); } return getThem.unicorns; });

ForkJoinPool.ManagedBlocker

 是潜在阻塞操作的抽象,允许 Fork/Join 池在识别到工作线程即将被阻塞时创建补偿线程。

应该很明显使用起来方便多了

CompletableFuture<List<String>> fetchUnicorns = CompletableFuture.supplyAsync(() -> unicornService.getUnicorns(), Executors.newSingleThreadExecutor());

这里。在生产环境中,您将保留对执行器的引用,重用它并最终对其调用

shutDown

。对于执行器不被重用的用例,

CompletableFuture<List<String>> fetchUnicorns = CompletableFuture.supplyAsync(() -> unicornService.getUnicorns(), r -> new Thread(r).start());

就足够了,这样,线程将在作业完成后自动处理。


1
投票
如果这个推论是正确的,为什么可以选择将默认的通用 FJpool 与 CompletableFuture 一起使用?

因为并非所有工作都是阻塞的。

您可以选择使用

CompletableFuture.supplyAsync(Supplier<U>, Executor)

 在自定义执行器上安排阻塞工作
    


0
投票
是的,这是不好的做法。我怀疑

ForkJoinPool

 成为合理默认值的原因是对于计算繁重的任务,它的性能与为每个任务创建 
Thread
 类似,但没有创建 
Thread
 的开销。

这是一个示例,它将运行的

CompletableFuture

 实例数量限制为您计算机上的核心数量(在我的计算机上需要 2 分钟):

public static void main(String... args) { var completableFutures = new ArrayList<CompletableFuture<Void>>(100); for (int i = 0; i < 100; i++) { int index = i; completableFutures.add( CompletableFuture.runAsync( () -> { try { Thread.sleep(10000); System.out.printf( "Thread: %s, Index: %d, Time:%d%n", Thread.currentThread().getName(), index, System.currentTimeMillis()); } catch (InterruptedException e) { throw new RuntimeException(e); } })); } completableFutures.forEach(CompletableFuture::join); }
自 Java 21 以来,现在有了 

虚拟线程。通过将 Executors.newVirtualThreadPerTaskExecutor()

 提供给 
CompletableFuture.supplyAsync(Supplier, Executor)
。下面的示例同时运行所有 
CompletableFuture
 实例(在我的机器上需要 10 秒):

public static void main(String... args) { try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { for (var i = 0; i < 100; i++) { var index = i; CompletableFuture.runAsync( () -> { try { Thread.sleep(10000); System.out.printf( "Thread: %s, Index: %d, Time:%d%n", Thread.currentThread().getName(), index, System.currentTimeMillis()); } catch (InterruptedException e) { throw new RuntimeException(e); } }, executor); } } }
    
© www.soinside.com 2019 - 2024. All rights reserved.