赞
踩
生产者和消费者分别为两个线程(或进程),共享一个固定大小的缓冲区(存储空间),生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。
对缓冲区进行互斥处理,缓冲区为临界区,防止竞争;
正确实现生产者和消费者的同步关系,防止出现死锁。
缓冲区满和为空时都调用wait()方法等待,当生产者生产了一个产品或者消费者消费了一个产品之后会唤醒所有线程。
wait(): 调用任何对象的wait()方法会让当前线程进入等待状态,会把当前的锁释放,然后让出CPU,直到另一个线程调用同一个对象的notify()或notifyAll()方法。
notify(): 唤醒因调用这个对象wait()方法而阻塞的线程,然后继续往下执行,直到执行完退出对象锁锁住的区域(synchronized修饰的代码块)后再释放锁。
wait()/notify()直接隶属于Object 类,所有对象都拥有这一对方法。
这一对方法却必须在 synchronized方法或块中调用,只有在synchronized 方法或块中当前线程才占有锁,才有锁可以释放。调用这一对方法的对象上的锁必须为当前线程所拥有,这样才有锁可以释放。因此,这一对方法调用必须放置在这样的 synchronized方法或块中,该方法或块的上锁对象就是调用这一对方法的对象。若不满足这一条件,则程序虽然仍能编译,但在运行时会出现IllegalMonitorStateException异常。
调用 notify() 方法导致解除阻塞的线程是从因调用该对象的 wait() 方法而阻塞的线程中随机选取的,我们无法预料哪一个线程将会被选择,所以编程时要特别小心,避免因这种不确定性而产生问题
除了 notify(),还有一个方法 notifyAll() 也可起到类似作用,唯一的区别在于,调用 notifyAll() 方法将把因调用该对象的 wait() 方法而阻塞的所有线程一次性全部解除阻塞。当然,只有获得锁的那一个线程才能进入可执行状态。
wait()和notify()必须成对存在。
public class ProducerConsumer { // 记录产品的数量 private static volatile int count = 0; // 限定产品的总数量为10 private static final int FULL = 10; // 锁对象 private static final String LOCK = "LOCK"; public static void main(String[] args) { ProducerConsumer producerConsumer = new ProducerConsumer(); // 线程池执行 ExecutorService service = Executors.newFixedThreadPool(8); service.execute(producerConsumer.new Producer()); service.execute(producerConsumer.new Consumer()); service.execute(producerConsumer.new Producer()); service.execute(producerConsumer.new Consumer()); service.execute(producerConsumer.new Producer()); service.execute(producerConsumer.new Consumer()); service.execute(producerConsumer.new Producer()); service.execute(producerConsumer.new Consumer()); } /** * 生产者 */ class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } synchronized (LOCK) { while (count == FULL) { try { LOCK.wait(); } catch (Exception e) { e.printStackTrace(); } } count++; System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count); LOCK.notifyAll(); } } } } /** * 消费者 */ class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (LOCK) { while (count == 0) { try { LOCK.wait(); } catch (Exception e) { } } count--; System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有" + count); LOCK.notifyAll(); } } } } }
pool-1-thread-1生产者生产,目前总共有1 pool-1-thread-8消费者消费,目前总共有0 pool-1-thread-7生产者生产,目前总共有1 pool-1-thread-6消费者消费,目前总共有0 pool-1-thread-5生产者生产,目前总共有1 pool-1-thread-3生产者生产,目前总共有2 pool-1-thread-4消费者消费,目前总共有1 pool-1-thread-2消费者消费,目前总共有0 pool-1-thread-7生产者生产,目前总共有1 pool-1-thread-3生产者生产,目前总共有2 pool-1-thread-1生产者生产,目前总共有3 pool-1-thread-8消费者消费,目前总共有2 pool-1-thread-6消费者消费,目前总共有1 pool-1-thread-4消费者消费,目前总共有0 pool-1-thread-5生产者生产,目前总共有1 pool-1-thread-2消费者消费,目前总共有0 pool-1-thread-3生产者生产,目前总共有1 pool-1-thread-4消费者消费,目前总共有0 pool-1-thread-5生产者生产,目前总共有1 pool-1-thread-6消费者消费,目前总共有0 pool-1-thread-1生产者生产,目前总共有1 pool-1-thread-7生产者生产,目前总共有2 pool-1-thread-8消费者消费,目前总共有1 pool-1-thread-2消费者消费,目前总共有0 pool-1-thread-3生产者生产,目前总共有1 pool-1-thread-4消费者消费,目前总共有0 pool-1-thread-7生产者生产,目前总共有1 pool-1-thread-8消费者消费,目前总共有0 pool-1-thread-1生产者生产,目前总共有1 pool-1-thread-6消费者消费,目前总共有0 pool-1-thread-5生产者生产,目前总共有1 pool-1-thread-2消费者消费,目前总共有0 pool-1-thread-3生产者生产,目前总共有1 pool-1-thread-5生产者生产,目前总共有2 pool-1-thread-8消费者消费,目前总共有1 pool-1-thread-1生产者生产,目前总共有2 pool-1-thread-6消费者消费,目前总共有1 pool-1-thread-7生产者生产,目前总共有2 pool-1-thread-4消费者消费,目前总共有1 pool-1-thread-2消费者消费,目前总共有0 pool-1-thread-5生产者生产,目前总共有1 pool-1-thread-7生产者生产,目前总共有2 pool-1-thread-1生产者生产,目前总共有3 pool-1-thread-6消费者消费,目前总共有2 pool-1-thread-8消费者消费,目前总共有1 pool-1-thread-3生产者生产,目前总共有2 pool-1-thread-4消费者消费,目前总共有1 pool-1-thread-2消费者消费,目前总共有0 pool-1-thread-1生产者生产,目前总共有1 pool-1-thread-8消费者消费,目前总共有0 pool-1-thread-7生产者生产,目前总共有1 pool-1-thread-5生产者生产,目前总共有2 pool-1-thread-6消费者消费,目前总共有1 pool-1-thread-3生产者生产,目前总共有2 pool-1-thread-2消费者消费,目前总共有1 pool-1-thread-4消费者消费,目前总共有0 pool-1-thread-1生产者生产,目前总共有1 pool-1-thread-7生产者生产,目前总共有2 pool-1-thread-6消费者消费,目前总共有1 pool-1-thread-5生产者生产,目前总共有2 pool-1-thread-8消费者消费,目前总共有1 pool-1-thread-4消费者消费,目前总共有0 pool-1-thread-3生产者生产,目前总共有1 pool-1-thread-2消费者消费,目前总共有0 pool-1-thread-1生产者生产,目前总共有1 pool-1-thread-5生产者生产,目前总共有2 pool-1-thread-6消费者消费,目前总共有1 pool-1-thread-2消费者消费,目前总共有0 pool-1-thread-3生产者生产,目前总共有1 pool-1-thread-4消费者消费,目前总共有0 pool-1-thread-7生产者生产,目前总共有1 pool-1-thread-8消费者消费,目前总共有0 pool-1-thread-1生产者生产,目前总共有1 pool-1-thread-4消费者消费,目前总共有0 pool-1-thread-3生产者生产,目前总共有1 pool-1-thread-7生产者生产,目前总共有2 pool-1-thread-5生产者生产,目前总共有3 pool-1-thread-6消费者消费,目前总共有2 pool-1-thread-2消费者消费,目前总共有1 pool-1-thread-8消费者消费,目前总共有0
BlockingQueue即阻塞队列,在某些情况下对阻塞队列的访问可能会造成阻塞。
操作 | 抛异常 | 特定值 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add(o) | offer(o) | put(o) | offer(o, timeout, timeunit) |
移除 | remove(o) | poll(o) | take(o) | poll(timeout, timeunit) |
检查 | element(o) | peek(o) |
这四类方法分别对应的是:
方法 | 描述 |
---|---|
boolean add(E e) | 如果可能,向队列中添加一个元素。否则,它抛出异常。 |
boolean offer(E e) | 如果能添加元素,则将元素添加到队列中,而不抛出异常。 它在失败时返回false,在成功时返回true。 |
E remove() | 删除队列的头。如果队列为空,它会抛出异常。此方法返回已移除的项目。 |
E poll() | 从队列中删除元素。如果队列为空而不是抛出异常,则返回null。 |
Eelement() | 查看队列的头,而不从队列中删除它。 如果队列为空,它会抛出异常。 |
E peek() | 查看队列,如果队列为空而不是抛出异常,则返回null。 |
void put(E e) | put方法只存在于****BlockingQueue类型的阻塞队列中,使用put方法向已满的队列添加新元素时,代码会阻塞在put处 |
E take() | take方法只存在于****BlockingQueue类型的阻塞队列中,获得空队列的头部元素时,会阻塞在获取的位置 |
下面看由阻塞队列实现的生产者消费者模型,这里使用take()和put()方法,消费者和消费者之间不存在同步,所以会出现连续生成和连续消费的现象。
public class ProducerConsumer { // 记录产品的数量 private static volatile int count = 0; // 创建阻塞队列 private final BlockingQueue blockingQueue = new ArrayBlockingQueue(10); public static void main(String[] args) { ProducerConsumer producerConsumer = new ProducerConsumer(); ExecutorService service = Executors.newFixedThreadPool(8); service.execute(producerConsumer.new Producer()); service.execute(producerConsumer.new Consumer()); service.execute(producerConsumer.new Producer()); service.execute(producerConsumer.new Consumer()); service.execute(producerConsumer.new Producer()); service.execute(producerConsumer.new Consumer()); service.execute(producerConsumer.new Producer()); service.execute(producerConsumer.new Consumer()); } /** * 生产者 */ class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } try { blockingQueue.put(1); count++; System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count); } catch (InterruptedException e) { e.printStackTrace(); } } } } /** * 消费者 */ class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } try { blockingQueue.take(); count--; System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有" + count); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
java.util.concurrent.locks包提供的ReentrantLock用于替代synchronized加锁,我们来看一下传统的synchronized代码:
public class Counter {
private int count;
public void add(int n) {
synchronized(this) {
count += n;
}
}
}
如果用ReentrantLock替代,可以把代码改造为:
public class Counter {
private final Lock lock = new ReentrantLock();
private int count;
public void add(int n) {
lock.lock();
try {
count += n;
} finally {
lock.unlock();
}
}
}
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
...
} finally {
lock.unlock();
}
}
synchronized可以配合wait和notify实现线程在条件不满足时等待,条件满足时唤醒,用ReentrantLock我们怎么编写wait和notify的功能呢?
答案是使用Condition对象来实现wait和notify的功能。
我们仍然以TaskQueue为例,把前面用synchronized实现的功能通过ReentrantLock和Condition来实现:
class TaskQueue { private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); private Queue<String> queue = new LinkedList<>(); public void addTask(String s) { lock.lock(); try { queue.add(s); condition.signalAll(); } finally { lock.unlock(); } } public String getTask() { lock.lock(); try { while (queue.isEmpty()) { condition.await(); } return queue.remove(); } finally { lock.unlock(); } } }
可见,使用Condition时,引用的Condition对象必须从Lock实例的newCondition()返回,这样才能获得一个绑定了Lock实例的Condition实例。
Condition提供的await()、signal()、signalAll()原理和synchronized锁对象的wait()、notify()、notifyAll()是一致的,并且其行为也是一样的:
await()会释放当前锁,进入等待状态;
signal()会唤醒某个等待线程;
signalAll()会唤醒所有等待线程;
唤醒线程从await()返回后需要重新获得锁。
此外,和tryLock()类似,await()可以在等待指定时间后,如果还没有被其他线程通过signal()或signalAll()唤醒,可以自己醒来:
if (condition.await(1, TimeUnit.SECOND)) {
// 被其他线程唤醒
} else {
// 指定时间内没有被其他线程唤醒
}
可见,使用Condition配合Lock,我们可以实现更灵活的线程同步。
public class ProducerConsumer { // 记录产品的数量 private static volatile int count = 0; private static final Integer FULL = 10; //创建一个锁对象 private Lock lock = new ReentrantLock(); //创建两个条件变量,一个为缓冲区已满,一个为缓冲区非空 private final Condition alreadyFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); public static void main(String[] args) { ProducerConsumer producerConsumer = new ProducerConsumer(); ExecutorService service = Executors.newFixedThreadPool(8); service.execute(producerConsumer.new Producer()); service.execute(producerConsumer.new Consumer()); service.execute(producerConsumer.new Producer()); service.execute(producerConsumer.new Consumer()); service.execute(producerConsumer.new Producer()); service.execute(producerConsumer.new Consumer()); service.execute(producerConsumer.new Producer()); service.execute(producerConsumer.new Consumer()); } /** * 生产者 */ class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } //获取锁 lock.lock(); try { while (count == FULL) { try { alreadyFull.await(); } catch (InterruptedException e) { e.printStackTrace(); } } count++; System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count); //唤醒消费者 notEmpty.signal(); } finally { //释放锁 lock.unlock(); } } } } /** * 消费者 */ class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } lock.lock(); try { while (count == 0) { try { notEmpty.await(); } catch (Exception e) { e.printStackTrace(); } } count--; System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有" + count); alreadyFull.signal(); } finally { lock.unlock(); } } } } }
public class ProducerConsumer { // 记录产品的数量 private static volatile int count = 0; //创建三个信号量 final Semaphore producer = new Semaphore(10); final Semaphore consumer = new Semaphore(0); final Semaphore mutex = new Semaphore(1); public static void main(String[] args) { ProducerConsumer producerConsumer = new ProducerConsumer(); ExecutorService service = Executors.newFixedThreadPool(8); service.execute(producerConsumer.new Producer()); service.execute(producerConsumer.new Consumer()); service.execute(producerConsumer.new Producer()); service.execute(producerConsumer.new Consumer()); service.execute(producerConsumer.new Producer()); service.execute(producerConsumer.new Consumer()); service.execute(producerConsumer.new Producer()); service.execute(producerConsumer.new Consumer()); } /** * 生产者 */ class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } try { producer.acquire(); mutex.acquire(); count++; System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count); } catch (InterruptedException e) { e.printStackTrace(); } finally { mutex.release(); consumer.release(); } } } } /** * 消费者 */ class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } try { consumer.acquire(); mutex.acquire(); count--; System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有" + count); } catch (InterruptedException e) { e.printStackTrace(); } finally { mutex.release(); producer.release(); } } } } }
public class ProducerConsumer { final PipedInputStream pis = new PipedInputStream(); final PipedOutputStream pos = new PipedOutputStream(); { try { pis.connect(pos); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { ProducerConsumer producerConsumer = new ProducerConsumer(); ExecutorService service = Executors.newFixedThreadPool(2); service.execute(producerConsumer.new Producer()); service.execute(producerConsumer.new Consumer()); } /** * 生产者 */ class Producer implements Runnable { @Override public void run() { try { while(true) { Thread.sleep(1000); int num = (int) (Math.random() * 255); System.out.println(Thread.currentThread().getName() + "生产者生产了一个数字,该数字为: " + num); pos.write(num); pos.flush(); } } catch (Exception e) { e.printStackTrace(); } finally { try { pos.close(); pis.close(); } catch (IOException e) { e.printStackTrace(); } } } } /** * 消费者 */ class Consumer implements Runnable { @Override public void run() { try { while(true) { Thread.sleep(1000); int num = pis.read(); System.out.println("消费者消费了一个数字,该数字为:" + num); } } catch (Exception e) { e.printStackTrace(); } finally { try { pos.close(); pis.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。