当前位置:   article > 正文

Java中的阻塞队列学习与总结体会_orderqueue.take

orderqueue.take

目录

 一、阻塞队列的基本介绍

      二、ArrayBlockingQueue基本分析

(一)源码分析

(二)Spring中的应用

(三)业务应用代码举例

三、LinkedBlockingQueue基本分析

(一)源码分析

(二)Spring中的应用

(三)业务应用代码举例

四、PriorityBlockingQueue

(一)源码分析

(二)业务应用代码举例

五、SynchronousQueue

(一)源码分析

(二)业务应用代码举例

参考文章链接等


干货分享,感谢您的阅读!

一、阻塞队列的基本介绍

阻塞队列(Blocking Queue)是 Java 并发编程中的一种数据结构,它具有特殊的特性,能够在队列满或队列空时进行阻塞等待或唤醒操作。

定义与特性

  • 阻塞队列是一种线程安全的队列数据结构,具有先进先出(FIFO)的特性。
  • 阻塞队列支持在队列为空时阻塞等待获取元素,或者在队列已满时阻塞等待添加元素的操作。
  • 阻塞队列提供了线程安全的添加、移除和检查操作,避免了手动实现线程同步的复杂性。

常见实现类

  • ArrayBlockingQueue:一个基于数组的有界阻塞队列,内部使用可重入锁实现线程安全。
  • LinkedBlockingQueue:一个基于链表的可选有界或无界阻塞队列,内部使用不同的锁实现。
  • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列,内部使用可重入锁实现线程安全。
  • SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等待对应的移除操作。

我们在对Java线程池ThreadPoolExecutor的理解分析中使用的阻塞队列总结重新展示如下:

 常用方法

  • put(E element):将指定元素添加到队列,如果队列已满,则阻塞等待。
  • take():从队列中获取并移除一个元素,如果队列为空,则阻塞等待。
  • offer(E element):尝试将指定元素添加到队列,如果队列已满,则返回 false。
  • poll():从队列中获取并移除一个元素,如果队列为空,则返回 null。
  • size():返回队列中的元素数量。

使用场景

  • 阻塞队列适用于生产者-消费者模式,其中生产者和消费者之间可以解耦并发操作。
  • 阻塞队列可以用于实现线程池、消息队列、任务调度等多线程编程场景。
  • 阻塞队列还可以用于解决不同线程之间的协作问题,例如同步点、线程间的数据交换等。
  • 通过使用阻塞队列,可以简化多线程编程中的线程同步和通信操作,提高代码的可读性和可维护性。

二、ArrayBlockingQueue基本分析

(一)源码分析

ArrayBlockingQueue 是基于数组的有界阻塞队列,它使用一个可重入锁来实现线程安全。以下是该类的关键方法和实现原理的概述:

构造方法

  • ArrayBlockingQueue(int capacity):创建一个指定容量的 ArrayBlockingQueue 实例。内部会创建一个数组用于存储元素,并初始化相关变量。

添加元素

  • put(E element):将指定元素添加到队列的末尾,如果队列已满,则阻塞等待。在添加元素之前会获取锁,并根据队列是否已满来决定是进行阻塞还是添加元素。
  • offer(E element):尝试将指定元素添加到队列的末尾,如果队列已满,则返回 false。在添加元素之前会获取锁,并根据队列是否已满来决定是否添加元素。

获取元素

  • take():从队列的头部获取并移除一个元素,如果队列为空,则阻塞等待。在获取元素之前会获取锁,并根据队列是否为空来决定是否阻塞等待。
  • poll():从队列的头部获取并移除一个元素,如果队列为空,则返回 null。在获取元素之前会获取锁,并根据队列是否为空来决定是否返回 null。

其他方法

  • size():返回队列中的元素数量。
  • remainingCapacity():返回队列剩余的容量。
  • peek():获取队列头部的元素,但不移除。

