当前位置:   article > 正文

生产者消费者问题_pqr共享一个缓冲区,pq构成一对生产者消费者

pqr共享一个缓冲区,pq构成一对生产者消费者

问题描述

线程模型中的经典问题

生产者和消费者分别为两个线程(或进程),共享一个固定大小的缓冲区(存储空间),生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。
在这里插入图片描述

生产者
  • 制造数据并且在缓冲区未满时写入,否则等待;
  • 在缓冲区为空的情况下写入数据并唤醒消费者读取数据;
消费者
  • 在缓冲区不为空时读出数据,否则等待;
  • 在缓冲区满的情况下消费数据并唤醒生产者写入数据。
问题关键
  • 对缓冲区进行互斥处理,缓冲区为临界区,防止竞争;

  • 正确实现生产者和消费者的同步关系,防止出现死锁。

解决方案

wait()和notify()方法的实现

缓冲区满和为空时都调用wait()方法等待,当生产者生产了一个产品或者消费者消费了一个产品之后会唤醒所有线程。

关于wait()和notify()
  • wait(): 调用任何对象的wait()方法会让当前线程进入等待状态,会把当前的锁释放,然后让出CPU,直到另一个线程调用同一个对象的notify()或notifyAll()方法。

  • notify(): 唤醒因调用这个对象wait()方法而阻塞的线程,然后继续往下执行,直到执行完退出对象锁锁住的区域(synchronized修饰的代码块)后再释放锁。

  • wait()/notify()直接隶属于Object 类,所有对象都拥有这一对方法。

  • 这一对方法却必须在 synchronized方法或块中调用,只有在synchronized 方法或块中当前线程才占有锁,才有锁可以释放。调用这一对方法的对象上的锁必须为当前线程所拥有,这样才有锁可以释放。因此,这一对方法调用必须放置在这样的 synchronized方法或块中,该方法或块的上锁对象就是调用这一对方法的对象。若不满足这一条件,则程序虽然仍能编译,但在运行时会出现IllegalMonitorStateException异常。

  • 调用 notify() 方法导致解除阻塞的线程是从因调用该对象的 wait() 方法而阻塞的线程中随机选取的,我们无法预料哪一个线程将会被选择,所以编程时要特别小心,避免因这种不确定性而产生问题

  • 除了 notify(),还有一个方法 notifyAll() 也可起到类似作用,唯一的区别在于,调用 notifyAll() 方法将把因调用该对象的 wait() 方法而阻塞的所有线程一次性全部解除阻塞。当然,只有获得锁的那一个线程才能进入可执行状态。

  • wait()和notify()必须成对存在。

代码实现
public class ProducerConsumer {
    // 记录产品的数量
    private static volatile int count = 0;
    // 限定产品的总数量为10
    private static final int FULL = 10;
    // 锁对象
    private static final String LOCK = "LOCK";

    public static void main(String[] args) {
        ProducerConsumer producerConsumer = new ProducerConsumer();
        // 线程池执行
        ExecutorService service = Executors.newFixedThreadPool(8);
        service.execute(producerConsumer.new Producer());
        service.execute(producerConsumer.new Consumer());
        service.execute(producerConsumer.new Producer());
        service.execute(producerConsumer.new Consumer());
        service.execute(producerConsumer.new Producer());
        service.execute(producerConsumer.new Consumer());
        service.execute(producerConsumer.new Producer());
        service.execute(producerConsumer.new Consumer());
    }

