当前位置:   article > 正文

C++多线程学习(十、生产者消费者模式)_c++ 多生产者和消费者

c++ 多生产者和消费者

目录

生产者消费者模式

简单的实现步骤:

代码【内涵注释】:

代码注解:

if (FW.ConsumePersonNum== WarehouseSize):

if (FW.ProductPersonPosition== ProductNum):


生产者消费者模式

生产者消费者模式是常用的多线程编程模式,用于解决生产者和消费者之间的数据交互问题。在该模式中,生产者负责产生数据,而消费者负责处理数据。通过多线程的方式,生产者将数据放入一个共享的缓冲区中,而消费者从缓冲区中获取数据进行处理。

简单的实现步骤:

1. 创建一个共享的缓冲区,用于存储生产者产生的数据。缓冲区可以是一个数组、队列或者其他数据结构。

2. 创建一个互斥锁(mutex)来保护对缓冲区的访问。互斥锁用于确保同时只有一个线程能够访问缓冲区。

3. 创建一个条件变量(condition variable)用于在生产者产生数据和消费者处理数据之间进行通信。条件变量用于阻塞线程,直到某个条件满足。

4. 创建一个生产者线程,用于产生数据。生产者线程将使用互斥锁来保护对缓冲区的访问,并在产生数据后通过条件变量通知消费者线程。

5. 创建一个或多个消费者线程,用于处理数据。消费者线程将使用互斥锁来保护对缓冲区的访问,并在缓冲区为空时通过条件变量等待通知。

6. 生产者线程产生数据后,获取互斥锁并将数据放入缓冲区。然后通过条件变量通知一个或多个消费者线程。

7. 消费者线程在处理数据前,获取互斥锁并从缓冲区中获取数据。如果缓冲区为空,则通过条件变量等待通知。

8. 当生产者线程产生数据并将数据放入缓冲区后,需要释放互斥锁,以便其他线程可以访问缓冲区。

9. 当消费者线程处理完数据后,需要释放互斥锁,以便其他线程可以访问缓冲区。

通过使用互斥锁和条件变量,可以实现线程间的同步和通信,确保生产者和消费者之间的正确交互。

