当前位置:   article > 正文

多线程-生产者-消费者模型_多线程生产者与消费者模型

多线程生产者与消费者模型

一、前言

        生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

        生产者消费者模型需要抓住“三个主体,三个要点“,三个主体是指:生产者消费者缓冲区。生产者往缓冲区放数据,消费者从缓冲区取数据。

三个要点是指:

1.缓冲区有固定大小
2.缓冲区满时,生产者不能再往缓冲区放数据(产品),而是被阻塞,直到缓冲区不是满的
3.缓冲区为空时,消费者不能再从缓冲区取数据,而是被阻塞,直到缓冲区不是空的。

        数据(产品)往往是先生产出来的先被消费。所以缓冲区一般用有界队列实现,又由于生产者、消费者在特定情况下需要被阻塞,所以更具体一点,缓冲区一般用有界阻塞队列来实现。
本篇用三种方式实现生产者-消费者模型:wait/notify + 队列、Lock/Condition + 队列、有界阻塞队列。

二、wait/notify + 队列

        实现生产者-消费者模型,主要是实现两个核心方法:往缓冲区中放元素、从缓冲区中取元素。
以下是缓冲区的代码实现,是生产者-消费者模型的核心。

ProducerConsumerQueue缓冲区
  1. package cn.java.threadmodel.producerconsumer;
  2. import java.util.LinkedList;
  3. import java.util.Queue;
  4. /**
  5. * @author 小石潭记
  6. * @date 2021/12/18 9:58
  7. * @Description: wait/notify机制实现生产者-消费者模型
  8. */
  9. public class ProducerConsumerQueue<E> {
  10. /**
  11. * 队列最大容量
  12. */
  13. private final static int QUEUE_MAX_SIZE = 3;
  14. /**
  15. * 存放元素的队列
  16. */
  17. private Queue<E> queue;
  18. public ProducerConsumerQueue() {
  19. queue = new LinkedList<>();
  20. }
  21. /**
  22. * 向队列中添加元素
  23. *
  24. * @param e
  25. * @return
  26. */
  27. public synchronized boolean put(E e) {
  28. // 如果队列是已满,则阻塞当前线程
  29. while (queue.size() == QUEUE_MAX_SIZE) {
  30. try {
  31. wait();
  32. } catch (InterruptedException e1) {
  33. e1.printStackTrace();
  34. }
  35. }
  36. // 队列未满,放入元素,并且通知消费线程
  37. queue.offer(e);
  38. System.out.println(Thread.currentThread().getName() + " -> 生产元素,元素个数为:" + queue.size());
  39. notify();
  40. return true;
  41. }
  42. /**
  43. * 从队列中获取元素
  44. * @return
  45. */
  46. public synchronized E get() {
  47. // 如果队列是空的,则阻塞当前线程
  48. while (queue.isEmpty()) {
  49. try {
  50. wait();
  51. } catch (InterruptedException e) {
  52. e.printStackTrace();
  53. }
  54. }
  55. // 队列非空,取出元素,并通知生产者线程
  56. E e = queue.poll();
  57. System.out.println(Thread.currentThread().getName() + " -> 消费元素,元素个数为:" + queue.size());
  58. notify();
  59. return e;
  60. }
  61. }
Producer生产者
  1. package cn.java.threadmodel.producerconsumer;
  2. /**
  3. * @author 小石潭记
  4. * @date 2021/12/18 10:00
  5. * @Description: 生产者线程
  6. */
  7. public class Producer implements Runnable {
  8. private ProducerConsumerQueue<Integer> queue;
  9. public Producer(ProducerConsumerQueue<Integer> queue) {
  10. this.queue = queue;
  11. }
  12. @Override
  13. public void run() {
  14. for (int i = 0; i < 10; i++) {
  15. queue.put(i);
  16. }
  17. }
  18. }
