当前位置:   article > 正文

java并发系列:阻塞队列(1) ArrayBlockingQueue_arrayblockingqueue满了

arrayblockingqueue满了

 一 阻塞队列

   看了《Java并发编程的艺术》。通常只有ReentrantLock+Condition+数组。而关于迭代器的介绍很少,所在本篇补上。

     阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

阻塞队列提供了四种处理方法:

方法\处理方式抛出异常返回特殊值一直阻塞超时退出
插入方法add(e)offer(e)put(e)offer(e,time,unit)
移除方法remove()poll()take()poll(time,unit)
检查方法element()peek()不可用不可用
  • 抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。
  • 返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null
  • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
  • 超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。

JDK7提供了7个阻塞队列。分别是

  • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
  • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

二 ArrayBlockingQueue 类

2.1 属性

  ArrayBlockingQueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证访问者公平的访问队列,类图如下

如图ArrayBlockingQueue内部有个数组items用来存放队列元素,putindex下标标示入队元素下标,takeIndex是出队下标,count统计队列元素个数,从定义可知道并没有使用volatile修饰,这是因为访问这些变量使用都是在锁块内,并不存在可见性问题。另外有个独占锁lock用来对出入队操作加锁,这导致同时只有一个线程可以访问入队出队,另外notEmpty,notFull条件变量用来进行出入队的同步,也就是通知模式。属性如下:

  1. public class ArrayBlockingQueue<E> extends AbstractQueue<E>
  2. implements BlockingQueue<E>, java.io.Serializable {
  3. private static final long serialVersionUID = -817911632652898426L;
  4. /** The queued items */
  5. final Object[] items;
  6. /** items index for next take, poll, peek or remove *///记录下一个take、remove、peek的索引
  7. int takeIndex;
  8. /** items index for next put, offer, or add */ //记录下一个put、offer、add的索引
  9. int putIndex;
  10. /** Number of elements in the queue */ //队列中元素的个数
  11. int count;
  12. /** Main lock guarding all access */
  13. final ReentrantLock lock;
  14. /** Condition for waiting takes */
  15. private final Condition notEmpty;
  16. /** Condition for waiting puts */
  17. private final Condition notFull;

2.2 构造方法

  1. //只指定容量
  2. public ArrayBlockingQueue(int capacity) {
  3. this(capacity, false);
  4. }
  5. //指定容量和ReentrantLock是否公平
  6. public ArrayBlockingQueue(int capacity, boolean fair) {
  7. if (capacity <= 0)
  8. throw new IllegalArgumentException();
  9. this.items = new Object[capacity];
  10. lock = new ReentrantLock(fair);
  11. notEmpty = lock.newCondition();
  12. notFull = lock.newCondition();
  13. }
  14. //将集合中的元素初始化队列的元素
  15. public ArrayBlockingQueue(int capacity, boolean fair,
  16. Collection<? extends E> c) {
  17. this(capacity, fair);
  18. final ReentrantLock lock = this.lock;
  19. lock.lock(); // Lock only for visibility, not mutual exclusion
  20. try {
  21. int i = 0;
  22. try {
  23. for (E e : c) {
  24. checkNotNull(e);
  25. items[i++] = e;
  26. }
  27. } catch (ArrayIndexOutOfBoundsException ex) {
  28. throw new IllegalArgumentException();
  29. }
  30. count = i;
  31. putIndex = (i == capacity) ? 0 : i;
  32. } finally {
  33. lock.unlock();
  34. }
  35. }
构造方法主要使用容量对items数组完成初始化,所以是有届队列,fair参数用来构造一个公平的或不公平的ReentrantLock,默认是Lock为非公平锁。

三 offer

在队尾插入元素,如果队列满则返回false,否者入队返回true。

  1. public boolean offer(E e) {
  2. checkNotNull(e);//非空检查
  3. final ReentrantLock lock = this.lock; //获取独占锁
  4. lock.lock();//加锁
  5. try {
  6. if (count == items.length) //如果队列满则返回false
  7. return false;
  8. else {
  9. enqueue(e); //否者插入元素
  10. return true; //返回true
  11. }
  12. } finally {
  13. lock.unlock(); //释放锁
  14. }
  15. }
  16. private void enqueue(E x) {
  17. // assert lock.getHoldCount() == 1;
  18. // assert items[putIndex] == null;
  19. final Object[] items = this.items;
  20. items[putIndex] = x; //元素入队
  21. if (++putIndex == items.length) //计算下一个元素应该存放的下标
  22. putIndex = 0;
  23. count++; //计数
  24. notEmpty.signal(); //通知有新增数据
  25. }

     这里由于在操作共享变量前加了锁,所以不存在内存不可见问题,加过锁后获取的共享变量都是从主内存获取的,而不是在CPU缓存或者寄存器里面的值,释放锁后修改的共享变量值会刷新会主内存中。这个队列是使用循环数组实现,所以putindex需要判断是否满了。

     另外入队后调用 notEmpty.signal();是为了激活调用notEmpty.await()阻塞后放入notEmpty条件队列中的线程。也就是之前说的通知模式,就是说生产者插入数据后,通知消费者当前队列可用。就是靠condition实现的。

四 put

与offer类似,在队列尾部添加元素,如果队列满则等待队列有空位置插入后返回

  1. public void put(E e) throws InterruptedException {
  2. checkNotNull(e);
  3. final ReentrantLock lock = this.lock;
  4. lock.lockInterruptibly();
  5. try {
  6. while (count == items.length)
  7. notFull.await();
  8. enqueue(e);
  9. } finally {
  10. lock.unlock();
  11. }
  12. }
 1. ArrayBlockingQueue不允许元素为null 
2. ArrayBlockingQueue在队列已满时将会调用notFull的await()方法释放锁并处于阻塞状态,直到出队操作调用了notFull.signal方法激活该线程。
3. 一旦ArrayBlockingQueue不为满的状态,就将元素入队.
   不同的是lock.lockInterruptibly(); 要思考为啥不用lock?

五 poll

  从队头获取并移除元素,队列为空,则返回null。

  1. public E poll() {
  2. final ReentrantLock lock = this.lock;
  3. lock.lock();
  4. try {
  5. return (count == 0) ? null : dequeue();
  6. } finally {
  7. lock.unlock();
  8. }
  9. }
  10. private E dequeue() {
  11. // assert lock.getHoldCount() == 1;
  12. // assert items[takeIndex] != null;
  13. final Object[] items = this.items;
  14. @SuppressWarnings("unchecked")
  15. //获取元素值
  16. E x = (E) items[takeIndex];
  17. items[takeIndex] = null; //将要取出的元素指向null
  18. //队头指针计算,
  19. if (++takeIndex == items.length)
  20. takeIndex = 0;
  21. count--;//队列元素个数减一
  22. if (itrs != null)
  23. itrs.elementDequeued(); //itrs也出队
  24. notFull.signal();//发送信号激活notFull条件队列里面的线程
  25. return x;
  26. }

其他的相对好理解,itrs.elementDequeued这里后面说。

六 take

    take与poll类似, 从队头获取元素,如果队列为空则阻塞直到队列有元素。和put方法相互对应

  1. public E take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. lock.lockInterruptibly();
  4. try {
  5. while (count == 0)
  6. notEmpty.await();
  7. return dequeue();
  8. } finally {
  9. lock.unlock();
  10. }
  11. }

