DelayQueue介绍

DelayQueue 是一个支持延时获取元素的阻塞队列

Posted by Sunfy on 2021-12-16
Words 1.3k and Reading Time 5 Minutes
Viewed Times
Viewed Times
Visitors In Total

DelayQueue

DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。延迟队列的特点是:不是先进先出,而是会按照延迟时间的长短来排序,下一个即将执行的任务会排到队列的最前面。

它是无界队列,放入的元素必须实现 Delayed 接口,而 Delayed 接口又继承了 Comparable 接口,所以自然就拥有了比较和排序的能力,代码如下:

1
2
3
4
5
6
public interface Delayed extends Comparable<Delayed> {
//getDelay 方法返回的是“还剩下多长的延迟时间才会被执行”,
//如果返回 0 或者负数则代表任务已过期。
//元素会根据延迟时间的长短被放到队列的不同位置,越靠近队列头代表越早过期。
long getDelay(TimeUnit unit);
}

一个使用优先级队列实现的无界阻塞队列

数据结构:PriorityQueue,与PriorityBlockingQueue类似,不过没有阻塞功能

锁:ReentrantLock

阻塞对象:Condition available

入队:不阻塞,无界队列,与优先级队列入队相同,available

出队:

  • 为空时阻塞
  • 检查堆顶元素过期时间
    • 小于等于0则出队
    • 大于0,说明没过期,则阻塞(判断leader线程是否为空(为了保证优先级))
      • 不为空(已有线程阻塞),直接阻塞
      • 为空,则将当前线程置为leader,并按照过期时间进行阻塞

应用场景

  • 商城订单超时关闭-订单业务,下单之后如果三十分钟内没有支付自动取消
  • 异步短信通知功能-饿了么订单通知,下单成功后60s之后给用户发送短信通知
  • 关闭空闲连接-服务器中,有很多客户端连接,空闲一段时间之后需要关闭
  • 缓存过期清除-缓存中的对象,超过了存货时间,需要从缓存中移除
  • 任务超时处理-在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求等

DelayQueue使用

1
DelayQueue<OrderInfo> queue = new DelayQueue<OrderInfo>();

原理

数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
//用于保证队列操作的线程安全
private final transient ReentrantLock lock = new ReentrantLock();
// 优先级队列,存储元素,用于保证延迟低的优先执行
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用于标记当前是否有线程在排队(仅用于取元素时) leader 指向的是第一个从队列获取元素阻塞的线程
private Thread leader = null;
// 条件,用于表示现在是否有可取的元素 当新元素到达,或新线程可能需要成为leader时被通知
private final Condition available = lock.newCondition();

public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}

入队put方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void put(E e) {
offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 入队
q.offer(e);
if (q.peek() == e) {
// 若入队的元素位于队列头部,说明当前元素延迟最小
// 将 leader 置空
leader = null;
// available条件队列转同步队列,准备唤醒阻塞在available上的线程
available.signal();
}
return true;
} finally {
lock.unlock(); // 解锁,真正唤醒阻塞的线程
}
}

出队take方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();// 取出堆顶元素
if (first == null)// 如果堆顶元素为空,说明队列中还没有元素,直接阻塞等待
available.await();
else {
long delay = first.getDelay(NANOSECONDS);// 堆顶元素的到期时间
if (delay <= 0)// 如果小于0说明已到期,直接调用poll()方法弹出堆顶元素
return q.poll();

// 如果delay大于0 ,则下面要阻塞了
// 将first置为空方便gc
first = null;
// 如果前面有其它线程在等待,直接进入等待
if (leader != null)
available.await();
else {
// 如果leader为null,把当前线程赋值给它
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待delay时间后自动醒过来
// 醒过来后把leader置空并重新进入循环判断堆顶元素是否到期
// 这里即使醒过来后也不一定能获取到元素
// 因为有可能其它线程先一步获取了锁并弹出了堆顶元素
// 条件锁的唤醒分成两步,先从Condition的队列里出队
// 再入队到AQS的队列中,当其它线程调用LockSupport.unpark(t)的时候才会真正唤醒
available.awaitNanos(delay);
} finally {
// 如果leader还是当前线程就把它置为空,让其它线程有机会获取元素
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 成功出队后,如果leader为空且堆顶还有元素,就唤醒下一个等待的线程
if (leader == null && q.peek() != null)
// available条件队列转同步队列,准备唤醒阻塞在available上的线程
available.signal();
// 解锁,真正唤醒阻塞的线程
lock.unlock();
}
}

Copyright 2021 sunfy.top ALL Rights Reserved

...

...

00:00
00:00