当前位置:   article > 正文

Java 并发队列详解

java 并发队列

一,简介

1,并发队列两种实现

  • 以ConcurrentLinkedQueue为代表的高性能非阻塞队列
  • 以BlockingQueue接口为代表的阻塞队列

2,阻塞队列与非阻塞队列的区别

  • 当阻塞队列是空的时,从队列中获取元素的操作将会被阻塞,试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。
  • 当阻塞队列是满时,往队列里添加元素的操作会被阻塞。试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列。

3,类接口关系

二,ConcurrentLinkedQueue

ConcurerntLinkedQueue一个基于链表的非阻塞的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序队列的头部是队列中时间最长的元素。队列的尾部是队列中时间最短的元素。新的元素插入到队列的尾部,队列获取操作从队列头部获得元素。当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue是一个恰当的选择。此队列不允许使用null元素。

1,ConcurrentLinkedQueue数据结构

通过源码分析可知,ConcurrentLinkedQueue的数据结构与LinkedBlockingQueue的数据结构相同,都是使用的链表结构。ConcurrentLinkedQueue的数据结构如下:

 说明: ConcurrentLinkedQueue采用的链表结构,并且包含有一个头节点和一个尾结点。

2,ConcurrentLinkedQueue源码分析

类的继承关系

  1. public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
  2. implements Queue<E>, java.io.Serializable {}

说明: ConcurrentLinkedQueue继承了抽象类AbstractQueue,AbstractQueue定义了对队列的基本操作;同时实现了Queue接口,Queue定义了对队列的基本操作,同时,还实现了Serializable接口,表示可以被序列化。

类的内部类

  1. private static class Node<E> {
  2. // 元素
  3. volatile E item;
  4. // next域
  5. volatile Node<E> next;
  6. /**
  7. * Constructs a new node. Uses relaxed write because item can
  8. * only be seen after publication via casNext.
  9. */
  10. // 构造函数
  11. Node(E item) {
  12. // 设置item的值
  13. UNSAFE.putObject(this, itemOffset, item);
  14. }
  15. // 比较并替换item值
  16. boolean casItem(E cmp, E val) {
  17. return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
  18. }
  19. void lazySetNext(Node<E> val) {
  20. // 设置next域的值,并不会保证修改对其他线程立即可见
  21. UNSAFE.putOrderedObject(this, nextOffset, val);
  22. }
  23. // 比较并替换next域的值
  24. boolean casNext(Node<E> cmp, Node<E> val) {
  25. return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
  26. }
  27. // Unsafe mechanics
  28. // 反射机制
  29. private static final sun.misc.Unsafe UNSAFE;
  30. // item域的偏移量
  31. private static final long itemOffset;
  32. // next域的偏移量
  33. private static final long nextOffset;
  34. static {
  35. try {
  36. UNSAFE = sun.misc.Unsafe.getUnsafe();
  37. Class<?> k = Node.class;
  38. itemOffset = UNSAFE.objectFieldOffset
  39. (k.getDeclaredField("item"));
  40. nextOffset = UNSAFE.objectFieldOffset
  41. (k.getDeclaredField("next"));
  42. } catch (Exception e) {
  43. throw new Error(e);
  44. }
  45. }
  46. }

说明: Node类表示链表结点,用于存放元素,包含item域和next域,item域表示元素,next域表示下一个结点,其利用反射机制和CAS机制来更新item域和next域,保证原子性。

类的属性

  1. public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
  2. implements Queue<E>, java.io.Serializable {
  3. // 版本序列号
  4. private static final long serialVersionUID = 196745693267521676L;
  5. // 反射机制
  6. private static final sun.misc.Unsafe UNSAFE;
  7. // head域的偏移量
  8. private static final long headOffset;
  9. // tail域的偏移量
  10. private static final long tailOffset;
  11. static {
  12. try {
  13. UNSAFE = sun.misc.Unsafe.getUnsafe();
  14. Class<?> k = ConcurrentLinkedQueue.class;
  15. headOffset = UNSAFE.objectFieldOffset
  16. (k.getDeclaredField("head"));
  17. tailOffset = UNSAFE.objectFieldOffset
  18. (k.getDeclaredField("tail"));
  19. } catch (Exception e) {
  20. throw new Error(e);
  21. }
  22. }
  23. // 头节点
  24. private transient volatile Node<E> head;
  25. // 尾结点
  26. private transient volatile Node<E> tail;
  27. }

