赞
踩
这么经典的问题,要多看几次,多思考。
https://blog.csdn.net/chenxun_2010/article/details/49848865
(1)单消费者单生产者模式
- #include <iostream>
- #include <mutex>
- #include <thread>
- #include <condition_variable>
-
- std::mutex mtx; //互斥变量
- std::condition_variable repo_not_full;//条件变量
- std::condition_variable repo_not_empty;//条件变量
-
- static const int item_total = 20;//一共要生产的个数
- static const int repository_size = 10;//仓库的能够存储的个数
-
- int product_position = 0;//buf中Product位置
- int consume_position = 0;//buf中consume位置
-
- int item_buffer[repository_size]; //仓库buffer
-
-
- void produter_item(int i)
- {
- std::unique_lock<std::mutex> lck(mtx);//锁住
- //如果不加一 会直接死锁
- while((product_position + 1) % repository_size == consume_position ){
- std::cout << "The item repository is full;" << std::endl;
- repo_not_full.wait(lck);//等待repo_not_full的notify命令
- }
- item_buffer[product_position] = i;
- product_position++;
- if(product_position == repository_size){
- product_position = 0;
- }
- repo_not_empty.notify_all();//通知别的等待进程,buf内不是空的了
-
- }
-
- int consumer_item()
- {
- std::unique_lock<std::mutex>lck(mtx);
- int data;
- while(product_position == consume_position)
- {
- std::cout << "The item repository is empty." << std::endl;
- repo_not_empty.wait(lck);
- }
- data = item_buffer[consume_position];
- consume_position++;
- if(consume_position == repository_size){
- consume_position = 0;
- }
- repo_not_full.notify_all();
- return data;
- }
-
-
-
- //生产线程
- void Producter_thread()
- {
- for(int i = 0;i < item_total; i++){
- std::cout << "生产者生产第"<< i <<"个产品."<< std::endl;
- produter_item(i);//执行producter动作
- }
- }
- //消费线程
- void Consumer_thread()
- {
- int cnt = 0;
- while(1){
- int item = consumer_item();//执行consume动作
- std::cout << "消费者消费第"<< item << "个产品." << std::endl;
- cnt++;//个数+1
- if(cnt == item_total){
- std::cout << "consumer_thread is finish." << std::endl;
- break; //结束
- }
- }
-
- }
-
- int main ()
- {
- std::thread producer(Producter_thread);//开启一个producer线程,入口是Producter_thread函数
- std::thread consumer(Consumer_thread);//同上
- producer.join();//等producer结束
- std::cout << "Producter_thread is end;" << std::endl;
- consumer.join();//等consumer结束再结束main函数
- std::cout << "consumer_thread is end;" << std::endl;
- }
-

(2) 多消费者,单生产者模型
多消费者模型,需要在单消费者模型上添加一个控制消费产品总数的变量,也是互斥访问的,有线程访问时候就加1。
- #include <iostream>
- #include <mutex>
- #include <thread>
- #include <condition_variable>
- #include <vector>
-
- std::mutex mtx; //互斥变量
- std::mutex consumer_count_mtx;//消费者互斥变量
-
- std::condition_variable repo_not_full;//条件变量
- std::condition_variable repo_not_empty;//条件变量
-
- static const int item_total = 20;//一共要生产的个数
- static const int repository_size = 10;//仓库的能够存储的个数
-
- static std::size_t consumer_items = 0; //消费者线程得到的产品数量
- static std::size_t product_position = 0;//buf中Product位置
- static std::size_t consume_position = 0;//buf中consume位置
-
- int item_buffer[repository_size]; //仓库buffer
-
-
- void produter_item(int i)
- {
- std::unique_lock<std::mutex> lck(mtx);//锁住
- //如果不加一 会直接死锁
- while((product_position + 1) % repository_size == consume_position ){
- std::cout << "仓库满了." << std::endl;
- repo_not_full.wait(lck);//等待repo_not_full的notify命令
- }
- item_buffer[product_position] = i;
- product_position++;
- if(product_position == repository_size){
- product_position = 0;
- }
- repo_not_empty.notify_all();//通知别的等待进程,buf内不是空的了
-
- }
-
- int consumer_item()
- {
- std::unique_lock<std::mutex>lck(mtx);
- int data;
- while(product_position == consume_position)
- {
- std::cout << "仓库空了." << std::endl;
- repo_not_empty.wait(lck);
- }
- data = item_buffer[consume_position];
- consume_position++;
- if(consume_position == repository_size){
- consume_position = 0;
- }
- repo_not_full.notify_all();
- return data;
- }
-
-
-
- //生产线程
- void Producter_thread()
- {
- for(int i = 0;i < item_total; i++){
- std::cout << "生产者生产第"<< i <<"个产品."<< std::endl;
- produter_item(i);//执行producter动作
- }
- }
- //消费线程
- void Consumer_thread()
- {
- int exit = 0;
- while(1){
- std::unique_lock<std::mutex> lck(consumer_count_mtx);
- consumer_items++;
- if(consumer_items <= item_total){
- int item = consumer_item();//执行consume动作
- std::cout << "消费者消费第"<< item << "个产品." << std::endl;
- }else{
- exit = 1;
- }
- if(exit == 1){
- std::cout << "消费完了所有的产品。." << std::endl;
- break; //结束
- }
- }
- std::cout << "消费者线程:" << std::this_thread::get_id() << "结束." << std::endl;
-
- }
-
- int main ()
- {
- std::thread producer(Producter_thread);//开启一个producer线程,入口是Producter_thread函数
- std::cout << "生产者线程id是:" << producer.get_id() << std::endl;
- std::vector<std::thread> thread_vector;//定义一个线程向量
- //将开启5个Consumer_thread线程
- for(int i = 0; i < 5; i++){
- thread_vector.push_back(std::thread(Consumer_thread));
- }
- producer.join();//等producer结束
- std::cout << "生产者线程结束." << std::endl;
- for(auto &thr:thread_vector){
- thr.join();//等consumer结束再结束main函数
- }
- }
-
-