ArrayBlockingQueue 使用了一个内部数组 items 来存储元素,同时使用两个指针 takeIndex 和 putIndex 来表示取出和放入元素的位置。通过这种方式,它能够保证元素的先进先出顺序。

在具体的实现中,ArrayBlockingQueue 使用了一个可重入锁 lock 来实现线程安全。不同的操作会在关键点上获取锁来确保线程安全性,例如在添加元素前获取锁、移除元素后释放锁等。

总体来说,ArrayBlockingQueue 是一个基于数组实现的有界阻塞队列,通过使用锁和条件变量来实现线程安全和阻塞等待的功能

(二)Spring中的应用

在 Spring 框架中,ArrayBlockingQueue 可以用于各种并发场景,以下是一些在 Spring 中使用 ArrayBlockingQueue 的应用代码举例:

异步任务处理

  1. @Component
  2. public class TaskProcessor {
  3. private final ExecutorService executorService;
  4. private final ArrayBlockingQueue<Runnable> taskQueue;
  5. public TaskProcessor() {
  6. taskQueue = new ArrayBlockingQueue<>(100);
  7. executorService = Executors.newFixedThreadPool(10);
  8. startProcessing();
  9. }
  10. public void submitTask(Runnable task) {
  11. try {
  12. // 将任务放入队列
  13. taskQueue.put(task);
  14. } catch (InterruptedException e) {
  15. Thread.currentThread().interrupt();
  16. }
  17. }
  18. private void startProcessing() {
  19. executorService.execute(() -> {
  20. while (true) {
  21. try {
  22. // 从队列中获取任务
  23. Runnable task = taskQueue.take();
  24. // 提交任务给线程池执行
  25. executorService.submit(task);
  26. } catch (InterruptedException e) {
  27. Thread.currentThread().interrupt();
  28. break;
  29. }
  30. }
  31. });
  32. }
  33. }

上述代码中,TaskProcessor 是一个异步任务处理器,它使用 ArrayBlockingQueue 来接收任务,并通过线程池来执行这些任务。通过将任务放入 ArrayBlockingQueue,可以实现任务的异步执行和流量控制。

事件发布与订阅

  1. @Component
  2. public class EventPublisher {
  3. private final ArrayBlockingQueue<Event> eventQueue;
  4. public EventPublisher() {
  5. eventQueue = new ArrayBlockingQueue<>(100);
  6. }
  7. public void publishEvent(Event event) {
  8. try {
  9. // 将事件放入队列
  10. eventQueue.put(event);
  11. } catch (InterruptedException e) {
  12. Thread.currentThread().interrupt();
  13. }
  14. }
  15. // 在适当的时机调用此方法来处理事件
  16. public void processEvents() {
  17. while (!eventQueue.isEmpty()) {
  18. // 从队列中获取事件
  19. Event event = eventQueue.poll();
  20. // 处理事件逻辑
  21. }
  22. }
  23. }

在上述代码中,EventPublisher 是一个事件发布者,它使用 ArrayBlockingQueue 来接收发布的事件。通过将事件放入 ArrayBlockingQueue,可以实现事件的异步处理和事件的批量处理。

