赞
踩
大家好,我是栗筝i,这篇文章是我的 “栗筝i 的 Java 技术栈” 专栏的第 028 篇文章,在 “栗筝i 的 Java 技术栈” 这个专栏中我会持续为大家更新 Java 技术相关全套技术栈内容。专栏的主要目标是已经有一定 Java 开发经验,并希望进一步完善自己对整个 Java 技术体系来充实自己的技术栈的同学。与此同时,本专栏的所有文章,也都会准备充足的代码示例和完善的知识点梳理,因此也十分适合零基础的小白和要准备工作面试的同学学习。当然,我也会在必要的时候进行相关技术深度的技术解读,相信即使是拥有多年 Java 开发经验的从业者和大佬们也会有所收获并找到乐趣。
–
在多线程编程中,如何有效地进行线程间通信和协调是一个关键问题。Java 并发包中的阻塞队列集合(BlockingQueue)为开发者提供了强大的工具,能够简化线程同步与数据共享的复杂性。阻塞队列不仅能够在生产者和消费者之间进行线程安全的数据传递,还通过自动的阻塞和唤醒机制,帮助我们轻松实现高效的生产者-消费者模型。本篇文章将详细介绍 Java 中几种常用的阻塞队列集合,分析它们的特点、应用场景及实现原理,帮助您更好地理解并掌握这些并发工具。
Java 中的阻塞队列(BlockingQueue)是一种在多线程环境下用于线程安全的数据结构,它不仅提供了典型的队列操作(如插入和移除),还可以在队列为空或满时自动阻塞操作线程,直到队列状态允许操作的继续。阻塞队列通过阻塞和等待机制有效地协调生产者和消费者线程之间的操作,确保数据一致性和线程安全。
以下是其主要功能和应用场景:
线程间通信:阻塞队列在生产者-消费者模型中扮演了关键角色,它允许生产者线程和消费者线程之间进行线程安全的数据传递。具体表现为:
生产者线程:生产者线程将数据放入队列中。如果队列已满,生产者线程将被阻塞,直到队列有空闲空间。
消费者线程:消费者线程从队列中取出数据。如果队列为空,消费者线程将被阻塞,直到有数据可供消费。
流量控制:通过阻塞机制,阻塞队列可以有效地控制生产者和消费者的工作节奏,避免过载和资源浪费。当队列达到容量上限时,阻塞队列会自动阻止进一步的插入操作,直到有空间可用,从而避免过载和资源浪费。
简化并发编程:阻塞队列封装了复杂的同步机制,简化了多线程环境下的数据共享和线程协调,使得开发者可以专注于业务逻辑,而不必担心线程安全问题。
BlockingQueue
是 Java 并发包(java.util.concurrent
)中的一个接口,继承自 Queue
接口。它提供了额外的阻塞操作,例如在队列为空时等待元素变得可用,或在队列已满时等待空间变得可用。
BlockingQueue
阻塞队列在 Java 中的主要实现有三个:
ArrayBlockingQueue
: 基于数组实现的有界阻塞队列,必须指定固定容量,支持可选的公平性策略。LinkedBlockingQueue
: 基于链表实现的阻塞队列,默认无界或指定容量,有较高的插入和删除性能。SynchronousQueue
: 一个没有内部容量的队列,每个插入操作必须等待一个对应的删除操作,反之亦然,适用于直接交换数据的场景。阻塞队列和非阻塞队列的区别:
操作方式:
线程安全性:
ArrayBlockingQueue
和 LinkedBlockingQueue
中,生产者线程会在队列满时进入等待状态,直到消费者线程移除元素,释放出空间。适用场景:
我们这里以 ArrayBlockingQueue
为例,来看 Java 对于阻塞队列的具体实现。
ArrayBlockingQueue
类使用一个数组 items
存储队列元素,并通过 takeIndex
和 putIndex
字段来跟踪下一个取出和放入元素的索引,同时用 count
记录当前队列中的元素数量。为了确保线程安全,ArrayBlockingQueue
使用 ReentrantLock
作为主要锁,配合 Condition
对象 notEmpty
和 notFull
分别用于管理线程在队列为空或已满时的等待和通知。itrs
字段用于维护当前活动的迭代器的状态,允许队列操作在进行元素添加或移除时保持迭代器的一致性。如果没有活动的迭代器,itrs
将为 null
。
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { // 省略其他方法和实现细节 ... /** 队列中的元素 */ @SuppressWarnings("serial") // 有条件序列化 final Object[] items; /** 下一个 take、poll、peek 或 remove 操作的元素索引 */ int takeIndex; /** 下一个 put、offer 或 add 操作的元素索引 */ int putIndex; /** 队列中的元素数量 */ int count; /* * 并发控制使用经典的双条件算法, * 这是任何教科书中都可以找到的。 */ /** 保护所有访问的主要锁 */ final ReentrantLock lock; /** 用于等待取元素的条件 */ @SuppressWarnings("serial") // 实现 Condition 的类可能是可序列化的。 private final Condition notEmpty; /** 用于等待放入元素的条件 */ @SuppressWarnings("serial") // 实现 Condition 的类可能是可序列化的。 private final Condition notFull; /** * 当前活动迭代器的共享状态,如果没有已知的迭代器,则为 null。 * 允许队列操作更新迭代器状态。 */ transient Itrs itrs; // 省略其他方法和实现细节 ... }
关于 Condition
: Condition
是 Java 并发库中的一个接口,它提供了一种线程间通信机制,使得线程能够在特定条件下等待和通知。Condition
的主要作用是为线程提供等待和通知机制,以便协调和管理线程的执行顺序。
Condition
的主要功能:
Condition
上调用 await()
方法进入等待状态,直到其他线程发出通知(即 signal()
或 signalAll()
);await()
方法使当前线程释放持有的锁,并进入等待队列,直到条件满足或被中断。此时线程会自动重新获取锁。Condition
的 signal()
方法,线程可以通知一个等待在该 Condition
上的线程,使其从等待状态中恢复并重新获取锁;使用 signalAll()
方法可以通知所有在该 Condition
上等待的线程,唤醒它们。await(long time, TimeUnit unit)
方法允许线程在等待时指定超时时间。如果超时,线程会自动从等待状态恢复,并且不需要手动调用 signal()
。Condition
和直接调用线程的方式相比:Condition
提供了更高级、更灵活的线程协调机制,能够与 ReentrantLock
配合使用,适合处理复杂的并发控制和条件等待需求。它支持多条件和超时等待等高级功能。
ArrayBlockingQueue
通过 ReentrantLock
和 Condition
实现线程安全的元素插入操作。offer(E e)
尝试将元素 e
插入队列,如果队列未满则成功插入,否则返回 false
。put(E e)
在队列已满时阻塞线程直到有空间,并插入元素。offer(E e, long timeout, TimeUnit unit)
在指定超时时间内尝试插入元素,超时后返回 false
。enqueue(E e)
将元素插入队列的尾部,更新索引,增加元素计数,并通知等待取元素的线程。
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { // 省略其他方法和实现细节 ... /** * 尝试将指定元素插入队列的尾部,如果队列容量未满则立即成功插入, * 插入成功时返回 {@code true},如果队列已满则返回 {@code false}。 * 这个方法通常比 {@link #add} 方法更可取,因为后者在插入失败时会抛出异常。 * * @throws NullPointerException 如果指定的元素为 null */ public boolean offer(E e) { Objects.requireNonNull(e); // 检查元素是否为 null,如果为 null 抛出异常 final ReentrantLock lock = this.lock; // 获取锁对象 lock.lock(); // 获取锁 try { if (count == items.length) // 如果队列已满 return false; // 插入失败,返回 false else { enqueue(e); // 调用 enqueue 方法插入元素 return true; // 插入成功,返回 true } } finally { lock.unlock(); // 释放锁 } } /** * 将指定元素插入队列的尾部,如果队列已满则阻塞当前线程直到有空间可用。 * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { Objects.requireNonNull(e); // 检查元素是否为 null,如果为 null 抛出异常 final ReentrantLock lock = this.lock; // 获取锁对象 lock.lockInterruptibly(); // 获取锁,允许中断 try { while (count == items.length) // 如果队列已满 notFull.await(); // 等待直到队列有空间 enqueue(e); // 调用 enqueue 方法插入元素 } finally { lock.unlock(); // 释放锁 } } /** * 将指定元素插入队列的尾部,如果队列已满,则在指定的等待时间内等待空间可用。 * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { Objects.requireNonNull(e); // 检查元素是否为 null,如果为 null 抛出异常 long nanos = unit.toNanos(timeout); // 将超时时间转换为纳秒 final ReentrantLock lock = this.lock; // 获取锁对象 lock.lockInterruptibly(); // 获取锁,允许中断 try { while (count == items.length) { // 如果队列已满 if (nanos <= 0L) // 如果超时时间已过 return false; // 返回 false 表示插入失败 nanos = notFull.awaitNanos(nanos); // 等待指定的时间,直到队列有空间 } enqueue(e); // 调用 enqueue 方法插入元素 return true; // 插入成功,返回 true } finally { lock.unlock(); // 释放锁 } } /** * 内部方法,将指定元素插入队列的尾部。 */ private void enqueue(E e) { // assert lock.isHeldByCurrentThread(); // 确保当前线程持有锁 // assert lock.getHoldCount() == 1; // 确保锁的持有计数为 1 // assert items[putIndex] == null; // 确保插入位置为空 final Object[] items = this.items; // 获取存储元素的数组 items[putIndex] = e; // 将元素放入数组的指定位置 if (++putIndex == items.length) putIndex = 0; // 更新插入索引,若超出数组长度则重置为 0 count++; // 增加队列中的元素数量 notEmpty.signal(); // 唤醒等待取元素的线程 } // 省略其他方法和实现细节 ... }
ArrayBlockingQueue
提供了多种获取元素的方法:poll()
尝试移除并返回队列头部的元素,如果队列为空则返回 null
;take()
移除并返回队列头部的元素,如果队列为空则阻塞当前线程直到有元素可用;poll(long timeout, TimeUnit unit)
在指定的超时时间内尝试移除并返回头部元素,若超时则返回 null
;peek()
返回队列头部的元素但不移除它,如果队列为空则返回 null
。这些方法通过 ReentrantLock
和 Condition
实现线程安全的队列操作,确保在并发环境下对队列的正确访问和管理
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { // 省略其他方法和实现细节 ... /** * 从队列中移除并返回头部元素,如果队列为空则返回 null。 * * @return 队列头部元素;如果队列为空则返回 null。 */ public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } /** * 从队列中移除并返回头部元素,如果队列为空则阻塞当前线程,直到有元素可用。 * * @return 队列头部元素。 * @throws InterruptedException 如果线程在等待时被中断。 */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); // 队列为空时,等待有元素可用 return dequeue(); } finally { lock.unlock(); } } /** * 尝试从队列中移除并返回头部元素,在指定的超时时间内等待元素变得可用。 * * @param timeout 等待的最大时间。 * @param unit 时间单位。 * @return 队列头部元素;如果在超时时间内未获得元素则返回 null。 * @throws InterruptedException 如果线程在等待时被中断。 */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); // 将超时时间转换为纳秒 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0L) return null; // 超时返回 null nanos = notEmpty.awaitNanos(nanos); // 等待指定的时间 } return dequeue(); } finally { lock.unlock(); } } /** * 返回队列头部的元素,但不移除它。如果队列为空,则返回 null。 * * @return 队列头部元素;如果队列为空则返回 null。 */ public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); // 返回队列头部的元素;如果队列为空则返回 null } finally { lock.unlock(); } } /** * 从当前的取元素位置提取元素,更新位置,并发出信号。 * 仅在持有锁的情况下调用。 * * @return 提取的元素。 */ private E dequeue() { // assert lock.isHeldByCurrentThread(); // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E e = (E) items[takeIndex]; // 获取并强制转换元素 items[takeIndex] = null; // 清除元素位置 if (++takeIndex == items.length) takeIndex = 0; // 更新取出位置索引 count--; // 减少队列中的元素数量 if (itrs != null) itrs.elementDequeued(); // 更新迭代器状态(如果存在) notFull.signal(); // 通知生产者线程队列有空间 return e; } // 省略其他方法和实现细节 ... }
ArrayBlockingQueue
和 LinkedBlockingQueue
是 Java 中的两种阻塞队列,它们在实现和使用场景上有显著的区别:
数据结构:
ArrayBlockingQueue
:使用固定大小的数组实现;内部是一个环形缓冲区(Circular Buffer),容量在初始化时设定,不可动态调整。
LinkedBlockingQueue
:使用链表实现;内部是一个双端链表,容量可以在初始化时设定,默认最大容量为 Integer.MAX_VALUE
,并且可以动态扩展。
存储特性:
ArrayBlockingQueue
:内存占用固定,因为容量在创建时就已经确定;适合容量已知且稳定的场景。
LinkedBlockingQueue
:内存占用根据实际存储的元素数量动态变化;适合容量不确定或需要动态调整的场景。
性能:
ArrayBlockingQueue
:插入和删除操作较为高效,因为是基于数组的索引操作;更好的缓存局部性,适合固定容量的高并发场景。
LinkedBlockingQueue
:插入和删除操作涉及链表节点的操作,可能稍慢;适合动态容量变化的场景,内存使用灵活。
使用场景:
ArrayBlockingQueue
:用于容量固定的情况,如固定大小的线程池任务队列;内存占用稳定,性能预测性强。
LinkedBlockingQueue
:用于需要大容量或容量可变的情况,如任务缓存队列;内存使用灵活,适应变化的工作负载。
总结:ArrayBlockingQueue
是基于固定大小数组的阻塞队列,适合固定容量的应用场景;而 LinkedBlockingQueue
是基于链表的阻塞队列,适合容量不确定的场景。二者在性能、内存占用和适用场景上各有特点。
SynchronousQueue
是 Java 中的一种特殊的阻塞队列,其实现和行为与传统的阻塞队列有显著不同。以下是 SynchronousQueue
的大致实现和特点:
数据结构:SynchronousQueue
不使用任何内部存储结构来保存元素。即,它不持有任何实际的元素。每个插入操作必须等待一个线程来执行移除操作,反之亦然。即,插入和移除操作是直接配对的。
操作特性:
插入操作 (put()
、offer()
):当调用 put()
或 offer()
方法插入元素时,当前线程会被阻塞,直到另一个线程调用 take()
或 poll()
方法从队列中移除该元素。这种机制确保了每个插入操作都有一个对应的移除操作。
移除操作 (take()
、poll()
):当调用 take()
或 poll()
方法移除元素时,当前线程会被阻塞,直到另一个线程调用 put()
或 offer()
方法将元素插入队列。这种机制确保了每个移除操作都有一个对应的插入操作。
线程交互:SynchronousQueue
实际上可以被视为一 “零容量” 队列,因为它不存储任何元素。插入和移除操作是完全同步的,必须在操作之间进行配对。
使用场景:常用于需要直接交换任务和线程的场景,如线程池的工作队列。线程池中的工作线程可以直接从队列中获取任务,而不需要额外的存储空间。
内部实现:SynchronousQueue
使用一组条件变量来实现线程间的配对机制。使用 ReentrantLock
和 Condition
对象来管理线程的等待和通知。
总的来说,SynchronousQueue
是一种特殊的阻塞队列,不存储元素,所有的插入操作都需要有相应的移除操作配对。它用于需要直接交换数据或任务的场景,如线程池的工作队列,其实现基于条件变量来管理线程的同步操作。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。