书上的例子就是put,take介绍通知模式。需要注意的是如果队列为空,当前线程会被挂起放到notEmpty的条件队列里面,直到入队操作执行调用notEmpty.signal后当前线程才会被激活,await才会返回。在看看《Java并发编程的艺术》对await实现原理:await()主要通过的 LockSupport.park(this);来实现.这里可以去看Condition

  1. public final void await() throws InterruptedException {
  2. if (Thread.interrupted())
  3. throw new InterruptedException();
  4. Node node = addConditionWaiter();
  5. int savedState = fullyRelease(node);
  6. int interruptMode = 0;
  7. while (!isOnSyncQueue(node)) {
  8. LockSupport.park(this);
  9. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  10. break;
  11. }
  12. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  13. interruptMode = REINTERRUPT;
  14. if (node.nextWaiter != null) // clean up if cancelled
  15. unlinkCancelledWaiters();
  16. if (interruptMode != 0)
  17. reportInterruptAfterWait(interruptMode);
  18. }

继续进入源码,发现调用setBlocker先保存下将要阻塞的线程,然后调用unsafe.park阻塞当前线程。

  1. public static void park(Object blocker) {
  2. Thread t = Thread.currentThread();
  3. setBlocker(t, blocker);
  4. UNSAFE.park(false, 0L);
  5. setBlocker(t, null);
  6. }

   unsafe.park是个native方法,代码如下:

public native void park(boolean isAbsolute, long time);
 park这个方法会阻塞当前线程,只有以下四种情况中的一种发生时,该方法才会返回。
      与park对应的unpark执行或已经执行时。注意:已经执行是指unpark先执行,然后再执行的park。
      线程被中断时。
      如果参数中的time不是零,等待了指定的毫秒数时。
      发生异常现象时。这些异常事先无法确定。

当然书上作者还介绍了JVM如何实现的park。不懂也就不贴了。看dump文件里面线程状态会有见到java.lang.Thread.State: WAITING (parking)

限于篇幅,peek,size不多说了。

peek 返回队列头元素但不移除该元素,队列为空,返回null

  1. public int size() {
  2. final ReentrantLock lock = this.lock;
  3. lock.lock();
  4. try {
  5. return count;
  6. } finally {
  7. lock.unlock();
  8. }
  9. }