(三)业务应用代码举例

  1. import java.util.concurrent.ArrayBlockingQueue;
  2. public class OrderQueue {
  3. private final ArrayBlockingQueue<Order> orderQueue;
  4. public OrderQueue(int capacity) {
  5. orderQueue = new ArrayBlockingQueue<>(capacity);
  6. }
  7. public void addOrder(Order order) {
  8. try {
  9. // 将订单放入队列
  10. orderQueue.put(order);
  11. } catch (InterruptedException e) {
  12. Thread.currentThread().interrupt();
  13. }
  14. }
  15. public Order processOrder() {
  16. try {
  17. // 从队列中获取并移除订单
  18. return orderQueue.take();
  19. } catch (InterruptedException e) {
  20. Thread.currentThread().interrupt();
  21. return null;
  22. }
  23. }
  24. public int getOrderCount() {
  25. // 获取队列中的订单数量
  26. return orderQueue.size();
  27. }
  28. }
  29. public class Order {
  30. private final String orderId;
  31. private final String customerName;
  32. public Order(String orderId, String customerName) {
  33. this.orderId = orderId;
  34. this.customerName = customerName;
  35. }
  36. public String getOrderId() {
  37. return orderId;
  38. }
  39. public String getCustomerName() {
  40. return customerName;
  41. }
  42. }
  43. public class OrderProcessor {
  44. private final OrderQueue orderQueue;
  45. public OrderProcessor() {
  46. orderQueue = new OrderQueue(100);
  47. }
  48. public void placeOrder(Order order) {
  49. // 将订单添加到订单队列
  50. orderQueue.addOrder(order);
  51. System.out.println("Order placed: " + order.getOrderId());
  52. }
  53. public void processOrders() {
  54. while (true) {
  55. // 从订单队列中获取订单进行处理
  56. Order order = orderQueue.processOrder();
  57. if (order != null) {
  58. System.out.println("Processing order: " + order.getOrderId() + " for customer: " + order.getCustomerName());
  59. // 处理订单逻辑
  60. }
  61. }
  62. }
  63. }
  64. public class Main {
  65. public static void main(String[] args) {
  66. OrderProcessor orderProcessor = new OrderProcessor();
  67. // 创建并提交一些订单
  68. orderProcessor.placeOrder(new Order("1001", "John Doe"));
  69. orderProcessor.placeOrder(new Order("1002", "Jane Smith"));
  70. orderProcessor.placeOrder(new Order("1003", "Alice Johnson"));
  71. // 处理订单
  72. orderProcessor.processOrders();
  73. }
  74. }

在上述代码中,我们有一个 Order 类表示订单对象。OrderQueue 类使用 ArrayBlockingQueue 来作为订单队列,它具有添加订单和处理订单的方法。OrderProcessor 类负责创建订单并将其放入订单队列,然后从订单队列中取出订单进行处理。

在主函数中,我们创建了一个 OrderProcessor 实例,然后创建一些订单并提交给 OrderProcessor 来处理。OrderProcessor 的 processOrders() 方法会不断地从订单队列中取出订单进行处理。

通过使用 ArrayBlockingQueue,我们可以实现订单的异步处理和流量控制,避免了线程之间的竞争和冲突,提高了系统的并发性能和稳定性。

三、LinkedBlockingQueue基本分析

(一)源码分析

LinkedBlockingQueue 是基于链表的可选有界或无界阻塞队列,它使用两个可重入锁和条件变量来实现线程安全。以下是该类的关键方法和实现原理的概述:

构造方法

  • LinkedBlockingQueue():创建一个无界的 LinkedBlockingQueue 实例。内部会创建一个链表用于存储元素,并初始化相关变量。
  • LinkedBlockingQueue(int capacity):创建一个指定容量的 LinkedBlockingQueue 实例。内部会根据指定容量来限制队列的大小。

添加元素

  • put(E element):将指定元素添加到队列的末尾,如果队列已满,则阻塞等待。在添加元素之前会获取锁,并根据队列是否已满来决定是进行阻塞还是添加元素。
  • offer(E element):尝试将指定元素添加到队列的末尾,如果队列已满,则返回 false。在添加元素之前会获取锁,并根据队列是否已满来决定是否添加元素。

获取元素

  • take():从队列的头部获取并移除一个元素,如果队列为空,则阻塞等待。在获取元素之前会获取锁,并根据队列是否为空来决定是否阻塞等待。
  • poll():从队列的头部获取并移除一个元素,如果队列为空,则返回 null。在获取元素之前会获取锁,并根据队列是否为空来决定是否返回 null。

其他方法

  • size():返回队列中的元素数量。
  • remainingCapacity():返回队列剩余的容量。
  • peek():获取队列头部的元素,但不移除。

LinkedBlockingQueue 使用了一个内部链表来存储元素,同时使用两个指针 head 和 tail 来表示链表的头尾节点。通过这种方式,它能够保证元素的先进先出顺序。

