赞
踩
看了《Java并发编程的艺术》。通常只有ReentrantLock+Condition+数组。而关于迭代器的介绍很少,所在本篇补上。
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
阻塞队列提供了四种处理方法:
方法\处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | 不可用 | 不可用 |
JDK7提供了7个阻塞队列。分别是
ArrayBlockingQueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证访问者公平的访问队列,类图如下
如图ArrayBlockingQueue内部有个数组items用来存放队列元素,putindex下标标示入队元素下标,takeIndex是出队下标,count统计队列元素个数,从定义可知道并没有使用volatile修饰,这是因为访问这些变量使用都是在锁块内,并不存在可见性问题。另外有个独占锁lock用来对出入队操作加锁,这导致同时只有一个线程可以访问入队出队,另外notEmpty,notFull条件变量用来进行出入队的同步,也就是通知模式。属性如下:
- public class ArrayBlockingQueue<E> extends AbstractQueue<E>
- implements BlockingQueue<E>, java.io.Serializable {
-
- private static final long serialVersionUID = -817911632652898426L;
-
- /** The queued items */
- final Object[] items;
-
- /** items index for next take, poll, peek or remove *///记录下一个take、remove、peek的索引
- int takeIndex;
-
- /** items index for next put, offer, or add */ //记录下一个put、offer、add的索引
- int putIndex;
-
- /** Number of elements in the queue */ //队列中元素的个数
- int count;
-
- /** Main lock guarding all access */
- final ReentrantLock lock;
-
- /** Condition for waiting takes */
- private final Condition notEmpty;
-
- /** Condition for waiting puts */
- private final Condition notFull;

2.2 构造方法
- //只指定容量
- public ArrayBlockingQueue(int capacity) {
- this(capacity, false);
- }
- //指定容量和ReentrantLock是否公平
- public ArrayBlockingQueue(int capacity, boolean fair) {
- if (capacity <= 0)
- throw new IllegalArgumentException();
- this.items = new Object[capacity];
- lock = new ReentrantLock(fair);
- notEmpty = lock.newCondition();
- notFull = lock.newCondition();
- }
- //将集合中的元素初始化队列的元素
- public ArrayBlockingQueue(int capacity, boolean fair,
- Collection<? extends E> c) {
- this(capacity, fair);
-
- final ReentrantLock lock = this.lock;
- lock.lock(); // Lock only for visibility, not mutual exclusion
- try {
- int i = 0;
- try {
- for (E e : c) {
- checkNotNull(e);
- items[i++] = e;
- }
- } catch (ArrayIndexOutOfBoundsException ex) {
- throw new IllegalArgumentException();
- }
- count = i;
- putIndex = (i == capacity) ? 0 : i;
- } finally {
- lock.unlock();
- }
- }

构造方法主要使用容量对items数组完成初始化,所以是有届队列,fair参数用来构造一个公平的或不公平的ReentrantLock,默认是Lock为非公平锁。
在队尾插入元素,如果队列满则返回false,否者入队返回true。
- public boolean offer(E e) {
- checkNotNull(e);//非空检查
- final ReentrantLock lock = this.lock; //获取独占锁
- lock.lock();//加锁
- try {
- if (count == items.length) //如果队列满则返回false
- return false;
- else {
- enqueue(e); //否者插入元素
- return true; //返回true
- }
- } finally {
- lock.unlock(); //释放锁
- }
- }
- private void enqueue(E x) {
- // assert lock.getHoldCount() == 1;
- // assert items[putIndex] == null;
- final Object[] items = this.items;
- items[putIndex] = x; //元素入队
- if (++putIndex == items.length) //计算下一个元素应该存放的下标
- putIndex = 0;
- count++; //计数
- notEmpty.signal(); //通知有新增数据
- }

这里由于在操作共享变量前加了锁,所以不存在内存不可见问题,加过锁后获取的共享变量都是从主内存获取的,而不是在CPU缓存或者寄存器里面的值,释放锁后修改的共享变量值会刷新会主内存中。这个队列是使用循环数组实现,所以putindex需要判断是否满了。
另外入队后调用 notEmpty.signal();是为了激活调用notEmpty.await()阻塞后放入notEmpty条件队列中的线程。也就是之前说的通知模式,就是说生产者插入数据后,通知消费者当前队列可用。就是靠condition实现的。
与offer类似,在队列尾部添加元素,如果队列满则等待队列有空位置插入后返回
- public void put(E e) throws InterruptedException {
- checkNotNull(e);
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- while (count == items.length)
- notFull.await();
- enqueue(e);
- } finally {
- lock.unlock();
- }
- }
1. ArrayBlockingQueue不允许元素为null
从队头获取并移除元素,队列为空,则返回null。
- public E poll() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- return (count == 0) ? null : dequeue();
- } finally {
- lock.unlock();
- }
- }
-
- private E dequeue() {
- // assert lock.getHoldCount() == 1;
- // assert items[takeIndex] != null;
- final Object[] items = this.items;
- @SuppressWarnings("unchecked")
- //获取元素值
- E x = (E) items[takeIndex];
- items[takeIndex] = null; //将要取出的元素指向null
- //队头指针计算,
- if (++takeIndex == items.length)
- takeIndex = 0;
- count--;//队列元素个数减一
- if (itrs != null)
- itrs.elementDequeued(); //itrs也出队
- notFull.signal();//发送信号激活notFull条件队列里面的线程
- return x;
- }