    /**
     * 生产者
     */
    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                synchronized (LOCK) {
                    while (count == FULL) {
                        try {
                            LOCK.wait();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    count++;
                    System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count);
                    LOCK.notifyAll();
                }
            }
        }
    }

    /**
     * 消费者
     */
    class Consumer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (LOCK) {
                    while (count == 0) {
                        try {
                            LOCK.wait();
                        } catch (Exception e) {
                        }
                    }
                    count--;
                    System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有" + count);
                    LOCK.notifyAll();
                }
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
结果
pool-1-thread-1生产者生产,目前总共有1
pool-1-thread-8消费者消费,目前总共有0
pool-1-thread-7生产者生产,目前总共有1
pool-1-thread-6消费者消费,目前总共有0
pool-1-thread-5生产者生产,目前总共有1
pool-1-thread-3生产者生产,目前总共有2
pool-1-thread-4消费者消费,目前总共有1
pool-1-thread-2消费者消费,目前总共有0
pool-1-thread-7生产者生产,目前总共有1
pool-1-thread-3生产者生产,目前总共有2
pool-1-thread-1生产者生产,目前总共有3
pool-1-thread-8消费者消费,目前总共有2
pool-1-thread-6消费者消费,目前总共有1
pool-1-thread-4消费者消费,目前总共有0
pool-1-thread-5生产者生产,目前总共有1
pool-1-thread-2消费者消费,目前总共有0
pool-1-thread-3生产者生产,目前总共有1
pool-1-thread-4消费者消费,目前总共有0
pool-1-thread-5生产者生产,目前总共有1
pool-1-thread-6消费者消费,目前总共有0
pool-1-thread-1生产者生产,目前总共有1
pool-1-thread-7生产者生产,目前总共有2
pool-1-thread-8消费者消费,目前总共有1
pool-1-thread-2消费者消费,目前总共有0
pool-1-thread-3生产者生产,目前总共有1
pool-1-thread-4消费者消费,目前总共有0
pool-1-thread-7生产者生产,目前总共有1
pool-1-thread-8消费者消费,目前总共有0
pool-1-thread-1生产者生产,目前总共有1
pool-1-thread-6消费者消费,目前总共有0
pool-1-thread-5生产者生产,目前总共有1
pool-1-thread-2消费者消费,目前总共有0
pool-1-thread-3生产者生产,目前总共有1
pool-1-thread-5生产者生产,目前总共有2
pool-1-thread-8消费者消费,目前总共有1
pool-1-thread-1生产者生产,目前总共有2
pool-1-thread-6消费者消费,目前总共有1
pool-1-thread-7生产者生产,目前总共有2
pool-1-thread-4消费者消费,目前总共有1
pool-1-thread-2消费者消费,目前总共有0
pool-1-thread-5生产者生产,目前总共有1
pool-1-thread-7生产者生产,目前总共有2
pool-1-thread-1生产者生产,目前总共有3
pool-1-thread-6消费者消费,目前总共有2
pool-1-thread-8消费者消费,目前总共有1
pool-1-thread-3生产者生产,目前总共有2
pool-1-thread-4消费者消费,目前总共有1
pool-1-thread-2消费者消费,目前总共有0
pool-1-thread-1生产者生产,目前总共有1
pool-1-thread-8消费者消费,目前总共有0
pool-1-thread-7生产者生产,目前总共有1
pool-1-thread-5生产者生产,目前总共有2
pool-1-thread-6消费者消费,目前总共有1
pool-1-thread-3生产者生产,目前总共有2
pool-1-thread-2消费者消费,目前总共有1
pool-1-thread-4消费者消费,目前总共有0
pool-1-thread-1生产者生产,目前总共有1
pool-1-thread-7生产者生产,目前总共有2
pool-1-thread-6消费者消费,目前总共有1
pool-1-thread-5生产者生产,目前总共有2
pool-1-thread-8消费者消费,目前总共有1
pool-1-thread-4消费者消费,目前总共有0
pool-1-thread-3生产者生产,目前总共有1
pool-1-thread-2消费者消费,目前总共有0
pool-1-thread-1生产者生产,目前总共有1
pool-1-thread-5生产者生产,目前总共有2
pool-1-thread-6消费者消费,目前总共有1
pool-1-thread-2消费者消费,目前总共有0
pool-1-thread-3生产者生产,目前总共有1
pool-1-thread-4消费者消费,目前总共有0
pool-1-thread-7生产者生产,目前总共有1
pool-1-thread-8消费者消费,目前总共有0
pool-1-thread-1生产者生产,目前总共有1
pool-1-thread-4消费者消费,目前总共有0
pool-1-thread-3生产者生产,目前总共有1
pool-1-thread-7生产者生产,目前总共有2
pool-1-thread-5生产者生产,目前总共有3
pool-1-thread-6消费者消费,目前总共有2
pool-1-thread-2消费者消费,目前总共有1
pool-1-thread-8消费者消费,目前总共有0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80

阻塞队列BlockingQueue的实现

BlockingQueue即阻塞队列,在某些情况下对阻塞队列的访问可能会造成阻塞。