在具体的实现中,LinkedBlockingQueue 使用了两个可重入锁 takeLock 和 putLock 来实现线程安全。

不同的操作会在关键点上获取相应的锁来确保线程安全性,例如在添加元素前获取 putLock,在获取元素前获取 takeLock,并在添加或移除元素后释放相应的锁。

总体来说,LinkedBlockingQueue 是一个基于链表实现的可选有界或无界阻塞队列,通过使用锁和条件变量来实现线程安全和阻塞等待的功能。

(二)Spring中的应用

在 Spring 框架中,LinkedBlockingQueue 可以用于各种并发场景。以下是一些在 Spring 中使用 LinkedBlockingQueue 的应用代码分析(异步消息队列):

  1. @Component
  2. public class MessageQueue {
  3. private final LinkedBlockingQueue<Message> messageQueue;
  4. public MessageQueue() {
  5. messageQueue = new LinkedBlockingQueue<>();
  6. }
  7. public void addMessage(Message message) {
  8. try {
  9. messageQueue.put(message); // 将消息放入队列
  10. } catch (InterruptedException e) {
  11. Thread.currentThread().interrupt();
  12. }
  13. }
  14. public Message pollMessage() {
  15. return messageQueue.poll(); // 从队列中获取并移除消息
  16. }
  17. public int getMessageCount() {
  18. return messageQueue.size(); // 获取队列中的消息数量
  19. }
  20. }
  21. public class Message {
  22. private final String content;
  23. public Message(String content) {
  24. this.content = content;
  25. }
  26. public String getContent() {
  27. return content;
  28. }
  29. }
  30. public class MessageProcessor {
  31. private final MessageQueue messageQueue;
  32. public MessageProcessor() {
  33. messageQueue = new MessageQueue();
  34. }
  35. public void sendMessage(String content) {
  36. messageQueue.addMessage(new Message(content)); // 将消息添加到消息队列
  37. System.out.println("Message sent: " + content);
  38. }
  39. public void processMessages() {
  40. while (true) {
  41. Message message = messageQueue.pollMessage(); // 从消息队列中获取消息进行处理
  42. if (message != null) {
  43. System.out.println("Processing message: " + message.getContent());
  44. // 处理消息逻辑
  45. }
  46. }
  47. }
  48. }

在上述代码中,有一个 Message 类表示消息对象。MessageQueue 类使用 LinkedBlockingQueue 来作为消息队列,它具有添加消息和获取消息的方法。MessageProcessor 类负责创建消息并将其放入消息队列,然后从消息队列中取出消息进行处理。

在主函数中,我们创建了一个 MessageProcessor 实例,并使用 sendMessage() 方法向消息队列发送消息。MessageProcessor 的 processMessages() 方法会不断地从消息队列中取出消息进行处理。

通过使用 LinkedBlockingQueue,我们可以实现消息的异步处理和流量控制,确保消息的有序性和可靠性。

(三)业务应用代码举例

