SynchronousQueue介绍

SynchronousQueue是一个没有数据缓冲的BlockingQueue

Posted by Sunfy on 2021-12-15
Words 975 and Reading Time 3 Minutes
Viewed Times
Viewed Times
Visitors In Total

SynchronousQueue

SynchronousQueue是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take。

image-20220919224939343

如图所示,SynchronousQueue 最大的不同之处在于,它的容量为 0,所以没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取。

需要注意的是,SynchronousQueue 的容量不是 1 而是 0,因为 SynchronousQueue 不需要去持有元素,它所做的就是直接传递(direct handoff)。由于每当需要传递的时候,SynchronousQueue 会把元素直接从生产者传给消费者,在此期间并不需要做存储,所以如果运用得当,它的效率是很高的。

应用场景

SynchronousQueue非常适合传递性场景做交换工作,生产者的线程和消费者的线程同步传递某些信息、事件或者任务。

SynchronousQueue的一个使用场景是在线程池里。如果我们不确定来自生产者请求数量,但是这些请求需要很快的处理掉,那么配合SynchronousQueue为每个生产者请求分配一个消费线程是处理效率最高的办法。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。

使用

1
BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>();

是一个没有数据缓冲的BlockingQueue,容量为0,它不会为队列中元素维护存储空间,它只是多个线程之间数据交换的媒介。

数据结构:链表(Node),在其内部类中维护了数据

  • 先消费(take),后生产(put)
    • 第一个线程Thread()是消费者访问,此时队列为空,则入队(创建Node节点并赋值)
    • 第二个线程Thread1也是消费者访问,与队尾模式相同,继续入队
    • 第三个线程Thread2是生产者,携带了数据e,与队尾模式不同,不进行入队操作。直接将该线程携带的数据e返回给队首的消费者,并唤醒队首线程Thread1(默认非公平策略是栈结构),出队

锁:CAS+自旋(无锁)

阻塞:自旋了一定次数后调用LockSupport.park()

存取调用同一个方法:transfer()

  • put、offer为生产者,携带了数据e,为Data模式,设置到SNode或QNode属性中
  • take、poll为消费者,不携带数据,为Request模式,设置到SNode或QNode属性中

过程

  • 线程访问阻塞队列,先判断队尾节点或者栈顶节点的Node与当前入队模式是否相同
  • 相同则构造节点Node入队,并阻塞当前线程,元素e和线程赋值给Node属性
  • 不同则将元素e(不为null)返回给取数据的线程,队首或栈顶线程被唤醒,出队

公平模式:TransferQueue,队尾匹配(判断模式),队头出队,先进先出

非公平模式(默认策略):TransferStack,栈顶匹配,栈顶出栈,后进先出

应用场景:

  • SynchronousQueue非常适合传递性场景做交换工作,生产者的线程和消费者的线程同步传递某些信息、事件或者任务
  • SynchronousQueue的一个使用场景是在线程池里。如果我们不确定来自生产者请求数量,但是这些请求需要很快的处理掉,那么配合SynchronousQueue为每个生产者请求分配一个消费线程是处理效率最高的办法。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。

Copyright 2021 sunfy.top ALL Rights Reserved

...

...

00:00
00:00