被阻塞的情况主要有如下两种:
  • 当队列满了的时候进行入队列操作
  • 当队列空了的时候进行出队列操作
  • 当一个线程对已经满了的阻塞队列进行入队操作时会阻塞,除非有另外一个线程进行了出队操作,当一个线程对一个空的阻塞队列进行出队操作时也会阻塞,除非有另外一个线程进行了入队操作。
  • 阻塞队列是线程安全的。
BlockingQueue接口的一些方法:
操作抛异常特定值阻塞超时
插入add(o)offer(o)put(o)offer(o, timeout, timeunit)
移除remove(o)poll(o)take(o)poll(timeout, timeunit)
检查element(o)peek(o)

这四类方法分别对应的是:

  • ThrowsException:如果操作不能马上进行,则抛出异常
  • SpecialValue:如果操作不能马上进行,将会返回一个特殊的值,一般是true或者false
  • Blocks:如果操作不能马上进行,操作会被阻塞
  • TimesOut:如果操作不能马上进行,操作会被阻塞指定的时间,如果指定时间没执行,则返回一个特殊值,一般是true或者false
BlockingQueue方法对比
方法描述
boolean add(E e)如果可能,向队列中添加一个元素。否则,它抛出异常。
boolean offer(E e)如果能添加元素,则将元素添加到队列中,而不抛出异常。 它在失败时返回false,在成功时返回true。
E remove()删除队列的头。如果队列为空,它会抛出异常。此方法返回已移除的项目。
E poll()从队列中删除元素。如果队列为空而不是抛出异常,则返回null。
Eelement()查看队列的头,而不从队列中删除它。 如果队列为空,它会抛出异常。
E peek()查看队列,如果队列为空而不是抛出异常,则返回null。
void put(E e)put方法只存在于****BlockingQueue类型的阻塞队列中,使用put方法向已满的队列添加新元素时,代码会阻塞在put处
E take()take方法只存在于****BlockingQueue类型的阻塞队列中,获得空队列的头部元素时,会阻塞在获取的位置

下面看由阻塞队列实现的生产者消费者模型,这里使用take()和put()方法,消费者和消费者之间不存在同步,所以会出现连续生成和连续消费的现象。

代码实现
public class ProducerConsumer {
    // 记录产品的数量
    private static volatile int count = 0;
    // 创建阻塞队列
    private final BlockingQueue blockingQueue = new ArrayBlockingQueue(10);

    public static void main(String[] args) {
        ProducerConsumer producerConsumer = new ProducerConsumer();
        ExecutorService service = Executors.newFixedThreadPool(8);
        service.execute(producerConsumer.new Producer());
        service.execute(producerConsumer.new Consumer());
        service.execute(producerConsumer.new Producer());
        service.execute(producerConsumer.new Consumer());
        service.execute(producerConsumer.new Producer());
        service.execute(producerConsumer.new Consumer());
        service.execute(producerConsumer.new Producer());
        service.execute(producerConsumer.new Consumer());
    }