练习使用 LinkedBlockingQueue 来实现异步处理任务的场景。使用 LinkedBlockingQueue 的业务场景代码示例:

  1. import org.springframework.stereotype.Component;
  2. import java.util.concurrent.LinkedBlockingQueue;
  3. @Component
  4. public class TaskQueue {
  5. private final LinkedBlockingQueue<Task> taskQueue;
  6. public TaskQueue() {
  7. taskQueue = new LinkedBlockingQueue<>();
  8. }
  9. public void addTask(Task task) {
  10. try {
  11. taskQueue.put(task); // 将任务放入队列
  12. } catch (InterruptedException e) {
  13. Thread.currentThread().interrupt();
  14. }
  15. }
  16. public Task takeTask() {
  17. try {
  18. return taskQueue.take(); // 从队列中获取并移除任务
  19. } catch (InterruptedException e) {
  20. Thread.currentThread().interrupt();
  21. return null;
  22. }
  23. }
  24. public int getTaskCount() {
  25. return taskQueue.size(); // 获取队列中的任务数量
  26. }
  27. }
  28. public class Task {
  29. private final String taskId;
  30. private final String taskName;
  31. public Task(String taskId, String taskName) {
  32. this.taskId = taskId;
  33. this.taskName = taskName;
  34. }
  35. public String getTaskId() {
  36. return taskId;
  37. }
  38. public String getTaskName() {
  39. return taskName;
  40. }
  41. }
  42. @Component
  43. public class TaskProcessor {
  44. private final TaskQueue taskQueue;
  45. public TaskProcessor(TaskQueue taskQueue) {
  46. this.taskQueue = taskQueue;
  47. }
  48. public void submitTask(String taskId, String taskName) {
  49. Task task = new Task(taskId, taskName);
  50. taskQueue.addTask(task); // 将任务添加到任务队列
  51. System.out.println("Task submitted: " + taskId);
  52. }
  53. public void processTasks() {
  54. while (true) {
  55. Task task = taskQueue.takeTask(); // 从任务队列中获取任务进行处理
  56. if (task != null) {
  57. System.out.println("Processing task: " + task.getTaskId() + ", " + task.getTaskName());
  58. // 处理任务逻辑
  59. }
  60. }
  61. }
  62. }
  63. public class Main {
  64. public static void main(String[] args) {
  65. // 创建 Spring 应用上下文
  66. ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);
  67. // 从应用上下文中获取 TaskProcessor 实例
  68. TaskProcessor taskProcessor = context.getBean(TaskProcessor.class);
  69. // 提交一些任务
  70. taskProcessor.submitTask("1", "Task 1");
  71. taskProcessor.submitTask("2", "Task 2");
  72. taskProcessor.submitTask("3", "Task 3");
  73. // 处理任务
  74. taskProcessor.processTasks();
  75. }
  76. }

在上述代码中,Task 类表示一个任务对象。TaskQueue 类使用 LinkedBlockingQueue 来作为任务队列,它具有添加任务和获取任务的方法。TaskProcessor 类负责创建任务并将其放入任务队列,然后从任务队列中取出任务进行处理。

在主函数中,我们使用 Spring 框架的 ApplicationContext 来获取 TaskProcessor 实例,并使用 submitTask() 方法向任务队列提交任务。TaskProcessor 的 processTasks() 方法会不断地从任务队列中取出任务进行处理。

四、PriorityBlockingQueue

(一)源码分析

PriorityBlockingQueue 是一个可阻塞的无界优先级队列,它基于数组实现,可以存储具有可比较性的元素。一个无界的具有优先级的阻塞队列,使用跟PriorityQueue相同的顺序规则,默认顺序是自然顺序(从小到大)。若传入的对象,不支持比较将报错( ClassCastException)。不允许null
底层使用的是基于数组的平衡二叉树堆实现(它的优先级的实现)。
公共方法使用单锁ReetrantLock保证线程的安全性

构造方法

  • PriorityBlockingQueue():创建一个空的 PriorityBlockingQueue 实例。内部会初始化数组和相关变量。
  • PriorityBlockingQueue(int initialCapacity):创建一个具有指定初始容量的 PriorityBlockingQueue 实例。

添加元素

  • put(E element):将指定元素添加到队列中。如果队列已满,则阻塞等待。在添加元素之前会获取锁,并根据队列是否已满来决定是进行阻塞还是添加元素。

获取元素

  • take():获取并移除队列中的第一个元素。如果队列为空,则阻塞等待。在获取元素之前会获取锁,并根据队列是否为空来决定是否阻塞等待。

其他方法

  • size():返回队列中的元素数量。
  • peek():获取队列中的第一个元素,但不移除。

PriorityBlockingQueue 内部使用了一个可重入锁和一个条件变量来实现线程安全和阻塞等待的功能。在添加元素和获取元素时,会获取锁来确保线程安全,并根据队列是否已满或为空来决定是否阻塞等待。