说明: 属性中包含了head域和tail域,表示链表的头节点和尾结点,同时,ConcurrentLinkedQueue也使用了反射机制和CAS机制来更新头节点和尾结点,保证原子性。

类的构造函数

  • ConcurrentLinkedQueue()型构造函数
  1. public ConcurrentLinkedQueue() {
  2. // 初始化头节点与尾结点
  3. head = tail = new Node<E>(null);
  4. }

说明: 该构造函数用于创建一个最初为空的 ConcurrentLinkedQueue,头节点与尾结点指向同一个结点,该结点的item域为null,next域也为null。

  • ConcurrentLinkedQueue(Collection<? extends E>)型构造函数
  1. public ConcurrentLinkedQueue(Collection<? extends E> c) {
  2. Node<E> h = null, t = null;
  3. for (E e : c) { // 遍历c集合
  4. // 保证元素不为空
  5. checkNotNull(e);
  6. // 新生一个结点
  7. Node<E> newNode = new Node<E>(e);
  8. if (h == null) // 头节点为null
  9. // 赋值头节点与尾结点
  10. h = t = newNode;
  11. else {
  12. // 直接头节点的next域
  13. t.lazySetNext(newNode);
  14. // 重新赋值头节点
  15. t = newNode;
  16. }
  17. }
  18. if (h == null) // 头节点为null
  19. // 新生头节点与尾结点
  20. h = t = new Node<E>(null);
  21. // 赋值头节点
  22. head = h;
  23. // 赋值尾结点
  24. tail = t;
  25. }

说明: 该构造函数用于创建一个最初包含给定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍历顺序来添加元素。

核心函数分析

offer函数

  1. public boolean offer(E e) {
  2. // 元素不为null
  3. checkNotNull(e);
  4. // 新生一个结点
  5. final Node<E> newNode = new Node<E>(e);
  6. for (Node<E> t = tail, p = t;;) { // 无限循环
  7. // q为p结点的下一个结点
  8. Node<E> q = p.next;
  9. if (q == null) { // q结点为null
  10. // p is last node
  11. if (p.casNext(null, newNode)) { // 比较并进行替换p结点的next域
  12. // Successful CAS is the linearization point
  13. // for e to become an element of this queue,
  14. // and for newNode to become "live".
  15. if (p != t) // p不等于t结点,不一致 // hop two nodes at a time
  16. // 比较并替换尾结点
  17. casTail(t, newNode); // Failure is OK.
  18. // 返回
  19. return true;
  20. }
  21. // Lost CAS race to another thread; re-read next
  22. }
  23. else if (p == q) // p结点等于q结点
  24. // We have fallen off list. If tail is unchanged, it
  25. // will also be off-list, in which case we need to
  26. // jump to head, from which all live nodes are always
  27. // reachable. Else the new tail is a better bet.
  28. // 原来的尾结点与现在的尾结点是否相等,若相等,则p赋值为head,否则,赋值为现在的尾结点
  29. p = (t != (t = tail)) ? t : head;
  30. else
  31. // Check for tail updates after two hops.
  32. // 重新赋值p结点
  33. p = (p != t && t != (t = tail)) ? t : q;
  34. }
  35. }

说明: offer函数用于将指定元素插入此队列的尾部。下面模拟offer函数的操作,队列状态的变化(假设单线程添加元素,连续添加10、20两个元素)。

  • 若ConcurrentLinkedQueue的初始状态如上图所示,即队列为空。单线程添加元素,此时,添加元素10,则状态如下所示

  • 如上图所示,添加元素10后,tail没有变化,还是指向之前的结点,继续添加元素20,则状态如下所示

  • 如上图所示,添加元素20后,tail指向了最新添加的结点。

