我熟悉函数式编程语言,通常使用Scala和Javascript。我正在研究Java8项目并且不确定我应该如何运行项目的列表/流,并使用自定义线程池并行地为每个项目执行一些副作用,并返回一个对象听取完成是可能的(无论是成功还是失败)。
目前我有以下代码,它似乎工作(我使用Play框架Promise实现作为返回)但它似乎并不理想,因为ForkJoinPool并不意味着首先用于IO密集型计算。
public static F.Promise<Void> performAllItemsBackup(Stream<Item> items) {
ForkJoinPool pool = new ForkJoinPool(3);
ForkJoinTask<F.Promise<Void>> result = pool
.submit(() -> {
try {
items.parallel().forEach(performSingleItemBackup);
return F.Promise.<Void>pure(null);
} catch (Exception e) {
return F.Promise.<Void>throwing(e);
}
});
try {
return result.get();
} catch (Exception e) {
throw new RuntimeException("Unable to get result", e);
}
}
有人可以给我一个更加惯用的上述功能吗?理想情况下,不使用ForkJoinPool,使用更标准的返回类型和最新的Java8 API?不确定我应该在CompletableFuture,CompletionStage,ForkJoinTask之间使用什么......
一个规范的解决方案
public static CompletableFuture<Void> performAllItemsBackup(Stream<Item> items) {
ForkJoinPool pool = new ForkJoinPool(3);
try {
return CompletableFuture.allOf(
items.map(CompletableFuture::completedFuture)
.map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
.toArray(CompletableFuture<?>[]::new));
} finally {
pool.shutdown();
}
}
请注意,ForkJoin池和并行流之间的交互是一个您不应该依赖的未指定的实现细节。相比之下,CompletableFuture
提供了一个专门的API来提供Executor
。它甚至不必是一个ForkJoinPool
:
public static CompletableFuture<Void> performAllItemsBackup(Stream<Item> items) {
ExecutorService pool = Executors.newFixedThreadPool(3);
try {
return CompletableFuture.allOf(
items.map(CompletableFuture::completedFuture)
.map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
.toArray(CompletableFuture<?>[]::new));
} finally {
pool.shutdown();
}
}
在任何一种情况下,您都应该显式关闭执行程序,而不是依赖于自动清理。
如果您需要F.Promise<Void>
结果,您可以使用
public static F.Promise<Void> performAllItemsBackup(Stream<Item> items) {
ExecutorService pool = Executors.newFixedThreadPool(3);
try {
return CompletableFuture.allOf(
items.map(CompletableFuture::completedFuture)
.map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
.toArray(CompletableFuture<?>[]::new))
.handle((v, e) -> e!=null? F.Promise.<Void>throwing(e): F.Promise.pure(v))
.join();
} finally {
pool.shutdown();
}
}
但请注意,这与原始代码一样,仅在操作完成时返回,而返回CompletableFuture
的方法允许操作异步运行,直到调用者调用join
或get
。
要返回一个真正的异步Promise
,你必须包装整个操作,例如
public static F.Promise<Void> performAllItemsBackup(Stream<Item> stream) {
return F.Promise.pure(stream).flatMap(items -> {
ExecutorService pool = Executors.newFixedThreadPool(3);
try {
return CompletableFuture.allOf(
items.map(CompletableFuture::completedFuture)
.map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
.toArray(CompletableFuture<?>[]::new))
.handle((v, e) -> e!=null? F.Promise.<Void>throwing(e): F.Promise.pure(v))
.join();
} finally {
pool.shutdown();
}
});
}
但最好决定使用一个API,而不是在两个不同的API之间来回跳转。