它的元素存储在数组中,并通过比较器来维护元素的优先级顺序。在插入元素时,会按照优先级的顺序将元素插入到合适的位置。在获取元素时,会返回优先级最高的元素,并将其从队列中移除。

总体来说,PriorityBlockingQueue 是一个基于数组实现的可阻塞的无界优先级队列。它可以在多线程环境下实现元素的有序存储和获取,并提供阻塞等待的能力

(二)业务应用代码举例

假设我们有一个在线商城系统,需要按照订单的优先级进行处理。高优先级的订单需要被尽快处理,而低优先级的订单可以稍后处理。我们可以使用 PriorityBlockingQueue 来实现订单队列的管理。

首先,定义一个 Order 类表示订单对象,该类包含订单的编号和优先级:

  1. public class Order implements Comparable<Order> {
  2. private String orderId;
  3. private int priority;
  4. public Order(String orderId, int priority) {
  5. this.orderId = orderId;
  6. this.priority = priority;
  7. }
  8. public String getOrderId() {
  9. return orderId;
  10. }
  11. public int getPriority() {
  12. return priority;
  13. }
  14. @Override
  15. public int compareTo(Order other) {
  16. return Integer.compare(this.priority, other.priority);
  17. }
  18. }

然后,我们创建一个 OrderProcessor 类来处理订单:

  1. @Component
  2. public class OrderProcessor {
  3. private final PriorityBlockingQueue<Order> orderQueue;
  4. public OrderProcessor() {
  5. orderQueue = new PriorityBlockingQueue<>();
  6. }
  7. public void addOrder(Order order) {
  8. orderQueue.put(order); // 将订单放入队列
  9. System.out.println("Order added: " + order.getOrderId());
  10. }
  11. public void processOrders() {
  12. while (true) {
  13. try {
  14. Order order = orderQueue.take(); // 从队列中获取并移除订单
  15. System.out.println("Processing order: " + order.getOrderId());
  16. // 处理订单逻辑
  17. } catch (InterruptedException e) {
  18. Thread.currentThread().interrupt();
  19. }
  20. }
  21. }
  22. }

在上述代码中,我们创建了一个 OrderProcessor 类,该类使用 PriorityBlockingQueue 作为订单队列。addOrder() 方法用于将订单放入队列,processOrders() 方法用于从队列中获取订单并处理。

在主函数中,我们创建了一个 OrderProcessor 实例,并使用 addOrder() 方法向订单队列添加订单。OrderProcessor 的 processOrders() 方法会不断地从订单队列中获取订单进行处理。

通过使用 PriorityBlockingQueue,我们可以实现按照订单优先级进行排序和处理的功能。高优先级的订单会被优先处理,从而提高订单处理的效率和响应性。

五、SynchronousQueue

(一)源码分析

SynchronousQueue作为阻塞队列的时候,对于每一个take的线程会阻塞直到有一个put的线程放入元素为止,反之亦然。在SynchronousQueue内部没有任何存放元素的能力。所以类似peek操作或者迭代器操作也是无效的,元素只能通过put类操作或者take类操作才有效。通常队列的第一个元素是当前第一个等待的线程。如果没有线程阻塞在该队列则poll会返回null。从Collection的视角来看SynchronousQueue表现为一个空的集合

SynchronousQueue相似于使用CSP和Ada算法(不知道怎么具体指什么算法),他非常适合做交换的工作,生产者的线程必须与消费者的线程同步以传递某些信息、事件或任务

SynchronousQueue支持支持生产者和消费者等待的公平性策略。默认情况下,不能保证生产消费的顺序。如果是公平锁的话可以保证当前第一个队首的线程是等待时间最长的线程,这时可以视SynchronousQueue为一个FIFO队列

(二)业务应用代码举例

在实际业务应用中,SynchronousQueue 可以用于一些特定的场景,其中一个常见的应用是实现线程间的任务调度和处理。以下是一个使用 SynchronousQueue 的业务应用举例:

