赞
踩
什么是生产者消费者模型?
生产者和消费者之间通过一个缓冲区来进行交互,生产者负责生成数据,然后存入缓冲区;消费者则负责消费数据,从缓冲区获取。生产者和消费者只和缓冲区交互,没有直接联系。
其中的同步互斥关系:
生产者之间互斥,消费者之间也互斥
生产者与消费者之间既又同步也有互斥(缓冲区满时,只能消费完再生产;缓冲区空时,只能生产完再消费)
注意:缓存区要先进先出,所以一般用队列实现
为什么要用生产者-消费者模型?
多个线程同时对一个资源类进行操作,会并发执行里面的方法。书写代码的时候最好让资源类的线程类分开书写,降低代码的耦合性,资源类只负责填入属性和方法,而线程类负责调用它里面的方法。
对于生产者消费者问题,需要定义三个类:生产者线程类、消费者线程类、缓冲资源类(核心)
package 线程基础.生产者消费者模型.synchronized_wait_notify版本; import java.util.LinkedList; public class BufferResources { private int maxSize = 10; //这里用list作为缓冲区,也可以替换为队列 private LinkedList list = new LinkedList<Integer>(); public synchronized void consume() { while (list.size() == 0) { System.out.println(Thread.currentThread().getName() + " 当前缓冲区为空,等待生产中..."); try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //消费数据,从First开始消费,模拟队列 Integer value = (Integer) list.removeFirst(); System.out.println(Thread.currentThread().getName() + " 消费成功:" + value.toString() + " 当前缓冲区size = " + list.size()); //唤醒所有处于wait状态的线程(包括生产者和消费者) notifyAll(); } public synchronized void product(Integer value){ while (list.size() == maxSize) { System.out.println(Thread.currentThread().getName() + " 当前缓冲区满了,等待消费中..."); try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //生产数据 list.add(value); System.out.println(Thread.currentThread().getName() + " 生产成功:" + value.toString() + " 当前缓冲区size = " + list.size()); //唤醒所有处于wait状态的线程(包括生产者和消费者) notifyAll(); } }
package 线程基础.生产者消费者模型.synchronized_wait_notify版本; import java.util.Random; public class Producter extends Thread { private BufferResources bufferResources; Random random = new Random(); //构造时需要指定缓冲区 public Producter(BufferResources bufferResources) { this.bufferResources = bufferResources; } @Override public void run() { //生产 this.bufferResources.product(random.nextInt()); } }
package 线程基础.生产者消费者模型.synchronized_wait_notify版本;
public class Consumer extends Thread {
private BufferResources bufferResources;
//构造时需要指定缓冲区
public Consumer(BufferResources bufferResources) {
this.bufferResources = bufferResources;
}
@Override
public void run() {
this.bufferResources.consume();
}
}
package 线程基础.生产者消费者模型; public class Test { public static void main(String[] args) { BufferResources bufferResources = new BufferResources(); //十个生产者线程 for (int i = 0; i < 10; i++) { new Producter(bufferResources).start(); } //十个消费者线程 for (int i = 0; i < 10; i++) { new Consumer(bufferResources).start(); } } }
测试结果
生产者消费者交错进行生产消费,不会相互影响,资源类的数据一致性得到保证
上述判断缓冲区是否为满或者空的语句为什么用while循环判断,而不是用if判断?
为了避免虚拟唤醒问题,也就是notifyAll之后,所有线程被唤醒,有可能出现先执行的线程使得后面的线程的不再满足运行条件的现象,如果直接结束wait会造成逻辑错误。具体可参考百度。
上述的唤醒为什么要用notifyAll?而不用notify?
因为notify只会唤醒一个线程去竞争锁,当生产者刚生产完使得缓冲区满的时候,如果又唤醒了一个生产者线程,那程序就会死锁,也叫假死问题。
用notifyAll真的合理吗?
其实不合理,因为按照逻辑来说,每生产完一个资源后,应该唤醒消费者去消费,而不需要把生产者也唤醒(消费资源也一样,不需要再唤醒消费者)。这个问题可通过如下的condition实现方式来解决。
package 线程基础.生产者消费者模型.lock_condition版本; import java.util.LinkedList; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class BufferResources { private int maxSize = 10; //这里用list作为缓冲区,也可以替换为队列 private LinkedList list = new LinkedList<Integer>(); //创建锁 private Lock lock = new ReentrantLock(); //生产者对应Condition private Condition producerCondition = lock.newCondition(); //消费者对应Condition private Condition consumerCondition = lock.newCondition(); public void consume() { lock.lock(); try{ while (list.size() == 0) { System.out.println(Thread.currentThread().getName() + " 当前缓冲区为空,等待生产中..."); try { //消费者进入等待状态 consumerCondition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } //进行消费数据,从First开始消费,模拟队列 Integer value = (Integer) list.removeFirst(); System.out.println(Thread.currentThread().getName() + " 消费成功:" + value.toString() + " 当前缓冲区size = " + list.size()); //消费完毕后,只唤醒生产者 producerCondition.signalAll(); }catch (Exception e){ e.printStackTrace(); }finally { //一定要用finally执行解锁 lock.unlock(); } } public void product(Integer value){ lock.lock(); try{ while (list.size() == maxSize) { System.out.println(Thread.currentThread().getName() + " 当前缓冲区满了,等待消费中..."); //生产者进入等待状态 producerCondition.await(); } //进生产数据 list.add(value); System.out.println(Thread.currentThread().getName() + " 生产成功:" + value.toString() + " 当前缓冲区size = " + list.size()); //生产完毕后,只唤醒消费者 consumerCondition.signalAll(); }catch (Exception e){ e.printStackTrace(); }finally { //一定要用finally执行解锁 lock.unlock(); } } }
与之前的版本完全一致
测试结果
生产者消费者交错进行生产消费,不会相互影响,资源类的数据一致性得到保证
分析:用lock的condition实现的好处就是能实现精准地唤醒一类线程
上述的缓冲区都是通过LinkedList简单模拟队列来实现的,但实际上比较常用的是阻塞队列(代码简单),因为它内部已经实现了数据操作的阻塞和互斥功能,具体如下:
也就是说,生产和消费的代码只需要用阻塞队列进行插入和删除,不需要写额外的互斥和阻塞逻辑了
package 线程基础.生产者消费者模型.阻塞队列版本; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class BufferResources { private int maxSize = 10; //阻塞队列作为缓冲区 private BlockingQueue buffer = new LinkedBlockingQueue(maxSize); public void consume() { try { Integer value = (Integer) buffer.take(); System.out.println(Thread.currentThread().getName() + " 消费成功:" + value.toString() + " 当前缓冲区size = " + buffer.size()); } catch (InterruptedException e) { e.printStackTrace(); } } public void product(Integer value) { try { buffer.put(value); System.out.println(Thread.currentThread().getName() + " 生产成功:" + value.toString() + " 当前缓冲区size = " + buffer.size()); } catch (InterruptedException e) { e.printStackTrace(); } } }
与之前的版本完全一致
测试结果
阻塞队列怎么实现阻塞和互斥逻辑的?
其实和我们之前自己实现的思路一样,参考如下LinkedBlockingQueue的put插入源码:
三种方式实现生产者消费者模型:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。