赞
踩
目录
干货分享,感谢您的阅读!
阻塞队列(Blocking Queue)是 Java 并发编程中的一种数据结构,它具有特殊的特性,能够在队列满或队列空时进行阻塞等待或唤醒操作。
定义与特性
常见实现类
我们在对Java线程池ThreadPoolExecutor的理解分析中使用的阻塞队列总结重新展示如下:
常用方法
使用场景
ArrayBlockingQueue 是基于数组的有界阻塞队列,它使用一个可重入锁来实现线程安全。以下是该类的关键方法和实现原理的概述:
构造方法
添加元素
获取元素
其他方法
ArrayBlockingQueue 使用了一个内部数组 items 来存储元素,同时使用两个指针 takeIndex 和 putIndex 来表示取出和放入元素的位置。通过这种方式,它能够保证元素的先进先出顺序。
在具体的实现中,ArrayBlockingQueue 使用了一个可重入锁 lock 来实现线程安全。不同的操作会在关键点上获取锁来确保线程安全性,例如在添加元素前获取锁、移除元素后释放锁等。
总体来说,ArrayBlockingQueue 是一个基于数组实现的有界阻塞队列,通过使用锁和条件变量来实现线程安全和阻塞等待的功能。
在 Spring 框架中,ArrayBlockingQueue 可以用于各种并发场景,以下是一些在 Spring 中使用 ArrayBlockingQueue 的应用代码举例:
异步任务处理
- @Component
- public class TaskProcessor {
- private final ExecutorService executorService;
- private final ArrayBlockingQueue<Runnable> taskQueue;
-
- public TaskProcessor() {
- taskQueue = new ArrayBlockingQueue<>(100);
- executorService = Executors.newFixedThreadPool(10);
- startProcessing();
- }
-
- public void submitTask(Runnable task) {
- try {
- // 将任务放入队列
- taskQueue.put(task);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- private void startProcessing() {
- executorService.execute(() -> {
- while (true) {
- try {
- // 从队列中获取任务
- Runnable task = taskQueue.take();
- // 提交任务给线程池执行
- executorService.submit(task);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- break;
- }
- }
- });
- }
- }
上述代码中,TaskProcessor 是一个异步任务处理器,它使用 ArrayBlockingQueue 来接收任务,并通过线程池来执行这些任务。通过将任务放入 ArrayBlockingQueue,可以实现任务的异步执行和流量控制。
事件发布与订阅
- @Component
- public class EventPublisher {
- private final ArrayBlockingQueue<Event> eventQueue;
-
- public EventPublisher() {
- eventQueue = new ArrayBlockingQueue<>(100);
- }
-
- public void publishEvent(Event event) {
- try {
- // 将事件放入队列
- eventQueue.put(event);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- // 在适当的时机调用此方法来处理事件
- public void processEvents() {
- while (!eventQueue.isEmpty()) {
- // 从队列中获取事件
- Event event = eventQueue.poll();
- // 处理事件逻辑
- }
- }
- }
在上述代码中,EventPublisher 是一个事件发布者,它使用 ArrayBlockingQueue 来接收发布的事件。通过将事件放入 ArrayBlockingQueue,可以实现事件的异步处理和事件的批量处理。
- import java.util.concurrent.ArrayBlockingQueue;
-
- public class OrderQueue {
- private final ArrayBlockingQueue<Order> orderQueue;
-
- public OrderQueue(int capacity) {
- orderQueue = new ArrayBlockingQueue<>(capacity);
- }
-
- public void addOrder(Order order) {
- try {
- // 将订单放入队列
- orderQueue.put(order);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- public Order processOrder() {
- try {
- // 从队列中获取并移除订单
- return orderQueue.take();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return null;
- }
- }
-
- public int getOrderCount() {
- // 获取队列中的订单数量
- return orderQueue.size();
- }
- }
-
- public class Order {
- private final String orderId;
- private final String customerName;
-
- public Order(String orderId, String customerName) {
- this.orderId = orderId;
- this.customerName = customerName;
- }
-
- public String getOrderId() {
- return orderId;
- }
-
- public String getCustomerName() {
- return customerName;
- }
- }
-
- public class OrderProcessor {
- private final OrderQueue orderQueue;
-
- public OrderProcessor() {
- orderQueue = new OrderQueue(100);
- }
-
- public void placeOrder(Order order) {
- // 将订单添加到订单队列
- orderQueue.addOrder(order);
- System.out.println("Order placed: " + order.getOrderId());
- }
-
- public void processOrders() {
- while (true) {
- // 从订单队列中获取订单进行处理
- Order order = orderQueue.processOrder();
- if (order != null) {
- System.out.println("Processing order: " + order.getOrderId() + " for customer: " + order.getCustomerName());
- // 处理订单逻辑
- }
- }
- }
- }
-
- public class Main {
- public static void main(String[] args) {
- OrderProcessor orderProcessor = new OrderProcessor();
-
- // 创建并提交一些订单
- orderProcessor.placeOrder(new Order("1001", "John Doe"));
- orderProcessor.placeOrder(new Order("1002", "Jane Smith"));
- orderProcessor.placeOrder(new Order("1003", "Alice Johnson"));
-
- // 处理订单
- orderProcessor.processOrders();
- }
- }
在上述代码中,我们有一个 Order 类表示订单对象。OrderQueue 类使用 ArrayBlockingQueue 来作为订单队列,它具有添加订单和处理订单的方法。OrderProcessor 类负责创建订单并将其放入订单队列,然后从订单队列中取出订单进行处理。
在主函数中,我们创建了一个 OrderProcessor 实例,然后创建一些订单并提交给 OrderProcessor 来处理。OrderProcessor 的 processOrders() 方法会不断地从订单队列中取出订单进行处理。
通过使用 ArrayBlockingQueue,我们可以实现订单的异步处理和流量控制,避免了线程之间的竞争和冲突,提高了系统的并发性能和稳定性。
LinkedBlockingQueue 是基于链表的可选有界或无界阻塞队列,它使用两个可重入锁和条件变量来实现线程安全。以下是该类的关键方法和实现原理的概述:
构造方法
添加元素
获取元素
其他方法
LinkedBlockingQueue 使用了一个内部链表来存储元素,同时使用两个指针 head 和 tail 来表示链表的头尾节点。通过这种方式,它能够保证元素的先进先出顺序。
在具体的实现中,LinkedBlockingQueue 使用了两个可重入锁 takeLock 和 putLock 来实现线程安全。
不同的操作会在关键点上获取相应的锁来确保线程安全性,例如在添加元素前获取 putLock,在获取元素前获取 takeLock,并在添加或移除元素后释放相应的锁。
总体来说,LinkedBlockingQueue 是一个基于链表实现的可选有界或无界阻塞队列,通过使用锁和条件变量来实现线程安全和阻塞等待的功能。
在 Spring 框架中,LinkedBlockingQueue 可以用于各种并发场景。以下是一些在 Spring 中使用 LinkedBlockingQueue 的应用代码分析(异步消息队列):
- @Component
- public class MessageQueue {
- private final LinkedBlockingQueue<Message> messageQueue;
-
- public MessageQueue() {
- messageQueue = new LinkedBlockingQueue<>();
- }
-
- public void addMessage(Message message) {
- try {
- messageQueue.put(message); // 将消息放入队列
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- public Message pollMessage() {
- return messageQueue.poll(); // 从队列中获取并移除消息
- }
-
- public int getMessageCount() {
- return messageQueue.size(); // 获取队列中的消息数量
- }
- }
-
- public class Message {
- private final String content;
-
- public Message(String content) {
- this.content = content;
- }
-
- public String getContent() {
- return content;
- }
- }
-
- public class MessageProcessor {
- private final MessageQueue messageQueue;
-
- public MessageProcessor() {
- messageQueue = new MessageQueue();
- }
-
- public void sendMessage(String content) {
- messageQueue.addMessage(new Message(content)); // 将消息添加到消息队列
- System.out.println("Message sent: " + content);
- }
-
- public void processMessages() {
- while (true) {
- Message message = messageQueue.pollMessage(); // 从消息队列中获取消息进行处理
- if (message != null) {
- System.out.println("Processing message: " + message.getContent());
- // 处理消息逻辑
- }
- }
- }
- }
在上述代码中,有一个 Message 类表示消息对象。MessageQueue 类使用 LinkedBlockingQueue 来作为消息队列,它具有添加消息和获取消息的方法。MessageProcessor 类负责创建消息并将其放入消息队列,然后从消息队列中取出消息进行处理。
在主函数中,我们创建了一个 MessageProcessor 实例,并使用 sendMessage() 方法向消息队列发送消息。MessageProcessor 的 processMessages() 方法会不断地从消息队列中取出消息进行处理。
通过使用 LinkedBlockingQueue,我们可以实现消息的异步处理和流量控制,确保消息的有序性和可靠性。
练习使用 LinkedBlockingQueue 来实现异步处理任务的场景。使用 LinkedBlockingQueue 的业务场景代码示例:
- import org.springframework.stereotype.Component;
-
- import java.util.concurrent.LinkedBlockingQueue;
-
- @Component
- public class TaskQueue {
- private final LinkedBlockingQueue<Task> taskQueue;
-
- public TaskQueue() {
- taskQueue = new LinkedBlockingQueue<>();
- }
-
- public void addTask(Task task) {
- try {
- taskQueue.put(task); // 将任务放入队列
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- public Task takeTask() {
- try {
- return taskQueue.take(); // 从队列中获取并移除任务
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return null;
- }
- }
-
- public int getTaskCount() {
- return taskQueue.size(); // 获取队列中的任务数量
- }
- }
-
- public class Task {
- private final String taskId;
- private final String taskName;
-
- public Task(String taskId, String taskName) {
- this.taskId = taskId;
- this.taskName = taskName;
- }
-
- public String getTaskId() {
- return taskId;
- }
-
- public String getTaskName() {
- return taskName;
- }
- }
-
- @Component
- public class TaskProcessor {
- private final TaskQueue taskQueue;
-
- public TaskProcessor(TaskQueue taskQueue) {
- this.taskQueue = taskQueue;
- }
-
- public void submitTask(String taskId, String taskName) {
- Task task = new Task(taskId, taskName);
- taskQueue.addTask(task); // 将任务添加到任务队列
- System.out.println("Task submitted: " + taskId);
- }
-
- public void processTasks() {
- while (true) {
- Task task = taskQueue.takeTask(); // 从任务队列中获取任务进行处理
- if (task != null) {
- System.out.println("Processing task: " + task.getTaskId() + ", " + task.getTaskName());
- // 处理任务逻辑
- }
- }
- }
- }
-
- public class Main {
- public static void main(String[] args) {
- // 创建 Spring 应用上下文
- ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);
-
- // 从应用上下文中获取 TaskProcessor 实例
- TaskProcessor taskProcessor = context.getBean(TaskProcessor.class);
-
- // 提交一些任务
- taskProcessor.submitTask("1", "Task 1");
- taskProcessor.submitTask("2", "Task 2");
- taskProcessor.submitTask("3", "Task 3");
-
- // 处理任务
- taskProcessor.processTasks();
- }
- }
在上述代码中,Task 类表示一个任务对象。TaskQueue 类使用 LinkedBlockingQueue 来作为任务队列,它具有添加任务和获取任务的方法。TaskProcessor 类负责创建任务并将其放入任务队列,然后从任务队列中取出任务进行处理。
在主函数中,我们使用 Spring 框架的 ApplicationContext 来获取 TaskProcessor 实例,并使用 submitTask() 方法向任务队列提交任务。TaskProcessor 的 processTasks() 方法会不断地从任务队列中取出任务进行处理。
PriorityBlockingQueue 是一个可阻塞的无界优先级队列,它基于数组实现,可以存储具有可比较性的元素。一个无界的具有优先级的阻塞队列,使用跟PriorityQueue相同的顺序规则,默认顺序是自然顺序(从小到大)。若传入的对象,不支持比较将报错( ClassCastException)。不允许null。
底层使用的是基于数组的平衡二叉树堆实现(它的优先级的实现)。
公共方法使用单锁ReetrantLock保证线程的安全性。
构造方法
添加元素
获取元素
其他方法
PriorityBlockingQueue 内部使用了一个可重入锁和一个条件变量来实现线程安全和阻塞等待的功能。在添加元素和获取元素时,会获取锁来确保线程安全,并根据队列是否已满或为空来决定是否阻塞等待。
它的元素存储在数组中,并通过比较器来维护元素的优先级顺序。在插入元素时,会按照优先级的顺序将元素插入到合适的位置。在获取元素时,会返回优先级最高的元素,并将其从队列中移除。
总体来说,PriorityBlockingQueue 是一个基于数组实现的可阻塞的无界优先级队列。它可以在多线程环境下实现元素的有序存储和获取,并提供阻塞等待的能力。
假设我们有一个在线商城系统,需要按照订单的优先级进行处理。高优先级的订单需要被尽快处理,而低优先级的订单可以稍后处理。我们可以使用 PriorityBlockingQueue 来实现订单队列的管理。
首先,定义一个 Order 类表示订单对象,该类包含订单的编号和优先级:
- public class Order implements Comparable<Order> {
- private String orderId;
- private int priority;
-
- public Order(String orderId, int priority) {
- this.orderId = orderId;
- this.priority = priority;
- }
-
- public String getOrderId() {
- return orderId;
- }
-
- public int getPriority() {
- return priority;
- }
-
- @Override
- public int compareTo(Order other) {
- return Integer.compare(this.priority, other.priority);
- }
- }
然后,我们创建一个 OrderProcessor 类来处理订单:
- @Component
- public class OrderProcessor {
- private final PriorityBlockingQueue<Order> orderQueue;
-
- public OrderProcessor() {
- orderQueue = new PriorityBlockingQueue<>();
- }
-
- public void addOrder(Order order) {
- orderQueue.put(order); // 将订单放入队列
- System.out.println("Order added: " + order.getOrderId());
- }
-
- public void processOrders() {
- while (true) {
- try {
- Order order = orderQueue.take(); // 从队列中获取并移除订单
- System.out.println("Processing order: " + order.getOrderId());
- // 处理订单逻辑
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
- }
在上述代码中,我们创建了一个 OrderProcessor 类,该类使用 PriorityBlockingQueue 作为订单队列。addOrder() 方法用于将订单放入队列,processOrders() 方法用于从队列中获取订单并处理。
在主函数中,我们创建了一个 OrderProcessor 实例,并使用 addOrder() 方法向订单队列添加订单。OrderProcessor 的 processOrders() 方法会不断地从订单队列中获取订单进行处理。
通过使用 PriorityBlockingQueue,我们可以实现按照订单优先级进行排序和处理的功能。高优先级的订单会被优先处理,从而提高订单处理的效率和响应性。
SynchronousQueue作为阻塞队列的时候,对于每一个take的线程会阻塞直到有一个put的线程放入元素为止,反之亦然。在SynchronousQueue内部没有任何存放元素的能力。所以类似peek操作或者迭代器操作也是无效的,元素只能通过put类操作或者take类操作才有效。通常队列的第一个元素是当前第一个等待的线程。如果没有线程阻塞在该队列则poll会返回null。从Collection的视角来看SynchronousQueue表现为一个空的集合。
SynchronousQueue相似于使用CSP和Ada算法(不知道怎么具体指什么算法),他非常适合做交换的工作,生产者的线程必须与消费者的线程同步以传递某些信息、事件或任务
SynchronousQueue支持支持生产者和消费者等待的公平性策略。默认情况下,不能保证生产消费的顺序。如果是公平锁的话可以保证当前第一个队首的线程是等待时间最长的线程,这时可以视SynchronousQueue为一个FIFO队列。
在实际业务应用中,SynchronousQueue 可以用于一些特定的场景,其中一个常见的应用是实现线程间的任务调度和处理。以下是一个使用 SynchronousQueue 的业务应用举例:
假设我们有一个任务调度器,其中有多个工作线程和一个任务队列,工作线程从任务队列中获取任务并进行处理。我们希望能够动态控制任务的执行顺序,确保高优先级的任务能够被尽快处理,而低优先级的任务则可以稍后处理。
在这种情况下,我们可以使用 SynchronousQueue 来实现任务队列。每个工作线程都会从 SynchronousQueue 中获取任务进行处理,当队列为空时,工作线程将被阻塞等待新的任务到达。
下面是一个简化的示例代码:
- import java.util.concurrent.SynchronousQueue;
-
- public class TaskScheduler {
- private final SynchronousQueue<Runnable> taskQueue;
-
- public TaskScheduler() {
- taskQueue = new SynchronousQueue<>();
- }
-
- public void scheduleTask(Runnable task) {
- try {
- taskQueue.put(task); // 将任务放入队列
- System.out.println("Task scheduled");
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- public void startWorkers(int numWorkers) {
- for (int i = 0; i < numWorkers; i++) {
- Thread workerThread = new Thread(() -> {
- while (true) {
- try {
- Runnable task = taskQueue.take(); // 从队列中获取任务
- System.out.println("Task started");
- // 处理任务逻辑
- task.run();
- System.out.println("Task completed");
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- });
- workerThread.start();
- }
- }
- }
在上述代码中,我们创建了一个 TaskScheduler 类,它使用 SynchronousQueue 作为任务队列。scheduleTask() 方法用于将任务放入队列,startWorkers() 方法用于启动工作线程并从队列中获取任务进行处理。
在主函数中,我们创建了一个 TaskScheduler 实例,并使用 scheduleTask() 方法向任务队列中添加任务。startWorkers() 方法会启动指定数量的工作线程,并从任务队列中获取任务进行处理。
通过使用 SynchronousQueue,我们实现了动态的任务调度和处理机制。当有新的任务到达时,工作线程会立即从队列中获取任务并进行处理,确保高优先级的任务能够尽快执行。
这个示例展示了在业务应用中使用 SynchronousQueue 的一个场景。通过合理地使用 SynchronousQueue,我们能够实现任务的有序调度和处理,并提供灵活的控制机制。
ArrayBlockingQueue源码分析_程序员小潘的博客-CSDN博客
ArrayBlockingQueue 源码解析_arrayblockingqueue源码_章全蛋的博客-CSDN博客
阻塞队列之ArrayBlockingQueue源码解析 - 知乎
线程安全的队列-ArrayBlockingQueue源码分析_51CTO博客_arrayblockingqueue 线程安全
从源码全面解析ArrayBlockingQueue的来龙去脉_牛客网
java阻塞队列LinkedBlockingQueue源码分析_l;inkedblockqueue_lianchaozhao的博客-CSDN博客
Java 源码 - LinkedBlockingQueue 源码解析_Q.E.D.的博客-CSDN博客
LinkedBlockingQueue源码解析_wx613dbd09b1332的技术博客_51CTO博客
从源码解析LinkedBlockingQueue的来龙去脉_牛客网
https://www.cnblogs.com/chafry/p/16783035.html
【JAVA多线程】PriorityBlockingQueue源码分析_吕布辕门的博客-CSDN博客
阻塞队列 — PriorityBlockingQueue源码分析 - 知乎
JAVA并发(7)-并发队列PriorityBlockingQueue的源码分析_mob604756f1c0ca的技术博客_51CTO博客
源码分析-SynchronousQueue_synchronousqueue源码_demon7552003的博客-CSDN博客
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。