当前位置:   article > 正文

Java高并发之阻塞队列(什么是阻塞队列、4对操作、7种阻塞队列、实现原理)_两个阻塞队列

两个阻塞队列
1. 阻塞队列概述
① 什么是阻塞队列
  • 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列,这两个附加的操作支持阻塞的插入和移除方法
  1. 支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,直到队列不满
  2. 支持阻塞的移除方法:当队列为空时,获取元素的线程会等待队列变为非空
  • 阻塞队列常用于生产者和消费者场景生产者向队列里添加元素的线程消费者从队列里获取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器
② 两个附加操作的4种处理方式
  • 在阻塞队列不可用时,这两个附加操作提供了4种处理方式,如下:
方法/处理方式抛出异常返回特殊值一直阻塞超时退出
插入方法add(e)offer(e)put(e)offer(time, unit)
移除方法remove()poll()take()poll(time, unit)
检查方法element()peek()不可用不可用
  • 抛出异常: 当队列满时,如果再往队列里插入元素(再执行add(e)操作),会抛出IllegalArgumentException异常。当队列空时,从队列里获取元素(执行element()操作)会抛出NoSuchElementException异常。
  • 返回特殊值: 当往队列插入元素时(执行offer(e)操作),会返回元素是否插入成功,成功返回true。如果是移除方法(执行poll()操作或者peek()操作),则是从队列里取出一个元素,如果没有则返回null
  • 一直阻塞: 当队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者从队列里take元素,队列会阻塞住消费者线程,直到队列不为空
  • 超时退出: 当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。当队列为空时,队列会阻塞消费者线程一段时间,如果超过一定的时间,消费者线程会退出。
  • CachedThreadPool中使用的SynchronousQueue中,就是使用的poll(time,unit)方法从队列中取任务。
  • 注意:NoSuchElementException异常是由element()方法抛出的,它将peek()方法进行了封装。peek()方法要么返回队列头部元素,要么返回null。当peek()方法返回null时,element()返回NoSuchElementException异常。
