如何等待使用不同`ExecutorServices`创建的`Future`列表

问题描述 投票:0回答:5

好吧,所以我知道这里的第一个答案/评论将是“使用一个

ExecutorService
并使用
invokeAll
”。然而,我们有一个充分的理由(我不会让人们感到厌烦)将线程池分开。

所以我有一个线程池列表(

ExecutorServices
),我需要做的是使用
Callable
在每个线程池上调用不同的
submit
(没问题)。现在我有这个
Future
实例的集合,每个实例都在单独的
ExecutorService
上创建,我想等待所有这些实例完成(并且能够提供一个超时,在该超时时间内取消任何未完成的操作)。

是否有一个现有的类可以执行此操作(包装

Future
实例的列表并允许等待所有操作完成)?如果没有,我们将不胜感激关于有效机制的建议。

正在考虑调用

get
并为每个调用设置超时,但必须计算每次调用的总时间。

我看到了这篇文章等待任何未来完成,但这扩展了

Future
而不是包装它们的列表。

java concurrency executorservice future
5个回答
17
投票

根据路易斯的评论,我正在寻找的是Futures.successfulAsList

这让我可以等待所有任务完成,然后检查是否有任何失败的 future。

番石榴规则!


2
投票

我不认为 JDK 提供了直接的 API 可以让你做到这一点。然而,我认为创建一个简单的方法来执行此操作同样简单。您可能想看一下 AbstractExecutorService.invokeAll() 的实现,以了解这是可以完成的。

本质上,您可以对每个 future 调用 future.get(),根据每次等待结果的时间来减少等待时间,并在从方法返回之前取消所有未完成的 future。


1
投票

也许我没真正明白。然而,对我来说,这听起来仍然很简单

public <V> List<V> get(List<Future<V>> futures, long timeout, TimeUnit unit)
          throws InterruptedException, ExecutionException, TimeoutException {
    List<V> result = new ArrayList<V>();
    long end = System.nanoTime() + unit.toNanos(timeout);
    for (Future<V> f: futures) {
        result.add(f.get(end - System.nanoTime(), TimeUnit.NANOSECONDS));
    }
    return result;
}

我这样说有错吗?

我认为您链接的问题要复杂得多,因为他们只想等待最快的,当然不知道哪个是最快的。


1
投票

这可能需要一些清理,但它应该可以解决您的问题。 (因时间和空间省略了一些封装):

public static <T> LatchWithWrappedCallables<T> wrapCallables(Collection<Callable<T>> callablesToWrap)
{
    CountDownLatch latch = new CountDownLatch(callablesToWrap.size());
    List<Callable<T>> wrapped = new ArrayList<Callable<T>>(callablesToWrap.size());
    for (Callable<T> currCallable : callablesToWrap)
    {
        wrapped.add(new CallableCountdownWrapper<T>(currCallable, latch));
    }

    LatchWithWrappedCallables<T> returnVal = new LatchWithWrappedCallables<T>();
    returnVal.latch = latch;
    returnVal.wrappedCallables = wrapped;
    return returnVal;
}

public static class LatchWithWrappedCallables<T>
{
    public CountDownLatch latch;
    public Collection<Callable<T>> wrappedCallables;
}

public static class CallableCountdownWrapper<T> implements Callable<T>
{
    private final Callable<T> wrapped;

    private final CountDownLatch latch;

    public CallableCountdownWrapper(Callable<T> wrapped, CountDownLatch latch)
    {
        this.wrapped = wrapped;
        this.latch = latch;
    }

    @Override
    public T call() throws Exception
    {
        try
        {
            return wrapped.call();
        }
        finally
        {
            latch.countDown();
        }
    }
}

然后你的代码会这样调用它:

Collection<Callable<String>> callablesToWrap = [Your callables that you need to wait for here];
LatchWithWrappedCallables<String> latchAndCallables = wrapCallables(callablesToWrap);

[Submit the wrapped callables to the executors here]

if(latchAndCallables.latch.await(timeToWaitInSec, TimeUnit.SECONDS))
{
    [Handling for timeout here]
}

0
投票

我怀疑您需要期货清单。这种解决方案浪费内存并且不必要地复杂。你所面临的是一个相对典型的多生产者-单消费者的情况。此类场景一般通过阻塞队列来解决。事实上,你有多个执行者并没有多大改变。

你所需要的只是阻塞出队,2边队列; Java 提供LinkedBlockingDeque。 LinkedBlockingDeque 确实是线程安全的集合,并针对高并发使用进行了优化。

创建一个 LinkedBlockingDeque 并将其引用分发给所有生产者和消费者。生产者将在一端推送消息,而消费者将在另一端拉取消息。 使用简单的

executor.execute(() -> { ...; add processed item to queue})
而不是
executor.submit()

您可以将 LinkedBlockingDeque 视为 Future 的变体,但对于多个值,需要多个 get()。

© www.soinside.com 2019 - 2024. All rights reserved.