当前位置:   article > 正文

C++ 生产者消费者 四种情况_c++生产者消费者

c++生产者消费者

生产者消费者问题是多线程并发中一个非常经典的问题,相信学过操作系统课程的同学都清楚这个问题的根源。本文将就四种情况分析并介绍生产者和消费者问题,它们分别是:单生产者-单消费者模型,单生产者-多消费者模型,多生产者-单消费者模型,多生产者-多消费者模型,我会给出四种情况下的 C++11 并发解决方案。


一、单生产者-单消费者模型

         顾名思义,单生产者-单消费者模型中只有一个生产者和一个消费者,生产者不停地往产品库中放入产品,消费者则从产品库中取走产品,产品库容积有限制,只能容纳一定数目的产品,如果生产者生产产品的速度过快,则需要等待消费者取走产品之后,产品库不为空才能继续往产品库中放置新的产品,相反,如果消费者取走产品的速度过快,则可能面临产品库中没有产品可使用的情况,此时需要等待生产者放入一个产品后,消费者才能继续工作。C++11实现单生产者单消费者模型的代码如下:

  1. #include <iostream>
  2. #include <condition_variable>
  3. #include <mutex>
  4. #include <thread>
  5.  
  6. static const int repository_size = 10;//循环队列的大小
  7. static const int item_total = 20;//要生产的产品数目
  8.  
  9. std::mutex mtx;//互斥量,保护产品缓冲区
  10.  
  11. std::condition_variable repo_not_full;//条件变量指示产品缓冲区不满
  12. std::condition_variable repo_not_empty;//条件变量指示产品缓冲区不为空,就是缓冲区有产品
  13.  
  14. int item_buffer[repository_size];
  15.  
  16. static std::size_t read_position = 0;//消费者读取产品的位置
  17. static std::size_t write_position = 0;//生产者写入产品的位置
  18.  
  19. std::chrono::seconds t(1);//a  new feature of c++ 11 standard
  20.  
  21. void produce_item(int i)
  22. {
  23.     std::unique_lock<std::mutex> lck(mtx); ///首先获得锁
  24.     while (((write_position + 1) % repository_size) == read_position)
  25.     {
  26.         std::cout << "Producer is waiting for an empty slot..." << std::endl;
  27.         repo_not_full.wait(lck);// 生产者等待"产品库缓冲区不为满"这一条件发生.
  28.     }                            //当缓冲区满了之后我们就不能添加产品了
  29.  
  30.     item_buffer[write_position] = i;//写入产品
  31.     write_position++;
  32.  
  33.     if (write_position == repository_size)//写入的位置如果在队列最后则重新设置
  34.     {
  35.         write_position = 0;
  36.     }
  37.  
  38.     repo_not_empty.notify_all();//通知消费者产品库不为空
  39.  
  40.     //lck.unlock();//解锁
  41. }
  42.  
  43. int consume_item()
  44. {
  45.     int data;
  46.     std::unique_lock<std::mutex> lck(mtx);
  47.     while (write_position == read_position)
  48.     {
  49.         std::cout << "Consumer is waiting for items..." << std::endl;
  50.         repo_not_empty.wait(lck);// 消费者等待"产品库缓冲区不为空"这一条件发生.
  51.     }                            
  52.  
  53.     data = item_buffer[read_position];//读取产品
  54.     read_position++;
  55.  
  56.     if (read_position >= repository_size)
  57.     {
  58.         read_position = 0;
  59.     }
  60.         
  61.     repo_not_full.notify_all();//通知产品库不满
  62.     //lck.unlock();
  63.  
  64.     return data;
  65. }
  66.  
  67. void Producer_thread()
  68. {
  69.     for (int i = 1; i <= item_total; ++i)
  70.     {
  71.         //std::this_thread::sleep_for(t);
  72.         std::cout << "生产者生产第" << i  << "个产品" << std::endl;
  73.         produce_item(i);
  74.     }
  75. }
  76.  
  77. void Consumer_thread()
  78. {
  79.     static int cnt = 0;
  80.     while (1)
  81.     {
  82.         //std::this_thread::sleep_for(t);
  83.         int item = consume_item();
  84.         std::cout << "消费者消费第" << item << "个产品" << std::endl;
  85.  
  86.         if (++cnt == item_total)
  87.             break;
  88.     }
  89. }
  90.  
  91. int main()
  92. {
  93.     std::thread producer(Producer_thread); // 创建生产者线程.
  94.     std::thread consumer(Consumer_thread); // 创建消费之线程.
  95.     producer.join();
  96.     consumer.join();
  97. }

