当前位置:   article > 正文

java 生产者消费者_Java多线程:线程间通信—生产者消费者模型

java 生产者消费者_Java多线程:线程间通信—生产者消费者模型

45601e1f415ed234860fb5a04e714d21.png

dd5822dd52822005fc222c9781ffb419.png

一、背景 && 定义

多线程环境下,只要有并发问题,就要保证数据的安全性,一般指的是通过 synchronized 来进行同步。

另一个问题是, 多个线程之间如何协作呢

我们看一个仓库出货问题(更具体一些,快餐店直接放好炸货的架子,不过每次只放一份)

  1. 假设仓库中只能存放一件商品,生产者将生产出来的产品放入仓库,消费者将仓库中产品取走进行消费;
  2. 如果仓库中没有商品,那么生产者将产品放入仓库,否则停止生产并等待,直到仓库中的产品被消费者取走为止;
  3. 如果仓库中放有产品,消费者可快速取走并消费,否则停止消费并等待,直到仓库中再次放入产品为止。

这其实就是一个线程同步问题。 生产者和消费者共享同一个资源,并且生产者和消费者之间互相依赖,互为条件。

如果一个快餐店:

先点单,餐出来之后再收钱。这种模式叫BIO-阻塞IO模式。

如果一个快餐店:

先收钱,收完钱消费者在旁边等。这种就是生产者-消费者模式。

这类问题里,同步的候只有 synchronized 是不够的,因为他虽然能解决资源的共享问题,实现资源的同步更新,但是无法 在不同线程之间进行消息传递 (通信)。

所以只有我们之前所说的 加锁排队 是不够的,还要有 通知

定义:

生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。

为了解决双方能力不等而等待的问题,引入对应的解决方案。生产者消费者模型是一种并发协作模型。

二、解决方式介绍

2.1 管程法

  1. 生产者 :负责生产数据的模块(模块可能是方法、对象、线程、进程);
  2. 消费者 :负责处理数据的模块(模块可能是方法、对象、线程、进程);
  3. 缓冲区 :消费者不能直接使用生产者的数据,它们之间有个“缓冲区”(缓冲区一般是队列)。

生产者和消费者都是通过缓冲区进行数据的 放 和 那 。

这样的话,一来可以避免旱的旱死,涝的涝死的问题:不管哪一方过快或者过慢,缓冲区始终有一部分数据;二来能够达到生产者和消费者的解耦,不再直接通信,从而提高效率。

因为容器相当于一个输送商品的管道,所以成为 管程法

2.2 信号灯法

采用类似红灯绿灯的模式,决定车走还是人走。

  • 管程法使用容器的状态来控制,数据在容器中;
  • 而信号灯法只是用信号来给生产者和消费者提醒,他们的交互数据并不由信号灯来保管。

2.3 Object类

jdk 里面 Object 类老早就有提供解决线程间通信的问题的方法:

  1. wait() :表示线程一直等待,直到其他线程通知(也就是调用了notify或者notifyAll方法),与sleep不同,会释放锁;
  2. wait(long timeout) :指定时间;
  3. notify() :唤醒一个处于等待状态的线程;
  4. notifyAll() :唤醒同一个对象上所有调用 wait() 方法的线程,优先级别高的线程优先调度。

这几个方法都是在 同步方法或者同步代码块 中使用,否则会抛出异常。

(很多面试题问 Java 的 Object 类有哪些方法,都是希望得到关于这块的答案,引导多线程)

三、管程法实现

管程法实现的四个角色:

  1. 生产者和消费者都是多线程;
  2. 中间的缓冲区应该是一个容器,并且需要的是一个 并发容器 ,java.util.concurrent包里面已经提供了;
  3. 资源,也就是各个角色来回交换的商品。

