赞
踩
BlockingQueue
)是一个支持两个附加操作的队列,这两个附加的操作支持阻塞的插入和移除方法。方法/处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(time, unit) |
移除方法 | remove() | poll() | take() | poll(time, unit) |
检查方法 | element() | peek() | 不可用 | 不可用 |
add(e)
操作),会抛出IllegalArgumentException
异常。当队列空时,从队列里获取元素(执行element()
操作)会抛出NoSuchElementException
异常。offer(e)
操作),会返回元素是否插入成功,成功返回true
。如果是移除方法(执行poll()
操作或者peek()
操作),则是从队列里取出一个元素,如果没有则返回null
。CachedThreadPool
中使用的SynchronousQueue
中,就是使用的poll(time,unit)方法
从队列中取任务。NoSuchElementException
异常是由element()
方法抛出的,它将peek()
方法进行了封装。peek()
方法要么返回队列头部元素,要么返回null
。当peek()
方法返回null
时,element()
返回NoSuchElementException
异常。public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
ArrayBlockingQueue
是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO) 的原则对元素进行排序。构造方法如下:public ArrayBlockingQueue(int capacity) {}
public ArrayBlockingQueue(int capacity, boolean fair) {}
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {}
fair
用于设置线程是否公平访问队列,默认值为false
,即指非公平地访问队列。capacity
用于设置ArrayBlockingQueue
的大小。如果capacity <= 0
, 会抛出IllegalArgumentException
异常。fair
的默认值为false
。ReentrantLock
实现:public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
ReentrantLock
。不管是向队列中添加元素,还是获取元素,都使用先通过lock.lock()
加锁,最后在finally语句块中通过lock.unlock()
解锁。LinkedBlockingQueue
是一个用链表实现的有界阻塞队列,此队列按照先进先出(FIFO) 的原则对元素进行排序。Integer.MAX_VALUE
,实际上被看做是无界阻塞队列。LinkedBlockingQueue
的构造方法如下:public LinkedBlockingQueue() {}
public LinkedBlockingQueue(int capacity) {}
public LinkedBlockingQueue(Collection<? extends E> c) {}
LinkedBlockingQueue
,capacity为默认值Integer.MAX_VALUE
。LinkedBlockingQueue
的容量,也可以在创建时就向LinkedBlockingQueue
中添加元素。LinkedBlockingQueue
有putLock
和takeLock
两个锁,都是可重入锁ReentrantLock
。当往队列中添加元素时,使用putLock
;从队列中获取元素时,使用takeLock
。都是先进行加锁(lock()
),再在finally方法中进行解锁(unlock()
)。PriorityBlockingQueue
是一个支持优先级的无界阻塞队列,默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue
时,指定构造参数Comparator
来进行排序。ScheduledFutureTask
中为了避免这种情况,除了依靠time进行优先级排序,还依靠sequenceNumber
进行优先级排序。PriorityBlockingQueue
的构造方法如下:public PriorityBlockingQueue() {}
public PriorityBlockingQueue(int initialCapacity){}
public PriorityBlockingQueue(Collection<? extends E> c) {}
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) {
DEFAULT_INITIAL_CAPACITY = 11
。size >= queue.length
时,会使用tryGrow()方法
进行扩容。while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
ReentrantLock
。不管是向队列中添加元素,还是获取元素,都使用先通过lock.lock()
加锁,最后在finally语句块中通过lock.unlock()
解锁。DelayQueue
是一个支持延时获取元素的无界阻塞队列,队列使用 PriorityQueue
来实现。DelayQueue
的构造方法如下:public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {}
DelayQueue
的核心属性为PriorityQueue
的实例q
,用它可以按照某种优先级对元素排序。DelayQueue
使用无参构造函数创建对象时,是有默认大小的。这是由实例q
决定的,DEFAULT_INITIAL_CAPACITY = 11
。DelayQueue
中添加元素时,实际是向PriorityQueue
中添加元素,会调用PriorityQueue
的相应方法。当遇到size >= queue.length
时,会使用grow(size+1)方法
对队列进行扩容。ReentrantLock
。不管是向队列中添加元素,还是获取元素,都使用先通过lock.lock()
加锁,最后在finally语句块中通过lock.unlock()
解锁。Delayed接口
?ScheduledThreadPoolExecutor
中的ScheduledFutureTask类
,实现了RunnableScheduledFuture接口
。RunnableScheduledFuture接口
继承了ScheduledFuture接口
,而ScheduledFuture接口
继承了Delayed接口
。ScheduledFutureTask类
需要实现 Delayed 接口
。ScheduledFutureTask类
有如下属性:ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns; // 任务下一次执行的具体时间
this.period = period; // 任务执行的时间间隔
this.sequenceNumber = sequencer.getAndIncrement(); // 任务在ScheduledThreadPoolExecutor中的序号
}
getDelay(unit)方法
,ScheduledFutureTask类
实现了该方法。虽然传入了时间unit,但是内部直接使用的NANOSECONDS
(纳秒)public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
this.time > other.time
返回1,小于返回-1,如果相等则继续比较sequenceNumber的大小。this.sequenceNumber > other.sequenceNumber
返回1,小于返回-1,相等的话,继续比较二者的延迟时间。this.getDelay > other.getDelay
返回1,小于返回-1,相等返回0。public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
siftUp方法
中,将需要较早执行的元素向队列头部移动。如果发现该元素的不是较早执行的(key.compareTo(e) >= 0
),直接退出循环。private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
...
if (key.compareTo(e) >= 0)
break;
...
}
...
}
DelayQueue
运用在以下应用场景:DelayQueue
保存缓存元素的有效期,使用一个线程循环查询 DelayQueue
,一旦能从 DelayQueue
中获取元素时,表示缓存有效期到了。DelayQueue
保存当天将会执行的任务和执行时间,一旦从 DelayQueue
中获取到任务就开始执行,比如 TimerQueue
就是使用 DelayQueue
实现的。SynchronousQueue
是一个不存储元素的阻塞队列,有如下特点:SynchronousQueue
可以看成是一个传球手,本身并不存储任何元素。只是负责把生产者线程处理的数据直接传递给消费者线程,非常适合于传递性场景。SynchronousQueue
的吞吐量高于 LinkedBlockingQueue
和 ArrayBlockingQueue
。SynchronousQueue
的构造函数如下:public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
SynchronousQueue
采用默认的非公平访问策略。当有元素可获取时,所有被阻塞的线程都有机会争抢获取元素的资格,可能会导致先阻塞的线程最后才获取到元素。transferer属性
赋值为TransferQueue
对象;否则,transferer属性
赋值为TransferStack
对象。transferer属性
是一个volatitle变量, SynchronousQueue
没有使用到可重入锁ReentrantQueue
。LinkedTransferQueue
是一个用链表实现的的无界阻塞 TransferQueue
队列。LinkedTransferQueue
多了 transfer()
和 tryTransfer()
方法。LinkedTransferQueue
的构造方法如下:public LinkedTransferQueue() {}
public LinkedTransferQueue(Collection<? extends E> c) {}
transfer()方法
将生产者产生的元素立刻传输给消费者。transfer()方法
将生产这产生的元素放到队列尾部,直到有消费者消费了该元素才返回。tryTransfer(E e)方法
,一种是带时间参数的tryTransfer(E e, long timeout, TimeUnit unit)方法
。tryTransfer(e)方法
:如果有正在等待接收元素的消费者,则立即将生产者产生的元素传递给消费者;如果没有正在接收获取元素的消费者,则直接返回false
。tryTransfer(E e, long timeout, TimeUnit unit)方法
:与tryTransfer()方法
不同的是,如果没有正在等待接收元素的消费者,它会等待指定的时间再返回。如果超时还没有被消费者消费,则返回false
;如果在等待的时间内被消费者消费,则返回true
。LinkedBlockingDeque
是一个用链表实现的双向有界阻塞队列,可以从队列的两端插入和移除元素。LinkedBlockingDeque
可以运用在工作窃取模式中。LinkedBlockingQueue
一样,默认的长度和最大容量为Integer.MAX_VALUE
。LinkedBlockingDeque
的构造方法如下:public LinkedBlockingDeque() {}
public LinkedBlockingDeque(int capacity) {}
public LinkedBlockingDeque(Collection<? extends E> c) {}
LinkedBlockingDeque
的容量为默认值Integer.MAX_VALUE
。LinkedBlockingDeque
过度膨胀,可以使用待capacity参数的构造函数,设置容量。LinkedBlockingDeque
多了 addFirst
、addLast
、offerFirst
、offerLast
、peekFirst
、peekLast
等方法。以 First 单词结尾的方法,表示插入、获取、移除队列中的第一个元素(队列头部);以 Last 单词结尾方法,表示插入、获取、移除队列中的最后一个元素(队列尾部)。LinkedBlockingDeque
也保留了原始的add、remove等方法,但是默认的方向具有差异:有的默认操作队列头部,有的默认操作队列尾部。所以使用时,最好使用以Last和First结尾的方法,指明操作队列的方向。ReentrantLock
。不管是向队列中添加元素,还是获取元素,都使用先通过lock.lock()
加锁,最后在finally语句块中通过lock.unlock()
解锁。ArrayBlockingQueue
、LinkedBlockingQueue
(经常被看做无界)、LinkedBlockingDeque
(经常被看做无界)DelayQueue
(可以指定大小,因为可以扩容)、PriorityBlockingQueue
(可以指定大小,因为可以扩容)SynchronousQueue
ArrayBlockingQueue
、SynchronousQueue
ArrayBlockingQueue
、PriorityBlockingQueue
、DelayQueue
、LinkedBlockingDeque
LinkedBlockingQueue
SynchronousQueue
(transfer属性是volatile变量)ArrayBlockingQueue
使用了 Condition
来实现,有两种状态:notFull
状态和notEmpty
状态。private final Condition notFull; private final Condition notEmpty; public ArrayBlockingQueue(int capacity, boolean fair) { // 省略其他代码 notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return extract(); } finally { lock.unlock(); } } private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); }
notFull.await()方法
中,如果队列满会使用LockSupport.park()
方法阻塞队列。1. 自己如何实现阻塞队列?
2. 常见的阻塞队列?
ArrayBlockingQueue
、LinkedBlockingQueue
、SynchronousQueue
、DelayQueue
、PriorityBlockingQueue
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。