赞
踩
– 两个线程,一个负责生产,一个负责消费,生产者生产一个,消费者消费一个。
Resource.java
package com.demo.ProducerConsumer; /** * 资源 * @author lixiaoxi * */ public class Resource { /*资源序号*/ private int number = 0; /*资源标记*/ private boolean flag = false; /** * 生产资源 */ public synchronized void create() { if (flag) {//先判断标记是否已经生产了,如果已经生产,等待消费; try { wait();//让生产线程等待 } catch (InterruptedException e) { e.printStackTrace(); } } number++;//生产一个 System.out.println(Thread.currentThread().getName() + "生产者------------" + number); flag = true;//将资源标记为已经生产 notify();//唤醒在等待操作资源的线程(队列) } /** * 消费资源 */ public synchronized void destroy() { if (!flag) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + "消费者****" + number); flag = false; notify(); } }
Producer.java
package com.demo.ProducerConsumer; /** * 生产者 * @author lixiaoxi * */ public class Producer implements Runnable{ private Resource resource; public Producer(Resource resource) { this.resource = resource; } @Override public void run() { while (true) { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } resource.create(); } } }
Consumer.java
package com.demo.ProducerConsumer; /** * 消费者 * @author lixiaoxi * */ public class Consumer implements Runnable{ private Resource resource; public Consumer(Resource resource) { this.resource = resource; } @Override public void run() { while (true) { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } resource.destroy(); } } }
ProducerConsumerTest.java
package com.demo.ProducerConsumer;
public class ProducerConsumerTest {
public static void main(String args[]) {
Resource resource = new Resource();
new Thread(new Producer(resource)).start();//生产者线程
new Thread(new Consumer(resource)).start();//消费者线程
}
}
打印结果:
以上打印结果可以看出没有任何问题。
ProducerConsumerTest.java
package com.demo.ProducerConsumer;
public class ProducerConsumerTest {
public static void main(String args[]) {
Resource resource = new Resource();
new Thread(new Producer(resource)).start();//生产者线程
new Thread(new Producer(resource)).start();//生产者线程
new Thread(new Consumer(resource)).start();//消费者线程
new Thread(new Consumer(resource)).start();//消费者线程
}
}
运行结果:
通过以上打印结果发现问题:
Resource.java
package com.demo.ProducerConsumer; /** * 资源 * @author lixiaoxi * */ public class Resource { /*资源序号*/ private int number = 0; /*资源标记*/ private boolean flag = false; /** * 生产资源 */ public synchronized void create() { while (flag) {//先判断标记是否已经生产了,如果已经生产,等待消费; try { wait();//让生产线程等待 } catch (InterruptedException e) { e.printStackTrace(); } } number++;//生产一个 System.out.println(Thread.currentThread().getName() + "生产者------------" + number); flag = true;//将资源标记为已经生产 notify();//唤醒在等待操作资源的线程(队列) } /** * 消费资源 */ public synchronized void destroy() { while (!flag) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + "消费者****" + number); flag = false; notify(); } }
运行结果:
Resource.java
package com.demo.ProducerConsumer; /** * 资源 * @author lixiaoxi * */ public class Resource { /*资源序号*/ private int number = 0; /*资源标记*/ private boolean flag = false; /** * 生产资源 */ public synchronized void create() { while (flag) {//先判断标记是否已经生产了,如果已经生产,等待消费; try { wait();//让生产线程等待 } catch (InterruptedException e) { e.printStackTrace(); } } number++;//生产一个 System.out.println(Thread.currentThread().getName() + "生产者------------" + number); flag = true;//将资源标记为已经生产 notifyAll();//唤醒在等待操作资源的线程(队列) } /** * 消费资源 */ public synchronized void destroy() { while (!flag) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + "消费者****" + number); flag = false; notifyAll(); } }
运行结果:
以上就大功告成了,没有任何问题。
再来梳理一下整个流程。按照示例,生产者消费者交替运行,每次生产后都有对应的消费者,测试类创建实例,如果是生产者先运行,进入run()方法,进入create()方法,flag默认为false,number+1,生产者生产一个产品,flag置为true,同时调用notifyAll()方法,唤醒所有正在等待的线程,接下来如果还是生产者运行呢?这是flag为true,进入while循环,执行wait()方法,接下来如果是消费者运行的话,调用destroy()方法,这时flag为true,消费者购买了一次产品,随即将flag置为false,并唤醒所有正在等待的线程。这就是一次完整的多生产者对应多消费者的问题。
上面的代码有一个问题,就是我们为了避免所有的线程都处于等待的状态,使用了notifyAll方法来唤醒所有的线程,即notifyAll唤醒的是自己方和对方线程。如果我需要只是唤醒对方的线程,比如:生产者只能唤醒消费者的线程,消费者只能唤醒生产者的线程。
在jdk1.5当中为我们提供了多线程的升级解决方案:
将同步synchronized替换成了Lock操作。
将Object中的wait,notify,notifyAll方法替换成了Condition对象。
可以只唤醒对方的线程。
完整代码:
Resource1.java
package com.demo.ProducerConsumer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 资源 * @author lixiaoxi * */ public class Resource1 { /*资源序号*/ private int number = 0; /*资源标记*/ private boolean flag = false; private Lock lock = new ReentrantLock(); //使用lock建立生产者的condition对象 private Condition condition_pro = lock.newCondition(); //使用lock建立消费者的condition对象 private Condition condition_con = lock.newCondition(); /** * 生产资源 */ public void create() throws InterruptedException { try{ lock.lock(); //先判断标记是否已经生产了,如果已经生产,等待消费 while(flag){ //生产者等待 condition_pro.await(); } //生产一个 number++; System.out.println(Thread.currentThread().getName() + "生产者------------" + number); //将资源标记为已经生产 flag = true; //生产者生产完毕后,唤醒消费者的线程(注意这里不是signalAll) condition_con.signal(); }finally{ lock.unlock(); } } /** * 消费资源 */ public void destroy() throws InterruptedException{ try{ lock.lock(); //先判断标记是否已经消费了,如果已经消费,等待生产 while(!flag){ //消费者等待 condition_con.await(); } System.out.println(Thread.currentThread().getName() + "消费者****" + number); //将资源标记为已经消费 flag = false; //消费者消费完毕后,唤醒生产者的线程 condition_pro.signal(); }finally{ lock.unlock(); } } }
Producer1.java
package com.demo.ProducerConsumer; /** * 生产者 * @author lixiaoxi * */ public class Producer1 implements Runnable{ private Resource1 resource; public Producer1(Resource1 resource) { this.resource = resource; } @Override public void run() { while (true) { try { Thread.sleep(10); resource.create(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
Consumer1.java
package com.demo.ProducerConsumer; /** * 消费者 * @author lixiaoxi * */ public class Consumer1 implements Runnable{ private Resource1 resource; public Consumer1(Resource1 resource) { this.resource = resource; } @Override public void run() { while (true) { try { Thread.sleep(10); resource.destroy(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
ProducerConsumerTest1.java
package com.demo.ProducerConsumer;
public class ProducerConsumerTest1 {
public static void main(String args[]) {
Resource1 resource = new Resource1();
new Thread(new Producer1(resource)).start();//生产者线程
new Thread(new Producer1(resource)).start();//生产者线程
new Thread(new Consumer1(resource)).start();//消费者线程
new Thread(new Consumer1(resource)).start();//消费者线程
}
}
运行结果:
1、如果生产者、消费者都是1个,那么flag标记可以用if判断。这里有多个,必须用while判断。
2、在while判断的同时,notify函数可能唤醒本类线程(如一个消费者唤醒另一个消费者),这会导致所有消费者忙等待,程序无法继续往下执行。使用notifyAll函数代替notify可以解决这个问题,notifyAll可以保证非本类线程被唤醒(消费者线程能唤醒生产者线程,反之也可以),解决了忙等待问题。
生产者/消费者模型最终达到的目的是平衡生产者和消费者的处理能力,达到这个目的的过程中,并不要求只有一个生产者和一个消费者。可以多个生产者对应多个消费者,可以一个生产者对应一个消费者,可以多个生产者对应一个消费者。
假死就发生在上面三种场景下。假死指的是全部线程都进入了WAITING状态,那么程序就不再执行任何业务功能了,整个项目呈现停滞状态。
比方说有生产者A和生产者B,缓冲区由于空了,消费者处于WAITING。生产者B处于WAITING,生产者A被消费者通知生产,生产者A生产出来的产品本应该通知消费者,结果通知了生产者B,生产者B被唤醒,发现缓冲区满了,于是继续WAITING。至此,两个生产者线程处于WAITING,消费者处于WAITING,系统假死。
上面的分析可以看出,假死出现的原因是因为notify的是同类,所以非单生产者/单消费者的场景,可以采取两种方法解决这个问题:
(1)synchronized用notifyAll()唤醒所有线程、ReentrantLock用signalAll()唤醒所有线程。
(2)用ReentrantLock定义两个Condition,一个表示生产者的Condition,一个表示消费者的Condition,唤醒的时候调用相应的Condition的signal()方法就可以了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。