我正在寻找一种同步多个异步操作的方法。我想使用大小等于我的操作的 BlockingQueue,但是我可以等谁直到队列已满?
我正在寻找类似反向阻塞队列的东西。
我需要在最后收集每个线程的结果。 AsyncHandler 是固定的,它已经是一个 ThreadExecutor 底层,我无法启动新线程。
//3 Times
makeAsync(new AsyncHandler() {
onSuccess() {
..
queue.put(result)
}
onFailure() {
..
}
});
//Blocking till the Queue is full
List<Results> = queue.takeAll()
额外问题:当我的一个请求失败时,我需要一种方法来结束等待
你所描述的内容
//Blocking till the Queue is full
List<Results> results = queue.takeAll();
在语义上与“获取与队列容量一样多的项目”没有区别。如果您知道容量,您可以通过以下方式实现:
// preferably a constant which you also use to construct the bounded queue
int capacity;
…
List<Results> results = new ArrayList<>(capacity);
queue.drainTo(results, capacity);
while(result.size()<capacity)
queue.drainTo(results, capacity-result.size());
这将阻塞,直到它收到与
capacity
一样多的项目,如上所述,这与等待队列变满(大小等于其容量)然后获取所有项目相同。唯一的区别是队列变满的事件不能保证发生,例如如果您打算对 offer
项目进行异步操作,直到队列已满,则这种方式不起作用。
如果您不知道容量,那您就不走运了。甚至不能保证任意
BlockingQueue
是有界的,读取,它可能具有无限的容量。
另一方面,如果异步操作能够检测它们何时完成,它们可以简单地在本地收集列表中的项目,并在完成后将整个列表作为单个项目放入
BlockingQueue<List<Results>>
中。那么等待它的代码只需要一个 take
即可获取整个列表。
如果您使用的是 Java 8,请执行以下操作:
makeAsync
时,创建一个 CompletableFuture<Result>
实例并使其可供 AsyncHandler
使用,并让调用者也保留一个引用,例如在列表中。complete(result)
实例上调用 CompletableFuture
。completeExceptionally(exception)
实例上调用 CompletableFuture
。CompletableFuture.allOf(cfArray).join()
。不幸的是,这需要一个数组,而不是一个列表,所以你必须进行转换。如果任何一项任务完成时出现错误,则 join()
调用将引发异常。否则,您可以通过调用各个 CompletableFuture
实例的 get()
方法来收集结果。如果您没有 Java 8,您将不得不推出自己的机制。将
CountDownLatch
初始化为您要触发的异步任务的数量。让每个异步任务将其结果(或异常,或指示失败的其他方式)存储到线程安全的数据结构中,然后递减(“countDown”)锁存器。让调用者等待锁存器达到零,然后收集结果和错误。这并不是非常困难,但您必须确定一种方法来存储有效结果以及记录是否发生错误,并手动维护计数。
如果你可以修改methodAsync(),那么就像每次将一些元素放入队列后使用CountDownLatch并让主线程等待这样的CountDownLatch一样简单。
如果不幸的是你不能修改methodAsync(),那么只需包装队列并给它一个倒数锁存器,然后重写add()方法来倒数这个锁存器。 main方法就等它完成了。
话虽如此,你的程序结构看起来组织得不太好。
我为你的问题写了一个解决方案。我使用了一个带有“freeWorkers”的附加 BlockingQueue,其容量与实际工作线程池相同。我只是在开始下一个任务之前获取“freeWorker”,每个任务都会将“freeWorker”返回到“freeWorkes”队列。这是一个例子:
package poc.concurrecy;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.SECONDS;
public class ThreadPoolQueueTest {
public static final String WHITE = "\033[0;39m";
public static final Random RANDOM = new Random();
public static void main(String[] args) throws InterruptedException {
int capacity = 5;
LinkedBlockingQueue<Integer> freeWorkers = new LinkedBlockingQueue<>(capacity);
for (int i = 1; i < capacity + 1; i++) freeWorkers.put(i);
System.out.println(WHITE + freeWorkers);
System.out.println(WHITE + "===========");
LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(capacity);
ExecutorService executor =
new ThreadPoolExecutor(capacity, capacity,
0L, TimeUnit.MILLISECONDS,
workQueue);
try {
for (int i = 0; i < 10; i++) {
Integer freeWorker = freeWorkers.take();
String color = "\033[0;3" + freeWorker + "m";
System.out.println(WHITE + "take free worker " + freeWorker);
System.out.println(WHITE + freeWorkers);
executor.execute(() -> {
System.out.println(color + "run " + freeWorker);
delay(1 + RANDOM.nextInt(3));
System.out.println(color + "done " + freeWorker);
try {
System.out.println(color + "return free worker " + freeWorker);
freeWorkers.put(freeWorker);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
System.out.println(WHITE + "put task " + freeWorker);
}
} finally {
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
}
System.out.println(WHITE + "==========");
System.out.println(WHITE + freeWorkers);
}
public static void delay(int timeout) {
try {
SECONDS.sleep(timeout);
} catch (InterruptedException e) {
// do nothing
}
}
}
这是一个输出:
[1, 2, 3, 4, 5]
===========
take free worker 1
[2, 3, 4, 5]
put task 1
take free worker 2
[3, 4, 5]
put task 2
take free worker 3
[4, 5]
put task 3
take free worker 4
[5]
put task 4
take free worker 5
[]
put task 5
run 3
run 5
run 1
run 2
run 4
done 4
return free worker 4
take free worker 4
[]
put task 4
run 4
done 3
return free worker 3
take free worker 3
[]
put task 3
run 3
done 5
return free worker 5
done 1
return free worker 1
take free worker 5
[1]
put task 5
take free worker 1
run 5
[]
put task 1
run 1
done 3
return free worker 3
done 2
return free worker 2
take free worker 3
[2]
put task 3
run 3
done 5
return free worker 5
done 1
return free worker 1
done 4
return free worker 4
done 3
return free worker 3
==========
[2, 5, 1, 4, 3]
显然你可以简化这段代码并删除所有调试信息“System.out”