利用 Object 类的几个方法,来实现管程法,以下是代码示例:

  1. /**
  2. * 协作模型:生产者消费者模型实现:管程法
  3. */
  4. public class Cooperation1 {
  5. public static void main(String[] args) {
  6. Container container = new Container();
  7. new Producer(container).start();
  8. new Consumer(container).start();
  9. }
  10. }
  11. /**
  12. * 生产者
  13. */
  14. class Producer extends Thread{
  15. Container container;
  16. public Producer(Container container){
  17. this.container = container;
  18. }
  19. @Override
  20. public void run() {
  21. //生产过程
  22. for (int i=0; i<10; i++){
  23. System.out.println("生产第 " + i + " 个馒头");
  24. container.push(new Hamburger(i));
  25. }
  26. }
  27. }
  28. /**
  29. * 消费者
  30. */
  31. class Consumer extends Thread{
  32. Container container;
  33. public Consumer(Container container){
  34. this.container = container;
  35. }
  36. @Override
  37. public void run() {
  38. //消费过程
  39. for (int i=0; i<10; i++){
  40. System.out.println("消费第 " + container.pop().id + " 个馒头");
  41. }
  42. }
  43. }
  44. /**
  45. * 缓冲区,操作商品,并和生产者、消费者交互
  46. */
  47. class Container{
  48. Hamburger[] food = new Hamburger[10];
  49. private int count = 0;
  50. //存储:生产
  51. public synchronized void push(Hamburger hamburger){
  52. if (count == food.length){
  53. try {
  54. this.wait();//阻塞,但是等待消费者通知后会解除
  55. } catch (InterruptedException e) {
  56. e.printStackTrace();
  57. }
  58. }
  59. food[count++] = hamburger;
  60. this.notifyAll();//说明存在数据了,通知消费者消费
  61. }
  62. //获取:消费
  63. public synchronized Hamburger pop(){
  64. if (count ==0 ){
  65. try {
  66. this.wait();//阻塞,直到生产者通知后会解除
  67. } catch (InterruptedException e) {
  68. e.printStackTrace();
  69. }
  70. }
  71. Hamburger ans = food[--count];
  72. this.notifyAll();//存在空余空间了,通知生产者生产
  73. return ans;
  74. }
  75. }
  76. /**
  77. * 商品
  78. */
  79. class Hamburger{
  80. int id;
  81. public Hamburger(int id) {
  82. this.id = id;
  83. }
  84. }

其中的核心有这么几点:

  1. 容器相当于一个栈,是后进先出的;
  2. 容器的两个方法对于资源的操作,一个和生产者交互,一个和消费者交互,除了 synchronized 修饰,因为两个方法是互斥的,所以利用 wait 和 notify 方法使他们完成阻塞和解除阻塞;
  3. 生产者和容器交互,添加数据;
  4. 消费者和容器交互,删除数据。

前面关于 线程的阻塞问题,生命周期里的阻塞 ,完整的可能情况,就包含这里的阻塞情况:

f16ce4d1f4d20b8b891e590c608be0fd.png

四、信号灯法实现

和上一种通过容器的容量让线程之间互相通知的方法不同,信号灯法没有用数据缓存的方式,而是用 信号灯来指示双方 ,对方是否已经准备好了要和你通信。

下面是一个 电视直播和观众的代码示例,通过信号灯,通知演员和观众直播,确保演员在演的时候,让观众来看。

  1. /**
  2. * 协作模型:生产者消费者实现:信号灯法
  3. */
  4. public class Cooperation2 {
  5. public static void main(String[] args) {
  6. TV tv = new TV();
  7. new Actor(tv).start();
  8. new Fans(tv).start();
  9. }
  10. }
  11. /**
  12. * 生产者:演员
  13. */
  14. class Actor extends Thread{
  15. TV tv;
  16. public Actor(TV tv){
  17. this.tv = tv;
  18. }
  19. @Override
  20. public void run() {
  21. for (int i=0; i<10; i++){
  22. if (i%2 == 0){
  23. this.tv.play("节目 " + i);
  24. }else{
  25. this.tv.play("广告 " + i);
  26. }
  27. }
  28. }
  29. }
  30. /**
  31. * 消费者:观众
  32. */
  33. class Fans extends Thread{
  34. TV tv;
  35. public Fans(TV tv){
  36. this.tv = tv;
  37. }
  38. @Override
  39. public void run() {
  40. for (int i=0; i<10; i++){
  41. tv.watch();
  42. }
  43. }
  44. }
  45. /**
  46. * 共同资源:电视直播
  47. */
  48. class TV{
  49. String voice;
  50. //信号灯,如果为真则演员准备,观众等待
  51. //如果为假,则观众就位,演员等待
  52. boolean flag = true;
  53. //表演方法:针对生产者
  54. public synchronized void play(String voice){
  55. //演员等待
  56. if (!flag){
  57. try {
  58. this.wait();
  59. } catch (InterruptedException e) {
  60. e.printStackTrace();
  61. }
  62. }
  63. this.voice = voice;
  64. System.out.println("表演 "+voice +" ing");
  65. //唤醒观众
  66. this.notifyAll();
  67. this.flag = !flag;
  68. }
  69. //观看方法:针对消费者
  70. public synchronized void watch(){
  71. //观众等待
  72. if (flag){
  73. try {
  74. this.wait();
  75. } catch (InterruptedException e) {
  76. e.printStackTrace();
  77. }
  78. }
  79. System.out.println("观看 " + voice +" ing");
  80. this.notifyAll();
  81. this.flag = !flag;
  82. }
  83. }

可以看到,相比管程法的核心区别是:

TV 没有用一个容器存储数据 ,只是通过生产者是否生产,来决定 信号灯 的标志,以此 通知消费者来消费。

显然这两种实现方法,有不同的适用场景,那就是决定于生产者消费者是否有数据沟通。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/614544
推荐阅读
相关标签
  

闽ICP备14008679号