当前位置:   article > 正文

多线程之生产者和消费者模型_多线程的生产者与消费者模型

多线程的生产者与消费者模型

这么经典的问题,要多看几次,多思考。

https://blog.csdn.net/chenxun_2010/article/details/49848865

(1)单消费者单生产者模式

  1. #include <iostream>
  2. #include <mutex>
  3. #include <thread>
  4. #include <condition_variable>
  5. std::mutex mtx; //互斥变量
  6. std::condition_variable repo_not_full;//条件变量
  7. std::condition_variable repo_not_empty;//条件变量
  8. static const int item_total = 20;//一共要生产的个数
  9. static const int repository_size = 10;//仓库的能够存储的个数
  10. int product_position = 0;//buf中Product位置
  11. int consume_position = 0;//buf中consume位置
  12. int item_buffer[repository_size]; //仓库buffer
  13. void produter_item(int i)
  14. {
  15. std::unique_lock<std::mutex> lck(mtx);//锁住
  16. //如果不加一 会直接死锁
  17. while((product_position + 1) % repository_size == consume_position ){
  18. std::cout << "The item repository is full;" << std::endl;
  19. repo_not_full.wait(lck);//等待repo_not_full的notify命令
  20. }
  21. item_buffer[product_position] = i;
  22. product_position++;
  23. if(product_position == repository_size){
  24. product_position = 0;
  25. }
  26. repo_not_empty.notify_all();//通知别的等待进程,buf内不是空的了
  27. }
  28. int consumer_item()
  29. {
  30. std::unique_lock<std::mutex>lck(mtx);
  31. int data;
  32. while(product_position == consume_position)
  33. {
  34. std::cout << "The item repository is empty." << std::endl;
  35. repo_not_empty.wait(lck);
  36. }
  37. data = item_buffer[consume_position];
  38. consume_position++;
  39. if(consume_position == repository_size){
  40. consume_position = 0;
  41. }
  42. repo_not_full.notify_all();
  43. return data;
  44. }
  45. //生产线程
  46. void Producter_thread()
  47. {
  48. for(int i = 0;i < item_total; i++){
  49. std::cout << "生产者生产第"<< i <<"个产品."<< std::endl;
  50. produter_item(i);//执行producter动作
  51. }
  52. }
  53. //消费线程
  54. void Consumer_thread()
  55. {
  56. int cnt = 0;
  57. while(1){
  58. int item = consumer_item();//执行consume动作
  59. std::cout << "消费者消费第"<< item << "个产品." << std::endl;
  60. cnt++;//个数+1
  61. if(cnt == item_total){
  62. std::cout << "consumer_thread is finish." << std::endl;
  63. break; //结束
  64. }
  65. }
  66. }
  67. int main ()
  68. {
  69. std::thread producer(Producter_thread);//开启一个producer线程,入口是Producter_thread函数
  70. std::thread consumer(Consumer_thread);//同上
  71. producer.join();//等producer结束
  72. std::cout << "Producter_thread is end;" << std::endl;
  73. consumer.join();//等consumer结束再结束main函数
  74. std::cout << "consumer_thread is end;" << std::endl;
  75. }

(2) 多消费者,单生产者模型