二、单生产者-多消费者模型
           与单生产者和单消费者模型不同的是,单生产者-多消费者模型中可以允许多个消费者同时从产品库中取走产品。所以除了保护产品库在多个读写线程下互斥之外,还需要维护消费者取走产品的计数器,代码如下:

  1. #include <iostream>
  2. #include <condition_variable>
  3. #include <mutex>
  4. #include <thread>
  5. #include <vector>
  6. static const int repository_size = 10;//循环队列的大小
  7. static const int item_total = 20;//要生产的产品数目
  8.  
  9. std::mutex mtx;//互斥量,保护产品缓冲区
  10. std::mutex mtx_counter;//互斥量,保护产品计数器
  11.  
  12. std::condition_variable repo_not_full;//条件变量指示产品缓冲区不满
  13. std::condition_variable repo_not_empty;//条件变量指示产品缓冲区不为空,就是缓冲区有产品
  14.  
  15. int item_buffer[repository_size];//产品缓冲区,这里使用了一个循环队列
  16.  
  17. static std::size_t read_position = 0;//消费者读取产品的位置
  18. static std::size_t write_position = 0;//生产者写入产品的位置
  19.  
  20. static std::size_t item_counter = 0;//消费者消费产品计数器
  21.  
  22. std::chrono::seconds t(1);//a  new feature of c++ 11 standard
  23.  
  24. void produce_item(int i)
  25. {
  26.     std::unique_lock<std::mutex> lck(mtx);
  27.     //item buffer is full, just wait here.
  28.     while (((write_position + 1) % repository_size) == read_position)
  29.     {
  30.         std::cout << "Producer is waiting for an empty slot..." << std::endl;
  31.         repo_not_full.wait(lck);// 生产者等待"产品库缓冲区不为满"这一条件发生.
  32.     }                            //当缓冲区满了之后我们就不能添加产品了
  33.  
  34.     item_buffer[write_position] = i;//写入产品
  35.     write_position++;
  36.  
  37.     if (write_position == repository_size)//写入的位置如果在队列最后则重新设置
  38.     {
  39.         write_position = 0;
  40.     }
  41.  
  42.     repo_not_empty.notify_all();//通知消费者产品库不为空
  43.  
  44.     lck.unlock();//解锁
  45. }
  46.  
  47. int consume_item()
  48. {
  49.     int data;
  50.     std::unique_lock<std::mutex> lck(mtx);
  51.     // item buffer is empty, just wait here.
  52.     while (write_position == read_position)
  53.     {
  54.         std::cout << "Consumer is waiting for items..." << std::endl;
  55.         repo_not_empty.wait(lck);// 消费者等待"产品库缓冲区不为空"这一条件发生.
  56.     }
  57.  
  58.     data = item_buffer[read_position];//读取产品
  59.     read_position++;
  60.  
  61.     if (read_position >= repository_size)
  62.     {
  63.         read_position = 0;
  64.     }
  65.  
  66.     repo_not_full.notify_all();//通知产品库不满
  67.     lck.unlock();
  68.  
  69.     return data;
  70. }
  71.  
  72. void Producer_thread()
  73. {
  74.     for (int i = 1; i <= item_total; ++i)
  75.     {
  76.         //std::this_thread::sleep_for(t);
  77.         std::cout << "生产者生产第" << i << "个产品" << std::endl;
  78.         produce_item(i);
  79.     }
  80. }
  81.  
  82. void Consumer_thread()
  83. {
  84.     bool read_to_exit = false;
  85.     while (1)
  86.     {
  87.         std::this_thread::sleep_for(t);
  88.         std::unique_lock<std::mutex> lck(mtx_counter);
  89.         if (item_counter < item_total)
  90.         {
  91.             int item = consume_item();
  92.             ++item_counter;
  93.             std::cout << "消费者线程" << std::this_thread::get_id()
  94.                 << "消费第" << item << "个产品" << std::endl;
  95.         }
  96.         else
  97.         {
  98.             read_to_exit = true;
  99.         }
  100.         if (read_to_exit == true)
  101.             break;
  102.     }
  103.  
  104.     std::cout << "Consumer thread " << std::this_thread::get_id()
  105.         << " is exiting..." << std::endl;
  106.  
  107. }
  108.  
  109. int main()
  110. {
  111.     std::thread producer(Producer_thread); // 创建生产者线程.
  112.     std::vector<std::thread> thread_vector;
  113.     for (int i = 0; i != 5; ++i)
  114.     {
  115.         thread_vector.push_back(std::thread(Consumer_thread));// 创建消费者线程.
  116.     }
  117.     
  118.     producer.join();
  119.     for (auto &thr : thread_vector)
  120.     {
  121.         thr.join();
  122.     }
  123.  
  124. }