public E element() {
    E x = peek();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
2. Java中的7种阻塞队列
① ArrayBlockingQueue
  • ArrayBlockingQueue 是一个用数组实现有界阻塞队列,此队列按照先进先出(FIFO) 的原则对元素进行排序。构造方法如下:
public ArrayBlockingQueue(int capacity) {}
public ArrayBlockingQueue(int capacity, boolean fair) {}
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {}
  • 1
  • 2
  • 3
  1. 参数fair用于设置线程是否公平访问队列,默认值为false,即指非公平地访问队列。
  2. 参数capacity用于设置ArrayBlockingQueue大小。如果capacity <= 0, 会抛出IllegalArgumentException异常。
  • 什么叫公平访问队列?
  1. 所谓公平访问是指阻塞的线程可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列
  2. 非公平是对先等待的线程是非公平的。当队列可用时,阻塞的线程都可以争夺访问队列的资格有可能先阻塞的线程最后才访问队列
  3. 为了保证公平性,通常会降低吞吐量。因此,参数fair的默认值为false
  • 访问者的公平性使用ReentrantLock实现:
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 只有一个锁lock,是一种可重入锁ReentrantLock。不管是向队列中添加元素,还是获取元素,都使用先通过lock.lock()加锁,最后在finally语句块中通过lock.unlock()解锁。
② LinkedBlockingQueue
  • LinkedBlockingQueue是一个用链表实现有界阻塞队列,此队列按照先进先出(FIFO) 的原则对元素进行排序。
  • 此队列的默认和最大容量为Integer.MAX_VALUE实际上被看做是无界阻塞队列
  • LinkedBlockingQueue的构造方法如下:
public LinkedBlockingQueue() {}
public LinkedBlockingQueue(int capacity) {}
public LinkedBlockingQueue(Collection<? extends E> c) {}
  • 1
  • 2
  • 3
  1. 无参构造函数创建的LinkedBlockingQueue,capacity为默认值Integer.MAX_VALUE
  2. 也可以指定LinkedBlockingQueue的容量,也可以在创建时就向LinkedBlockingQueue中添加元素。
  • LinkedBlockingQueueputLocktakeLock两个锁,都是可重入锁ReentrantLock。当往队列中添加元素时,使用putLock;从队列中获取元素时,使用takeLock。都是先进行加锁(lock()),再在finally方法中进行解锁(unlock())。
③ PriorityBlockingQueue
  • PriorityBlockingQueue是一个支持优先级无界阻塞队列,默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来进行排序。
  • 注意: 不能保证同优先级元素的顺序,在ScheduledFutureTask中为了避免这种情况,除了依靠time进行优先级排序,还依靠sequenceNumber进行优先级排序。
  • PriorityBlockingQueue的构造方法如下:
public PriorityBlockingQueue() {}
public PriorityBlockingQueue(int initialCapacity){}
public PriorityBlockingQueue(Collection<? extends E> c) {}
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) {
  • 1
  • 2
  • 3
  • 4
  1. 使用无参构造函数创建时,默认初始化大小DEFAULT_INITIAL_CAPACITY = 11
  2. 好奇?明明是一个无界阻塞队列,为何可以指定大小?原来使用size 记录中的元素个数,当size >= queue.length时,会使用tryGrow()方法进行扩容。
while ((n = size) >= (cap = (array = queue).length))
     tryGrow(array, cap);
  • 1
  • 2
  • 只有一个锁lock,是一种可重入锁ReentrantLock。不管是向队列中添加元素,还是获取元素,都使用先通过lock.lock()加锁,最后在finally语句块中通过lock.unlock()解锁。
④ DelayQueue
  • DelayQueue 是一个支持延时获取元素无界阻塞队列,队列使用 PriorityQueue 来实现。
  • DelayQueue 的构造方法如下:
public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {}
  • 1
  • 2
  1. DelayQueue的核心属性为PriorityQueue的实例q,用它可以按照某种优先级对元素排序
  2. DelayQueue使用无参构造函数创建对象时,是有默认大小的。这是由实例q决定的,DEFAULT_INITIAL_CAPACITY = 11
  3. 当向DelayQueue中添加元素时,实际是向PriorityQueue中添加元素,会调用PriorityQueue的相应方法。当遇到size >= queue.length时,会使用grow(size+1)方法对队列进行扩容。
  • 只有一个锁lock,是一种可重入锁ReentrantLock。不管是向队列中添加元素,还是获取元素,都使用先通过lock.lock()加锁,最后在finally语句块中通过lock.unlock()解锁。
  • 队列中的元素必须实现 Delayed 接口,并且实现 compareTo ()方法指定元素的顺序
  • 为什么要实现Delayed接口
  1. ScheduledThreadPoolExecutor中的ScheduledFutureTask类,实现了RunnableScheduledFuture接口
  2. RunnableScheduledFuture接口继承了ScheduledFuture接口,而ScheduledFuture接口继承了Delayed接口
  3. 所以ScheduledFutureTask类需要实现 Delayed 接口
  • ScheduledFutureTask类有如下属性:
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    super(r, result);
    this.time = ns; // 任务下一次执行的具体时间
    this.period = period;  // 任务执行的时间间隔
    this.sequenceNumber = sequencer.getAndIncrement(); // 任务在ScheduledThreadPoolExecutor中的序号
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • Delayed 接口只有一个getDelay(unit)方法ScheduledFutureTask类实现了该方法。虽然传入了时间unit,但是内部直接使用的NANOSECONDS(纳秒)
public long getDelay(TimeUnit unit) {
    return unit.convert(time - now(), NANOSECONDS);
}
  • 1
  • 2
  • 3
  • compareTo()方法的代码如下:
  1. 比较二者是否为同一对象,如果是同一对象直接返回0.
  2. 接着比较time大小,如果this.time > other.time返回1,小于返回-1,如果相等则继续比较sequenceNumber的大小
  3. 如果this.sequenceNumber > other.sequenceNumber返回1,小于返回-1,相等的话,继续比较二者的延迟时间
  4. this.getDelay > other.getDelay返回1,小于返回-1,相等返回0。
public int compareTo(Delayed other) {
     if (other == this) // compare zero if same object
         return 0;
     if (other instanceof ScheduledFutureTask) {
         ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
         long diff = time - x.time;
         if (diff < 0)
             return -1;
         else if (diff > 0)
             return 1;
         else if (sequenceNumber < x.sequenceNumber)
             return -1;
         else
             return 1;
     }
     long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
     return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • siftUp方法中,将需要较早执行的元素向队列头部移动。如果发现该元素的不是较早执行的(key.compareTo(e) >= 0),直接退出循环。
private void siftUp(int k, RunnableScheduledFuture<?> key) {
    while (k > 0) {
        ...
        if (key.compareTo(e) >= 0)
            break;
        ...
    }
    ...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 我们可以将 DelayQueue 运用在以下应用场景:
  1. 缓存系统的设计: 可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询 DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了
  2. 定时任务调度: 使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从 DelayQueue 中获取到任务就开始执行,比如 TimerQueue 就是使用 DelayQueue 实现的。
⑤ SynchronousQueue
  • SynchronousQueue 是一个不存储元素的阻塞队列,有如下特点:
  1. 每一个添加元素的操作必须等待另一个线程获取元素的操作,否则不能继续添加元素。反之亦然。
  2. SynchronousQueue 可以看成是一个传球手,本身并不存储任何元素。只是负责把生产者线程处理的数据直接传递给消费者线程,非常适合于传递性场景
  3. 支持公平访问队列,默认情况下采用非公平访问策略。
  4. SynchronousQueue吞吐量高于 LinkedBlockingQueueArrayBlockingQueue
  • SynchronousQueue 的构造函数如下:
public SynchronousQueue() {
	this(false);
}
public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  1. 使用无参构造函数,创建的 SynchronousQueue 采用默认的非公平访问策略。当有元素可获取时,所有被阻塞的线程都有机会争抢获取元素的资格,可能会导致先阻塞的线程最后才获取到元素。
  2. 也可以指定是否公平访问队列。如果是true,则transferer属性赋值为TransferQueue对象;否则,transferer属性赋值为TransferStack对象。
  • 注意: transferer属性是一个volatitle变量SynchronousQueue 没有使用到可重入锁ReentrantQueue
⑥ LinkedTransferQueue
  • LinkedTransferQueue 是一个用链表实现的的无界阻塞 TransferQueue 队列。
  • 相对于其他阻塞队列,LinkedTransferQueue 多了 transfer()tryTransfer()方法。
  • LinkedTransferQueue的构造方法如下:
public LinkedTransferQueue() {}
public LinkedTransferQueue(Collection<? extends E> c) {}
  • 1
  • 2
  • transfer()方法:
  1. 如果有正在等待接收元素的消费者,transfer()方法将生产者产生的元素立刻传输给消费者。
  2. 如果没有正在等待接收元素的消费者,transfer()方法将生产这产生的元素放到队列尾部直到有消费者消费了该元素才返回
  • tryTransfer()方法:
  1. 有两种形式,一种是不带时间参数的tryTransfer(E e)方法,一种是带时间参数的tryTransfer(E e, long timeout, TimeUnit unit)方法
  2. 不带时间参数的tryTransfer(e)方法:如果有正在等待接收元素的消费者,则立即将生产者产生的元素传递给消费者;如果没有正在接收获取元素的消费者,则直接返回false
  3. 带时间参数的tryTransfer(E e, long timeout, TimeUnit unit)方法:与tryTransfer()方法不同的是,如果没有正在等待接收元素的消费者,它会等待指定的时间再返回。如果超时还没有被消费者消费,则返回false;如果在等待的时间内被消费者消费,则返回true
⑦ LinkedBlockingDeque
  • LinkedBlockingDeque 是一个用链表实现双向有界阻塞队列,可以从队列的两端插入和移除元素。LinkedBlockingDeque可以运用在工作窃取模式中。
  • LinkedBlockingQueue一样,默认的长度和最大容量为Integer.MAX_VALUE
  • 双向队列由于存在两个出入口,可以减少多线程同时入队的竞争
  • LinkedBlockingDeque 的构造方法如下:
public LinkedBlockingDeque() {}
public LinkedBlockingDeque(int capacity) {}
public LinkedBlockingDeque(Collection<? extends E> c) {}  
  • 1
  • 2
  • 3
  1. 使用默认的无参构造函数,创建的LinkedBlockingDeque的容量为默认值Integer.MAX_VALUE
  2. 为了防止LinkedBlockingDeque过度膨胀,可以使用待capacity参数的构造函数,设置容量。
  • 相比其他的阻塞队列,LinkedBlockingDeque 多了 addFirstaddLastofferFirstofferLastpeekFirstpeekLast 等方法。以 First 单词结尾的方法,表示插入、获取、移除队列中的第一个元素(队列头部);以 Last 单词结尾方法,表示插入、获取、移除队列中的最后一个元素(队列尾部)。
  • LinkedBlockingDeque 也保留了原始的add、remove等方法,但是默认的方向具有差异:有的默认操作队列头部,有的默认操作队列尾部。所以使用时,最好使用以Last和First结尾的方法,指明操作队列的方向
  • 只有一个锁lock,是一种可重入锁ReentrantLock。不管是向队列中添加元素,还是获取元素,都使用先通过lock.lock()加锁,最后在finally语句块中通过lock.unlock()解锁。
3. 阻塞队列的总结
  • 是否有界?
  1. 有界阻塞队列:ArrayBlockingQueueLinkedBlockingQueue(经常被看做无界)、LinkedBlockingDeque(经常被看做无界)
  2. 无界阻塞队列:DelayQueue(可以指定大小,因为可以扩容)、PriorityBlockingQueue(可以指定大小,因为可以扩容)
  3. 不存储元素的阻塞队列:SynchronousQueue
  • 是否可以公平访问队列?
    支持公平访问的队列有:ArrayBlockingQueueSynchronousQueue
  • 带有锁的队列
  1. 有一个可重入锁的队列:ArrayBlockingQueuePriorityBlockingQueueDelayQueueLinkedBlockingDeque
  2. 有两个可重入锁的队列:LinkedBlockingQueue
  3. 不带可重入锁的队列:SynchronousQueue(transfer属性是volatile变量)
4. JDK中阻塞队列的实现原理
  • JDK中的阻塞队列使用通知模式实现。
  • 所谓通知模式,就是当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了队列中的一个元素后,会通知生产者当前队列可用
  • ArrayBlockingQueue 使用了 Condition 来实现,有两种状态:notFull状态和notEmpty状态。
private final Condition notFull;
private final Condition notEmpty;

public ArrayBlockingQueue(int capacity, boolean fair) {
    // 省略其他代码 
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

public void put(E e) throws InterruptedException {
      checkNotNull(e);
      final ReentrantLock lock = this.lock;
      lock.lockInterruptibly();
      try {
          while (count == items.length)
              notFull.await();
          insert(e);
      } finally {
          lock.unlock();
      }
}
public E take() throws InterruptedException {
      final ReentrantLock lock = this.lock;
      lock.lockInterruptibly();
      try {
          while (count == 0)
              notEmpty.await();
          return extract();
} finally {
          lock.unlock();
      }
}
private void insert(E x) {
    items[putIndex] = x;
    putIndex = inc(putIndex);
    ++count;
    notEmpty.signal();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • notFull.await()方法中,如果队列满会使用LockSupport.park()方法阻塞队列
5. 常见问题总结

1. 自己如何实现阻塞队列?

  • 问到了就说不会吧。。。。

2. 常见的阻塞队列?

  • 主要是 ArrayBlockingQueueLinkedBlockingQueueSynchronousQueueDelayQueuePriorityBlockingQueue
  • 讲解他们实现、是否有界、特殊性质等
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/968450
推荐阅读
相关标签
  

闽ICP备14008679号