赞
踩
生产者消费者问题是多线程并发中一个非常经典的问题,相信学过操作系统课程的同学都清楚这个问题的根源。本文将就四种情况分析并介绍生产者和消费者问题,它们分别是:单生产者-单消费者模型,单生产者-多消费者模型,多生产者-单消费者模型,多生产者-多消费者模型,我会给出四种情况下的 C++11 并发解决方案。
一、单生产者-单消费者模型
顾名思义,单生产者-单消费者模型中只有一个生产者和一个消费者,生产者不停地往产品库中放入产品,消费者则从产品库中取走产品,产品库容积有限制,只能容纳一定数目的产品,如果生产者生产产品的速度过快,则需要等待消费者取走产品之后,产品库不为空才能继续往产品库中放置新的产品,相反,如果消费者取走产品的速度过快,则可能面临产品库中没有产品可使用的情况,此时需要等待生产者放入一个产品后,消费者才能继续工作。C++11实现单生产者单消费者模型的代码如下:
- #include <iostream>
- #include <condition_variable>
- #include <mutex>
- #include <thread>
-
- static const int repository_size = 10;//循环队列的大小
- static const int item_total = 20;//要生产的产品数目
-
- std::mutex mtx;//互斥量,保护产品缓冲区
-
- std::condition_variable repo_not_full;//条件变量指示产品缓冲区不满
- std::condition_variable repo_not_empty;//条件变量指示产品缓冲区不为空,就是缓冲区有产品
-
- int item_buffer[repository_size];
-
- static std::size_t read_position = 0;//消费者读取产品的位置
- static std::size_t write_position = 0;//生产者写入产品的位置
-
- std::chrono::seconds t(1);//a new feature of c++ 11 standard
-
- void produce_item(int i)
- {
- std::unique_lock<std::mutex> lck(mtx); ///首先获得锁
- while (((write_position + 1) % repository_size) == read_position)
- {
- std::cout << "Producer is waiting for an empty slot..." << std::endl;
- repo_not_full.wait(lck);// 生产者等待"产品库缓冲区不为满"这一条件发生.
- } //当缓冲区满了之后我们就不能添加产品了
-
- item_buffer[write_position] = i;//写入产品
- write_position++;
-
- if (write_position == repository_size)//写入的位置如果在队列最后则重新设置
- {
- write_position = 0;
- }
-
- repo_not_empty.notify_all();//通知消费者产品库不为空
-
- //lck.unlock();//解锁
- }
-
- int consume_item()
- {
- int data;
- std::unique_lock<std::mutex> lck(mtx);
- while (write_position == read_position)
- {
- std::cout << "Consumer is waiting for items..." << std::endl;
- repo_not_empty.wait(lck);// 消费者等待"产品库缓冲区不为空"这一条件发生.
- }
-
- data = item_buffer[read_position];//读取产品
- read_position++;
-
- if (read_position >= repository_size)
- {
- read_position = 0;
- }
-
- repo_not_full.notify_all();//通知产品库不满
- //lck.unlock();
-
- return data;
- }
-
- void Producer_thread()
- {
- for (int i = 1; i <= item_total; ++i)
- {
- //std::this_thread::sleep_for(t);
- std::cout << "生产者生产第" << i << "个产品" << std::endl;
- produce_item(i);
- }
- }
-
- void Consumer_thread()
- {
- static int cnt = 0;
- while (1)
- {
- //std::this_thread::sleep_for(t);
- int item = consume_item();
- std::cout << "消费者消费第" << item << "个产品" << std::endl;
-
- if (++cnt == item_total)
- break;
- }
- }
-
- int main()
- {
- std::thread producer(Producer_thread); // 创建生产者线程.
- std::thread consumer(Consumer_thread); // 创建消费之线程.
- producer.join();
- consumer.join();
- }
二、单生产者-多消费者模型
与单生产者和单消费者模型不同的是,单生产者-多消费者模型中可以允许多个消费者同时从产品库中取走产品。所以除了保护产品库在多个读写线程下互斥之外,还需要维护消费者取走产品的计数器,代码如下:
- #include <iostream>
- #include <condition_variable>
- #include <mutex>
- #include <thread>
- #include <vector>
- static const int repository_size = 10;//循环队列的大小
- static const int item_total = 20;//要生产的产品数目
-
- std::mutex mtx;//互斥量,保护产品缓冲区
- std::mutex mtx_counter;//互斥量,保护产品计数器
-
- std::condition_variable repo_not_full;//条件变量指示产品缓冲区不满
- std::condition_variable repo_not_empty;//条件变量指示产品缓冲区不为空,就是缓冲区有产品
-
- int item_buffer[repository_size];//产品缓冲区,这里使用了一个循环队列
-
- static std::size_t read_position = 0;//消费者读取产品的位置
- static std::size_t write_position = 0;//生产者写入产品的位置
-
- static std::size_t item_counter = 0;//消费者消费产品计数器
-
- std::chrono::seconds t(1);//a new feature of c++ 11 standard
-
- void produce_item(int i)
- {
- std::unique_lock<std::mutex> lck(mtx);
- //item buffer is full, just wait here.
- while (((write_position + 1) % repository_size) == read_position)
- {
- std::cout << "Producer is waiting for an empty slot..." << std::endl;
- repo_not_full.wait(lck);// 生产者等待"产品库缓冲区不为满"这一条件发生.
- } //当缓冲区满了之后我们就不能添加产品了
-
- item_buffer[write_position] = i;//写入产品
- write_position++;
-
- if (write_position == repository_size)//写入的位置如果在队列最后则重新设置
- {
- write_position = 0;
- }
-
- repo_not_empty.notify_all();//通知消费者产品库不为空
-
- lck.unlock();//解锁
- }
-
- int consume_item()
- {
- int data;
- std::unique_lock<std::mutex> lck(mtx);
- // item buffer is empty, just wait here.
- while (write_position == read_position)
- {
- std::cout << "Consumer is waiting for items..." << std::endl;
- repo_not_empty.wait(lck);// 消费者等待"产品库缓冲区不为空"这一条件发生.
- }
-
- data = item_buffer[read_position];//读取产品
- read_position++;
-
- if (read_position >= repository_size)
- {
- read_position = 0;
- }
-
- repo_not_full.notify_all();//通知产品库不满
- lck.unlock();
-
- return data;
- }
-
- void Producer_thread()
- {
- for (int i = 1; i <= item_total; ++i)
- {
- //std::this_thread::sleep_for(t);
- std::cout << "生产者生产第" << i << "个产品" << std::endl;
- produce_item(i);
- }
- }
-
- void Consumer_thread()
- {
- bool read_to_exit = false;
- while (1)
- {
- std::this_thread::sleep_for(t);
- std::unique_lock<std::mutex> lck(mtx_counter);
- if (item_counter < item_total)
- {
- int item = consume_item();
- ++item_counter;
- std::cout << "消费者线程" << std::this_thread::get_id()
- << "消费第" << item << "个产品" << std::endl;
- }
- else
- {
- read_to_exit = true;
- }
- if (read_to_exit == true)
- break;
- }
-
- std::cout << "Consumer thread " << std::this_thread::get_id()
- << " is exiting..." << std::endl;
-
- }
-
- int main()
- {
- std::thread producer(Producer_thread); // 创建生产者线程.
- std::vector<std::thread> thread_vector;
- for (int i = 0; i != 5; ++i)
- {
- thread_vector.push_back(std::thread(Consumer_thread));// 创建消费者线程.
- }
-
- producer.join();
- for (auto &thr : thread_vector)
- {
- thr.join();
- }
-
- }
-
三、多生产者-单消费者模型
与单生产者和单消费者模型不同的是,多生产者-单消费者模型中可以允许多个生产者同时向产品库中放入产品。所以除了保护产品库在多个读写线程下互斥之外,还需要维护生产者放入产品的计数器,代码如下:
- #include <iostream>
- #include <condition_variable>
- #include <mutex>
- #include <thread>
- #include <vector>
-
- static const int repository_size = 10;//循环队列的大小
- static const int item_total = 20;//要生产的产品数目
-
- std::mutex mtx;//互斥量,保护产品缓冲区
- std::mutex mtx_counter;
-
- std::condition_variable repo_not_full;//条件变量指示产品缓冲区不满
- std::condition_variable repo_not_empty;//条件变量指示产品缓冲区不为空,就是缓冲区有产品
-
- int item_buffer[repository_size];//产品缓冲区,这里使用了一个循环队列
-
- static std::size_t read_position = 0;//消费者读取产品的位置
- static std::size_t write_position = 0;//生产者写入产品的位置
-
- static std::size_t item_counter = 0;//计数器
-
- std::chrono::seconds t(1);//a new feature of c++ 11 standard
-
- void produce_item(int i)
- {
- std::unique_lock<std::mutex> lck(mtx);
- // item buffer is full, just wait here.
- while (((write_position + 1) % repository_size) == read_position)
- {
- std::cout << "Producer is waiting for an empty slot..." << std::endl;
- repo_not_full.wait(lck);// 生产者等待"产品库缓冲区不为满"这一条件发生.
- } //当缓冲区满了之后我们就不能添加产品了
-
- item_buffer[write_position] = i;//写入产品
- write_position++;
-
- if (write_position == repository_size)//写入的位置如果在队列最后则重新设置
- {
- write_position = 0;
- }
-
- repo_not_empty.notify_all();//通知消费者产品库不为空
-
- lck.unlock();//解锁
- }
-
- int consume_item()
- {
- int data;
- std::unique_lock<std::mutex> lck(mtx);
- // item buffer is empty, just wait here.
- while (write_position == read_position)
- {
- std::cout << "Consumer is waiting for items..." << std::endl;
- repo_not_empty.wait(lck);// 消费者等待"产品库缓冲区不为空"这一条件发生.
- }
-
- data = item_buffer[read_position];//读取产品
- read_position++;
-
- if (read_position >= repository_size)
- {
- read_position = 0;
- }
-
- repo_not_full.notify_all();//通知产品库不满
- lck.unlock();
-
- return data;
- }
-
- void Producer_thread()
- {
- bool read_to_exit = false;
- while (1)
- {
- std::unique_lock<std::mutex> lck(mtx_counter);
- if (item_counter < item_total)
- {
- ++item_counter;
- produce_item(item_counter);
- std::cout << "生产者线程 " << std::this_thread::get_id()
- << "生产第 " << item_counter << "个产品" << std::endl;
- }
- else
- {
- read_to_exit = true;
- }
-
- if (read_to_exit == true)
- break;
- }
-
- std::cout << "Producer thread " << std::this_thread::get_id()
- << " is exiting..." << std::endl;
-
- }
-
- void Consumer_thread()
- {
- static int cnt = 0;
- while (1)
- {
- std::this_thread::sleep_for(t);
- int item = consume_item();
- std::cout << "消费者消费第" << item << "个产品" << std::endl;
-
- if (++cnt == item_total)
- break;
- }
- }
-
- int main()
- {
- std::vector<std::thread> thread_vector;
- for (int i = 0; i != 5; ++i)
- {
- thread_vector.push_back(std::thread(Producer_thread));// 创建消费者线程.
- }
-
- std::thread consumer(Consumer_thread); // 创建消费之线程.
-
- for (auto &thr : thread_vector)
- {
- thr.join();
- }
-
- consumer.join();
- }
-
四、多生产者-多消费者模型
该模型可以说是前面两种模型的综合,程序需要维护两个计数器,分别是生产者已生产产品的数目和消费者已取走产品的数目。另外也需要保护产品库在多个生产者和多个消费者互斥地访问。
- #include <iostream>
- #include <condition_variable>
- #include <mutex>
- #include <thread>
- #include <vector>
-
- static const int repository_size = 10;//循环队列的大小
- static const int item_total = 20;//要生产的产品数目
-
- std::mutex mtx;//互斥量,保护产品缓冲区
- std::mutex producer_count_mtx;
- std::mutex consumer_count_mtx;
-
- std::condition_variable repo_not_full;//条件变量指示产品缓冲区不满
- std::condition_variable repo_not_empty;//条件变量指示产品缓冲区不为空,就是缓冲区有产品
-
- int item_buffer[repository_size];//产品缓冲区,这里使用了一个循环队列
-
- static std::size_t read_position = 0;//消费者读取产品的位置
- static std::size_t write_position = 0;//生产者写入产品的位置
-
- static size_t produced_item_counter = 0;
- static size_t consumed_item_counter = 0;
-
- std::chrono::seconds t(1);//a new feature of c++ 11 standard
- std::chrono::microseconds t1(1000);
-
- void produce_item(int i)
- {
- std::unique_lock<std::mutex> lck(mtx);
- // item buffer is full, just wait here.
- while (((write_position + 1) % repository_size) == read_position)
- {
- std::cout << "Producer is waiting for an empty slot..." << std::endl;
- repo_not_full.wait(lck);// 生产者等待"产品库缓冲区不为满"这一条件发生.
- } //当缓冲区满了之后我们就不能添加产品了
-
- item_buffer[write_position] = i;//写入产品
- write_position++;
-
- if (write_position == repository_size)//写入的位置如果在队列最后则重新设置
- {
- write_position = 0;
- }
-
- repo_not_empty.notify_all();//通知消费者产品库不为空
-
- lck.unlock();//解锁
- }
-
- int consume_item()
- {
- int data;
- std::unique_lock<std::mutex> lck(mtx);
- // item buffer is empty, just wait here.
- while (write_position == read_position)
- {
- std::cout << "Consumer is waiting for items..." << std::endl;
- repo_not_empty.wait(lck);// 消费者等待"产品库缓冲区不为空"这一条件发生.
- }
-
- data = item_buffer[read_position];//读取产品
- read_position++;
-
- if (read_position >= repository_size)
- {
- read_position = 0;
- }
-
- repo_not_full.notify_all();//通知产品库不满
- lck.unlock();
-
- return data;
- }
-
- void Producer_thread()
- {
- bool ready_to_exit = false;
- while (1)
- {
- //std::this_thread::sleep_for(t);
- std::unique_lock<std::mutex> lock(producer_count_mtx);
- if (produced_item_counter < item_total)
- {
- ++produced_item_counter;
- produce_item(produced_item_counter);
- std::cout << "生产者线程 " << std::this_thread::get_id()
- << "生产第 " << produced_item_counter << "个产品" << std::endl;
- }
- else
- {
- ready_to_exit = true;
- }
-
- lock.unlock();
-
- if (ready_to_exit == true)
- {
- break;
- }
- }
-
- std::cout << "Producer thread " << std::this_thread::get_id()
- << " is exiting..." << std::endl;
- }
-
- void Consumer_thread()
- {
- bool read_to_exit = false;
- while (1)
- {
- std::this_thread::sleep_for(t1);
- std::unique_lock<std::mutex> lck(consumer_count_mtx);
- if (consumed_item_counter < item_total)
- {
- int item = consume_item();
- ++consumed_item_counter;
- std::cout << "消费者线程" << std::this_thread::get_id()
- << "消费第" << item << "个产品" << std::endl;
- }
- else
- {
- read_to_exit = true;
- }
-
- if (read_to_exit == true)
- {
- break;
- }
- }
-
- std::cout << "Consumer thread " << std::this_thread::get_id()
- << " is exiting..." << std::endl;
- }
-
- int main()
- {
- std::vector<std::thread> thread_vector1;
- std::vector<std::thread> thread_vector2;
- for (int i = 0; i != 5; ++i)
- {
- thread_vector1.push_back(std::thread(Producer_thread));// 创建生产者线程.
- thread_vector2.push_back(std::thread(Consumer_thread));// 创建消费者线程.
-
- }
-
- for (auto &thr1 : thread_vector1)
- {
- thr1.join();
- }
-
- for (auto &thr2 : thread_vector2)
- {
- thr2.join();
- }
- }
原文链接:https://blog.csdn.net/chenxun_2010/article/details/49848865
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。