3、多生产者单消费者模型
- #include <iostream>
- #include <mutex>
- #include <thread>
- #include <condition_variable>
- #include <vector>
-
- std::mutex mtx; //互斥变量
- std::mutex producer_count_mtx;//消费者互斥变量
-
- std::condition_variable repo_not_full;//条件变量
- std::condition_variable repo_not_empty;//条件变量
-
- static const int item_total = 20;//一共要生产的个数
- static const int repository_size = 10;//仓库的能够存储的个数
-
- static std::size_t consumer_items = 0; //消费者线程得到的产品数量
- static std::size_t producter_items = 0;//生产者线程生产的产品数量
- static std::size_t product_position = 0;//buf中Product位置
- static std::size_t consume_position = 0;//buf中consume位置
-
- int item_buffer[repository_size]; //仓库buffer
-
- std::chrono::seconds t(1);
-
-
- void produter_item(int i)
- {
- std::unique_lock<std::mutex> lck(mtx);//锁住
- //如果不加一 会直接死锁
- while((product_position + 1) % repository_size == consume_position ){
- std::cout << "仓库满了." << std::endl;
- repo_not_full.wait(lck);//等待repo_not_full的notify命令
- }
- item_buffer[product_position] = i;
- product_position++;
- if(product_position == repository_size){
- product_position = 0;
- }
- repo_not_empty.notify_all();//通知别的等待进程,buf内不是空的了
-
- }
-
- int consumer_item()
- {
- std::unique_lock<std::mutex>lck(mtx);
- int data;
- while(product_position == consume_position)
- {
- std::cout << "仓库空了." << std::endl;
- repo_not_empty.wait(lck);
- }
- data = item_buffer[consume_position];
- consume_position++;
- if(consume_position == repository_size){
- consume_position = 0;
- }
- repo_not_full.notify_all();
- return data;
- }
-
-
-
- //生产线程
- void Producter_thread()
- {
- int exit = 0;
-
- while(1){
- std::unique_lock<std::mutex> lck(producer_count_mtx);
- producter_items++;
- if(producter_items <= item_total){
- std::cout << "生产第" << producter_items << "产品." << std::endl;
- produter_item(producter_items - 1);
- }else{
- exit = 1;
- }
- if(exit == 1){
- std::cout << "生产数量已经达到要求." << std::endl;
- break;
- }
- }
- std::cout << "生产者线程" << std::this_thread::get_id() << "结束." << std::endl;
- }
- //消费线程
- void Consumer_thread()
- {
- while(1){
- std::this_thread::sleep_for(t);
- if(consumer_items <= item_total){
- int item = consumer_item();//执行consume动作
- std::cout << "消费者消费"<< item << "产品." << std::endl;
- }
- consumer_items++;
- if(consumer_items == item_total){
- break;
- }
- }
-
- }
-
- int main ()
- {
- //定义5个生产者线程和一个消费者线程
- std::vector<std::thread> thread_vector;
- for(int i = 0; i < 5; i++){
- thread_vector.push_back(std::thread(Producter_thread));
- }
- std::thread consumer(Consumer_thread);
-
- for(auto &thr: thread_vector){
- thr.join();
- }
- consumer.join();
- std::cout << "消费者线程结束." << std::endl;
- }
-
-

