等到阻塞队列已满

问题描述 投票:0回答:5

我正在寻找一种同步多个异步操作的方法。我想使用大小等于我的操作的 BlockingQueue,但是我可以等谁直到队列已满?

我正在寻找类似反向阻塞队列的东西。

我需要在最后收集每个线程的结果。 AsyncHandler 是固定的,它已经是一个 ThreadExecutor 底层,我无法启动新线程。

//3 Times
makeAsync(new AsyncHandler() {
   onSuccess() {
     ..
     queue.put(result)
   }

   onFailure() {
     ..
   }
});

//Blocking till the Queue is full
List<Results> = queue.takeAll()

额外问题:当我的一个请求失败时,我需要一种方法来结束等待

java multithreading synchronization blockingqueue
5个回答
2
投票

我从来没有需要做这类事情,但你可能会幸运地使用各种线程中的

CountDownLatch
CyclicBarrier


1
投票

你所描述的内容

//Blocking till the Queue is full
List<Results> results = queue.takeAll();

在语义上与“获取与队列容量一样多的项目”没有区别。如果您知道容量,您可以通过以下方式实现:

// preferably a constant which you also use to construct the bounded queue
int capacity;

List<Results> results = new ArrayList<>(capacity);
queue.drainTo(results, capacity);
while(result.size()<capacity)
    queue.drainTo(results, capacity-result.size());

这将阻塞,直到它收到与

capacity
一样多的项目,如上所述,这与等待队列变满(大小等于其容量)然后获取所有项目相同。唯一的区别是队列变满的事件不能保证发生,例如如果您打算对
offer
项目进行异步操作,直到队列已满,则这种方式不起作用。

如果您不知道容量,那您就不走运了。甚至不能保证任意

BlockingQueue
是有界的,读取,它可能具有无限的容量。

另一方面,如果异步操作能够检测它们何时完成,它们可以简单地在本地收集列表中的项目,并在完成后将整个列表作为单个项目放入

BlockingQueue<List<Results>>
中。那么等待它的代码只需要一个
take
即可获取整个列表。


1
投票

如果您使用的是 Java 8,请执行以下操作:

  • 每次调用
    makeAsync
    时,创建一个
    CompletableFuture<Result>
    实例并使其可供
    AsyncHandler
    使用,并让调用者也保留一个引用,例如在列表中。
  • 当异步任务正常完成时,让它在其
    complete(result)
    实例上调用
    CompletableFuture
  • 当异步任务完成但出现错误时,让它在其
    completeExceptionally(exception)
    实例上调用
    CompletableFuture
  • 启动所有异步任务后,让调用者调用
    CompletableFuture.allOf(cfArray).join()
    。不幸的是,这需要一个数组,而不是一个列表,所以你必须进行转换。如果任何一项任务完成时出现错误,则
    join()
    调用将引发异常。否则,您可以通过调用各个
    CompletableFuture
    实例的
    get()
    方法来收集结果。

如果您没有 Java 8,您将不得不推出自己的机制。将

CountDownLatch
初始化为您要触发的异步任务的数量。让每个异步任务将其结果(或异常,或指示失败的其他方式)存储到线程安全的数据结构中,然后递减(“countDown”)锁存器。让调用者等待锁存器达到零,然后收集结果和错误。这并不是非常困难,但您必须确定一种方法来存储有效结果以及记录是否发生错误,并手动维护计数。


0
投票

如果你可以修改methodAsync(),那么就像每次将一些元素放入队列后使用CountDownLatch并让主线程等待这样的CountDownLatch一样简单。

如果不幸的是你不能修改methodAsync(),那么只需包装队列并给它一个倒数锁存器,然后重写add()方法来倒数这个锁存器。 main方法就等它完成了。

话虽如此,你的程序结构看起来组织得不太好。


0
投票

我为你的问题写了一个解决方案。我使用了一个带有“freeWorkers”的附加 BlockingQueue,其容量与实际工作线程池相同。我只是在开始下一个任务之前获取“freeWorker”,每个任务都会将“freeWorker”返回到“freeWorkes”队列。这是一个例子:

package poc.concurrecy;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.TimeUnit.SECONDS;

public class ThreadPoolQueueTest {

    public static final String WHITE = "\033[0;39m";
    public static final Random RANDOM = new Random();

    public static void main(String[] args) throws InterruptedException {
        int capacity = 5;
        LinkedBlockingQueue<Integer> freeWorkers = new LinkedBlockingQueue<>(capacity);
        for (int i = 1; i < capacity + 1; i++) freeWorkers.put(i);
        System.out.println(WHITE + freeWorkers);
        System.out.println(WHITE + "===========");
        LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(capacity);
        ExecutorService executor =
                new ThreadPoolExecutor(capacity, capacity,
                        0L, TimeUnit.MILLISECONDS,
                        workQueue);
        try {
            for (int i = 0; i < 10; i++) {
                Integer freeWorker = freeWorkers.take();
                String color = "\033[0;3" + freeWorker + "m";
                System.out.println(WHITE + "take free worker " + freeWorker);
                System.out.println(WHITE + freeWorkers);
                executor.execute(() -> {
                    System.out.println(color + "run " + freeWorker);
                    delay(1 + RANDOM.nextInt(3));
                    System.out.println(color + "done " + freeWorker);
                    try {
                        System.out.println(color + "return free worker " + freeWorker);
                        freeWorkers.put(freeWorker);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                });
                System.out.println(WHITE + "put task " + freeWorker);
            }

        } finally {
            executor.shutdown();
            executor.awaitTermination(5, TimeUnit.SECONDS);
        }
        System.out.println(WHITE + "==========");
        System.out.println(WHITE + freeWorkers);
    }

    public static void delay(int timeout) {
        try {
            SECONDS.sleep(timeout);
        } catch (InterruptedException e) {
            // do nothing
        }
    }
}

这是一个输出:

[1, 2, 3, 4, 5]
===========
take free worker 1

[2, 3, 4, 5]
put task 1
take free worker 2
[3, 4, 5]
put task 2
take free worker 3
[4, 5]
put task 3
take free worker 4
[5]
put task 4
take free worker 5
[]
put task 5
run 3
run 5
run 1
run 2
run 4
done 4
return free worker 4
take free worker 4
[]
put task 4
run 4
done 3
return free worker 3
take free worker 3
[]
put task 3
run 3
done 5
return free worker 5
done 1
return free worker 1
take free worker 5
[1]
put task 5
take free worker 1
run 5
[]
put task 1
run 1
done 3
return free worker 3
done 2
return free worker 2
take free worker 3
[2]
put task 3
run 3
done 5
return free worker 5
done 1
return free worker 1
done 4
return free worker 4
done 3
return free worker 3
==========
[2, 5, 1, 4, 3]

显然你可以简化这段代码并删除所有调试信息“System.out”

© www.soinside.com 2019 - 2024. All rights reserved.