Consumer消费者
  1. package cn.java.threadmodel.producerconsumer;
  2. /**
  3. * @author 小石潭记
  4. * @date 2021/12/18 10:00
  5. * @Description: 消费者线程
  6. */
  7. public class Consumer implements Runnable {
  8. private ProducerConsumerQueue<Integer> queue;
  9. public Consumer(ProducerConsumerQueue<Integer> queue) {
  10. this.queue = queue;
  11. }
  12. @Override
  13. public void run() {
  14. for (int i = 0; i < 10; i++) {
  15. queue.get();
  16. }
  17. }
  18. }
ProducerConsumerDemo测试类
  1. package cn.java.threadmodel.producerconsumer;
  2. import java.util.Random;
  3. import java.util.concurrent.ExecutorService;
  4. import java.util.concurrent.Executors;
  5. /**
  6. * @author 小石潭记
  7. * @date 2021/12/18 10:01
  8. * @Description: 测试生产消费者模型
  9. */
  10. public class ProducerConsumerDemo {
  11. private final static ExecutorService service = Executors.newCachedThreadPool();
  12. public static void main(String[] args) throws InterruptedException {
  13. Random random = new Random();
  14. // 生产者-消费者模型缓冲区
  15. ProducerConsumerQueue<Integer> queue = new ProducerConsumerQueue<>();
  16. Producer producer = new Producer(queue);
  17. Consumer consumer = new Consumer(queue);
  18. for (int i = 0; i < 3; i++) {
  19. // 休眠0-50毫秒,增加随机性
  20. Thread.sleep(random.nextInt(50));
  21. service.submit(producer);
  22. }
  23. for (int i = 0; i < 3; i++) {
  24. // 休眠0-50毫秒,增加随机性
  25. Thread.sleep(random.nextInt(50));
  26. service.submit(consumer);
  27. }
  28. // 关闭线程池
  29. service.shutdown();
  30. }
  31. }

从上图的测试结果得知:

  • 由于队列的最大长度是3(QUEUE_MAX_SIZE),所以缓冲区元素不会超过3,说明缓冲区满时,生产者确实被阻塞了
  • 缓冲区元素个数最小为0,不会出现负数,说明缓冲区为空时,消费者被阻塞了

 这就是生产者-消费者模型基于wait/notify+队列的基本实现。

三、Lock/Condition + 队列

核心部分缓冲区的实现代码实现如下:

  1. package cn.java.threadmodel.producerconsumer.lockcondition;
  2. import java.util.LinkedList;
  3. import java.util.Queue;
  4. import java.util.concurrent.locks.Condition;
  5. import java.util.concurrent.locks.Lock;
  6. import java.util.concurrent.locks.ReentrantLock;
  7. /**
  8. * @author 小石潭记
  9. * @date 2021/12/18 10:11
  10. * @Description: Lock/Condition实现生产者-消费者模型
  11. */
  12. public class ProducerConsumerQueue<E> {
  13. /**
  14. * 队列最大容量
  15. */
  16. private final static int QUEUE_MAX_SIZE = 3;
  17. /**
  18. * 存放元素的队列
  19. */
  20. private Queue<E> queue;
  21. private final Lock lock = new ReentrantLock();
  22. private final Condition producerCondition = lock.newCondition();
  23. private final Condition consumerCondition = lock.newCondition();
  24. public ProducerConsumerQueue() {
  25. queue = new LinkedList<>();
  26. }
  27. /**
  28. * 向队列中添加元素
  29. * @param e
  30. * @return
  31. */
  32. public boolean put(E e) {
  33. final Lock lock = this.lock;
  34. lock.lock();
  35. try {
  36. while (queue.size() == QUEUE_MAX_SIZE) {
  37. // 队列已满
  38. try {
  39. producerCondition.await();
  40. } catch (InterruptedException e1) {
  41. e1.printStackTrace();
  42. }
  43. }
  44. queue.offer(e);
  45. System.out.println(Thread.currentThread().getName() + " -> 生产元素,元素个数为:" + queue.size());
  46. consumerCondition.signal();
  47. } finally {
  48. lock.unlock();
  49. }
  50. return true;
  51. }
  52. /**
  53. * 从队列中取出元素
  54. * @return
  55. */
  56. public E get() {
  57. final Lock lock = this.lock;
  58. lock.lock();
  59. try {
  60. while (queue.isEmpty()) {
  61. // 队列为空
  62. try {
  63. consumerCondition.await();
  64. } catch (InterruptedException e1) {
  65. e1.printStackTrace();
  66. }
  67. }
  68. E e = queue.poll();
  69. System.out.println(Thread.currentThread().getName() + " -> 消费元素,元素个数为:" + queue.size());
  70. producerCondition.signal();
  71. return e;
  72. } finally {
  73. lock.unlock();
  74. }
  75. }
  76. }

