我试图弄清楚
java.util.concurrent.DelayQueue
在多线程环境中是如何工作的。我看到这个数据结构内部使用了ReentrantLock
,并且它是在此类的每个方法的开头获取的。我尝试使用不同的方法来实现生产者-消费者模式,从队列中检索元素的顶部 (poll()
/ take()
)。
public class DelayQueueTest {
static DelayQueue<Delayed> delayQueue = new DelayQueue<>();
public static final long executionTime = 2_000L;
public static long timeStart;
public static void go() throws InterruptedException {
timeStart = System.currentTimeMillis();
Thread consumer1 = new Thread(new Consumer());
Thread producer = new Thread(new Producer());
consumer1.start();
Thread.sleep(200);
producer.start();
producer.join();
consumer1.join();
}
static class Consumer implements Runnable {
static DelayQueue<Delayed> delayQueue = DelayQueueTest.delayQueue;
@Override
public void run() {
while (System.currentTimeMillis() - timeStart < executionTime) {
System.out.println(String.format(
"Thread %s started taking at %d",
Thread.currentThread().getId(),
System.currentTimeMillis() - timeStart)
);
Delayed result;
try {
result = delayQueue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.print(String.format(
"Thread %s finished taking at %d... ",
Thread.currentThread().getId(),
System.currentTimeMillis() - timeStart)
);
if (result != null) System.out.println(String.format("with result %s", result.toString()));
else System.out.println();
System.out.format("Queue size: %d\n", delayQueue.size());
try {
Thread.sleep(500);
}
catch (InterruptedException ex) {
System.out.println(ex);
}
}
}
}
static class Producer implements Runnable {
static DelayQueue<Delayed> delayQueue = DelayQueueTest.delayQueue;
@Override
public void run() {
while (System.currentTimeMillis() - timeStart < executionTime) {
System.out.println(String.format(
"Thread %s started offering at %d. Queue size: %d",
Thread.currentThread().getId(),
System.currentTimeMillis() - timeStart,
delayQueue.size())
);
delayQueue.offer(new DelayObject(300));
System.out.println(String.format(
"Thread %s finished offering at %d. Queue size: %d",
Thread.currentThread().getId(),
System.currentTimeMillis() - timeStart,
delayQueue.size())
);
try {
Thread.sleep(300);
}
catch (InterruptedException ex) {
System.out.println(ex);
}
}
}
}
}
当我在
消费者线程上调用
take()
,并在200毫秒后在生产者线程上调用
offer()
时,问题出现了。我期望消费者线程在 take()
获取锁时阻塞整个数据结构,因此 offer()
不会向队列中添加元素(因为 offer()
也想获取锁,但它被 offer()
锁定)
)。所以,从全球范围来看,我预计会出现僵局。然而,这并没有发生。这是我的程序的输出:
Thread 15 started taking at 2
Thread 16 started offering at 214. Queue size: 0
Thread 16 finished offering at 217. Queue size: 1
Thread 16 started offering at 527. Queue size: 1
Thread 16 finished offering at 527. Queue size: 1
Thread 15 finished taking at 527... with result org.example.DelayQueueTest$DelayObject@7148b5a8
Queue size: 1
Thread 16 started offering at 828. Queue size: 1
Thread 16 finished offering at 828. Queue size: 2
Thread 15 started taking at 1035
Thread 15 finished taking at 1035... with result org.example.DelayQueueTest$DelayObject@21819116
Queue size: 1
Thread 16 started offering at 1143. Queue size: 1
Thread 16 finished offering at 1143. Queue size: 2
Thread 16 started offering at 1453. Queue size: 2
Thread 16 finished offering at 1453. Queue size: 3
Thread 15 started taking at 1548
Thread 15 finished taking at 1548... with result org.example.DelayQueueTest$DelayObject@6bf427cb
Queue size: 2
Thread 16 started offering at 1769. Queue size: 2
Thread 16 finished offering at 1769. Queue size: 3
你能解释一下,为什么我有这样的输出吗?为什么要加锁,而且已经加锁了,
offer()
却执行成功了?
此外,如果您可能想出这个
DelayQueue
类的潜在用例,我很乐意观察它们......
正如您正确地注意到的那样,
DelayQueue.take()
获取与 DelayQueue
关联的锁,这可以防止其他线程与 DelayQueue
交互。
但是,如果没有可用元素,
take()
会调用 available.await();
(请参阅DelayQueue.take()
第 242 行和 249 行的源)。
available
与 DelayQueue
锁相关联(source 第 103 和 129 行):
private final transient ReentrantLock lock = new ReentrantLock();
private final Condition available = lock.newCondition();
ReentrantLock.newCondition()
的文档
返回一个
实例以与此
Condition
实例一起使用。 当与内置监视器锁一起使用时,返回的Lock
实例支持与
Condition
监视器方法(
Object
、
wait
和notify
)相同的用法。notifyAll
- [...]
- 当条件等待方法被调用时,锁被释放,并且在它们返回之前,锁被重新获取,并且锁持有计数恢复到调用该方法时的值。
- [...]
这意味着在调用
available.await()
时,可重入锁被释放,这允许其他线程进入 DelayQueue
的锁定方法。