在Java8中并行运行IO计算

问题描述 投票:11回答:1

我熟悉函数式编程语言,通常使用Scala和Javascript。我正在研究Java8项目并且不确定我应该如何运行项目的列表/流,并使用自定义线程池并行地为每个项目执行一些副作用,并返回一个对象听取完成是可能的(无论是成功还是失败)。

目前我有以下代码,它似乎工作(我使用Play框架Promise实现作为返回)但它似乎并不理想,因为ForkJoinPool并不意味着首先用于IO密集型计算。

public static F.Promise<Void> performAllItemsBackup(Stream<Item> items) {
    ForkJoinPool pool = new ForkJoinPool(3);
    ForkJoinTask<F.Promise<Void>> result = pool
            .submit(() -> {
                try {
                    items.parallel().forEach(performSingleItemBackup);
                    return F.Promise.<Void>pure(null);
                } catch (Exception e) {
                    return F.Promise.<Void>throwing(e);
                }
            });

    try {
        return result.get();
    } catch (Exception e) {
        throw new RuntimeException("Unable to get result", e);
    }
}

有人可以给我一个更加惯用的上述功能吗?理想情况下,不使用ForkJoinPool,使用更标准的返回类型和最新的Java8 API?不确定我应该在CompletableFuture,CompletionStage,ForkJoinTask之间使用什么......

java java-8
1个回答
12
投票

一个规范的解决方案

public static CompletableFuture<Void> performAllItemsBackup(Stream<Item> items) {
    ForkJoinPool pool = new ForkJoinPool(3);
    try {
        return CompletableFuture.allOf(
            items.map(CompletableFuture::completedFuture)
                 .map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
                 .toArray(CompletableFuture<?>[]::new));
    } finally {
        pool.shutdown();
    }
}

请注意,ForkJoin池和并行流之间的交互是一个您不应该依赖的未指定的实现细节。相比之下,CompletableFuture提供了一个专门的API来提供Executor。它甚至不必是一个ForkJoinPool

public static CompletableFuture<Void> performAllItemsBackup(Stream<Item> items) {
    ExecutorService pool = Executors.newFixedThreadPool(3);
    try {
        return CompletableFuture.allOf(
            items.map(CompletableFuture::completedFuture)
                 .map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
                 .toArray(CompletableFuture<?>[]::new));
    } finally {
        pool.shutdown();
    }
}

在任何一种情况下,您都应该显式关闭执行程序,而不是依赖于自动清理。

如果您需要F.Promise<Void>结果,您可以使用

public static F.Promise<Void> performAllItemsBackup(Stream<Item> items) {
    ExecutorService pool = Executors.newFixedThreadPool(3);
    try {
        return CompletableFuture.allOf(
            items.map(CompletableFuture::completedFuture)
                 .map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
                 .toArray(CompletableFuture<?>[]::new))
            .handle((v, e) -> e!=null? F.Promise.<Void>throwing(e): F.Promise.pure(v))
            .join();
    } finally {
        pool.shutdown();
    }
}

但请注意,这与原始代码一样,仅在操作完成时返回,而返回CompletableFuture的方法允许操作异步运行,直到调用者调用joinget

要返回一个真正的异步Promise,你必须包装整个操作,例如

public static F.Promise<Void> performAllItemsBackup(Stream<Item> stream) {
    return F.Promise.pure(stream).flatMap(items -> {
        ExecutorService pool = Executors.newFixedThreadPool(3);
        try {
            return CompletableFuture.allOf(
                items.map(CompletableFuture::completedFuture)
                     .map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
                     .toArray(CompletableFuture<?>[]::new))
                .handle((v, e) -> e!=null? F.Promise.<Void>throwing(e): F.Promise.pure(v))
                .join();
        } finally {
            pool.shutdown();
        }
    });
}

但最好决定使用一个API,而不是在两个不同的API之间来回跳转。

© www.soinside.com 2019 - 2024. All rights reserved.