我在应用程序中遇到了这样一种情况:事件进入,处理事件的线程(信令线程)必须向另一个处于闲置状态的线程(工作线程)发出信号,以使其可以运行某些代码。工作线程完成后,应等待再次发出信号。当工作线程正在工作时,事件可能会到达。在这种情况下,它应该继续运行并立即工作。工作线程执行的一项操作足以应付任何数量的传入事件,因此无需为每个事件执行一次,而只需在每个事件之后尽快进行一次。正确行为示例:
event comes in
worker thread starts work
worker thread finishes work
event comes in
worker thread starts work
event comes in
event comes in
worker thread finishes work
worker thread starts work
worker thread finishes work
4个事件,3个工作周期。不幸的是,但不可避免的要求是信令线程在处理事件时不能阻塞。目前,我已经使用BlockingQueue实现了此功能,即使内容没有意思甚至没有看,它也具有填满自身的毫无意义的副作用。我原本希望能够使用CountDownLatch或CyclicBarrier或类似工具来完成这项工作,但我一直找不到方法。这是我的实现:
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class Main {
private static final class MyBarrier {
private BlockingQueue<Boolean> queue = new LinkedBlockingQueue<>();
void await() throws InterruptedException {
queue.take();
queue.clear();
}
void signal() {
queue.add(true);
}
}
private static Random random = new Random(0);
private static void sleepForMax(int maxMillis) {
sleep(random.nextInt(maxMillis));
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static void main(String[] args) {
MyBarrier myBarrier = new MyBarrier();
final ExecutorService singallingThread = Executors.newSingleThreadExecutor();
singallingThread.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
sleepForMax(1_000); // simulate period between events arriving
myBarrier.signal();
System.out.println("Signalling work to be done");
}
System.out.println("Thread interrupted");
});
final ExecutorService workingThread = Executors.newSingleThreadExecutor();
workingThread.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
System.out.println("Waiting for work");
myBarrier.await();
} catch (InterruptedException e) {
break;
}
System.out.println("Doing work...");
sleepForMax(3_000); // simulate work being done
System.out.println("Work done");
}
System.out.println("Thread interrupted");
});
sleep(10_000);
singallingThread.shutdownNow();
workingThread.shutdownNow();
}
}
什么是更好的方法?
我正在尝试使用java.util.concurrent.Phaser进行测试,该方法可能有效,但是我之前没有使用Phaser,所以不确定。
private static final class MyBarrier2 {
private Phaser phaser = new Phaser(1);
void await() throws InterruptedException {
phaser.awaitAdvanceInterruptibly(phaser.getPhase());
}
void signal() {
phaser.arrive();
}
}