当前位置:   article > 正文

多线程的同步问题的经典案例生产者-消费者模式剖析_第2关:线程同步之生产者-消费者问题

第2关:线程同步之生产者-消费者问题

今天是4月23日(世界读书日),给大家推荐三本尼恩高并发三步曲《java高并发核心编程卷1+卷2+卷3》,帮大家打好3高底子,奔向技术自由,实现辉煌人生。
在这里插入图片描述

那接下来我们就剖析多线程安全的生产者-消费者模式的具体实现。

什么是生产者-消费者问题

生产者-消费者问题(Producer-Consumer Problem)也称有限缓冲问题(Bounded-Buffer Problem),是一个多线程同步问题的经典案例。

生产者-消费者问题描述了两个访问共享缓冲区的线程,即生产者线程和消费者线程,在实际运行时会发生的问题。生产者线程的主要功能是生成一定量的数据放到缓冲区中,然后重复此过程。消费者线程的主要功能是从缓冲区提取(或消耗)数据。

生产者―消费者问题关键是:
1)保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中为空时消耗数据。
2)保证在生产者加入过程、消费者消耗过程中,不会产生错误的数据和行为。

生产者-消费者问题不仅仅是一个多线程同步问题的经典案例,而且业内已经将解决该问题的方案,抽象成为了一种设计模式——“生产者-消费者”模式。

什么是生产者-消费者模式

生产者-消费者模式是一个经典的多线程设计模式,它为多线程间的协作提供了良好的解决方案。

在生产者-消费者模式中,通常由两类线程,即生产者线程(若干个)和消费者线程(若干个)。生产者线程向数据缓冲区(DataBuffer)加入数据,消费者线程则从DataBuffer消耗数据。生
产者和消费者、内存缓冲区之间的关系结构图如下:
在这里插入图片描述
生产者-消费者模式中,有4个关键点:
(1)生产者与生产者之间、消费者与消费者之间,对数据缓冲区的操作是并发进行的。
(2)数据缓冲区是有容量上限的。数据缓冲区满后,生产者不能再加入数据;DataBuffer空时,消费者不能再取出数据。
(3 )数据缓冲区是线程安全的。在并发操作数据区的过程中,不能出现数据不一致情况;或者在多个线程并发更改共享数据后,不会造成出现脏数据的情况。
(4)生产者或者消费者线程在空闲时,需要尽可能阻塞而不是执行无效的空操作,尽量节约CPU资源。

线程不安全的生产者-消费者模式实现

根据生产者―消费者模式的结构图和描述先来实现一个非线程安全版本:包含了数据缓冲区(DataBuffer)类、生产者(Producer)类、消费者(Consumer)类。
首先定义其数据缓冲区类,具体的代码如下:

//共享数据区,类定义
class NotSafeDataBuffer<T> {
    public static final int MAX_AMOUNT = 10;
    //保存具体数据元素
    private List<T> dataList = new LinkedList<>();

    //保存元素数量
    private AtomicInteger amount = new AtomicInteger(0);

    /**
     * 向数据区增加一个元素
     */
    public void add(T element) throws Exception {
        if (amount.get() > MAX_AMOUNT) {
            Print.tcfo("队列已经满了!");
            return;
        }
        dataList.add(element);
        Print.tcfo(element + "");
        amount.incrementAndGet();

        //如果数据不一致,抛出异常
        if (amount.get() != dataList.size()) {
            throw new Exception(amount + "!=" + dataList.size());
        }
    }