多消费者模型,需要在单消费者模型上添加一个控制消费产品总数的变量,也是互斥访问的,有线程访问时候就加1。

  1. #include <iostream>
  2. #include <mutex>
  3. #include <thread>
  4. #include <condition_variable>
  5. #include <vector>
  6. std::mutex mtx; //互斥变量
  7. std::mutex consumer_count_mtx;//消费者互斥变量
  8. std::condition_variable repo_not_full;//条件变量
  9. std::condition_variable repo_not_empty;//条件变量
  10. static const int item_total = 20;//一共要生产的个数
  11. static const int repository_size = 10;//仓库的能够存储的个数
  12. static std::size_t consumer_items = 0; //消费者线程得到的产品数量
  13. static std::size_t product_position = 0;//buf中Product位置
  14. static std::size_t consume_position = 0;//buf中consume位置
  15. int item_buffer[repository_size]; //仓库buffer
  16. void produter_item(int i)
  17. {
  18. std::unique_lock<std::mutex> lck(mtx);//锁住
  19. //如果不加一 会直接死锁
  20. while((product_position + 1) % repository_size == consume_position ){
  21. std::cout << "仓库满了." << std::endl;
  22. repo_not_full.wait(lck);//等待repo_not_full的notify命令
  23. }
  24. item_buffer[product_position] = i;
  25. product_position++;
  26. if(product_position == repository_size){
  27. product_position = 0;
  28. }
  29. repo_not_empty.notify_all();//通知别的等待进程,buf内不是空的了
  30. }
  31. int consumer_item()
  32. {
  33. std::unique_lock<std::mutex>lck(mtx);
  34. int data;
  35. while(product_position == consume_position)
  36. {
  37. std::cout << "仓库空了." << std::endl;
  38. repo_not_empty.wait(lck);
  39. }
  40. data = item_buffer[consume_position];
  41. consume_position++;
  42. if(consume_position == repository_size){
  43. consume_position = 0;
  44. }
  45. repo_not_full.notify_all();
  46. return data;
  47. }
  48. //生产线程
  49. void Producter_thread()
  50. {
  51. for(int i = 0;i < item_total; i++){
  52. std::cout << "生产者生产第"<< i <<"个产品."<< std::endl;
  53. produter_item(i);//执行producter动作
  54. }
  55. }
  56. //消费线程
  57. void Consumer_thread()
  58. {
  59. int exit = 0;
  60. while(1){
  61. std::unique_lock<std::mutex> lck(consumer_count_mtx);
  62. consumer_items++;
  63. if(consumer_items <= item_total){
  64. int item = consumer_item();//执行consume动作
  65. std::cout << "消费者消费第"<< item << "个产品." << std::endl;
  66. }else{
  67. exit = 1;
  68. }
  69. if(exit == 1){
  70. std::cout << "消费完了所有的产品。." << std::endl;
  71. break; //结束
  72. }
  73. }
  74. std::cout << "消费者线程:" << std::this_thread::get_id() << "结束." << std::endl;
  75. }
  76. int main ()
  77. {
  78. std::thread producer(Producter_thread);//开启一个producer线程,入口是Producter_thread函数
  79. std::cout << "生产者线程id是:" << producer.get_id() << std::endl;
  80. std::vector<std::thread> thread_vector;//定义一个线程向量
  81. //将开启5个Consumer_thread线程
  82. for(int i = 0; i < 5; i++){
  83. thread_vector.push_back(std::thread(Consumer_thread));
  84. }
  85. producer.join();//等producer结束
  86. std::cout << "生产者线程结束." << std::endl;
  87. for(auto &thr:thread_vector){
  88. thr.join();//等consumer结束再结束main函数
  89. }
  90. }

3、多生产者单消费者模型

  1. #include <iostream>
  2. #include <mutex>
  3. #include <thread>
  4. #include <condition_variable>
  5. #include <vector>
  6. std::mutex mtx; //互斥变量
  7. std::mutex producer_count_mtx;//消费者互斥变量
  8. std::condition_variable repo_not_full;//条件变量
  9. std::condition_variable repo_not_empty;//条件变量
  10. static const int item_total = 20;//一共要生产的个数
  11. static const int repository_size = 10;//仓库的能够存储的个数
  12. static std::size_t consumer_items = 0; //消费者线程得到的产品数量
  13. static std::size_t producter_items = 0;//生产者线程生产的产品数量
  14. static std::size_t product_position = 0;//buf中Product位置
  15. static std::size_t consume_position = 0;//buf中consume位置
  16. int item_buffer[repository_size]; //仓库buffer
  17. std::chrono::seconds t(1);
  18. void produter_item(int i)
  19. {
  20. std::unique_lock<std::mutex> lck(mtx);//锁住
  21. //如果不加一 会直接死锁
  22. while((product_position + 1) % repository_size == consume_position ){
  23. std::cout << "仓库满了." << std::endl;
  24. repo_not_full.wait(lck);//等待repo_not_full的notify命令
  25. }
  26. item_buffer[product_position] = i;
  27. product_position++;
  28. if(product_position == repository_size){
  29. product_position = 0;
  30. }
  31. repo_not_empty.notify_all();//通知别的等待进程,buf内不是空的了
  32. }
  33. int consumer_item()
  34. {
  35. std::unique_lock<std::mutex>lck(mtx);
  36. int data;
  37. while(product_position == consume_position)
  38. {
  39. std::cout << "仓库空了." << std::endl;
  40. repo_not_empty.wait(lck);
  41. }
  42. data = item_buffer[consume_position];
  43. consume_position++;
  44. if(consume_position == repository_size){
  45. consume_position = 0;
  46. }
  47. repo_not_full.notify_all();
  48. return data;
  49. }
  50. //生产线程
  51. void Producter_thread()
  52. {
  53. int exit = 0;
  54. while(1){
  55. std::unique_lock<std::mutex> lck(producer_count_mtx);
  56. producter_items++;
  57. if(producter_items <= item_total){
  58. std::cout << "生产第" << producter_items << "产品." << std::endl;
  59. produter_item(producter_items - 1);
  60. }else{
  61. exit = 1;
  62. }
  63. if(exit == 1){
  64. std::cout << "生产数量已经达到要求." << std::endl;
  65. break;
  66. }
  67. }
  68. std::cout << "生产者线程" << std::this_thread::get_id() << "结束." << std::endl;
  69. }
  70. //消费线程
  71. void Consumer_thread()
  72. {
  73. while(1){
  74. std::this_thread::sleep_for(t);
  75. if(consumer_items <= item_total){
  76. int item = consumer_item();//执行consume动作
  77. std::cout << "消费者消费"<< item << "产品." << std::endl;
  78. }
  79. consumer_items++;
  80. if(consumer_items == item_total){
  81. break;
  82. }
  83. }
  84. }
  85. int main ()
  86. {
  87. //定义5个生产者线程和一个消费者线程
  88. std::vector<std::thread> thread_vector;
  89. for(int i = 0; i < 5; i++){
  90. thread_vector.push_back(std::thread(Producter_thread));
  91. }
  92. std::thread consumer(Consumer_thread);
  93. for(auto &thr: thread_vector){
  94. thr.join();
  95. }
  96. consumer.join();
  97. std::cout << "消费者线程结束." << std::endl;
  98. }