poll函数

  1. public E poll() {
  2. restartFromHead:
  3. for (;;) { // 无限循环
  4. for (Node<E> h = head, p = h, q;;) { // 保存头节点
  5. // item项
  6. E item = p.item;
  7. if (item != null && p.casItem(item, null)) { // item不为null并且比较并替换item成功
  8. // Successful CAS is the linearization point
  9. // for item to be removed from this queue.
  10. if (p != h) // p不等于h // hop two nodes at a time
  11. // 更新头节点
  12. updateHead(h, ((q = p.next) != null) ? q : p);
  13. // 返回item
  14. return item;
  15. }
  16. else if ((q = p.next) == null) { // q结点为null
  17. // 更新头节点
  18. updateHead(h, p);
  19. return null;
  20. }
  21. else if (p == q) // p等于q
  22. // 继续循环
  23. continue restartFromHead;
  24. else
  25. // p赋值为q
  26. p = q;
  27. }
  28. }
  29. }

说明: 此函数用于获取并移除此队列的头,如果此队列为空,则返回null。下面模拟poll函数的操作,队列状态的变化(假设单线程操作,状态为之前offer10、20后的状态,poll两次)。

  •  队列初始状态如上图所示,在poll操作后,队列的状态如下图所示

  • 如上图可知,poll操作后,head改变了,并且head所指向的结点的item变为了null。再进行一次poll操作,队列的状态如下图所示。

  • 如上图可知,poll操作后,head结点没有变化,只是指示的结点的item域变成了null。

remove函数

  1. public boolean remove(Object o) {
  2. // 元素为null,返回
  3. if (o == null) return false;
  4. Node<E> pred = null;
  5. for (Node<E> p = first(); p != null; p = succ(p)) { // 获取第一个存活的结点
  6. // 第一个存活结点的item值
  7. E item = p.item;
  8. if (item != null &&
  9. o.equals(item) &&
  10. p.casItem(item, null)) { // 找到item相等的结点,并且将该结点的item设置为null
  11. // p的后继结点
  12. Node<E> next = succ(p);
  13. if (pred != null && next != null) // pred不为null并且next不为null
  14. // 比较并替换next域
  15. pred.casNext(p, next);
  16. return true;
  17. }
  18. // pred赋值为p
  19. pred = p;
  20. }
  21. return false;
  22. }

说明: 此函数用于从队列中移除指定元素的单个实例(如果存在)。其中,会调用到first函数和succ函数,first函数的源码如下

  1. Node<E> first() {
  2. restartFromHead:
  3. for (;;) { // 无限循环,确保成功
  4. for (Node<E> h = head, p = h, q;;) {
  5. // p结点的item域是否为null
  6. boolean hasItem = (p.item != null);
  7. if (hasItem || (q = p.next) == null) { // item不为null或者next域为null
  8. // 更新头节点
  9. updateHead(h, p);
  10. // 返回结点
  11. return hasItem ? p : null;
  12. }
  13. else if (p == q) // p等于q
  14. // 继续从头节点开始
  15. continue restartFromHead;
  16. else
  17. // p赋值为q
  18. p = q;
  19. }
  20. }
  21. }

说明: first函数用于找到链表中第一个存活的结点。succ函数源码如下

  1. final Node<E> succ(Node<E> p) {
  2. // p结点的next域
  3. Node<E> next = p.next;
  4. // 如果next域为自身,则返回头节点,否则,返回next
  5. return (p == next) ? head : next;
  6. }

说明: succ用于获取结点的下一个结点。如果结点的next域指向自身,则返回head头节点,否则,返回next结点。下面模拟remove函数的操作,队列状态的变化(假设单线程操作,状态为之前offer10、20后的状态,执行remove(10)、remove(20)操作)。

  • 如上图所示,为ConcurrentLinkedQueue的初始状态,remove(10)后的状态如下图所示

  • 如上图所示,当执行remove(10)后,head指向了head结点之前指向的结点的下一个结点,并且head结点的item域置为null。继续执行remove(20),状态如下图所示

  • 如上图所示,执行remove(20)后,head与tail指向同一个结点,item域为null。