三、多生产者-单消费者模型

          与单生产者和单消费者模型不同的是,多生产者-单消费者模型中可以允许多个生产者同时向产品库中放入产品。所以除了保护产品库在多个读写线程下互斥之外,还需要维护生产者放入产品的计数器,代码如下:

  1. #include <iostream>
  2. #include <condition_variable>
  3. #include <mutex>
  4. #include <thread>
  5. #include <vector>
  6.  
  7. static const int repository_size = 10;//循环队列的大小
  8. static const int item_total = 20;//要生产的产品数目
  9.  
  10. std::mutex mtx;//互斥量,保护产品缓冲区
  11. std::mutex mtx_counter;
  12.  
  13. std::condition_variable repo_not_full;//条件变量指示产品缓冲区不满
  14. std::condition_variable repo_not_empty;//条件变量指示产品缓冲区不为空,就是缓冲区有产品
  15.  
  16. int item_buffer[repository_size];//产品缓冲区,这里使用了一个循环队列
  17.  
  18. static std::size_t read_position = 0;//消费者读取产品的位置
  19. static std::size_t write_position = 0;//生产者写入产品的位置
  20.  
  21. static std::size_t item_counter = 0;//计数器
  22.  
  23. std::chrono::seconds t(1);//a  new feature of c++ 11 standard
  24.  
  25. void produce_item(int i)
  26. {
  27.     std::unique_lock<std::mutex> lck(mtx);
  28.     // item buffer is full, just wait here.
  29.     while (((write_position + 1) % repository_size) == read_position)
  30.     {
  31.         std::cout << "Producer is waiting for an empty slot..." << std::endl;
  32.         repo_not_full.wait(lck);// 生产者等待"产品库缓冲区不为满"这一条件发生.
  33.     }                            //当缓冲区满了之后我们就不能添加产品了
  34.  
  35.     item_buffer[write_position] = i;//写入产品
  36.     write_position++;
  37.  
  38.     if (write_position == repository_size)//写入的位置如果在队列最后则重新设置
  39.     {
  40.         write_position = 0;
  41.     }
  42.  
  43.     repo_not_empty.notify_all();//通知消费者产品库不为空
  44.  
  45.     lck.unlock();//解锁
  46. }
  47.  
  48. int consume_item()
  49. {
  50.     int data;
  51.     std::unique_lock<std::mutex> lck(mtx);
  52.     // item buffer is empty, just wait here.
  53.     while (write_position == read_position)
  54.     {
  55.         std::cout << "Consumer is waiting for items..." << std::endl;
  56.         repo_not_empty.wait(lck);// 消费者等待"产品库缓冲区不为空"这一条件发生.
  57.     }
  58.  
  59.     data = item_buffer[read_position];//读取产品
  60.     read_position++;
  61.  
  62.     if (read_position >= repository_size)
  63.     {
  64.         read_position = 0;
  65.     }
  66.  
  67.     repo_not_full.notify_all();//通知产品库不满
  68.     lck.unlock();
  69.  
  70.     return data;
  71. }
  72.  
  73. void Producer_thread()
  74. {
  75.     bool read_to_exit = false;
  76.     while (1)
  77.     {
  78.         std::unique_lock<std::mutex> lck(mtx_counter);
  79.         if (item_counter < item_total)
  80.         {
  81.             ++item_counter;
  82.             produce_item(item_counter);
  83.             std::cout << "生产者线程 " << std::this_thread::get_id()
  84.                 << "生产第  " << item_counter << "个产品" << std::endl;
  85.         }
  86.         else
  87.         {
  88.             read_to_exit = true;
  89.         }
  90.  
  91.         if (read_to_exit == true)
  92.             break;
  93.     }
  94.  
  95.     std::cout << "Producer thread " << std::this_thread::get_id()
  96.         << " is exiting..." << std::endl;
  97.  
  98. }
  99.  
  100. void Consumer_thread()
  101. {
  102.     static int cnt = 0;
  103.     while (1)
  104.     {
  105.         std::this_thread::sleep_for(t);
  106.         int item = consume_item();
  107.         std::cout << "消费者消费第" << item << "个产品" << std::endl;
  108.  
  109.         if (++cnt == item_total)
  110.             break;
  111.     }
  112. }
  113.  
  114. int main()
  115. {
  116.     std::vector<std::thread> thread_vector;
  117.     for (int i = 0; i != 5; ++i)
  118.     {
  119.         thread_vector.push_back(std::thread(Producer_thread));// 创建消费者线程.
  120.     }
  121.  
  122.     std::thread consumer(Consumer_thread); // 创建消费之线程.
  123.  
  124.     for (auto &thr : thread_vector)
  125.     {
  126.         thr.join();
  127.     }
  128.  
  129.     consumer.join();
  130. }