其他的相对好理解,itrs.elementDequeued这里后面说。
take与poll类似, 从队头获取元素,如果队列为空则阻塞直到队列有元素。和put方法相互对应
- public E take() throws InterruptedException {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- while (count == 0)
- notEmpty.await();
- return dequeue();
- } finally {
- lock.unlock();
- }
- }
书上的例子就是put,take介绍通知模式。需要注意的是如果队列为空,当前线程会被挂起放到notEmpty的条件队列里面,直到入队操作执行调用notEmpty.signal后当前线程才会被激活,await才会返回。在看看《Java并发编程的艺术》对await实现原理:await()主要通过的 LockSupport.park(this);来实现.这里可以去看Condition
- public final void await() throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- Node node = addConditionWaiter();
- int savedState = fullyRelease(node);
- int interruptMode = 0;
- while (!isOnSyncQueue(node)) {
- LockSupport.park(this);
- if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
- break;
- }
- if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- interruptMode = REINTERRUPT;
- if (node.nextWaiter != null) // clean up if cancelled
- unlinkCancelledWaiters();
- if (interruptMode != 0)
- reportInterruptAfterWait(interruptMode);
- }

继续进入源码,发现调用setBlocker先保存下将要阻塞的线程,然后调用unsafe.park阻塞当前线程。
- public static void park(Object blocker) {
- Thread t = Thread.currentThread();
- setBlocker(t, blocker);
- UNSAFE.park(false, 0L);
- setBlocker(t, null);
- }
unsafe.park是个native方法,代码如下:
public native void park(boolean isAbsolute, long time);
park这个方法会阻塞当前线程,只有以下四种情况中的一种发生时,该方法才会返回。
当然书上作者还介绍了JVM如何实现的park。不懂也就不贴了。看dump文件里面线程状态会有见到java.lang.Thread.State: WAITING (parking)
限于篇幅,peek,size不多说了。
peek 返回队列头元素但不移除该元素,队列为空,返回null
- public int size() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- return count;
- } finally {
- lock.unlock();
- }
- }
书上重点是介绍了基于condition的通知模式。但是对比源码会发现,put数据相对好理解。take数据dequeue里面的itrs出队。为啥加数据没有反而取数据会有?总觉得第一眼看上去ArrayBlockingQueue基于condition,必将平淡。还是有一种奇特的设计。我们从上面阅读类的源码来看,它是线程安全的。另一方面,ArrayBlockingQueue里面数组下标是循环利用的,可以理解为是条循环队列。 一开始迭代器是创建时固定位置,队列则可能在不断的出入队列,这样迭代器会受到严重影响(迭代器的位置不对),所以为了保证操作的正确性,当队列有一个或多个迭代器的时候,其通过以下手段保持状态:
跟踪循环的次数。即 takeIndex为0的次数。
每当删除一个内部元素时,通过回调通知所有迭代器(因此其他元素也可以移动)。
- private class Itr implements Iterator<E> {
- /** Index to look for new nextItem; NONE at end */ //主要指向下一个元素
- private int cursor;
-
- /** Element to be returned by next call to next(); null if none */ //next返回的下一个元素
- private E nextItem;
-
- /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */ //nextItem的index
- private int nextIndex;
-
- /** Last element returned; null if none or not detached. */ //最后一个元素
- private E lastItem;
-
- /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */ //最后一个元素的索引
- private int lastRet;
-
- /** Previous value of takeIndex, or DETACHED when detached */ //takeIndex的前一个位置
- private int prevTakeIndex;
-
- /** Previous value of iters.cycles */ //itrs监控前一个的循环数量cycles的值
- private int prevCycles;
-
- /** Special index value indicating "not available" or "undefined" */ //none模式,代表节点不存在或者没有
- private static final int NONE = -1;
-
- private static final int REMOVED = -2; //说明当前节点被其他线程调用remove模式删除了
-
- /** Special value for prevTakeIndex indicating "detached mode" */ //说明处于detached模式
- private static final int DETACHED = -3;

- Itr() {
- lastRet = NONE; //最后一个索引为NONE
- final ReentrantLock lock = ArrayBlockingQueue.this.lock; //获取外部类的锁。
- lock.lock(); //加锁
- try {
- if (count == 0) { //当队列里面实际是没有数据的
- cursor = NONE;
- nextIndex = NONE;
- prevTakeIndex = DETACHED;
- } else {
- final int takeIndex = ArrayBlockingQueue.this.takeIndex;
- prevTakeIndex = takeIndex;
- nextItem = itemAt(nextIndex = takeIndex);
- cursor = incCursor(takeIndex);
- if (itrs == null) {
- itrs = new Itrs(this);
- } else {
- itrs.register(this); // in this order
- itrs.doSomeSweeping(false); //清理无用的迭代器
- }
- prevCycles = itrs.cycles;
- }
- } finally {
- lock.unlock();
- }
- }

它的构造方法如上, count等于0的时候,就说明队列里面没有数据,那么创建的这个迭代器是个无用的迭代器,可以直接移除,进入detach模式。否则就把当前队列的读取位置给迭代器当做下一个元素,cursor存储下个元素的位置。
而doSomeSweeping主要用来清理无用的迭代器。在迭代器创建和detach的时候会触发。sweeper字段就是记录上次扫描到的位置。如果为null,就从链表头开始扫描,有就从其下一个开始扫描。如果找到了一个被回收了或者是耗尽的迭代器,就清理掉它,继续找下一个。这就完成了对无效迭代器的清理了。下面看看它的主要代码:
- void doSomeSweeping(boolean tryHarder) {
- int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES; //判断要尝试几次去清扫。
- Node o, p;
- final Node sweeper = this.sweeper;
- boolean passedGo; // to limit search to one full sweep
-
- if (sweeper == null) { //初始化o,p,以及passedGo
- o = null;
- p = head;
- passedGo = true;
- } else {
- o = sweeper;
- p = o.next;
- passedGo = false;
- }
-
- for (; probes > 0; probes--) { //循环次数。
- if (p == null) {
- if (passedGo)
- break;
- o = null;
- p = head;
- passedGo = true;
- }
- final Itr it = p.get();
- final Node next = p.next;
- if (it == null || it.isDetached()) { //这个iterator是null,或者已经处于detached模式了。需要被清理的迭代器
- // found a discarded/exhausted iterator
- probes = LONG_SWEEP_PROBES; // "try harder"
- // unlink p 清理
- p.clear();
- p.next = null;
- if (o == null) { //说明是第一个迭代器
- head = next;
- if (next == null) { //itrs里面是空的了。
- // We've run out of iterators to track; retire
- itrs = null;
- return;
- }
- }
- else
- o.next = next; //o指向前一个清扫过的p
- } else {
- o = p; //把p赋值给o,
- }
- p = next; //p往后面串一个。
- }
-
- this.sweeper = (p == null) ? null : o; //判断p,并给sweeper赋值。
- }

下面主要看负责管理Iterator的Itrs类。
- class Itrs {
-
- /**
- * Node in a linked list of weak iterator references.
- */
- private class Node extends WeakReference<Itr> {
- Node next; //指向下一个节点
-
- Node(Itr iterator, Node next) {
- super(iterator);
- this.next = next;
- }
- }
-
- /**记录循环的次数,当take下标到0的时候为一个循环 cycle+1 */
- int cycles = 0;
-
- /** Linked list of weak iterator references */ //头节点head
- private Node head;
-
- /** Used to expunge stale iterators *///用来去删除废弃的iterators。
- private Node sweeper = null;
- //尝试次数
- private static final int SHORT_SWEEP_PROBES = 4;
- private static final int LONG_SWEEP_PROBES = 16;

里面每个Iterator被一个Node节点封装,而每个Node又是一个弱引用(WeakReference).我们再来看看之前提到的take调用了dequeue里面的itrs.elementDequeued();
- /**
- * 当元素出队列的时候调用的方法这个出队列方法
- */
- void elementDequeued() {
- // 在队列为空的时候调用清空所有的迭代器;
- if (count == 0)
- queueIsEmpty();
- // 当拿元素进行循环的时候,清理所有过期的迭代器
- else if (takeIndex == 0)
- takeIndexWrapped();
- }
当count为0时候,调用queueIsEmpty:
- void queueIsEmpty() {
- // assert lock.getHoldCount() == 1;
- for (Node p = head; p != null; p = p.next) {
- Itr it = p.get();
- if (it != null) {
- p.clear();
- it.shutdown();
- }
- }
- head = null;
- itrs = null;
- }
而在queueIsEmpty 里面,则需要把itrs里面的所有node遍历,如果此时里面的某一个iterator不为null,调用shutdown方法,shutdown方法里面则是把Iterator里面的状态标志初始化:
- void shutdown() {
- // assert lock.getHoldCount() == 1;
- cursor = NONE;
- if (nextIndex >= 0)
- nextIndex = REMOVED;
- if (lastRet >= 0) {
- lastRet = REMOVED;
- lastItem = null;
- }
- prevTakeIndex = DETACHED;
- // Don't set nextItem to null because we must continue to be
- // able to return it on next().
- //
- // Caller will unlink from itrs when convenient.
- }
elementDequeued第一个分支结束了。再看看第二个分支条件:从外部类的takeIndex 判断是否为0,从而判断是否能够拿东西(或者循环了一圈回到原点),如果不能拿,则调用takeIndexWrapped 方法:
-
- /**
- * 因为takeIndex等于0了,意味着开始下一个循环了.
- * 然后通知所有的迭代器,删除无用的迭代器。
- */
- void takeIndexWrapped() {
- //循环了一次cycle加1
- cycles++;
- for (Node o = null, p = head; p != null;) {
- final Itr it = p.get();
- final Node next = p.next;
- //需要清理的条件,和清理代码
- if (it == null || it.takeIndexWrapped()) {
- p.clear();
- p.next = null;
- if (o == null)
- head = next;
- else
- o.next = next;
- } else {
- o = p;
- }
- p = next;
- }
- //没有迭代器了,就关掉迭代器的集合
- if (head == null) // no more iterators to track
- itrs = null;
- }

说到有界阻塞队列,很多都是只介绍了常见的方法,对于迭代器的介绍很少。需要在补充一下。
参考:http://ifeve.com/java-blocking-queue/
https://blog.csdn.net/anla_/article/details/78993297
https://blog.csdn.net/wx_vampire/article/details/79585794
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。