size函数

  1. public int size() {
  2. // 计数
  3. int count = 0;
  4. for (Node<E> p = first(); p != null; p = succ(p)) // 从第一个存活的结点开始往后遍历
  5. if (p.item != null) // 结点的item域不为null
  6. // Collection.size() spec says to max out
  7. if (++count == Integer.MAX_VALUE) // 增加计数,若达到最大值,则跳出循环
  8. break;
  9. // 返回大小
  10. return count;
  11. }

说明: 此函数用于返回ConcurrenLinkedQueue的大小,从第一个存活的结点(first)开始,往后遍历链表,当结点的item域不为null时,增加计数,之后返回大小。

3,ConcurrentLinkedQueue示例

下面通过一个示例来了解ConcurrentLinkedQueue的使用

  1. import java.util.concurrent.ConcurrentLinkedQueue;
  2. class PutThread extends Thread {
  3. private ConcurrentLinkedQueue<Integer> clq;
  4. public PutThread(ConcurrentLinkedQueue<Integer> clq) {
  5. this.clq = clq;
  6. }
  7. public void run() {
  8. for (int i = 0; i < 10; i++) {
  9. try {
  10. System.out.println("add " + i);
  11. clq.add(i);
  12. Thread.sleep(100);
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. }
  18. }
  19. class GetThread extends Thread {
  20. private ConcurrentLinkedQueue<Integer> clq;
  21. public GetThread(ConcurrentLinkedQueue<Integer> clq) {
  22. this.clq = clq;
  23. }
  24. public void run() {
  25. for (int i = 0; i < 10; i++) {
  26. try {
  27. System.out.println("poll " + clq.poll());
  28. Thread.sleep(100);
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. }
  33. }
  34. }
  35. public class ConcurrentLinkedQueueDemo {
  36. public static void main(String[] args) {
  37. ConcurrentLinkedQueue<Integer> clq = new ConcurrentLinkedQueue<Integer>();
  38. PutThread p1 = new PutThread(clq);
  39. GetThread g1 = new GetThread(clq);
  40. p1.start();
  41. g1.start();
  42. }
  43. }

运行结果(某一次):

add 0
poll null
add 1
poll 0
add 2
poll 1
add 3
poll 2
add 4
poll 3
add 5
poll 4
poll 5
add 6
add 7
poll 6
poll 7
add 8
add 9
poll 8

说明: GetThread线程不会因为ConcurrentLinkedQueue队列为空而等待,而是直接返回null,所以当实现队列不空时,等待时,则需要用户自己实现等待逻辑。

4,HOPS(延迟更新的策略)的设计

通过上面对offer和poll方法的分析,我们发现tail和head是延迟更新的,两者更新触发时机为:

  • tail更新触发时机:当tail指向的节点的下一个节点不为null的时候,会执行定位队列真正的队尾节点的操作,找到队尾节点后完成插入之后才会通过casTail进行tail更新;当tail指向的节点的下一个节点为null的时候,只插入节点不更新tail。

  • head更新触发时机:当head指向的节点的item域为null的时候,会执行定位队列真正的队头节点的操作,找到队头节点后完成删除之后才会通过updateHead进行head更新;当head指向的节点的item域不为null的时候,只删除节点不更新head。

并且在更新操作时,源码中会有注释为:hop two nodes at a time。所以这种延迟更新的策略就被叫做HOPS的原因是这个,从上面更新时的状态图可以看出,head和tail的更新是“跳着的”即中间总是间隔了一个。那么这样设计的意图是什么呢?

如果让tail永远作为队列的队尾节点,实现的代码量会更少,而且逻辑更易懂。但是,这样做有一个缺点,如果大量的入队操作,每次都要执行CAS进行tail的更新,汇总起来对性能也会是大大的损耗。如果能减少CAS更新的操作,无疑可以大大提升入队的操作效率,所以doug lea大师每间隔1次(tail和队尾节点的距离为1)进行才利用CAS更新tail。对head的更新也是同样的道理,虽然,这样设计会多出在循环中定位队尾节点,但总体来说读的操作效率要远远高于写的性能,因此,多出来的在循环中定位尾节点的操作的性能损耗相对而言是很小的。

5,ConcurrentLinkedQueue适合的场景

ConcurrentLinkedQueue通过无锁来做到了更高的并发量,是个高性能的队列,但是使用场景相对不如阻塞队列常见,毕竟取数据也要不停的去循环,不如阻塞的逻辑好设计,但是在并发量特别大的情况下,是个不错的选择,性能上好很多,而且这个队列的设计也是特别费力,尤其的使用的改良算法和对哨兵的处理。整体的思路都是比较严谨的,这个也是使用了无锁造成的,我们自己使用无锁的条件的话,这个队列是个不错的参考。

三,BlockingQueue

1,BlockingQueue和BlockingDeque

BlockingQueue

BlockingQueue 通常用于一个线程生产对象,而另外一个线程消费这些对象的场景。下图是对这个原理的阐述:

一个线程往里边放,另外一个线程从里边取的一个 BlockingQueue。

一个线程将会持续生产新对象并将其插入到队列之中,直到队列达到它所能容纳的临界点。也就是说,它是有限的。如果该阻塞队列到达了其临界点,负责生产的线程将会在往里边插入新对象时发生阻塞。它会一直处于阻塞之中,直到负责消费的线程从队列中拿走一个对象。 负责消费的线程将会一直从该阻塞队列中拿出对象。如果消费线程尝试去从一个空的队列中提取对象的话,这个消费线程将会处于阻塞之中,直到一个生产线程把一个对象丢进队列。

BlockingQueue 的方法

BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:

抛异常特定值阻塞超时
插入add(o)offer(o)put(o)offer(o, timeout, timeunit)
移除remove()poll()take()poll(timeout, timeunit)
检查element()peek()

四组不同的行为方式解释:

  • 抛异常: 如果试图的操作无法立即执行,抛一个异常。
  • 特定值: 如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
  • 阻塞: 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
  • 超时: 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。

无法向一个 BlockingQueue 中插入 null。如果你试图插入 null,BlockingQueue 将会抛出一个 NullPointerException。 可以访问到 BlockingQueue 中的所有元素,而不仅仅是开始和结束的元素。比如说,你将一个对象放入队列之中以等待处理,但你的应用想要将其取消掉。那么你可以调用诸如 remove(o) 方法来将队列之中的特定对象进行移除。但是这么干效率并不高(译者注: 基于队列的数据结构,获取除开始或结束位置的其他对象的效率不会太高),因此你尽量不要用这一类的方法,除非你确实不得不那么做。

BlockingDeque

java.util.concurrent 包里的 BlockingDeque 接口表示一个线程安放入和提取实例的双端队列。

BlockingDeque 类是一个双端队列,在不能够插入元素时,它将阻塞住试图插入元素的线程;在不能够抽取元素时,它将阻塞住试图抽取的线程。 deque(双端队列) 是 "Double Ended Queue" 的缩写。因此,双端队列是一个你可以从任意一端插入或者抽取元素的队列。

在线程既是一个队列的生产者又是这个队列的消费者的时候可以使用到 BlockingDeque。如果生产者线程需要在队列的两端都可以插入数据,消费者线程需要在队列的两端都可以移除数据,这个时候也可以使用 BlockingDeque。BlockingDeque 图解:

BlockingDeque 的方法

一个 BlockingDeque - 线程在双端队列的两端都可以插入和提取元素。 一个线程生产元素,并把它们插入到队列的任意一端。如果双端队列已满,插入线程将被阻塞,直到一个移除线程从该队列中移出了一个元素。如果双端队列为空,移除线程将被阻塞,直到一个插入线程向该队列插入了一个新元素。

BlockingDeque 具有 4 组不同的方法用于插入、移除以及对双端队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:

抛异常特定值阻塞超时
插入addFirst(o)offerFirst(o)putFirst(o)offerFirst(o, timeout, timeunit)
移除removeFirst(o)pollFirst(o)takeFirst(o)pollFirst(timeout, timeunit)
检查getFirst(o)peekFirst(o)
抛异常特定值阻塞超时
插入addLast(o)offerLast(o)putLast(o)offerLast(o, timeout, timeunit)
移除removeLast(o)pollLast(o)takeLast(o)pollLast(timeout, timeunit)
检查getLast(o)peekLast(o)

四组不同的行为方式解释:

  • 抛异常: 如果试图的操作无法立即执行,抛一个异常。
  • 特定值: 如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
  • 阻塞: 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
  • 超时: 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。

BlockingDeque 与BlockingQueue关系

BlockingDeque 接口继承自 BlockingQueue 接口。这就意味着你可以像使用一个 BlockingQueue 那样使用 BlockingDeque。如果你这么干的话,各种插入方法将会把新元素添加到双端队列的尾端,而移除方法将会把双端队列的首端的元素移除。正如 BlockingQueue 接口的插入和移除方法一样。

以下是 BlockingDeque 对 BlockingQueue 接口的方法的具体内部实现:

BlockingQueueBlockingDeque
add()addLast()
offer() x 2offerLast() x 2
put()putLast()
remove()removeFirst()
poll() x 2pollFirst()
take()takeFirst()
element()getFirst()
peek()peekFirst()

BlockingQueue 的例子

这里是一个 Java 中使用 BlockingQueue 的示例。本示例使用的是 BlockingQueue 接口的 ArrayBlockingQueue 实现。 首先,BlockingQueueExample 类分别在两个独立的线程中启动了一个 Producer 和 一个 Consumer。Producer 向一个共享的 BlockingQueue 中注入字符串,而 Consumer 则会从中把它们拿出来。

  1. public class BlockingQueueExample {
  2. public static void main(String[] args) throws Exception {
  3. BlockingQueue queue = new ArrayBlockingQueue(1024);
  4. Producer producer = new Producer(queue);
  5. Consumer consumer = new Consumer(queue);
  6. new Thread(producer).start();
  7. new Thread(consumer).start();
  8. Thread.sleep(4000);
  9. }
  10. }

以下是 Producer 类。注意它在每次 put() 调用时是如何休眠一秒钟的。这将导致 Consumer 在等待队列中对象的时候发生阻塞。

  1. public class Producer implements Runnable{
  2. protected BlockingQueue queue = null;
  3. public Producer(BlockingQueue queue) {
  4. this.queue = queue;
  5. }
  6. public void run() {
  7. try {
  8. queue.put("1");
  9. Thread.sleep(1000);
  10. queue.put("2");
  11. Thread.sleep(1000);
  12. queue.put("3");
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. }

以下是 Consumer 类。它只是把对象从队列中抽取出来,然后将它们打印到 System.out。

  1. public class Consumer implements Runnable{
  2. protected BlockingQueue queue = null;
  3. public Consumer(BlockingQueue queue) {
  4. this.queue = queue;
  5. }
  6. public void run() {
  7. try {
  8. System.out.println(queue.take());
  9. System.out.println(queue.take());
  10. System.out.println(queue.take());
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. }
  15. }

2,数组阻塞队列 ArrayBlockingQueue

ArrayBlockingQueue 类实现了 BlockingQueue 接口。

ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注: 因为它是基于数组实现的,也就具有数组的特性: 一旦初始化,大小就无法修改)。 ArrayBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。 以下是在使用 ArrayBlockingQueue 的时候对其初始化的一个示例:

  1. BlockingQueue queue = new ArrayBlockingQueue(1024);
  2. queue.put("1");
  3. Object object = queue.take();

以下是使用了 Java 泛型的一个 BlockingQueue 示例。注意其中是如何对 String 元素放入和提取的:

  1. BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024);
  2. queue.put("1");
  3. String string = queue.take();

3,延迟队列 DelayQueue

DelayQueue 实现了 BlockingQueue 接口。

DelayQueue 对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现 java.util.concurrent.Delayed 接口,该接口定义:

  1. public interface Delayed extends Comparable<Delayed< {
  2. public long getDelay(TimeUnit timeUnit);
  3. }

DelayQueue 将会在每个元素的 getDelay() 方法返回的值的时间段之后才释放掉该元素。如果返回的是 0 或者负值,延迟将被认为过期,该元素将会在 DelayQueue 的下一次 take 被调用的时候被释放掉。

传递给 getDelay 方法的 getDelay 实例是一个枚举类型,它表明了将要延迟的时间段。TimeUnit 枚举将会取以下值:

  • DAYS
  • HOURS
  • INUTES
  • SECONDS
  • MILLISECONDS
  • MICROSECONDS
  • NANOSECONDS

正如你所看到的,Delayed 接口也继承了 java.lang.Comparable 接口,这也就意味着 Delayed 对象之间可以进行对比。这个可能在对 DelayQueue 队列中的元素进行排序时有用,因此它们可以根据过期时间进行有序释放。 以下是使用 DelayQueue 的例子:

  1. public class DelayQueueExample {
  2. public static void main(String[] args) {
  3. DelayQueue queue = new DelayQueue();
  4. Delayed element1 = new DelayedElement();
  5. queue.put(element1);
  6. Delayed element2 = queue.take();
  7. }
  8. }

DelayedElement 是我所创建的一个 DelayedElement 接口的实现类,它不在 java.util.concurrent 包里。你需要自行创建你自己的 Delayed 接口的实现以使用 DelayQueue 类。

4,链阻塞队列 LinkedBlockingQueue

LinkedBlockingQueue 类实现了 BlockingQueue 接口。

LinkedBlockingQueue 内部以一个链式结构(链接节点)对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限。

LinkedBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。 以下是 LinkedBlockingQueue 的初始化和使用示例代码:

  1. BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>();
  2. BlockingQueue<String> bounded = new LinkedBlockingQueue<String>(1024);
  3. bounded.put("Value");
  4. String value = bounded.take();

5,具有优先级的阻塞队列 PriorityBlockingQueue

PriorityBlockingQueue 类实现了 BlockingQueue 接口。

PriorityBlockingQueue 是一个无界的并发队列。它使用了和类 java.util.PriorityQueue 一样的排序规则。你无法向这个队列中插入 null 值。 所有插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable 接口。因此该队列中元素的排序就取决于你自己的 Comparable 实现。 注意 PriorityBlockingQueue 对于具有相等优先级(compare() == 0)的元素并不强制任何特定行为。

同时注意,如果你从一个 PriorityBlockingQueue 获得一个 Iterator 的话,该 Iterator 并不能保证它对元素的遍历是以优先级为序的。 以下是使用 PriorityBlockingQueue 的示例:

  1. BlockingQueue queue = new PriorityBlockingQueue();
  2. //String implements java.lang.Comparable
  3. queue.put("Value");
  4. String value = queue.take();

6,同步队列 SynchronousQueue

SynchronousQueue 类实现了 BlockingQueue 接口。

SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。 据此,把这个类称作一个队列显然是夸大其词了。它更多像是一个汇合点。

7,BlockingDeque 的例子

既然 BlockingDeque 是一个接口,那么你想要使用它的话就得使用它的众多的实现类的其中一个。java.util.concurrent 包提供了以下 BlockingDeque 接口的实现类: LinkedBlockingDeque。

以下是如何使用 BlockingDeque 方法的一个简短代码示例:

  1. BlockingDeque<String> deque = new LinkedBlockingDeque<String>();
  2. deque.addFirst("1");
  3. deque.addLast("2");
  4. String two = deque.takeLast();
  5. String one = deque.takeFirst();

8,链阻塞双端队列 LinkedBlockingDeque

LinkedBlockingDeque 类实现了 BlockingDeque 接口。

deque(双端队列) 是 "Double Ended Queue" 的缩写。因此,双端队列是一个你可以从任意一端插入或者抽取元素的队列。

LinkedBlockingDeque 是一个双端队列,在它为空的时候,一个试图从中抽取数据的线程将会阻塞,无论该线程是试图从哪一端抽取数据。

以下是 LinkedBlockingDeque 实例化以及使用的示例:

  1. BlockingDeque<String> deque = new LinkedBlockingDeque<String>();
  2. deque.addFirst("1");
  3. deque.addLast("2");
  4. String two = deque.takeLast();
  5. String one = deque.takeFirst();

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/139625
推荐阅读
相关标签
  

闽ICP备14008679号