    /**
     * 生产者
     */
    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                try {
                    blockingQueue.put(1);
                    count++;
                    System.out.println(Thread.currentThread().getName()
                            + "生产者生产,目前总共有" + count);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 消费者
     */
    class Consumer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    blockingQueue.take();
                    count--;
                    System.out.println(Thread.currentThread().getName()
                            + "消费者消费,目前总共有" + count);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67

可重入锁ReentrantLock的实现

ReentrantLock简介
  • 通过ReentrantLock对象的lock()方法和unlock()方法实现了对锁的显示控制,而synchronize()则是对锁的隐性控制。
  • 可重入锁,也叫做递归锁,简单来说,该锁维护这一个与获取锁相关的计数器,如果拥有锁的某个线程再次得到锁,那么获取计数器就加1,函数调用结束计数器就减1,然后锁需要被释放两次才能获得真正释放。已经获取锁的线程进入其他需要相同锁的同步代码块不会被阻塞。
  • 从Java 5开始,引入了一个高级的处理并发的java.util.concurrent包,它提供了大量更高级的并发功能,能大大简化多线程程序的编写。
  • Java语言直接提供了synchronized关键字用于加锁,但这种锁一是很重,二是获取时必须一直等待,没有额外的尝试机制。

java.util.concurrent.locks包提供的ReentrantLock用于替代synchronized加锁,我们来看一下传统的synchronized代码:

public class Counter {
    private int count;

    public void add(int n) {
        synchronized(this) {
            count += n;
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

如果用ReentrantLock替代,可以把代码改造为:

public class Counter {
    private final Lock lock = new ReentrantLock();
    private int count;

    public void add(int n) {
        lock.lock();
        try {
            count += n;
        } finally {
            lock.unlock();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 因为synchronized是Java语言层面提供的语法,所以我们不需要考虑异常,而ReentrantLock是Java代码实现的锁,我们就必须先获取锁,然后在finally中正确释放锁。
  • 顾名思义,ReentrantLock是可重入锁,它和synchronized一样,一个线程可以多次获取同一个锁。
  • 和synchronized不同的是,ReentrantLock可以尝试获取锁:
if (lock.tryLock(1, TimeUnit.SECONDS)) {
    try {
        ...
    } finally {
        lock.unlock();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 上述代码在尝试获取锁的时候,最多等待1秒。如果1秒后仍未获取到锁,tryLock()返回false,程序就可以做一些额外处理,而不是无限等待下去。
  • 所以,使用ReentrantLock比直接使用synchronized更安全,线程在tryLock()失败的时候不会导致死锁。
  • ReentrantLock可以替代synchronized进行同步;
  • ReentrantLock获取锁更安全;
  • 必须先获取到锁,再进入try {…}代码块,最后使用finally保证释放锁;
  • 可以使用tryLock()尝试获取锁。
Condition简介

synchronized可以配合wait和notify实现线程在条件不满足时等待,条件满足时唤醒,用ReentrantLock我们怎么编写wait和notify的功能呢?

答案是使用Condition对象来实现wait和notify的功能。

我们仍然以TaskQueue为例,把前面用synchronized实现的功能通过ReentrantLock和Condition来实现:

class TaskQueue {
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private Queue<String> queue = new LinkedList<>();

    public void addTask(String s) {
        lock.lock();
        try {
            queue.add(s);
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public String getTask() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                condition.await();
            }
            return queue.remove();
        } finally {
            lock.unlock();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

可见,使用Condition时,引用的Condition对象必须从Lock实例的newCondition()返回,这样才能获得一个绑定了Lock实例的Condition实例。

Condition提供的await()、signal()、signalAll()原理和synchronized锁对象的wait()、notify()、notifyAll()是一致的,并且其行为也是一样的:

  • await()会释放当前锁,进入等待状态;

  • signal()会唤醒某个等待线程;

  • signalAll()会唤醒所有等待线程;

唤醒线程从await()返回后需要重新获得锁。

此外,和tryLock()类似,await()可以在等待指定时间后,如果还没有被其他线程通过signal()或signalAll()唤醒,可以自己醒来:

if (condition.await(1, TimeUnit.SECOND)) {
    // 被其他线程唤醒
} else {
    // 指定时间内没有被其他线程唤醒
}
  • 1
  • 2
  • 3
  • 4
  • 5

可见,使用Condition配合Lock,我们可以实现更灵活的线程同步。

代码实现
public class ProducerConsumer {
    // 记录产品的数量
    private static volatile int count = 0;
    private static final Integer FULL = 10;
    //创建一个锁对象
    private Lock lock = new ReentrantLock();
    //创建两个条件变量,一个为缓冲区已满,一个为缓冲区非空
    private final Condition alreadyFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public static void main(String[] args) {
        ProducerConsumer producerConsumer = new ProducerConsumer();
        ExecutorService service = Executors.newFixedThreadPool(8);
        service.execute(producerConsumer.new Producer());
        service.execute(producerConsumer.new Consumer());
        service.execute(producerConsumer.new Producer());
        service.execute(producerConsumer.new Consumer());
        service.execute(producerConsumer.new Producer());
        service.execute(producerConsumer.new Consumer());
        service.execute(producerConsumer.new Producer());
        service.execute(producerConsumer.new Consumer());
    }

    /**
     * 生产者
     */
    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000);
                } catch (Exception e) {
                    e.printStackTrace();
                }

                //获取锁
                lock.lock();
                try {
                    while (count == FULL) {
                        try {
                            alreadyFull.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    count++;
                    System.out.println(Thread.currentThread().getName()
                            + "生产者生产,目前总共有" + count);
                    //唤醒消费者
                    notEmpty.signal();
                } finally {
                    //释放锁
                    lock.unlock();
                }
            }
        }
    }

    /**
     * 消费者
     */
    class Consumer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                lock.lock();
                try {
                    while (count == 0) {
                        try {
                            notEmpty.await();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    count--;
                    System.out.println(Thread.currentThread().getName()
                            + "消费者消费,目前总共有" + count);
                    alreadyFull.signal();
                } finally {
                    lock.unlock();
                }
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92

信号量Semaphore的实现

  • Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源,在操作系统中是一个非常重要的问题,可以用来解决哲学家就餐问题。
  • Java中的Semaphore维护了一个许可集,一开始先设定这个许可集的数量,可以使用acquire()方法获得一个许可,当许可不足时会被阻塞,release()添加一个许可。在下列代码中,还加入了另外一个mutex信号量,维护生产者消费者之间的同步关系,保证生产者和消费者之间的交替进行
public class ProducerConsumer {

    // 记录产品的数量
    private static volatile int count = 0;
    //创建三个信号量
    final Semaphore producer = new Semaphore(10);
    final Semaphore consumer = new Semaphore(0);
    final Semaphore mutex = new Semaphore(1);

    public static void main(String[] args) {
        ProducerConsumer producerConsumer = new ProducerConsumer();
        ExecutorService service = Executors.newFixedThreadPool(8);
        service.execute(producerConsumer.new Producer());
        service.execute(producerConsumer.new Consumer());
        service.execute(producerConsumer.new Producer());
        service.execute(producerConsumer.new Consumer());
        service.execute(producerConsumer.new Producer());
        service.execute(producerConsumer.new Consumer());
        service.execute(producerConsumer.new Producer());
        service.execute(producerConsumer.new Consumer());
    }

    /**
     * 生产者
     */
    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000);
                } catch (Exception e) {
                    e.printStackTrace();
                }

                try {
                    producer.acquire();
                    mutex.acquire();
                    count++;
                    System.out.println(Thread.currentThread().getName()
                            + "生产者生产,目前总共有" + count);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    mutex.release();
                    consumer.release();
                }
            }
        }
    }

    /**
     * 消费者
     */
    class Consumer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                try {
                    consumer.acquire();
                    mutex.acquire();
                    count--;
                    System.out.println(Thread.currentThread().getName()
                            + "消费者消费,目前总共有" + count);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    mutex.release();
                    producer.release();
                }
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80

管道输入输出流PipedInputStream和PipedOutputStream实现

  • 在java的io包下,PipedOutputStream和PipedInputStream分别是管道输出流和管道输入流。它们的作用是让多线程可以通过管道进行线程间的通讯。在使用管道通信时,必须将PipedOutputStream和PipedInputStream配套使用。
  • 使用方法:先创建一个管道输入流和管道输出流,然后将输入流和输出流进行连接,用生产者线程往管道输出流中写入数据,消费者在管道输入流中读取数据,这样就可以实现了不同线程间的相互通讯,但是这种方式在生产者和生产者、消费者和消费者之间不能保证同步,也就是说在一个生产者和一个消费者的情况下是可以生产者和消费者之间交替运行的,多个生成者和多个消费者者之间则不行
代码实现
public class ProducerConsumer {
    final PipedInputStream pis = new PipedInputStream();
    final PipedOutputStream pos = new PipedOutputStream();

    {
        try {
            pis.connect(pos);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        ProducerConsumer producerConsumer = new ProducerConsumer();
        ExecutorService service = Executors.newFixedThreadPool(2);
        service.execute(producerConsumer.new Producer());
        service.execute(producerConsumer.new Consumer());

    }

    /**
     * 生产者
     */
    class Producer implements Runnable {
        @Override
        public void run() {
            try {
                while(true) {
                    Thread.sleep(1000);
                    int num = (int) (Math.random() * 255);
                    System.out.println(Thread.currentThread().getName() + "生产者生产了一个数字,该数字为: " + num);
                    pos.write(num);
                    pos.flush();
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    pos.close();
                    pis.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 消费者
     */
    class Consumer implements Runnable {
        @Override
        public void run() {
            try {
                while(true) {
                    Thread.sleep(1000);
                    int num = pis.read();
                    System.out.println("消费者消费了一个数字,该数字为:" + num);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    pos.close();
                    pis.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/267921
推荐阅读
相关标签
  

闽ICP备14008679号