代码【内涵注释】:

  1. #include <iostream>
  2. #include <thread>
  3. #include <deque>//双向队列
  4. #include <mutex>
  5. #include<condition_variable>
  6. using namespace std;
  7. const int ProductNum = 100;//产品个数
  8. const int WarehouseSize = 10;//仓库大小
  9. //仓库类
  10. template<class T>
  11. class Warehouse
  12. {
  13. public:
  14. Warehouse()
  15. {
  16. size_t ProductPersonNum = 0;
  17. size_t ConsumePersonNum = 0;
  18. size_t ProductPersonPosition = 0;
  19. size_t ConsumePersonPosition = 0;
  20. };
  21. public:
  22. deque<T> SaveProduct; //存储产品
  23. mutex mtx; //生产者和消费者的互斥量
  24. mutex mtx_Product; //生产计数互斥量
  25. mutex mtx_Consume; //消费计数互斥量
  26. condition_variable Warehouse_noFull; //条件变量:仓库没满
  27. condition_variable Warehouse_noEmpty; //条件变量:仓库不是空的
  28. size_t ProductPersonNum;//生产者计数
  29. size_t ConsumePersonNum;//消费者计数
  30. //确定当前是哪个位置的生产者或消费者需要进行操作。
  31. size_t ProductPersonPosition;//生产者位置
  32. size_t ConsumePersonPosition;//消费者位置
  33. };
  34. //工厂类
  35. template<class T>
  36. class Factory
  37. {
  38. public:
  39. //任务派发,具体的实现在protected里面
  40. //生产者操作
  41. void ProducterTask()
  42. {
  43. bool bReadyExit = false;
  44. while (true)
  45. {
  46. unique_lock<mutex> lock(FactoryWarehouse.mtx_Product);//加锁产品互斥量
  47. //线程结束条件
  48. if (FactoryWarehouse.ProductPersonNum< ProductNum)//小于就继续做计数过程
  49. {
  50. FactoryWarehouse.ProductPersonNum++;
  51. //生产产品
  52. //this_thread::sleep_for(1s);//假设要1s生产产品
  53. T item = FactoryWarehouse.ProductPersonNum;
  54. cout << "生产者的ID:" << this_thread::get_id() << endl;
  55. cout << "货源号:" << item << endl;
  56. InputFWarehouse(FactoryWarehouse, item);
  57. }
  58. else//否则
  59. {
  60. bReadyExit = true;
  61. }
  62. lock.unlock();
  63. if (bReadyExit)
  64. {
  65. break;
  66. }
  67. }
  68. }
  69. //消费者操作
  70. void ConsumerTask()
  71. {
  72. bool bReadyExit = false;
  73. while (true)
  74. {
  75. unique_lock<mutex> lock(FactoryWarehouse.mtx_Consume);//加锁消费者互斥量
  76. if (FactoryWarehouse.ConsumePersonNum < ProductNum)
  77. {
  78. T item = GetProductInFWarehouse(FactoryWarehouse);
  79. //消费产品
  80. //this_thread::sleep_for(1s);
  81. cout << "消费者的ID:" << this_thread::get_id() << endl;
  82. cout << "消费的货源号:" << item << endl;
  83. FactoryWarehouse.ConsumePersonNum++;
  84. }
  85. else
  86. {
  87. bReadyExit = true;
  88. }
  89. lock.unlock();
  90. if (bReadyExit)
  91. {
  92. break;
  93. }
  94. }
  95. }
  96. protected:
  97. Warehouse<T> FactoryWarehouse;//工厂仓库
  98. //把产品放到仓库里面
  99. void InputFWarehouse(Warehouse<T>& FW ,T item)
  100. {
  101. unique_lock<mutex> lock(FW.mtx);//加锁
  102. FW.SaveProduct.push_back(item);
  103. //当ProductPersonPosition达到了ProductNum时,将ConsumePersonPosition重置为0。这样可以保证循环使用消费者线程来消费产品。
  104. if (FW.ProductPersonPosition== ProductNum)
  105. {
  106. FW.ConsumePersonPosition = 0;
  107. }
  108. FW.Warehouse_noEmpty.notify_all();//唤醒所有线程
  109. }
  110. //从仓库中取出产品
  111. T GetProductInFWarehouse(Warehouse<T>& FW)
  112. {
  113. unique_lock<mutex> lock(FW.mtx);
  114. while (FW.SaveProduct.empty())
  115. {
  116. cout << "无货物,请等待." << endl;
  117. FW.Warehouse_noEmpty.wait(lock);//没有货源,进行等待
  118. }
  119. T Data = FW.SaveProduct.front();
  120. FW.SaveProduct.pop_front();
  121. if (FW.ConsumePersonNum== WarehouseSize)
  122. {
  123. FW.ConsumePersonPosition = 0;
  124. }
  125. FW.Warehouse_noFull.notify_all();
  126. lock.unlock();
  127. return Data;
  128. }
  129. };
  130. int main()
  131. {
  132. //测试
  133. cout << "主线程ID:" << this_thread::get_id() << endl;
  134. Factory<int> myFactory;
  135. //4个生产者
  136. thread Producter1(&Factory<int>::ProducterTask, &myFactory);
  137. thread Producter2(&Factory<int>::ProducterTask, &myFactory);
  138. thread Producter3(&Factory<int>::ProducterTask, &myFactory);
  139. thread Producter4(&Factory<int>::ProducterTask, &myFactory);
  140. //5个消费者
  141. thread Consumer1(&Factory<int>::ConsumerTask, &myFactory);
  142. thread Consumer2(&Factory<int>::ConsumerTask, &myFactory);
  143. thread Consumer3(&Factory<int>::ConsumerTask, &myFactory);
  144. thread Consumer4(&Factory<int>::ConsumerTask, &myFactory);
  145. thread Consumer5(&Factory<int>::ConsumerTask, &myFactory);
  146. Producter1.join();
  147. Producter2.join();
  148. Producter3.join();
  149. Producter4.join();
  150. Consumer1.join();
  151. Consumer2.join();
  152. Consumer3.join();
  153. Consumer4.join();
  154. Consumer5.join();
  155. return 0;
  156. }

代码注解:

if (FW.ConsumePersonNum== WarehouseSize):

当消费者线程的数量达到了仓库的大小时,说明所有的消费者线程已经使用完了,需要重新开始计数。即将ConsumePersonPosition重置为0,下一个消费者线程从头开始使用。

if (FW.ProductPersonPosition== ProductNum):

当生产者线程的数量达到了产品的数量时,说明所有的产品已经生产完了,需要重新开始计数。即将ConsumePersonPosition重置为0,下一个生产者线程从头开始生产。

这样做的目的是实现循环使用生产者和消费者线程的效果,使得每个生产者和消费者线程都能循环使用,不会因为达到边界而停止工作。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/267899
推荐阅读
相关标签
  

闽ICP备14008679号