假设我们有一个任务调度器,其中有多个工作线程和一个任务队列,工作线程从任务队列中获取任务并进行处理。我们希望能够动态控制任务的执行顺序,确保高优先级的任务能够被尽快处理,而低优先级的任务则可以稍后处理。

在这种情况下,我们可以使用 SynchronousQueue 来实现任务队列。每个工作线程都会从 SynchronousQueue 中获取任务进行处理,当队列为空时,工作线程将被阻塞等待新的任务到达。

下面是一个简化的示例代码:

  1. import java.util.concurrent.SynchronousQueue;
  2. public class TaskScheduler {
  3. private final SynchronousQueue<Runnable> taskQueue;
  4. public TaskScheduler() {
  5. taskQueue = new SynchronousQueue<>();
  6. }
  7. public void scheduleTask(Runnable task) {
  8. try {
  9. taskQueue.put(task); // 将任务放入队列
  10. System.out.println("Task scheduled");
  11. } catch (InterruptedException e) {
  12. Thread.currentThread().interrupt();
  13. }
  14. }
  15. public void startWorkers(int numWorkers) {
  16. for (int i = 0; i < numWorkers; i++) {
  17. Thread workerThread = new Thread(() -> {
  18. while (true) {
  19. try {
  20. Runnable task = taskQueue.take(); // 从队列中获取任务
  21. System.out.println("Task started");
  22. // 处理任务逻辑
  23. task.run();
  24. System.out.println("Task completed");
  25. } catch (InterruptedException e) {
  26. Thread.currentThread().interrupt();
  27. }
  28. }
  29. });
  30. workerThread.start();
  31. }
  32. }
  33. }

在上述代码中,我们创建了一个 TaskScheduler 类,它使用 SynchronousQueue 作为任务队列。scheduleTask() 方法用于将任务放入队列,startWorkers() 方法用于启动工作线程并从队列中获取任务进行处理。

在主函数中,我们创建了一个 TaskScheduler 实例,并使用 scheduleTask() 方法向任务队列中添加任务。startWorkers() 方法会启动指定数量的工作线程,并从任务队列中获取任务进行处理。

通过使用 SynchronousQueue,我们实现了动态的任务调度和处理机制。当有新的任务到达时,工作线程会立即从队列中获取任务并进行处理,确保高优先级的任务能够尽快执行。

这个示例展示了在业务应用中使用 SynchronousQueue 的一个场景。通过合理地使用 SynchronousQueue,我们能够实现任务的有序调度和处理,并提供灵活的控制机制。

参考文章链接等

ArrayBlockingQueue源码分析_程序员小潘的博客-CSDN博客

ArrayBlockingQueue 源码解析_arrayblockingqueue源码_章全蛋的博客-CSDN博客

阻塞队列之ArrayBlockingQueue源码解析 - 知乎

线程安全的队列-ArrayBlockingQueue源码分析_51CTO博客_arrayblockingqueue 线程安全

从源码全面解析ArrayBlockingQueue的来龙去脉_牛客网

java阻塞队列LinkedBlockingQueue源码分析_l;inkedblockqueue_lianchaozhao的博客-CSDN博客

Java 源码 - LinkedBlockingQueue 源码解析_Q.E.D.的博客-CSDN博客

LinkedBlockingQueue源码解析_wx613dbd09b1332的技术博客_51CTO博客

从源码解析LinkedBlockingQueue的来龙去脉_牛客网

https://www.cnblogs.com/chafry/p/16783035.html

【JAVA多线程】PriorityBlockingQueue源码分析_吕布辕门的博客-CSDN博客

阻塞队列 — PriorityBlockingQueue源码分析 - 知乎

JAVA并发(7)-并发队列PriorityBlockingQueue的源码分析_mob604756f1c0ca的技术博客_51CTO博客

源码分析-SynchronousQueue_synchronousqueue源码_demon7552003的博客-CSDN博客

SynchronousQueue 源码分析 (基于Java 8) - 简书

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/948956
推荐阅读
相关标签
  

闽ICP备14008679号