赞
踩
我们假设一种场景,生产者一直生产资源,消费者一直消费资源,资源存储在一个缓存池中,生产者将生产的资源存进缓存池中,消费者从缓存池 中拿到资源进行消费,这就是大名鼎鼎的生产者-消费者模式。
该模式能够简化开发过程,一方面消除了生产者与消费者类之间的代码依赖性,另方面将生产数据的过程与使用数据的过程解耦简单化负载。
我们⾃⼰coding实现这个模式的时候,因为需要让多个线程操作共享变量(即资源),所以很容易引发线程安全问题,造成重复消费和死锁,尤其是⽣产者和消费者存在多个的情况。另外,当缓冲池空了,我们需要阻塞消费者,唤醒⽣产者;当缓冲池满了,我们需要阻塞⽣产者,唤醒消费者,这些个等待-唤醒逻辑都需要⾃⼰实现。
这么容易出错的事情,JDK当然帮我们做啦,这就是阻塞队列(BlockingQueue),你只管往里面存、取就行,而不用担心多线程环境下存、取共享变量的线程安全问题。
BlockingQueue是Java
util.concurrent包下重要的数据结构,区别于普通的队列,BlockingQueue提供了线程安全的队列访问⽅式,并发包下很多⾼级同步类的实现都是基于BlockingQueue实现的。
BlockingQueue⼀般⽤于⽣产者-消费者模式,⽣产者是往队列⾥添加元素的线程,消费者是从队列⾥拿元素的线程。BlockingQueue就是存放元素的容器。
阻塞队列提供了四组不同的方法用于插入、移除、检查元素:
方法\处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | - | - |
注意之处:
由数组结构组成的有界阻塞队列。内部结构是数组,故具有数组的特性。
public ArrayBlockingQueue(int capacity, boolean fair){
//..省略代码
}
可以初始化队列⼤⼩, 且⼀旦初始化不能改变。构造⽅法中的fair表示控制对象的内部锁是否采⽤公平锁,默认是⾮公平锁。
该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素 。注⼊其中的元素必须实现
DelayQueue是⼀个没有⼤⼩限制的队列,因此往队列中插⼊数据的操作(⽣产者)永远不会被阻塞,⽽只
基于优先级的⽆界阻塞队列(优先级的判断通过构造函数传⼊的Compator对象来决定),内部控制线程同步
这个队列⽐较特殊,没有任何内部容量,甚⾄连⼀个队列的容量都没有。并且每个put 必须等待⼀个 take,反之亦然。
需要区别容量为1的ArrayBlockingQueue、LinkedBlockingQueue。
以下⽅法的返回值,可以帮助理解这个队列:
注意:
PriorityBlockingQueue不会阻塞数据⽣产者(因为队列是⽆界的),⽽只会在没有可消费的数据时,阻塞数据的消费者。因此使⽤的时候要特别注意,⽣产者⽣产数据的速度绝对不能快于消费者消费数据的速度,否则时间⼀⻓,会最终耗尽所有的可⽤堆内存空间。对于使⽤默认⼤⼩的LinkedBlockingQueue也是⼀样的。
阻塞队列的原理很简单,利⽤了Lock锁的多条件(Condition)阻塞控制。接下来我们分析ArrayBlockingQueue JDK 1.8 的源码。
⾸先是构造器,除了初始化队列的⼤⼩和是否是公平锁之外,还对同⼀个锁(lock)初始化了两个监视器,分别是notEmpty和notFull。这两个监视器的作⽤⽬前可以简单理解为标记分组,当该线程是put操作时,给他加上监视器notFull,标记这个线程是⼀个⽣产者;当线程是take操作时,给他加上监视器notEmpty,标记这个线程是消费者。
//数据元素数组 final Object[] items; //下⼀个待取出元素索引 int takeIndex; //下⼀个待添加元素索引 int putIndex; //元素个数 int count; //内部锁 final ReentrantLock lock; //消费者监视器 private final Condition notEmpty; //⽣产者监视器 private final Condition notFull; public ArrayBlockingQueue(int capacity, boolean fair) { //..省略其他代码 lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
put操作的源码
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; // 1.⾃旋拿锁 lock.lockInterruptibly(); try { // 2.判断队列是否满了 while (count == items.length) // 2.1如果满了,阻塞该线程,并标记为notFull线程, // 等待notFull的唤醒,唤醒之后继续执⾏while循环。 notFull.await(); // 3.如果没有满,则进⼊队列 enqueue(e); } finally { lock.unlock(); } } private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; // 4 唤醒⼀个等待的线程 notEmpty.signal(); }
总结put的流程:
take操作的源码
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
take操作和put操作的流程是类似的,总结⼀下take操作的流程:
注意:
public class Test1 { private int queueSize = 10; private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize); public static void main(String[] args) { Test1 t = new Test1(); Producer p = t.new Producer(); Consumer c = t.new Consumer(); p.start(); c.start(); } class Consumer extends Thread { @Override public void run() { consume(); } private void consume(){ while (true) { try { queue.take(); System.out.println("从队列取走一个元素,队里剩余:"+queue.size()); }catch (InterruptedException e) { e.printStackTrace(); } } } } class Producer extends Thread { @Override public void run() { produce(); } private void produce(){ while(true){ try{ queue.put(1); System.out.println("向队列中插入一个元素,队列剩余空间:"+queue.size()); }catch (Exception e){ e.printStackTrace(); } } } } }
下面是输出片段:
入一个元素,队列剩余空间:5 向队列中插入一个元素,队列剩余空间:1 向队列中插入一个元素,队列剩余空间:2 向队列中插入一个元素,队列剩余空间:2 向队列中插入一个元素,队列剩余空间:3 从队列取走一个元素,队里剩余:1 从队列取走一个元素,队里剩余:3 向队列中插入一个元素,队列剩余空间:4 向队列中插入一个元素,队列剩余空间:3 向队列中插入一个元素,队列剩余空间:4 从队列取走一个元素,队里剩余:2 从队列取走一个元素,队里剩余:4 向队列中插入一个元素,队列剩余空间:5 向队列中插入一个元素,队列剩余空间:4 向队列中插入一个元素,队列剩余空间:5 向队列中插入一个元素,队列剩余空间:6 向队列中插入一个元素,队列剩余空间:7
注意,这个例⼦中的输出结果看起来可能有问题,⽐如有⼏⾏在插⼊⼀个元素之后,队列的剩余空间不变。这是由于System.out.println语句没有锁。考虑到这样的情况:线程1在执⾏完put/take操作后⽴即失去CPU时间⽚,然后切换到线程2执⾏put/take操作,执⾏完毕后回到线程1的System.out.println语句并输出,发现这个
时候阻塞队列的size已经被线程2改变了,所以这个时候输出的size并不是当时线程1执⾏完put/take操作之后阻塞队列的size,但可以确保的是size不会超过10个。实际上使⽤阻塞队列是没有问题的。
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
TimeUnit unit,BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
Java中的线程池就是使⽤阻塞队列实现的,我们在了解阻塞队列之后,⽆论是使⽤Exectors类中已经提供的线程池,还是⾃⼰通过ThreadPoolExecutor实现线程池,都会更加得⼼应⼿,想要了解线程池的同学,可以看第⼗⼆章:线程池原理。
注:上⾯提到了⽣产者-消费者模式,⼤家可以参考⽣产者-消费者模型,可以更好的理解阻塞队列。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。