4、多生产者多消费者模型
- #include <iostream>
- #include <mutex>
- #include <thread>
- #include <condition_variable>
- #include <vector>
-
- std::mutex mtx; //互斥变量
- std::mutex consumer_count_mtx;//消费者互斥变量
- std::mutex producer_count_mtx;//生产者互斥变量
-
- std::condition_variable repo_not_full;//条件变量
- std::condition_variable repo_not_empty;//条件变量
-
- static const int item_total = 20;//一共要生产的个数
- static const int repository_size = 10;//仓库的能够存储的个数
-
- static std::size_t producer_items = 0;//生产者线程生产的产品数量
- static std::size_t consumer_items = 0; //消费者线程得到的产品数量
- static std::size_t product_position = 0;//buf中Product位置
- static std::size_t consume_position = 0;//buf中consume位置
-
- int item_buffer[repository_size]; //仓库buffer
-
- std::chrono::seconds t(1);
-
- void produter_item(int i)
- {
- std::unique_lock<std::mutex> lck(mtx);//锁住
- //如果不加一 会直接死锁
- while((product_position + 1) % repository_size == consume_position ){
- std::cout << "仓库满了." << std::endl;
- repo_not_full.wait(lck);//等待repo_not_full的notify命令
- }
- item_buffer[product_position] = i;
- product_position++;
- if(product_position == repository_size){
- product_position = 0;
- }
- repo_not_empty.notify_all();//通知别的等待进程,buf内不是空的了
-
- }
-
- int consumer_item()
- {
- std::unique_lock<std::mutex>lck(mtx);
- int data;
- while(product_position == consume_position)
- {
- std::cout << "仓库空了." << std::endl;
- repo_not_empty.wait(lck);
- }
- data = item_buffer[consume_position];
- consume_position++;
- if(consume_position == repository_size){
- consume_position = 0;
- }
- repo_not_full.notify_all();
- return data;
- }
-
-
-
- //生产线程
- void Producter_thread()
- {
- int exit = 0;
- while(1){
- std::unique_lock<std::mutex> lck(producer_count_mtx);
- producer_items ++;
- if(producer_items <= item_total){
- std::cout << "生产者生产第"<< producer_items <<"个产品."<< std::endl;
- produter_item(producer_items);//执行producter动作
- }else{
- exit = 1;
- }
- if(exit == 1){
- std::cout << "生产完了所有产品." << std::endl;
- break;
- }
- }
- std::cout << "生产者线程:" << std::this_thread::get_id() <<"结束." << std::endl;
- }
- //消费线程
- void Consumer_thread()
- {
- int exit = 0;
- while(1){
- std::this_thread::sleep_for(t);
- std::unique_lock<std::mutex> lck(consumer_count_mtx);
- consumer_items++;
- if(consumer_items <= item_total){
- int item = consumer_item();//执行consume动作
- std::cout << "消费者消费第"<< item << "个产品." << std::endl;
- }else{
- exit = 1;
- }
- if(exit == 1){
- std::cout << "消费完了所有的产品." << std::endl;
- break; //结束
- }
- }
- std::cout << "消费者线程:" << std::this_thread::get_id() << "结束." << std::endl;
-
- }
-
- int main ()
- {
- std::vector<std::thread> producer_vector;//定义一个生产者线程向量
- std::vector<std::thread> consumer_vector;//定义一个消费者线程向量
- //将开启5个Consumer_thread线程
- for(int i = 0; i < 5; i++){
- producer_vector.push_back(std::thread(Producter_thread));
- consumer_vector.push_back(std::thread(Consumer_thread));
- }
- //等待生产者线程结束
- for(auto &thr:producer_vector){
- thr.join();
- }
- //等待消费者线程结束
- for(auto &thr:consumer_vector){
- thr.join();//等consumer结束再结束main函数
- }
- }
-
-
-

总结:
多线程的时候,要在两个以上线程公用的变量前面加锁,阻止访问冲突。要像更加灵活的运用锁,就要和条件变量配合来使用。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。