七 迭代器

书上重点是介绍了基于condition的通知模式。但是对比源码会发现,put数据相对好理解。take数据dequeue里面的itrs出队。为啥加数据没有反而取数据会有?总觉得第一眼看上去ArrayBlockingQueue基于condition,必将平淡。还是有一种奇特的设计。我们从上面阅读类的源码来看,它是线程安全的。另一方面,ArrayBlockingQueue里面数组下标是循环利用的,可以理解为是条循环队列。 一开始迭代器是创建时固定位置,队列则可能在不断的出入队列,这样迭代器会受到严重影响(迭代器的位置不对),所以为了保证操作的正确性,当队列有一个或多个迭代器的时候,其通过以下手段保持状态:

        跟踪循环的次数。即 takeIndex为0的次数。
        每当删除一个内部元素时,通过回调通知所有迭代器(因此其他元素也可以移动)。

  1. private class Itr implements Iterator<E> {
  2. /** Index to look for new nextItem; NONE at end */ //主要指向下一个元素
  3. private int cursor;
  4. /** Element to be returned by next call to next(); null if none */ //next返回的下一个元素
  5. private E nextItem;
  6. /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */ //nextItem的index
  7. private int nextIndex;
  8. /** Last element returned; null if none or not detached. */ //最后一个元素
  9. private E lastItem;
  10. /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */ //最后一个元素的索引
  11. private int lastRet;
  12. /** Previous value of takeIndex, or DETACHED when detached */ //takeIndex的前一个位置
  13. private int prevTakeIndex;
  14. /** Previous value of iters.cycles */ //itrs监控前一个的循环数量cycles的值
  15. private int prevCycles;
  16. /** Special index value indicating "not available" or "undefined" */ //none模式,代表节点不存在或者没有
  17. private static final int NONE = -1;
  18. private static final int REMOVED = -2; //说明当前节点被其他线程调用remove模式删除了
  19. /** Special value for prevTakeIndex indicating "detached mode" */ //说明处于detached模式
  20. private static final int DETACHED = -3
  1. Itr() {
  2. lastRet = NONE; //最后一个索引为NONE
  3. final ReentrantLock lock = ArrayBlockingQueue.this.lock; //获取外部类的锁。
  4. lock.lock(); //加锁
  5. try {
  6. if (count == 0) { //当队列里面实际是没有数据的
  7. cursor = NONE;
  8. nextIndex = NONE;
  9. prevTakeIndex = DETACHED;
  10. } else {
  11. final int takeIndex = ArrayBlockingQueue.this.takeIndex;
  12. prevTakeIndex = takeIndex;
  13. nextItem = itemAt(nextIndex = takeIndex);
  14. cursor = incCursor(takeIndex);
  15. if (itrs == null) {
  16. itrs = new Itrs(this);
  17. } else {
  18. itrs.register(this); // in this order
  19. itrs.doSomeSweeping(false); //清理无用的迭代器
  20. }
  21. prevCycles = itrs.cycles;
  22. }
  23. } finally {
  24. lock.unlock();
  25. }
  26. }
它的构造方法如上, count等于0的时候,就说明队列里面没有数据,那么创建的这个迭代器是个无用的迭代器,可以直接移除,进入detach模式。否则就把当前队列的读取位置给迭代器当做下一个元素,cursor存储下个元素的位置。 

  而doSomeSweeping主要用来清理无用的迭代器。在迭代器创建和detach的时候会触发。sweeper字段就是记录上次扫描到的位置。如果为null,就从链表头开始扫描,有就从其下一个开始扫描。如果找到了一个被回收了或者是耗尽的迭代器,就清理掉它,继续找下一个。这就完成了对无效迭代器的清理了。下面看看它的主要代码:

  1. void doSomeSweeping(boolean tryHarder) {
  2. int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES; //判断要尝试几次去清扫。
  3. Node o, p;
  4. final Node sweeper = this.sweeper;
  5. boolean passedGo; // to limit search to one full sweep
  6. if (sweeper == null) { //初始化o,p,以及passedGo
  7. o = null;
  8. p = head;
  9. passedGo = true;
  10. } else {
  11. o = sweeper;
  12. p = o.next;
  13. passedGo = false;
  14. }
  15. for (; probes > 0; probes--) { //循环次数。
  16. if (p == null) {
  17. if (passedGo)
  18. break;
  19. o = null;
  20. p = head;
  21. passedGo = true;
  22. }
  23. final Itr it = p.get();
  24. final Node next = p.next;
  25. if (it == null || it.isDetached()) { //这个iterator是null,或者已经处于detached模式了。需要被清理的迭代器
  26. // found a discarded/exhausted iterator
  27. probes = LONG_SWEEP_PROBES; // "try harder"
  28. // unlink p 清理
  29. p.clear();
  30. p.next = null;
  31. if (o == null) { //说明是第一个迭代器
  32. head = next;
  33. if (next == null) { //itrs里面是空的了。
  34. // We've run out of iterators to track; retire
  35. itrs = null;
  36. return;
  37. }
  38. }
  39. else
  40. o.next = next; //o指向前一个清扫过的p
  41. } else {
  42. o = p; //把p赋值给o,
  43. }
  44. p = next; //p往后面串一个。
  45. }
  46. this.sweeper = (p == null) ? null : o; //判断p,并给sweeper赋值。
  47. }

下面主要看负责管理Iterator的Itrs类。

  1. class Itrs {
  2. /**
  3. * Node in a linked list of weak iterator references.
  4. */
  5. private class Node extends WeakReference<Itr> {
  6. Node next; //指向下一个节点
  7. Node(Itr iterator, Node next) {
  8. super(iterator);
  9. this.next = next;
  10. }
  11. }
  12. /**记录循环的次数,当take下标到0的时候为一个循环 cycle+1 */
  13. int cycles = 0;
  14. /** Linked list of weak iterator references */ //头节点head
  15. private Node head;
  16. /** Used to expunge stale iterators *///用来去删除废弃的iterators。
  17. private Node sweeper = null;
  18. //尝试次数
  19. private static final int SHORT_SWEEP_PROBES = 4;
  20. private static final int LONG_SWEEP_PROBES = 16;

里面每个Iterator被一个Node节点封装,而每个Node又是一个弱引用(WeakReference).我们再来看看之前提到的take调用了dequeue里面的itrs.elementDequeued();

  1.     /** 
  2.      * 当元素出队列的时候调用的方法这个出队列方法 
  3.      */  
  4.     void elementDequeued() {  
  5.         // 在队列为空的时候调用清空所有的迭代器;  
  6.         if (count == 0)  
  7.             queueIsEmpty();  
  8.         // 当拿元素进行循环的时候,清理所有过期的迭代器  
  9.         else if (takeIndex == 0)  
  10.             takeIndexWrapped();  
  11.     }  

当count为0时候,调用queueIsEmpty:

  1. void queueIsEmpty() {
  2. // assert lock.getHoldCount() == 1;
  3. for (Node p = head; p != null; p = p.next) {
  4. Itr it = p.get();
  5. if (it != null) {
  6. p.clear();
  7. it.shutdown();
  8. }
  9. }
  10. head = null;
  11. itrs = null;
  12. }

而在queueIsEmpty 里面,则需要把itrs里面的所有node遍历,如果此时里面的某一个iterator不为null,调用shutdown方法,shutdown方法里面则是把Iterator里面的状态标志初始化:

  1. void shutdown() {
  2. // assert lock.getHoldCount() == 1;
  3. cursor = NONE;
  4. if (nextIndex >= 0)
  5. nextIndex = REMOVED;
  6. if (lastRet >= 0) {
  7. lastRet = REMOVED;
  8. lastItem = null;
  9. }
  10. prevTakeIndex = DETACHED;
  11. // Don't set nextItem to null because we must continue to be
  12. // able to return it on next().
  13. //
  14. // Caller will unlink from itrs when convenient.
  15. }

elementDequeued第一个分支结束了。再看看第二个分支条件:从外部类的takeIndex 判断是否为0,从而判断是否能够拿东西(或者循环了一圈回到原点),如果不能拿,则调用takeIndexWrapped 方法:

  1.   
  2.     /** 
  3.  * 因为takeIndex等于0了,意味着开始下一个循环了. 
  4.  * 然后通知所有的迭代器,删除无用的迭代器。 
  5.  */  
  6. void takeIndexWrapped() {  
  7.     //循环了一次cycle加1  
  8.     cycles++;  
  9.     for (Node o = null, p = head; p != null;) {  
  10.         final Itr it = p.get();  
  11.         final Node next = p.next;  
  12.         //需要清理的条件,和清理代码  
  13.         if (it == null || it.takeIndexWrapped()) {  
  14.             p.clear();  
  15.             p.next = null;  
  16.             if (o == null)  
  17.                 head = next;  
  18.             else  
  19.                 o.next = next;  
  20.         } else {  
  21.             o = p;  
  22.         }  
  23.         p = next;  
  24.     }  
  25.     //没有迭代器了,就关掉迭代器的集合  
  26.     if (head == null)   // no more iterators to track  
  27.         itrs = null;  
  28. }  

说到有界阻塞队列,很多都是只介绍了常见的方法,对于迭代器的介绍很少。需要在补充一下。


参考:http://ifeve.com/java-blocking-queue/

https://blog.csdn.net/anla_/article/details/78993297

https://blog.csdn.net/wx_vampire/article/details/79585794

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/976152
推荐阅读
相关标签
  

闽ICP备14008679号