    /**
     * 从数据区取出一个元素
     */
    public T fetch() throws Exception {
        if (amount.get() <= 0) {
            Print.tcfo("队列已经空了!");
            return null;
        }
        T element = dataList.remove(0);
        Print.tcfo(element + "");
        amount.decrementAndGet();
        //如果数据不一致,抛出异常
        if (amount.get() != dataList.size()) {
            throw new Exception(amount + "!=" + dataList.size());
        }
        return element;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

在add()实例方法中,加入元素之前首先会对amount是否达到上限进行判断,如果数据区满了,则不能加入数据;在fetch()实例方法中,消耗元素前首先会对amount是否大于零进行判断,如果数据区空了,就不能取出数据。

生产者-消费者模式中,数据缓冲区(DataBuffer)类以及相应的生产、消费动作(Action)是可变的,生产者类、消费者类的执行逻辑是不同的,那本着“分离变与不变”的软件设计基本原则,可以将生产者类、消费者类与具体的生产、消费Action解耦,从而使得生产者类、消费者类的代码在后续可以复用,生产者、消费者逻辑与对应Action解耦后的类结构图如下:
在这里插入图片描述
通用Producer类组合了一个Callable类型的成员action实例,代表了生产数据所需要执行的实际动作,需要在构造Producer实例时传入。通用生产者类的代码具体如下:

/**
 * 生产者任务的定义
 * Created by 尼恩@疯狂创客圈.
 */
public class Producer implements Runnable {
    //生产的时间间隔,产一次等待的时间,默认为200ms
    public static final int PRODUCE_GAP = 200;

    //总次数
    // 注意:
    // 不是单个的次数
    // 是所有生产者的总的生产次数
    static final AtomicInteger TURN = new AtomicInteger(0);

    //生产者对象编号
    static final AtomicInteger PRODUCER_NO = new AtomicInteger(1);

    //生产者名称
    String name = null;

    //生产的动作
    Callable action = null;

    int gap = PRODUCE_GAP;

    public Producer(Callable action, int gap) {
        this.action = action;
        this.gap = gap;
        if (this.gap <= 0) {
            this.gap = PRODUCE_GAP;
        }
        name = "生产者-" + PRODUCER_NO.incrementAndGet();

    }

    public Producer(Callable action) {
        this.action = action;
        this.gap = PRODUCE_GAP;
        name = "生产者-" + PRODUCER_NO.incrementAndGet();

    }

    @Override
    public void run() {
        while (true) {

            try {
                //执行生产动作
                Object out = action.call();
                //输出生产的结果
                if (null != out) {
                    Print.tcfo("第" + TURN.get() + "轮生产:" + out);
                }
                //每一轮生产之后,稍微等待一下
                sleepMilliSeconds(gap);

                //增加生产轮次
                TURN.incrementAndGet();

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65

通用Consumer类也组合了一个Callable类型的成员action实例,代表了消费者所需要执行的实际消耗动作,需要在构造Consumer实例时传入。通用Consumer类的代码具体如下:

/**
 * 消费者任务的定义
 * Created by 尼恩@疯狂创客圈.
 */
public class Consumer implements Runnable {

    //消费的时间间隔,默认等待100毫秒
    public static final int CONSUME_GAP = 100;


    //消费总次数
    // 注意:
    // 不是单个消费者的次数
    // 是所有消费者的总的消费次数
    static final AtomicInteger TURN = new AtomicInteger(0);

    //消费者对象编号
    static final AtomicInteger CONSUMER_NO = new AtomicInteger(1);

    //消费者名称
    String name;

    //消费的动作
    Callable action = null;

    //消费一次等待的时间,默认为1000ms
    int gap = CONSUME_GAP;

    public Consumer(Callable action, int gap) {
        this.action = action;
        this.gap = gap;
        name = "消费者-" + CONSUMER_NO.incrementAndGet();

    }

    public Consumer(Callable action) {
        this.action = action;
        this.gap = gap;
        this.gap = CONSUME_GAP;
        name = "消费者-" + CONSUMER_NO.incrementAndGet();
    }

    @Override
    public void run() {
        while (true) {
            //增加消费次数
            TURN.incrementAndGet();
            try {
                //执行消费动作
                Object out = action.call();
                if (null != out) {
                    Print.tcfo("第" + TURN.get() + "轮消费:" + out);
                }
                //每一轮消费之后,稍微等待一下
                sleepMilliSeconds(gap);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

在完成了数据缓冲区类的定义、生产者类定义、消费者类的定义之后,接下来定义一下数据缓冲区实例、生产动作和消费动作,具体的代码如下:

public class NotSafePetStore {
    //共享数据区,实例对象
    private static NotSafeDataBuffer<IGoods> notSafeDataBuffer = new NotSafeDataBuffer();

    //生产者执行的动作
    static Callable<IGoods> produceAction = () ->
    {
        //首先生成一个随机的商品
        IGoods goods = Goods.produceOne();
        //将商品加上共享数据区
        try {
            notSafeDataBuffer.add(goods);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return goods;
    };
    //消费者执行的动作
    static Callable<IGoods> consumerAction = () ->
    {
        // 从PetStore获取商品
        IGoods goods = null;
        try {
            goods = notSafeDataBuffer.fetch();

        } catch (Exception e) {
            e.printStackTrace();
        }
        return goods;
    };

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

利用以上NotSafePetStore类所定义的三个静态成员,可以快速组装出一个简单的生产者-消费者模式的Java实现版本,具体的代码如下:

public static void main(String[] args) throws InterruptedException {
        System.setErr(System.out);

        // 同时并发执行的线程数
        final int THREAD_TOTAL = 20;
        //线程池,用于多线程模拟测试
        ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_TOTAL);
        for (int i = 0; i < 5; i++) {
            //生产者线程每生产一个商品,间隔500ms
            threadPool.submit(new Producer(produceAction, 500));
            //消费者线程每消费一个商品,间隔1500ms
            threadPool.submit(new Consumer(consumerAction, 1500));
        }
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

在NotSafePetStore的main()方法中,利用for循环向线程池提交了5个生产者线程和5个消费者实例。每个生产者实例生产一个商品间隔500毫秒;消费者实例每消费一个商品间隔1500毫秒;也就是说,生产的速度大于消费的速度。

执行结果如下:
在这里插入图片描述
从以上异常可以看出,在向数据缓冲区进行元素的增加或者提取时,多个线程在并发执行对amount、dataList两个成员操作时次序已经混乱,导致了数据不一致和线程安全问题。

线程安全的生产者-消费者模式实现

解决生产者-消费者模式的线程安全问题有多种方式,比较低级的办法是用synchronized+wait+notify方式解决,比较推荐的方法是用信号量(Semaphore)、Blockingqueue、disruptor 的方式来解决,那么咱们就挨个来看下。

synchronized+wait()+notify() 的实现方式

使用synchronized解决生产者和消费者模式,首先我们需要找出临界区资源和临界区代码块。

首先,我们来看下什么是临界区资源。临界区资源表示一种可以被多个线程使用的公共资源或共享数据,但是每一次只能有一个线程使用它。一旦临界区资源被占用,想使用该资源的其他线程则必须等待。在并发情况下,临界区资源是受保护的对象。

接下来,我们再来看下什么是临界区代码块。临界区代码段(Critical Section)是每个线程中访问临界资源的那段代码,多个线程必须互斥地对临界区资源进行访问。线程进入临界区代码段之前,必须在进入区申请资源,申请成功之后进行临界区代码段,执行完成之后释放资源。临界区代码段的进入和退出如下:
在这里插入图片描述
最后,我们来看下竟态条件(Race Conditions)可能是由于在访问临界区代码段时没有互斥地访问而导致的特殊情况。如果多个线程在临界区代码段的并发执行结果可能因为代码的执行顺序不同而出现不同的结果,我们就说这时在临界区出现了竞态条件问题。

那咱们回过头来看生产者-消费者模式, 这个模式中, 生产者和消费者都需要操作DataBuffer(数据缓冲区)中,可以知道,临界区代码段在DataBuffer(数据缓冲区)中。在数据缓冲区中,主要是数据进行操作, 那么 由两个临界区资源,分别是amount和dataList。 由生产者-消费者模式的关键点我们可知, 生产者与生产者之间、消费者与消费者之间,对数据缓冲区的操作是并发进行的。那么添加数据和消耗数据是临界区代码,即其add()和fetch()两个方法。 那么创建建一个安全的数据缓存区类SafeDataBuffer类,在其add()和fetch()两个实例方法的public声明后面加上synchronized关键字即可。那线程安全的SafeDataBuffer类代码如下:

//共享数据区,类定义
class SafeDataBuffer<T> {
    public static final int MAX_AMOUNT = 10;
    private List<T> dataList = new LinkedList<>();

    //保存数量
    private AtomicInteger amount = new AtomicInteger(0);

    /**
     * 向数据区增加一个元素
     */
    public synchronized void add(T element) throws Exception {
        if (amount.get() > MAX_AMOUNT) {
            Print.tcfo("队列已经满了!");
            return;
        }
        dataList.add(element);
        Print.tcfo(element + "");
        amount.incrementAndGet();

        //如果数据不一致,抛出异常
        if (amount.get() != dataList.size()) {
            throw new Exception(amount + "!=" + dataList.size());
        }
    }

    /**
     * 从数据区取出一个元素
     */
    public synchronized T fetch() throws Exception {
        if (amount.get() <= 0) {
            Print.tcfo("队列已经空了!");
            return null;
        }
        T element = dataList.remove(0);
        Print.tcfo(element + "");
        amount.decrementAndGet();
        //如果数据不一致,抛出异常
        if (amount.get() != dataList.size()) {
            throw new Exception(amount + "!=" + dataList.size());
        }
        return element;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

由于其他的代码没有发生变化,我们执行看下结果:

在这里插入图片描述
运行这个线程安全的生产者-消费者模式的实现版本,等待一段时间,之前出现的amount数量和dataList的长度不相等的受检异常没有再抛出;之前出现的数据不一致情况以及线程安全问题也被完全解除。

目前的SafeDataBuffer类中,还存在一个隐蔽、但是又很耗性能的问题:消费者每一轮消费,不管数据区是否为空,都需要进行数据区的询问和判断。循环的代码如下:

    /**
     * 从数据区取出一个元素
     */
    public synchronized T fetch() throws Exception {
        if (amount.get() <= 0) {
            Print.tcfo("队列已经空了!");
            return null;
        }
        ....
     .}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

当数据区空时(amount <= 0),消费者无法取出数据,但是仍然做一个无用的数据区询问工作,白白耗费了CPU的时间片

对于生产者来说,也存在类似的无效轮询问题。当数据区满时,生产者无法加入数据,这时候生产者执行add(T element)方法也白白耗费了CPU的时间片。

 /**
     * 向数据区增加一个元素
     */
    public synchronized void add(T element) throws Exception {
        if (amount.get() > MAX_AMOUNT) {
            Print.tcfo("队列已经满了!");
            return;
        }
      ....
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

在生产者或者消费者空闲时节约CPU时间片,免去巨大的CPU资源浪费的方法是使用“等待-通知”方式进行生产者与消费者之间的线程通信。具体实现:
(1)在数据区满(amount.get() > MAX_AMOUNT)时,可以让生产者等待,等到下次数据区中可以加入数据时,给生产者发通知,让生产者唤醒。
(2)在数据区空(amount <= 0)时,可以让消费者等待,等到下次数据区中可以取出数据时,消费者才能被唤醒。
(3)可以在消费者取出一个数据后,由消费者去唤醒等待的生产者。
(4)可以在生产者加入一个数据后,由生产者去唤醒等待的消费者。

Java语言中“等待-通知”方式的线程间的通信使用对象的wait()、notify()两类方法来实现。每个Java对象都有wait()、notify()两类实例方法,并且wait()、notify()方法和对象的监视器是紧密相关的。

Java对象中的wait()、notify()两类方法就如同信号开关,用来进行等待方和通知方之间的交互。

对象的wait()方法的主要作用是让当前线程阻塞并等待被唤醒。wait()方法与对象监视器紧密相关,使用wait()方法时也一定需要放在同步块中。wait()方法的调用方法如下:

synchronized(locko)
{
//同步保护的代码块
locko.wait();
...
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

对象的notify()方法的主要作用是唤醒在等待的线程。notify()方法与对象监视器紧密相关,使用notify()方法时也需要放在同步块中。notify()方法的调用方法如下:

synchronized(locko)
{
//同步保护的代码块
locko.notify();
...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

为了避免空轮询导致CPU时间片浪费,提高生产者-消费者实现版本的性能,接下来演示使用“等待-通知”的方式在生产者与消费者之间进行线程间通信。

使用“等待-通知”机制通信的生产者-消费者实现版本定义三个同步对象,具体如下:
(1)LOCK_OBJECT:用于临界区同步,临界区资源为数据缓冲区的dataList变量和amount 变量。

(2)NOT_FULL:用于数据缓冲区的未满条件等待和通知。生产者在添加元素前,需要判断数据区是否已满,如果是,生产者进入NOT_FULL的同步区去等待被通知,只要消费者消耗一个元素,数据区就是未满的,进入NOT_FULL的同步区发送通知。

(3)NOT_EMPTY:用于数据缓冲区的非空条件等待和通知。消费者在消耗元素前需要判断数据区是否已空,如果是,消费者进入NOT_EMPTY的同步区等待被通知,只要生产者添加一个元素,数据区就是非空的,生产者会进入NOT_EMPTY的同步区发送通知。

具体代码如下:

public class CommunicatePetStore {

    public static final int MAX_AMOUNT = 10; //数据区长度


    //共享数据区,类定义
    static class DateBuffer<T> {
        //保存数据
        private List<T> dataList = new LinkedList<>();
        //保存数量
        private volatile int amount = 0;

        private final Object LOCK_OBJECT = new Object();
        private final Object NOT_FULL = new Object();
        private final Object NOT_EMPTY = new Object();

        // 向数据区增加一个元素
        public void add(T element) throws Exception {
            synchronized (NOT_FULL) {
                while (amount >= MAX_AMOUNT) {
                    Print.tcfo("队列已经满了!");
                    //等待未满通知
                    NOT_FULL.wait();
                }
            }
            synchronized (LOCK_OBJECT) {

                if (amount < MAX_AMOUNT) { // 加上双重检查,模拟双检锁在单例模式中应用
                    dataList.add(element);
                    amount++;
                }
            }
            synchronized (NOT_EMPTY) {
                //发送未空通知
                NOT_EMPTY.notify();
            }


        }

        /**
         * 从数据区取出一个商品
         */
        public T fetch() throws Exception {
            synchronized (NOT_EMPTY) {
                while (amount <= 0) {
                    Print.tcfo("队列已经空了!");
                    //等待未空通知
                    NOT_EMPTY.wait();
                }
            }

            T element = null;
            synchronized (LOCK_OBJECT) {
                if (amount > 0) {  // 加上双重检查,模拟双检锁在单例模式中应用
                    element = dataList.remove(0);
                    amount--;
                }
            }

            synchronized (NOT_FULL) {
                //发送未满通知
                NOT_FULL.notify();
            }
            return element;
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68

那以上就是使用synchronized+wait+notify实现的线程安全的生产者-消费者模式。虽然线程安全问题顺利解决,但是以上的解决方式使用了SafeDataBuffer的实例的对象锁作为同步锁,这样一来,所有的生产、消费动作在执行过程中都需要抢占同一个同步锁,最终的结果是所有的生产、消费动作都被串行化了。而且在锁竞争激烈的情况下,synchronized锁会膨胀升级为重量级锁,严重的影响的程序的性能。

那接下我们看下信号量(semaphore)实现线程安全的生产者-消费者模式。

信号量实现(Semaphore)

信号量是Dijkstra在1965年提出的一种方法,它使用一个整型变量来累计唤醒次数,供以后使用。在他的建议中引入了一个新的变量类型,称作信号量(semaphore)。一个信号量的取值可以为0(表示没有保存下来的唤醒操作)或者正值(表示有一个或多个唤醒操作)。

Dijkstra建议设立两种操作:down和up(分别为一般化后的sleep和wakeup)。对一个信号量执行down操作,则是检查其值是否大于0。若该值大于0,则将其减1(即用掉一个保存的唤醒信号)并继续;若该值为0,则进程将睡眠,而且此时down操作并未结束。检查数值、修改变量值以及可能发生的睡眠操作均作为一个单一的、不可分割的原子操作完成。保证一旦一个信号量操作开始,则在该操作完成或阻塞之前,其他进程均不允许访问该信号量。这种原子性对于解决同步问题和避免竞争条件是绝对必要的。所谓原子操作,是指一组相关联的操作要么都不间断地执行,要么不执行。

up操作对信号量的值增1。如果一个或多个进程在该信号量上睡眠,无法完成一个先前的down操作,则由系统选择其中的一个(如随机挑选)并允许该进程完成它的down操作。于是,对一个有进程在其上睡眠的信号量执行一次up操作后,该信号量的值仍旧是0,但在其上睡眠的进程却少了一个。信号量的值增加1和唤醒一个进程同样也是不可分割的,不会有某个进程因执行up而阻塞,正如前面的模型中不会有进程因执行wakeup而阻塞一样。

在Dijkstra原来的论文中,他分别使用名称P和V而不是down和up,荷兰语中,Proberen的意思是尝试,Verhogen的含义是增加或升高。

从物理上说明信号量的P、V操作的含义。 P(S)表示申请一个资源,S.value>0表示有资源可用,其值为资源的数目;S.value=0表示无资源可用;S.value<0, 则|S.value|表示S等待队列中的进程个数。V(S)表示释放一个资源,信号量的初值应该大于等于0。P操作相当于“等待一个信号”,而V操作相当于“发送一个信号”,在实现同步过程中,V操作相当于发送一个信号说合作者已经完成了某项任务,在实现互斥过程中,V操作相当于发送一个信号说临界资源可用了。实际上,在实现互斥时,P、V操作相当于申请资源和释放资源。

该解决方案使用了三个信号量:一个称为full,用来记录充满缓冲槽数目,一个称为empty,记录空的缓冲槽总数;一个称为mutex,用来确保生产者和消费者不会同时访问缓冲区。full的初值为0,empty的初值为缓冲区中槽的数目,mutex的初值为1。供两个或多个进程使用的信号量,其初值为1,保证同时只有一个进程可以进入临界区,称作二元信号量。如果每个进程在进入临界区前都执行down操作,并在刚刚退出时执行一个up操作,就能够实现互斥。

信号量的另一种用途是用于实现同步,信号量full和empty用来保证某种事件的顺序发生或不发生。在本例中,它们保证当缓冲区满的时候生产者停止运行,以及当缓冲区空的时候消费者停止运行。

对于无界缓冲区的生产者—消费者问题,两个进程共享一个不限大小的公共缓冲区。由于是无界缓冲区(仓库是无界限制的),即生产者不用关心仓库是否满,只管往里面生产东西,但是消费者还是要关心仓库是否空。所以生产者不会因得不到缓冲区而被阻塞,不需要对空缓冲区进行管理,可以去掉在有界缓冲区中用来管理空缓冲区的信号量及其PV操作。

在JUC中的信号量Semaphore属于共享锁。Semaphore可以用来控制在同一时刻访问共享资源的线程数量,通过协调各个线程以保证共享资源的合理使用。Semaphore维护了一组虚拟许可,其数量可以通过构造器的参数指定。线程在访问共享资源前必须使用Semaphore的acquire()方法获得许可,如果许可数量为0,该线程就一直阻塞。线程访问完成资源后,必须使用Semaphore的release()方法释放许可。更形象的说法是:Semaphore是一个是许可管理器。

JUC包中Semaphore类的主要方法大致如下:
(1)Semaphore(permits)
构造一个Semaphore实例,初始化其管理的许可数量为permits参数值。
(2)Semaphore(permits,fair)
构造一个Semaphore实例,初始化其管理的许可数量为permits参数值,并且可以设置是否以公平模式(fair参数是否为true)进行许可的发放。
(3)availablePermits()
获取Semaphore对象可用的许可数量。
(4)acquire()
当前线程尝试获取Semaphore对象的一个许可。此过程是阻塞的,线程会一直等待Semaphore发放一个许可,直到发生以下任意一件事:

  • 当前线程获取了一个可用的许可。
  • 当前线程被中断,就会抛出 InterruptedException 异常,并停止等待,继续往下执行。
    (5)acquire(permits)
    当前线程尝试以阻塞方式获取permits个许可。此过程是阻塞的,线程会一直等待Semaphore发放permits个许可。如果没有足够的许可而当前线程被中断,就会抛出InterruptedException异常并终止阻塞。
    (6)acquireUninterruptibly()
    当前线程尝试以阻塞方式获取一个许可,阻塞的过程不可中断,直到成功获取一个许可。
    (7)acquireUninterruptibly(permits)
    当前线程尝试以阻塞方式获取permits个许可,阻塞的过程不可中断,直到成功获取permits个许可。
    (8)tryAcquire()
    当前线程尝试去获取一个许可。此过程是非阻塞的,它只是进行一次尝试,会立即返回。如果当前线程成功获取了一个许可,就返回true。如果当前线程没有获得到许可,就返回false。
    (9)tryAcquire(permits)
    当前线程尝试去获取permits个许可。此过程是非阻塞的,它只是进行一次尝试,会立即返回。如果当前线程成功获取了permits个许可,就返回true。如果当前线程没有获得到permits个许可,就返回false。
    (10)tryAcquire(timeout,TimeUnit)
    限时获取一个许可。此过程是阻塞的,它会一直等待许可,直到发生以下任意一件事:
  • 当前线程获取了一个许可,则会停止等待,继续执行,并返回 true 。
  • 当前线程等待 timeout 后超时,则会停止等待,继续执行 ,并返回 false 。
  • 当前线程在 timeout 时间内被中断,则会抛出 InterruptedException 异常,并停止等待,继续
    执行。
    (11)tryAcquire(permits,timeout,TimeUnit)
    与tryAcquire(timeout,TimeUnit)方法在逻辑上基本相同,不同之处在于:在获取许可的数量上不同,此方法用于获取permits个许可。
    (12)release()
    当前线程释放一个可用的许可。
    (13)release(permits)
    当前线程释放permits个可用的许可。
    (14)drainPermits()
    当前线程获得剩余的所有可用许可。
    (15)hasQueuedThreads()
    判断当前Semaphore对象上是否存在正在等待许可的线程。
    (16)getQueueLength()
    获取当前Semaphore对象上正在等待许可的线程数量。

那接下来我们就看下使用Semaphore实现的生产者-消费者模式的代码,主要是针对临界区资源和临界区代码进行修改,具体修改如下:

public class SemaphorePetStore {
    public static final int MAX_AMOUNT = 10; //数据区长度

    //共享数据区,类定义
    static class DateBuffer<T> {
        //保存数据
        private LinkedBlockingDeque<T> dataList = new LinkedBlockingDeque<>(MAX_AMOUNT);

        //保存数量
        private volatile int amount = 0;
        // 每次处理的次数
        private static final int times = 100;

        //信号量标识
        private static AtomicInteger signal = new AtomicInteger(0);


        // 向数据区增加一个元素
        public void add(T element) throws Exception {
            while (amount < times) {
                if (signal.get() >= 0 && dataList.size() == 0) {
                    synchronized (signal) {
                        //生产者: P操作 -1
                        Print.fo("生产者: P操作 -1 ");
                        signal.incrementAndGet();
                        Print.fo("生产者: 生产,放入一个对象");
                        dataList.add(element);
                        amount++;
                        //生产者: P操作 -1
                        Print.fo("生产者: V操作 +1");
                        signal.decrementAndGet();
                        Print.fo("生产者: 通知消费者,生产者阻塞");
                        signal.notifyAll();
                        // 阻塞
                        signal.wait();
                        ;
                    }
                } else {
                    Thread.sleep(10);
                }
            }
        }

        /**
         * 从数据区取出一个商品
         */
        public T fetch() throws Exception {
            T element = null;
            while (amount < times) {
                if (signal.get() <= 0 && dataList.size() > 0) {
                    synchronized (signal) {
                        //消费者: P操作 -1
                        Print.fo("消费者: P操作 -1 ");
                        signal.decrementAndGet();
                        Print.fo("消费者: 消费,取出一个对象");
                        element = dataList.take();
                        amount--;
                        //生产者: P操作 -1
                        Print.fo("消费者: V操作 +1");
                        signal.incrementAndGet();
                        Print.fo("消费者: 通知生产者,消费者阻塞");
                        signal.notifyAll();
                        // 阻塞
                        signal.wait();
                        ;
                    }
                } else {
                    Thread.sleep(10);
                }
            }
            return element;
        }
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74

由于其他代码未做更改,小伙伴可参考前面的线程不安全的生产者类、消费者类以及组装生产者-消费者模式的实现。部分执行结果如下:
在这里插入图片描述

Blockingqueue 实现

在多线程环境中,通过BlockingQueue(阻塞队列)可以很容易地实现多线程之间数据共享和通信。

阻塞队列与普通队列(ArrayDeque等)之间的最大不同点在于阻塞队列提供了阻塞式的添加和删除方法。
(1)阻塞添加
所谓的阻塞添加是指当阻塞队列元素已满时,队列会阻塞添加元素的线程,直队列元素不满时,才重新唤醒线程执行元素添加操作。

(2)阻塞删除
阻塞删除是指在队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空时,才重新唤醒删除线程再执行删除操作。

BlockingQueue的实现类有ArrayBlockingQueue、DelayQueue、LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,具体如下:
在这里插入图片描述
ArrayBlockingQueue是一个常用的阻塞队列,是基于数组实现的,其内部使用一个定长数组存储元素。除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整型变量,分别标识队列的头部和尾部在数组中的位置。ArrayBlockingQueue的添加和删除操作都是共用同一个锁对象,由此意味着添加和删除无法并行运行,这一点不同于LinkedBlockingQueue。ArrayBlockingQueue完全可以将添加和删除的锁分离,从而添加和删除操作完全并行。Doug Lea之所以没有这样去做,是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧。

LinkedBlockingQueue是基于链表的阻塞队列,其内部也维持着一个数据缓冲队列(该队列由一个链表构成)。LinkedBlockingQueue对于添加和删除元素分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

DelayQueue中的元素只有当其指定的延迟时间到了,才能从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中添加数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。DelayQueue使用场景较少,但是相当巧妙,常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。

基于优先级的阻塞队列和DelayQueue类似,PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。在使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。

相对于有缓冲的阻塞队列(如LinkedBlockingQueue)来说,SynchronousQueue少了中间缓冲区(如仓库)的环节。如果有仓库,生产者直接把商品批发给仓库,不需要关心仓库最终会将这些商品发给哪些消费者,由于仓库可以中转部分商品,总体来说有仓库进行生产和消费的吞吐量高一些。反过来说,又因为仓库的引入,使得商品从生产者到消费者中间增加了额外的交易环节,单个商品的及时响应性能可能会降低,所以对单个消息的响应要求高的场景可以使用SynchronousQueue。声明一个SynchronousQueue有两种不同的方式:公平模式和非公平模式。公平模式的SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略。非公平模式(默认情况)的SynchronousQueue采用非公平锁,同时配合一个LIFO堆栈(TransferStack内部实例)来管理多余的生产者和消费者。对于后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现线程饥渴的情况,即可能出现某些生产者或者消费者的数据永远都得不到处理。

了解完阻塞队列的基本方法、主要类型之后,下面通过ArrayBlockingQueue队列实现一个生产者-消费者的案例。

具体的代码在前面的生产者和消费者实现基础上进行迭代——Consumer(消费者)和Producer(生产者)通过ArrayBlockingQueue队列获取和添加元素。其中,消费者调用了take()方法获取元素,当队列没有元素就阻塞;生产者调用put()方法添加元素,当队列满时就阻塞。通过这种方式便实现生产者-消费者模式,比直接使用等待唤醒机制或者Condition条件队列更加简单。基于ArrayBlockingQueue的生产者和消费者实现版本具体的UML类图如下

在这里插入图片描述
出于“分离变与不变”的原则,此版本的Producer(生产者)、Consumer(消费者)等的逻辑不用变化,直接复用前面原的代码即可。此版本DataBuffer(共享数据区)需要变化,使用一个ArrayBlockingQueue用于缓存数据,具体的代码如下:

public class ArrayBlockingQueuePetStore {

    public static final int MAX_AMOUNT = 10; //数据区长度


    //共享数据区,类定义
    static class DateBuffer<T> {
        //保存数据
        private ArrayBlockingQueue<T> dataList = new ArrayBlockingQueue<>(MAX_AMOUNT);


        // 向数据区增加一个元素
        public void add(T element) throws Exception {
            dataList.put(element);
        }

        /**
         * 从数据区取出一个商品
         */
        public T fetch() throws Exception {
            return dataList.take();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

运行程序,部分执行结果如下:
在这里插入图片描述

无锁实现

用BlockigQueue队列实现生产者和消费者是一个不错的选择。它可以很自然地实现作为生产者和消费者的内存缓冲区。但是BlockigQueue队列并不是一个高性能的实现,它完全使用锁和阻塞等待来实现线程间的同步。在高并发场合,它的性能并不是特别的优越。

如果我们使用CAS来实现生产者-消费者模式,也同样可以获得可观的性能提升。不过正如大家所见,使用CAS进行编程是非常困难的,但有一个好消息是,目前有一个现成的Disruptor框架,它已经帮助我们实现了这一个功能。

Disruptor框架是由LMAX公司开发的一款高效的无锁内存队列。它使用无锁的方式实现了一个环形队列(RingBuffer),非常适合实现生产者-消费者模式,比如事件和消息的发布。目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。

Disruptor框架别出心裁地使用了环形队列来代替普通线形队列,这个环形队列内部实现为一个普通的数组。对于一般的队列,势必要提供队列同步head和尾部tail两个指针,用于出队和入队,这样无疑增加了线程协作的复杂度。但如果队列是环形的,则只需要对外提供一个当前位置cursor,利用这个指针既可以进行入队操作,也可以进行出队操作。由于环形队列的缘故,队列的总大小必须事先指定,不能动态扩展。为了能够快速从一个序列(sequence)对应到数组的实际位置(每次有元素入队,序列就加1),Disruptor框架要求我们必须将数组的大小设置为2的整数次方。这样通过sequence &(queueSize-1)就能立即定位到实际的元素位置index,这比取余(%)操作快得多。

如果queueSize是2的整数次幂,则这个数字的二进制表示必然是10、100、1000、10000等形式。因此,queueSize-1的二进制则是一个全1的数字。因此它可以将sequence限定在queueSize-1范围内,并且不会有任何一位是浪费的。

RingBuffer的结构如下:
在这里插入图片描述
其实质只是一个普通的数组,只是当放置数据填充满队列(即到达2^n-1位置)之后,再填充数据,就会从0开始,覆盖之前的数据,于是就相当于一个环。

RingBuffer的指针(Sequence)属于一个volatile变量,同时也是我们能够不用锁操作就能实现Disruptor的原因之一,而且通过缓存行补充,避免伪共享问题。 该所谓指针是通过一直自增的方式来获取下一个可写或者可读数据,该数据是Long类型,不用担心会爆掉。有人计算过: long的范围最大可以达到9223372036854775807,一年365 * 24 * 60 * 60 = 31536000秒,每秒产生1W条数据,也可以使用292年。

Disruptor 不像传统的队列,分为一个队头指针和一个队尾指针,而是只有一个角标(上面的seq),
在Disruptor中生产者分为单生产者和多生产者,在枚举类ProducerType中定义单生产(SINGLE)和多生产(MULTI)。而消费者并没有区分。

单生产者情况下,就是普通的生产者向RingBuffer中放置数据,消费者获取最大可消费的位置,并进行消费。单生产者线程写数据的流程比较简单,具体如下:
(1)申请写入m个元素;
(2)若是有m个元素可以入,则返回最大的序列号。这儿主要判断是否会覆盖未读的元素;
(3)若是返回的正确,则生产者开始写入元素。
在这里插入图片描述

采用多生产者时,会遇到“如何防止多个线程重复写同一个元素”的问题。Disruptor的解决方法是,每个线程获取不同的一段数组空间进行操作。这个通过CAS很容易达到。只需要在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可。

但是又会碰到新问题:如何防止读取的时候,读到还未写的元素。那么Disruptor引入了一个跟RingBuffer同样大小的Buffer,称为AvailableBuffer。当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。多生产者流程如下:
(1)申请写入m个元素;
(2)若是有m个元素可以写入,则返回最大的序列号。每个生产者会被分配一段独享的空间;
(3)生产者写入元素,写入元素的同时设置available Buffer里面相应的位置,以标记自己哪些位置是已经写入成功的。
在这里插入图片描述

那么生产者和消费者模式在RingBuffer上的情况如下
在这里插入图片描述
生产者向缓冲区中写入数据,而消费者从中读取数据。生产者写入数据时,使用CAS操作,消费者读取数据时,为了防止多个消费者处理同一个数据,也使用CAS操作进行数据保护。

基于Disruptor的高性能生产者和消费者模式的类图如下:
在这里插入图片描述
MsgEven 是存放数据对象的载体,具体代码如下:

public class MsgEven {

    private IGoods goods;

    public IGoods getGoods() {
        return goods;
    }

    public void setGoods(IGoods goods) {
        this.goods = goods;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

消费者的作用是读取数据进行处理。这里,数据的读取已经由Disruptor框架进行封装了,onEvent()方法为框架的回调方法。因此,只需要简单地进行数据处理即可。具体代码如下:

public class Consumer  implements EventHandler<MsgEven> {

    //消费的时间间隔,默认等待100毫秒
    public static final int CONSUME_GAP = 100;


    //消费者对象编号
    static final AtomicInteger CONSUMER_NO = new AtomicInteger(1);

    //消费者名称
    String name;


    public Consumer() {
        name = "消费者-" + CONSUMER_NO.incrementAndGet();
    }


    @Override
    public void onEvent(MsgEven msgEven, long sequence, boolean endOfBatch)  {
  
        Print.tcfo("消费者中:"+sequence+"商品信息:"+msgEven.getGoods());
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

需要一个产生MsgEven 对象的工厂类GoodsFactory。它会在Disruptor框架系统初始化时,构造所有的缓冲区中的对象实例,具体代码如下:

public class GoodsFactory implements EventFactory<MsgEven> {
    @Override
    public MsgEven newInstance() {
        return new MsgEven();
    }
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

生产者需要一个RingBuffer的引用,也就是环形缓冲区。它有一个重要的方法add()将产生的数据推入缓冲区。方法add()接收一个IGood对象。add()方法的功能就是将传入的IGood对象中的数据提取出来,并装载到环形缓冲区中。具体代码如下:

public class Produer {

    //生产者对象编号
    static final AtomicInteger PRODUCER_NO = new AtomicInteger(1);

    //生产者名称
    String name = null;



    private  final RingBuffer<MsgEven> ringBuffer ;

    public Produer(RingBuffer<MsgEven> ringBuffer) {
        name = "生产者-" + PRODUCER_NO.incrementAndGet();
        this.ringBuffer = ringBuffer;
    }

    public  void add(IGoods goods){
        // 1.ringBuffer 事件队列 下一个槽
        long sequence = ringBuffer.next();
        try {
            //2.取出空的事件队列
            MsgEven    msgEven= ringBuffer.get(sequence);
            msgEven.setGoods(goods);
            //3.获取事件队列传递的数据
            Print.cfo("生产者名称:"+name+",生产商品:"+goods.toString());
        }finally {
            //4.发布事件
            ringBuffer.publish(sequence);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

我们的生产者、消费者和数据都已经准备就绪,只差一个统筹规划的主函数将所有的内容整合起来。具体代码如下:

public class DisruptorPetStore {
    public static void main(String[] args) throws InterruptedException {
        // 1.创建工厂
        GoodsFactory dateBufferFactory= new GoodsFactory();
       //2.创建ringBuffer 大小,大小一定要是2的N次方
        int bufferSize=1024*1024;

        //3.创建Disruptor
        Disruptor<MsgEven>  disruptor = new Disruptor<MsgEven>(dateBufferFactory,bufferSize, Executors.defaultThreadFactory(),ProducerType.MULTI,new BlockingWaitStrategy());
        //4.设置事件处理器 即消费者
        disruptor.handleEventsWith(new Consumer());
        // 5.启动
        disruptor.start();

        // 6.创建RingBuffer容器
        RingBuffer<MsgEven> ringBuffer= disruptor.getRingBuffer();

        //7.创建生产者
        Produer produer = new Produer(ringBuffer);

        for (int l=0;true;l++){
           IGoods goods = Goods.produceOne();
            produer.add(goods);
            Thread.sleep(100);

        }
    }
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

部分执行结果如下:
在这里插入图片描述

学习生产者-消费者模式学习的思想, 消息队列、缓存中也有生产者-消费者模式的思想,小伙伴可以自行研究。

尼恩高并发三步曲《java高并发核心编程卷1+卷2+卷3》,关注尼恩公众号《技术自由圈》可以领电子部哦。
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/661546
推荐阅读
相关标签
  

闽ICP备14008679号