赞
踩
在 Java 多线程编程中,还有一个非常重要的设计模式,它就是:生产者和消费者模型。
这种模型可以充分发挥 cpu 的多线程特性,通过一些平衡手段能有效的提升系统整体处理数据的速度,减轻系统负载,提高程序的效率和稳定性,同时实现模块之间的解耦。
那什么是生产者和消费者模型呢?
简单的说,生产者和消费者之间不直接进行交互,而是通过一个缓冲区来进行交互,生产者负责生成数据,然后存入缓冲区;消费者则负责处理数据,从缓冲区获取。
大致流程图如下:
对于最简单的生产者和消费者模型,总结下来,大概有以下几个特点:
生产者和消费者模型作为一个非常重要的设计模型,它的优点在于:
生产者和消费者模型的应用场景非常多,例如 Java 的线程池任务执行框架、消息中间件 rabbitMQ 等,因此掌握生产者和消费者模型,对于开发者至关重要。
下面我们通过几个案例,一起来了解一下生产者和消费者设计模型的实践思路。
生产者和消费者模型,最简单的一种技术实践方案就是基于线程的 wait() / notify() 方法,也就是通知和唤醒机制,可以将两个操作实现解耦,具体代码实践如下。
/**
* 缓冲区容器类
*/
public class Container {
/**
* 缓冲区最大容量
*/
private int capacity = 3;
/**
* 缓冲区
*/
private LinkedList<Integer> list = new LinkedList<Integer>();
/**
* 添加数据到缓冲区
* @param value
*/
public synchronized void add(Integer value) {
if(list.size() >= capacity){
System.out.println("生产者:"+ Thread.currentThread().getName()+",缓冲区已满,生产者进入waiting...");
try {
// 进入等待状态
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("生产者:"+ Thread.currentThread().getName()+",add:" + value);
list.add(value);
//唤醒其他所有处于wait()的线程,包括消费者和生产者
notifyAll();
}
/**
* 从缓冲区获取数据
*/
public synchronized void get() {
if(list.size() == 0){
System.out.println("消费者:"+ Thread.currentThread().getName()+",缓冲区为空,消费者进入waiting...");
try {
// 进入等待状态
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 从头部获取数据,并移除元素
Integer val = list.removeFirst();
System.out.println("消费者:"+ Thread.currentThread().getName()+",value:" + val);
//唤醒其他所有处于wait()的线程,包括消费者和生产者
notifyAll();
}
}
/**
* 生产者类
*/
public class Producer extends Thread{
private Container container;
public Producer(Container container) {
this.container = container;
}
@Override
public void run() {
for (int i = 0; i < 6; i++) {
container.add(i);
}
}
}
/**
* 消费者类
*/
public class Consumer extends Thread{
private Container container;
public Consumer(Container container) {
this.container = container;
}
@Override
public void run() {
for (int i = 0; i < 6; i++) {
container.get();
}
}
}
/**
* 测试类
*/
public class MyThreadTest {
public static void main(String[] args) {
Container container = new Container();
Producer producer = new Producer(container);
Consumer consumer = new Consumer(container);
producer.start();
consumer.start();
}
}
运行结果如下:
生产者:Thread-0,add:0
生产者:Thread-0,add:1
生产者:Thread-0,add:2
生产者:Thread-0,缓冲区已满,生产者进入waiting...
消费者:Thread-1,value:0
消费者:Thread-1,value:1
消费者:Thread-1,value:2
消费者:Thread-1,缓冲区为空,消费者进入waiting...
生产者:Thread-0,add:3
生产者:Thread-0,add:4
生产者:Thread-0,add:5
消费者:Thread-1,value:3
消费者:Thread-1,value:4
消费者:Thread-1,value:5
从日志上可以很清晰的看到,生产者线程生产一批数据之后,当缓冲区已经满了,会进入等待状态,此时会通知消费者线程;消费者线程处理完数据之后,当缓冲区没有数据时,也会进入等待状态,再次通知生产者线程。
除此之外,我们还可以利用ReentrantLock
和Condition
类中的 await() / signal() 方法实现生产者和消费者模型。
缓冲区容器类,具体代码实践如下。
/**
* 缓冲区容器类
*/
public class Container {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private int capacity = 3;
private LinkedList<Integer> list = new LinkedList<Integer>();
/**
* 添加数据到缓冲区
* @param value
*/
public void add(Integer value) {
boolean flag = false;
try {
flag = lock.tryLock(3, TimeUnit.SECONDS);
if(list.size() >= capacity){
System.out.println("生产者:"+ Thread.currentThread().getName()+",缓冲区已满,生产者进入waiting...");
// 进入等待状态
condition.await();
}
System.out.println("生产者:"+ Thread.currentThread().getName()+",add:" + value);
list.add(value);
//唤醒其他所有处于wait()的线程,包括消费者和生产者
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
if(flag){
lock.unlock();
}
}
}
/**
* 从缓冲区获取数据
*/
public void get() {
boolean flag = false;
try {
flag = lock.tryLock(3, TimeUnit.SECONDS);
if(list.size() == 0){
System.out.println("消费者:"+ Thread.currentThread().getName()+",缓冲区为空,消费者进入waiting...");
// 进入等待状态
condition.await();
}
// 从头部获取数据,并移除元素
Integer val = list.removeFirst();
System.out.println("消费者:"+ Thread.currentThread().getName()+",value:" + val);
//唤醒其他所有处于wait()的线程,包括消费者和生产者
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
if(flag){
lock.unlock();
}
}
}
}
生产者、消费者、测试类代码,跟上文一致,运行结果和上文介绍的也是一样。
上面介绍的都是一个生产者线程和一个消费者线程,模型比较简单。实际上,在业务开发中,经常会出现多个生产者线程和多个消费者线程,按照以上的实现思路,会出现什么情况呢?
有可能会出现程序假死现象!下面我们来分析一下案例,假如有两个生产者线程 a1、a2,两个消费者线程 b1、b2,执行过程如下:
遇到这种情况,应该如何解决呢?
因为ReentrantLock
和Condition
的结合,编程具有高度灵活性,我们可以采用这种组合解决多生产者和多消费者中的假死问题。
具体实现逻辑如下:
/**
* 缓冲区容器类
*/
public class ContainerDemo {
private Lock lock = new ReentrantLock();
private Condition producerCondition = lock.newCondition();
private Condition consumerCondition = lock.newCondition();
private int capacity = 3;
private LinkedList<Integer> list = new LinkedList<Integer>();
/**
* 添加数据到缓冲区
* @param value
*/
public void add(Integer value) {
boolean flag = false;
try {
flag = lock.tryLock(3, TimeUnit.SECONDS);
if(list.size() >= capacity){
System.out.println("生产者:"+ Thread.currentThread().getName()+",缓冲区已满,生产者进入waiting...");
// 生产者进入等待状态
producerCondition.await();
}
System.out.println("生产者:"+ Thread.currentThread().getName()+",add:" + value);
list.add(value);
// 唤醒所有消费者处于wait()的线程
consumerCondition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
if(flag){
lock.unlock();
}
}
}
/**
* 从缓冲区获取数据
*/
public void get() {
boolean flag = false;
try {
flag = lock.tryLock(3, TimeUnit.SECONDS);
if(list.size() == 0){
System.out.println("消费者:"+ Thread.currentThread().getName()+",缓冲区为空,消费者进入waiting...");
// 消费者进入等待状态
consumerCondition.await();
}
// 从头部获取数据,并移除元素
Integer val = list.removeFirst();
System.out.println("消费者:"+ Thread.currentThread().getName()+",value:" + val);
// 唤醒所有生产者处于wait()的线程
producerCondition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
if(flag){
lock.unlock();
}
}
}
}
/**
* 生产者
*/
public class Producer extends Thread{
private ContainerDemo container;
private Integer value;
public Producer(ContainerDemo container, Integer value) {
this.container = container;
this.value = value;
}
@Override
public void run() {
container.add(value);
}
}
/**
* 消费者
*/
public class Consumer extends Thread{
private ContainerDemo container;
public Consumer(ContainerDemo container) {
this.container = container;
}
@Override
public void run() {
container.get();
}
}
/**
* 测试类
*/
public class MyThreadTest {
public static void main(String[] args) {
ContainerDemo container = new ContainerDemo();
List<Thread> threadList = new ArrayList<>();
// 初始化6个生产者线程
for (int i = 0; i < 6; i++) {
threadList.add(new Producer(container, i));
}
// 初始化6个消费者线程
for (int i = 0; i < 6; i++) {
threadList.add(new Consumer(container));
}
// 启动线程
for (Thread thread : threadList) {
thread.start();
}
}
}
运行结果如下:
生产者:Thread-0,add:0
生产者:Thread-1,add:1
生产者:Thread-2,add:2
生产者:Thread-3,缓冲区已满,生产者进入waiting...
生产者:Thread-4,缓冲区已满,生产者进入waiting...
生产者:Thread-5,缓冲区已满,生产者进入waiting...
消费者:Thread-6,value:0
消费者:Thread-7,value:1
生产者:Thread-3,add:3
生产者:Thread-4,add:4
生产者:Thread-5,add:5
消费者:Thread-8,value:2
消费者:Thread-9,value:3
消费者:Thread-10,value:4
消费者:Thread-11,value:5
通过ReentrantLock
定义两个Condition
,一个表示生产者的Condition
,一个表示消费者的Condition
,唤醒的时候调用对应的signalAll()
方法就可以解决假死现象。
最后我们来总结一下,对于生产者和消费者模型,通过合理的编程实现,可以充分充分发挥 cpu 多线程的特性,显著的提升系统处理数据的效率。
对于生产者和消费者模型中的假死现象,可以使用ReentrantLock
定义两个Condition
,进行交叉唤醒,以解决假死问题。
1、https://www.cnblogs.com/xrq730/p/4855663.html
最近无意间获得一份阿里大佬写的技术笔记,内容涵盖 Spring、Spring Boot/Cloud、Dubbo、JVM、集合、多线程、JPA、MyBatis、MySQL 等技术知识。需要的小伙伴可以点击如下链接获取,资源地址:技术资料笔记。
不会有人刷到这里还想白嫖吧?点赞对我真的非常重要!在线求赞。加个关注我会非常感激!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。