DelayQueue
DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。延迟队列的特点是:不是先进先出,而是会按照延迟时间的长短来排序,下一个即将执行的任务会排到队列的最前面。
它是无界队列,放入的元素必须实现 Delayed 接口,而 Delayed 接口又继承了 Comparable 接口,所以自然就拥有了比较和排序的能力,代码如下:
1 2 3 4 5 6
| public interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit unit); }
|
一个使用优先级队列实现的无界阻塞队列
数据结构:PriorityQueue,与PriorityBlockingQueue类似,不过没有阻塞功能
锁:ReentrantLock
阻塞对象:Condition available
入队:不阻塞,无界队列,与优先级队列入队相同,available
出队:
- 为空时阻塞
- 检查堆顶元素过期时间
- 小于等于0则出队
- 大于0,说明没过期,则阻塞(判断leader线程是否为空(为了保证优先级))
- 不为空(已有线程阻塞),直接阻塞
- 为空,则将当前线程置为leader,并按照过期时间进行阻塞
应用场景
- 商城订单超时关闭-订单业务,下单之后如果三十分钟内没有支付自动取消
- 异步短信通知功能-饿了么订单通知,下单成功后60s之后给用户发送短信通知
- 关闭空闲连接-服务器中,有很多客户端连接,空闲一段时间之后需要关闭
- 缓存过期清除-缓存中的对象,超过了存货时间,需要从缓存中移除
- 任务超时处理-在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求等
DelayQueue使用
1
| DelayQueue<OrderInfo> queue = new DelayQueue<OrderInfo>();
|
原理
数据结构
1 2 3 4 5 6 7 8 9 10 11 12 13
| private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
private final Condition available = lock.newCondition();
public DelayQueue() {} public DelayQueue(Collection<? extends E> c) { this.addAll(c); }
|
入队put方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public void put(E e) { offer(e); } public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
|
出队take方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll(); first = null; if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
|
Copyright 2021 sunfy.top ALL Rights Reserved