文章目录
- 类图结构及概要
- 核心方法
- offer方法
- take方法
- poll方法
- size方法
- 总结
DelayQueue 并发队列是 一个无界阻塞延迟队 列 ,队列中的每个元素都有个过期时 间,当从队列获取元素时,只有过期元素才会出队列。队列头元素是最快要过期的元素。
类图结构及概要
DelayQueue 内部使用 PriorityQueue 存放数据,使用 ReentrantLock 实现线程同步 。另外,队列里面的元素要实现 Delayed 接口,由于每个元素都有一个过期时间 ,所以要实现获知当前元素还剩下多少时 间就过期了的接口,由于内部使用优先级队列来实现,所以要实现元素之间相互比较的接口。
条件变量 available 与 lock 锁是对应的,其目的是为了实现线程问同步 。其中 leader 变量的使用基于 Leader-Follower 模式的变体,用于尽量减少不必要的线程等待。当一个线程调用队列的 take 方法变为 leader 线程后,它会调用条件变量 available.awaitNanos(delay) 等待 delay 时间,但是其他线程 (follwer 线程) 则 会调用 available.await()进行无限等待 。 leader 线程延迟时间过期后,会退出 take 方法 , 并通过调用available.signal()方法唤醒一个 follwer 线程 ,被唤醒的 follwer 线程被选举为新的 leader 线程 。
核心方法
offer方法
插入元素到队列,如果插入元素为 null 则抛 出 NullPointerException 异常 , 否则由于是无界队列 , 所以一直返回 true。插入元素要实现 Delayed 接口。
``
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
``
如上代码首先获取独占锁,然后添加元素到优先级队列,由于 q 是优先级队列,所以添加元素后,调用 q.peek() 方法返回的并不一定是当前添加的元素。如果代码( 2 )判断结果为 true,则说明当前元素 e 是最先将过期的 , 那么重置 leader 线程为 null , 这时候激活 avaliab\e 变量条件队列里面的一个线程,告诉它队列里面有元素了。
take方法
获取并移除队列里面延迟时间过期的元素,如果队列里面没有过期元素则等待。
``
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 获取但不移除队首元素( 1)
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
``
如上代码首先获取独占锁 lock 。 假设线程 A 第一次调用 队列的 take() 方法时队列为空 ,则 执行代码(1)后 first==null ,所以会执行代码 (2) 把当前线程放入 available 的条件队列里阻塞等待。
当有另 外一个线程 B 执行 offer ( item )方法并且添加元素到队列时 , 假设此时没有其他线程执行入队操作 ,则线程 B 添加 的元素是队首元素 , 那么执行 qpeek() 。
e 这时候就会重置 leader 线程为 null ,并且激活条件变量的条件队列里面的一个线程。此时线程 A 就会被激活。
线程 A 被撤活并循环后重新获取 队首元素,这时候 first 就是线程 B 新增的元素,可知这时候 first 不为 null , 则调用 first. getDelay(TimeUnit.NANOSECONDS) 方法查看该元素还剩余多 少时间就要过期,如 果 de lay<=O 则说明 己经过期,那么直接出队返回。否则查看 leader 是否为 null ,不为 null 则说明其他线程也在执行 take ,则把该线程放入条件队列。如果这时候 leader 为 null,则 :izt取当前线程 A 为 leader 线程 , 然后执行代码(5 ) 等待 delay时间(这期间该线程会释放锁,所以其他线程可 以 offer 添加元素,也可以 take 阻塞自己) ,剩余过期时间到后,线程 A 会重新竞争得到锁,然后重置 leader 线程为 null , 重新进入循环,这时候就会发现队头的元素己经过期了,则会直接返回队头元素 。
在返回前会执行 finally 块里面的代码( 7 ),代码 (7 )执行结果为 true 则说明 当前线程从队列移除过期元素后,又有其他线程执行了入队操作,那么这时候调用条件变量的singa l 方法,激活条件队列里面的等待线程。
poll方法
获取并移除队头过期元素,如果没有过期元素则返回 null 。首先获取独占锁,然后获取队头元素,如果队头元素为 null 或者还没过期则返回 null ,否则返回队头元素。
size方法
计算队列元素个数,包含过期的和没有过期的。首先获取独占锁,然后调用优先级队列 的 size 方法。
总结
DelayQueue 队列,其内部使用 PriorityQueue 存放数据,使用 ReentrantLock 实现线程同步 。另 外队列里面的 元素要实现 Delayed 接口,其中一个是获取当前元素到过期时间剩余时间的接 口,在出队时判断元素是否过期 了,一个是元素之间比较的接口 ,因为这是一个有优先级的队列 。