生产者线程、消费者线程、测试代码更是和wait/notify方式一致。

四、有界阻塞队列

同样,缓冲区的实现也是其核心部分,不过阻塞队列已经提供了相应的阻塞API,所以不需要额外编写阻塞部分的代码。

  1. package cn.java.threadmodel.producerconsumer.queue;
  2. import java.util.concurrent.BlockingQueue;
  3. import java.util.concurrent.LinkedBlockingQueue;
  4. /**
  5. * @author 小石潭记
  6. * @date 2021/12/18 10:23
  7. * @Description: * 阻塞队列实现生产者-消费者模型
  8. * * 对应的阻塞方法是put()/take()
  9. */
  10. public class ProducerConsumerQueue<E> {
  11. /**
  12. * 队列最大容量
  13. */
  14. private final static int QUEUE_MAX_SIZE = 3;
  15. /**
  16. * 存放元素的队列
  17. */
  18. private BlockingQueue<E> queue;
  19. public ProducerConsumerQueue() {
  20. queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);
  21. }
  22. /**
  23. * 向队列中添加元素
  24. * @param e
  25. * @return
  26. */
  27. public boolean put(E e) {
  28. try {
  29. queue.put(e);
  30. System.out.println(Thread.currentThread().getName() + " -> 生产元素,元素个数为:" + queue.size());
  31. } catch (InterruptedException e1) {
  32. e1.printStackTrace();
  33. }
  34. return true;
  35. }
  36. /**
  37. * 从队列中取出元素
  38. * @return
  39. */
  40. public E get() {
  41. try {
  42. E e = queue.take();
  43. System.out.println(Thread.currentThread().getName() + " -> 消费元素,元素个数为:" + queue.size());
  44. return e;
  45. } catch (InterruptedException e1) {
  46. e1.printStackTrace();
  47. }
  48. return null;
  49. }
  50. }

生产者线程、消费者线程、测试代码也和前面两种一模一样。

五、总结

通过三种方式实现生产者-消费者模型,可以看出使用阻塞队列的方式最简单,也更安全。其实看看阻塞队列的源码,会发现其内部的实现和这里的前两种差不多,只是JDK提供的阻塞队列健壮性更好。

说完了三种实现方式,再来说说为什么要使用生产者-消费者模式,消费者直接调用生产者不好吗?
回顾文章开始的那张图,试想一下,如果没有生产者-消费者模式会怎样,大概会变成如下这样

 可以看到,三个生产者,三个消费者就会产生 3 * 3 = 9条调用关系(箭头方法代表数据走向),还有一点就是消费者也有可能还是生产者,生产者也有可能还是消费者,一旦生产者、消费者的数量多了之后就会形成复杂的调用网。所以生产者-消费者模型的最大好处就是解耦。
其次如果生产者和消费者的速度上有较大的差异,就一定会存在一方总是在等待另一方的情况。比如快递小哥如果每一个快递都必须直接送到用户手上,如果某个用户一直联系不上,或者说过了很久才取快递,那么快递小哥就只能一直等待。所以就出现了快递站,快递小哥只需要把快递放在指定位置,用户去指定位置取就行了。所以生产者-消费者模型的第二个好处就是平衡生产能力和消费能力的差异。

六、参考

三种方式实现生产者-消费者模型

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/661597
推荐阅读
相关标签
  

闽ICP备14008679号