当前位置:   article > 正文

多线程之生产者与消费者模型_线程生产者消费者模型

线程生产者消费者模型

在这个模型中我们需要有三个主要类,分别是商品类,生产者类以及消费者类。
其中运行机制和我们平时的生产者与消费者的关系机制大同小异。
在这个模型中,我们需要定义一个容器来实现消费者与生产者之间的解耦,为了更贴近实际,采用FIFO原则的队列作为容器为最佳选择,并且此模型还遵循着以下两条规则:
1.当生产者生产商品数量超过容器限制时,停止生产;
2.当消费者消费商品导致容器为空时,停止消费。
以下通过两种方式来实现这个模型。
第一种方式:wait()方法和notify()方法
注意:在这个方法中需要把容器作为对象赋锁。
首先定义一个商品类:

class Goods {
    private String name;
    private String id;
    private Double price;

    public Goods(String name, String id, Double price) {
        this.name = name;
        this.id = id;
        this.price = price;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }

    @Override
    public String toString() {
        return "Goods{" +
                "name='" + name + '\'' +
                ", id='" + id + '\'' +
                ", price=" + price +
                '}';
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

定义一个生产者类:

class Producer implements Runnable {
    private final Queue<Goods> queue;

    public Producer(Queue<Goods> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            synchronized (this.queue) {
                if (queue.size() >= 3) {
                    System.out.println(Thread.currentThread().getName() + "容器已满,停止生产");
                    try {
                        this.queue.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    String id = UUID.randomUUID().toString();
                    String name = "包子";
                    Double price = new Random().nextDouble();
                    Goods goods = new Goods(id, name, price);
                    queue.add(goods);
                    System.out.println(Thread.currentThread().getName() + "生产了一个" + goods);
                }
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

定义一个消费者类:

class Consumer implements Runnable {
    private Queue<Goods> queue;

    public Consumer(Queue<Goods> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            synchronized (this.queue) {
                if (this.queue.isEmpty()) {
                    System.out.println(Thread.currentThread().getName() + "容器已空,开始生产");
                    this.queue.notifyAll();
                } else {
                    Goods goods = this.queue.poll();
                    System.out.println(Thread.currentThread().getName() + "消费了一个" + goods);
                }
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

最后定义一个测试用例:

public static void code1() {
        //生产者
        //消费者
        //容器
        Queue<Goods> queue = new LinkedList<>();
        Runnable produce = new Producer(queue);
        Runnable consumer = new Consumer(queue);

        //生产者线程
        for (int i = 0; i < 5; i++) {
            new Thread(produce, "生产者-" + i).start();
        }

        //消费者线程
        for (int i = 0; i < 2; i++) {
            new Thread(consumer, "消费者-" + i).start();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

方法二:使用阻塞队列(这个方法好像还有一点问题,回头再重新修改)
由于BlockingQueue接口自带阻塞的功能,所以也不需要格外加锁。
如果该队列已满,该线程被阻塞;如果该队列已空,该线程也被阻塞。
首先定义一个商品类:

class Goods1 {
    private String name;
    private String id;
    private Double price;

    public Goods1(String name, String id, Double price) {
        this.name = name;
        this.id = id;
        this.price = price;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }

    @Override
    public String toString() {
        return "Goods{" +
                "name='" + name + '\'' +
                ", id='" + id + '\'' +
                ", price=" + price +
                '}';
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

定义一个生产者类:

class Producer1 implements Runnable {
    private BlockingQueue<Goods1> queue;

    public Producer1(BlockingQueue<Goods1> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        String id = UUID.randomUUID().toString();
        String name = "包子";
        Double price = new Random().nextDouble();
        Goods1 goods = new Goods1(id, name, price);
        while (true) {
            System.out.println(Thread.currentThread().getName() + "准备生产!");
            try {
                Thread.sleep(500);
                queue.put(goods);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "生产完成:" + goods);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

定义一个消费者类:

class Consumer1 implements Runnable {
    private BlockingQueue<Goods1> queue;

    public Consumer1(BlockingQueue<Goods1> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            System.out.println(Thread.currentThread().getName() + "准备消费!");
            try {
                Thread.sleep(500);
                Goods1 goods = queue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "消费完成:" + goods);
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

最后定义一个测试用例:

public static void main(String[] args) {
        BlockingQueue<Goods1> queue = new ArrayBlockingQueue<>(3);
        Runnable produce = new Producer1(queue);
        Runnable consumer = new Consumer1(queue);
        //生产者线程
        for (int i = 0; i < 3; i++) {
            new Thread(produce, "生产者-" + i).start();
        }


        //消费者线程
        for (int i = 0; i < 2; i++) {
            new Thread(consumer, "消费者-" + i).start();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

以上就是实现生产者与消费者模型的两种方式。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/661630
推荐阅读
相关标签
  

闽ICP备14008679号