四、多生产者-多消费者模型
          该模型可以说是前面两种模型的综合,程序需要维护两个计数器,分别是生产者已生产产品的数目和消费者已取走产品的数目。另外也需要保护产品库在多个生产者和多个消费者互斥地访问。

  1. #include <iostream>
  2. #include <condition_variable>
  3. #include <mutex>
  4. #include <thread>
  5. #include <vector>
  6.  
  7. static const int repository_size = 10;//循环队列的大小
  8. static const int item_total = 20;//要生产的产品数目
  9.  
  10. std::mutex mtx;//互斥量,保护产品缓冲区
  11. std::mutex producer_count_mtx;
  12. std::mutex consumer_count_mtx;
  13.  
  14. std::condition_variable repo_not_full;//条件变量指示产品缓冲区不满
  15. std::condition_variable repo_not_empty;//条件变量指示产品缓冲区不为空,就是缓冲区有产品
  16.  
  17. int item_buffer[repository_size];//产品缓冲区,这里使用了一个循环队列
  18.  
  19. static std::size_t read_position = 0;//消费者读取产品的位置
  20. static std::size_t write_position = 0;//生产者写入产品的位置
  21.  
  22. static size_t produced_item_counter = 0;
  23. static size_t consumed_item_counter = 0;
  24.  
  25. std::chrono::seconds t(1);//a  new feature of c++ 11 standard
  26. std::chrono::microseconds t1(1000);
  27.  
  28. void produce_item(int i)
  29. {
  30.     std::unique_lock<std::mutex> lck(mtx);
  31.     // item buffer is full, just wait here.
  32.     while (((write_position + 1) % repository_size) == read_position)
  33.     {
  34.         std::cout << "Producer is waiting for an empty slot..." << std::endl;
  35.         repo_not_full.wait(lck);// 生产者等待"产品库缓冲区不为满"这一条件发生.
  36.     }                            //当缓冲区满了之后我们就不能添加产品了
  37.  
  38.     item_buffer[write_position] = i;//写入产品
  39.     write_position++;
  40.  
  41.     if (write_position == repository_size)//写入的位置如果在队列最后则重新设置
  42.     {
  43.         write_position = 0;
  44.     }
  45.  
  46.     repo_not_empty.notify_all();//通知消费者产品库不为空
  47.  
  48.     lck.unlock();//解锁
  49. }
  50.  
  51. int consume_item()
  52. {
  53.     int data;
  54.     std::unique_lock<std::mutex> lck(mtx);
  55.     // item buffer is empty, just wait here.
  56.     while (write_position == read_position)
  57.     {
  58.         std::cout << "Consumer is waiting for items..." << std::endl;
  59.         repo_not_empty.wait(lck);// 消费者等待"产品库缓冲区不为空"这一条件发生.
  60.     }
  61.  
  62.     data = item_buffer[read_position];//读取产品
  63.     read_position++;
  64.  
  65.     if (read_position >= repository_size)
  66.     {
  67.         read_position = 0;
  68.     }
  69.  
  70.     repo_not_full.notify_all();//通知产品库不满
  71.     lck.unlock();
  72.  
  73.     return data;
  74. }
  75.  
  76. void Producer_thread()
  77. {
  78.     bool ready_to_exit = false;
  79.     while (1)
  80.     {
  81.         //std::this_thread::sleep_for(t);
  82.         std::unique_lock<std::mutex> lock(producer_count_mtx);
  83.         if (produced_item_counter < item_total)
  84.         {
  85.             ++produced_item_counter;
  86.             produce_item(produced_item_counter);
  87.             std::cout << "生产者线程 " << std::this_thread::get_id()
  88.                 << "生产第  " << produced_item_counter << "个产品" << std::endl;
  89.         }
  90.         else
  91.         {
  92.             ready_to_exit = true;
  93.         }
  94.  
  95.         lock.unlock();
  96.  
  97.         if (ready_to_exit == true)
  98.         {
  99.             break;
  100.         }
  101.     }
  102.  
  103.     std::cout << "Producer thread " << std::this_thread::get_id()
  104.         << " is exiting..." << std::endl;
  105. }
  106.  
  107. void Consumer_thread()
  108. {
  109.     bool read_to_exit = false;
  110.     while (1)
  111.     {
  112.         std::this_thread::sleep_for(t1);
  113.         std::unique_lock<std::mutex> lck(consumer_count_mtx);
  114.         if (consumed_item_counter < item_total)
  115.         {
  116.             int item = consume_item();
  117.             ++consumed_item_counter;
  118.             std::cout << "消费者线程" << std::this_thread::get_id()
  119.                 << "消费第" << item << "个产品" << std::endl;
  120.         }
  121.         else
  122.         {
  123.             read_to_exit = true;
  124.         }
  125.  
  126.         if (read_to_exit == true)
  127.         {
  128.             break;
  129.         }
  130.     }
  131.  
  132.     std::cout << "Consumer thread " << std::this_thread::get_id()
  133.         << " is exiting..." << std::endl;
  134. }
  135.  
  136. int main()
  137. {
  138.     std::vector<std::thread> thread_vector1;
  139.     std::vector<std::thread> thread_vector2;
  140.     for (int i = 0; i != 5; ++i)
  141.     {
  142.         thread_vector1.push_back(std::thread(Producer_thread));// 创建生产者线程.
  143.         thread_vector2.push_back(std::thread(Consumer_thread));// 创建消费者线程.
  144.  
  145.     }
  146.  
  147.     for (auto &thr1 : thread_vector1)
  148.     {
  149.         thr1.join();
  150.     }
  151.  
  152.     for (auto &thr2 : thread_vector2)
  153.     {
  154.         thr2.join();
  155.     }
  156. }


原文链接:https://blog.csdn.net/chenxun_2010/article/details/49848865

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/267905
推荐阅读
相关标签
  

闽ICP备14008679号