4、多生产者多消费者模型

  1. #include <iostream>
  2. #include <mutex>
  3. #include <thread>
  4. #include <condition_variable>
  5. #include <vector>
  6. std::mutex mtx; //互斥变量
  7. std::mutex consumer_count_mtx;//消费者互斥变量
  8. std::mutex producer_count_mtx;//生产者互斥变量
  9. std::condition_variable repo_not_full;//条件变量
  10. std::condition_variable repo_not_empty;//条件变量
  11. static const int item_total = 20;//一共要生产的个数
  12. static const int repository_size = 10;//仓库的能够存储的个数
  13. static std::size_t producer_items = 0;//生产者线程生产的产品数量
  14. static std::size_t consumer_items = 0; //消费者线程得到的产品数量
  15. static std::size_t product_position = 0;//buf中Product位置
  16. static std::size_t consume_position = 0;//buf中consume位置
  17. int item_buffer[repository_size]; //仓库buffer
  18. std::chrono::seconds t(1);
  19. void produter_item(int i)
  20. {
  21. std::unique_lock<std::mutex> lck(mtx);//锁住
  22. //如果不加一 会直接死锁
  23. while((product_position + 1) % repository_size == consume_position ){
  24. std::cout << "仓库满了." << std::endl;
  25. repo_not_full.wait(lck);//等待repo_not_full的notify命令
  26. }
  27. item_buffer[product_position] = i;
  28. product_position++;
  29. if(product_position == repository_size){
  30. product_position = 0;
  31. }
  32. repo_not_empty.notify_all();//通知别的等待进程,buf内不是空的了
  33. }
  34. int consumer_item()
  35. {
  36. std::unique_lock<std::mutex>lck(mtx);
  37. int data;
  38. while(product_position == consume_position)
  39. {
  40. std::cout << "仓库空了." << std::endl;
  41. repo_not_empty.wait(lck);
  42. }
  43. data = item_buffer[consume_position];
  44. consume_position++;
  45. if(consume_position == repository_size){
  46. consume_position = 0;
  47. }
  48. repo_not_full.notify_all();
  49. return data;
  50. }
  51. //生产线程
  52. void Producter_thread()
  53. {
  54. int exit = 0;
  55. while(1){
  56. std::unique_lock<std::mutex> lck(producer_count_mtx);
  57. producer_items ++;
  58. if(producer_items <= item_total){
  59. std::cout << "生产者生产第"<< producer_items <<"个产品."<< std::endl;
  60. produter_item(producer_items);//执行producter动作
  61. }else{
  62. exit = 1;
  63. }
  64. if(exit == 1){
  65. std::cout << "生产完了所有产品." << std::endl;
  66. break;
  67. }
  68. }
  69. std::cout << "生产者线程:" << std::this_thread::get_id() <<"结束." << std::endl;
  70. }
  71. //消费线程
  72. void Consumer_thread()
  73. {
  74. int exit = 0;
  75. while(1){
  76. std::this_thread::sleep_for(t);
  77. std::unique_lock<std::mutex> lck(consumer_count_mtx);
  78. consumer_items++;
  79. if(consumer_items <= item_total){
  80. int item = consumer_item();//执行consume动作
  81. std::cout << "消费者消费第"<< item << "个产品." << std::endl;
  82. }else{
  83. exit = 1;
  84. }
  85. if(exit == 1){
  86. std::cout << "消费完了所有的产品." << std::endl;
  87. break; //结束
  88. }
  89. }
  90. std::cout << "消费者线程:" << std::this_thread::get_id() << "结束." << std::endl;
  91. }
  92. int main ()
  93. {
  94. std::vector<std::thread> producer_vector;//定义一个生产者线程向量
  95. std::vector<std::thread> consumer_vector;//定义一个消费者线程向量
  96. //将开启5个Consumer_thread线程
  97. for(int i = 0; i < 5; i++){
  98. producer_vector.push_back(std::thread(Producter_thread));
  99. consumer_vector.push_back(std::thread(Consumer_thread));
  100. }
  101. //等待生产者线程结束
  102. for(auto &thr:producer_vector){
  103. thr.join();
  104. }
  105. //等待消费者线程结束
  106. for(auto &thr:consumer_vector){
  107. thr.join();//等consumer结束再结束main函数
  108. }
  109. }

总结:

多线程的时候,要在两个以上线程公用的变量前面加锁,阻止访问冲突。要像更加灵活的运用锁,就要和条件变量配合来使用。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/661617
推荐阅读
相关标签
  

闽ICP备14008679号