当前位置:   article > 正文

C++多线程生产者消费者的实现_c++多线程生产者消费者怎么实现

c++多线程生产者消费者怎么实现

分为四种情况:单生产者单消费者;单生产者多消费者;多生产者单消费者;多生产者多消费者。

单生产者单消费者:

  1. #include<iostream>
  2. #include<chrono>
  3. #include<mutex>
  4. #include<condition_variable>
  5. #include<thread>
  6. using namespace std;
  7. static int pool_max = 10, max_size = 15, pool_size = 0, producer_count = 0, consumer_count = 0;
  8. /* 产品池容量大小 最多生产产品的个数 当前池中的产品个数 生产了个产品数量 消费了的产品数量 */
  9. mutex m_c, m_p, m;
  10. /* 多消费者时的互斥量 多生产者时的互斥量 产品池的互斥量 */
  11. condition_variable p_not_full, p_not_empty;
  12. /* 生产者等待产品池不满 消费者等待产品池非空 */
  13. void ProducerS ()
  14. {
  15. while (true)
  16. {
  17. unique_lock<mutex> lck(m);
  18. while (pool_size == pool_max)
  19. {
  20. cout << "Pool full\t Wait..." << endl;
  21. p_not_full.wait(lck);
  22. }
  23. pool_size++;
  24. producer_count++;
  25. cout << "Producer generate : " << producer_count << endl;
  26. if (pool_size == 1)
  27. {
  28. cout << "Notify consumer...\n";
  29. p_not_empty.notify_one();
  30. }
  31. if (producer_count == max_size) break;
  32. lck.unlock();
  33. this_thread::sleep_for(chrono::milliseconds(1000));
  34. }
  35. cout << "Producer Single exit\t id is : " << this_thread::get_id() << endl;
  36. }
  37. void ConsumerS()
  38. {
  39. while (true)
  40. {
  41. unique_lock<mutex> lck(m);
  42. while (pool_size == 0)
  43. {
  44. cout << "Pool empty\t Wait...\n";
  45. p_not_empty.wait(lck);
  46. }
  47. pool_size--;
  48. consumer_count++;
  49. cout << "Consumer consume : " << consumer_count << endl;
  50. if (pool_size == pool_max-1)
  51. {
  52. cout << "Notify producer...\n";
  53. p_not_full.notify_one();
  54. }
  55. if (consumer_count == max_size) break;
  56. lck.unlock();
  57. this_thread::sleep_for(chrono::milliseconds(1500));
  58. }
  59. cout << "Consumer Single exit\t id is : " << this_thread::get_id() << endl;
  60. }
  61. void TestPsCs()
  62. {
  63. thread t_c(ConsumerS);
  64. thread t_p(ProducerS);
  65. t_c.join();
  66. t_p.join();
  67. }
  68. int main(void)
  69. {
  70. TestPsCs();
  71. return 0;
  72. }

运行截图:

单生产者多消费者:

  1. #include<iostream>
  2. #include<chrono>
  3. #include<mutex>
  4. #include<condition_variable>
  5. #include<thread>
  6. using namespace std;
  7. static int pool_max = 10, max_size = 15, pool_size = 0, producer_count = 0, consumer_count = 0;
  8. /* 产品池容量大小 最多生产产品的个数 当前池中的产品个数 生产了个产品数量 消费了的产品数量 */
  9. mutex m_c, m_p, m;
  10. /* 多消费者时的互斥量 多生产者时的互斥量 产品池的互斥量 */
  11. condition_variable p_not_full, p_not_empty;
  12. /* 生产者等待产品池不满 消费者等待产品池非空 */
  13. void ProducerS ()
  14. {
  15. while (true)
  16. {
  17. unique_lock<mutex> lck(m);
  18. while (pool_size == pool_max)
  19. {
  20. cout << "Pool full\t Wait..." << endl;
  21. p_not_full.wait(lck);
  22. }
  23. pool_size++;
  24. producer_count++;
  25. cout << "Producer generate : " << producer_count << endl;
  26. if (pool_size == 1)
  27. {
  28. cout << "Notify consumer...\n";
  29. p_not_empty.notify_one();
  30. }
  31. if (producer_count == max_size) break;
  32. lck.unlock();
  33. this_thread::sleep_for(chrono::milliseconds(1000));
  34. }
  35. cout << "Producer Single exit\t id is : " << this_thread::get_id() << endl;
  36. }
  37. void ConsumerM()
  38. {
  39. while (true)
  40. {
  41. unique_lock<mutex> lck_c(m_c);
  42. if (consumer_count == max_size) break;
  43. unique_lock<mutex> lck(m);
  44. while (pool_size == 0)
  45. {
  46. cout << "Pool empty\t Wait...\n";
  47. p_not_empty.wait(lck);
  48. }
  49. pool_size--;
  50. consumer_count++;
  51. cout << "Consumer : " << this_thread::get_id() << "\tconsume : " << consumer_count << endl;
  52. if (pool_size == pool_max-1)
  53. {
  54. cout << "Notify producer...\n";
  55. p_not_full.notify_one();
  56. }
  57. if (consumer_count == max_size) break;
  58. lck_c.unlock();
  59. lck.unlock();
  60. this_thread::sleep_for(chrono::milliseconds(1500));
  61. }
  62. cout << "Consumer Single exit\t id is : " << this_thread::get_id() << endl;
  63. }
  64. void TestPsCM()
  65. {
  66. const int c_t_i = 5;
  67. thread t_c[c_t_i];
  68. for (int i = 0; i<c_t_i; ++i)
  69. t_c[i] = thread(ConsumerM);
  70. thread t_p(ProducerS);
  71. for (int i = 0; i<c_t_i; ++i)
  72. t_c[i].join();
  73. t_p.join();
  74. }
  75. int main(void)
  76. {
  77. TestPsCM();
  78. return 0;
  79. }

运行截图:

多生产者单消费者:

  1. #include<iostream>
  2. #include<chrono>
  3. #include<mutex>
  4. #include<condition_variable>
  5. #include<thread>
  6. using namespace std;
  7. static int pool_max = 10, max_size = 15, pool_size = 0, producer_count = 0, consumer_count = 0;
  8. /* 产品池容量大小 最多生产产品的个数 当前池中的产品个数 生产了个产品数量 消费了的产品数量 */
  9. mutex m_c, m_p, m;
  10. /* 多消费者时的互斥量 多生产者时的互斥量 产品池的互斥量 */
  11. condition_variable p_not_full, p_not_empty;
  12. /* 生产者等待产品池不满 消费者等待产品池非空 */
  13. void ProducerM ()
  14. {
  15. while (true)
  16. {
  17. unique_lock<mutex> lck_p(m_p);
  18. if (producer_count == max_size) break;
  19. unique_lock<mutex> lck(m);
  20. while (pool_size == pool_max)
  21. {
  22. cout << "Pool full\t Wait..." << endl;
  23. p_not_full.wait(lck);
  24. }
  25. pool_size++;
  26. producer_count++;
  27. cout << "Producer : " << this_thread::get_id() << "\tgenerate : " << producer_count << endl;
  28. if (pool_size == 1)
  29. {
  30. cout << "Notify consumer...\n";
  31. p_not_empty.notify_one();
  32. }
  33. if (producer_count == max_size) break;
  34. lck.unlock();
  35. lck_p.unlock();
  36. this_thread::sleep_for(chrono::milliseconds(1000));
  37. }
  38. cout << "Producer Single exit\t id is : " << this_thread::get_id() << endl;
  39. }
  40. void ConsumerS()
  41. {
  42. while (true)
  43. {
  44. unique_lock<mutex> lck(m);
  45. while (pool_size == 0)
  46. {
  47. cout << "Pool empty\t Wait...\n";
  48. p_not_empty.wait(lck);
  49. }
  50. pool_size--;
  51. consumer_count++;
  52. cout << "Consumer : " << this_thread::get_id() << "\tconsume : " << consumer_count << endl;
  53. if (pool_size == pool_max-1)
  54. {
  55. cout << "Notify producer...\n";
  56. p_not_full.notify_one();
  57. }
  58. if (consumer_count == max_size) break;
  59. lck.unlock();
  60. this_thread::sleep_for(chrono::milliseconds(1500));
  61. }
  62. cout << "Consumer Single exit\t id is : " << this_thread::get_id() << endl;
  63. }
  64. void TestPmCs()
  65. {
  66. const int p_t_i = 5;
  67. thread t_c(ConsumerS);
  68. thread t_p[p_t_i];
  69. for (int i = 0; i<p_t_i; ++i)
  70. t_p[i] = thread(ProducerM);
  71. for (int i = 0; i<p_t_i; ++i)
  72. t_p[i].join();
  73. t_c.join();
  74. }
  75. int main(void)
  76. {
  77. TestPmCs();
  78. return 0;
  79. }

运行截图:

多生产者多消费者:

  1. #include<iostream>
  2. #include<chrono>
  3. #include<mutex>
  4. #include<condition_variable>
  5. #include<thread>
  6. using namespace std;
  7. static int pool_max = 10, max_size = 15, pool_size = 0, producer_count = 0, consumer_count = 0;
  8. /* 产品池容量大小 最多生产产品的个数 当前池中的产品个数 生产了个产品数量 消费了的产品数量 */
  9. mutex m_c, m_p, m;
  10. /* 多消费者时的互斥量 多生产者时的互斥量 产品池的互斥量 */
  11. condition_variable p_not_full, p_not_empty;
  12. /* 生产者等待产品池不满 消费者等待产品池非空 */
  13. void ProducerM ()
  14. {
  15. while (true)
  16. {
  17. unique_lock<mutex> lck_p(m_p);
  18. if (producer_count == max_size) break;
  19. unique_lock<mutex> lck(m);
  20. while (pool_size == pool_max)
  21. {
  22. cout << "Pool full\t Wait..." << endl;
  23. p_not_full.wait(lck);
  24. }
  25. pool_size++;
  26. producer_count++;
  27. cout << "Producer : " << this_thread::get_id() << "\tgenerate : " << producer_count << endl;
  28. if (pool_size == 1)
  29. {
  30. cout << "Notify consumer...\n";
  31. p_not_empty.notify_one();
  32. }
  33. if (producer_count == max_size) break;
  34. lck.unlock();
  35. lck_p.unlock();
  36. this_thread::sleep_for(chrono::milliseconds(1000));
  37. }
  38. cout << "Producer Single exit\t id is : " << this_thread::get_id() << endl;
  39. }
  40. void ConsumerM()
  41. {
  42. while (true)
  43. {
  44. unique_lock<mutex> lck_c(m_c);
  45. if (consumer_count == max_size) break;
  46. unique_lock<mutex> lck(m);
  47. while (pool_size == 0)
  48. {
  49. cout << "Pool empty\t Wait...\n";
  50. p_not_empty.wait(lck);
  51. }
  52. pool_size--;
  53. consumer_count++;
  54. cout << "Consumer : " << this_thread::get_id() << "\tconsume : " << consumer_count << endl;
  55. if (pool_size == pool_max-1)
  56. {
  57. cout << "Notify producer...\n";
  58. p_not_full.notify_one();
  59. }
  60. if (consumer_count == max_size) break;
  61. lck.unlock();
  62. lck_c.unlock();
  63. this_thread::sleep_for(chrono::milliseconds(1500));
  64. }
  65. cout << "Consumer Single exit\t id is : " << this_thread::get_id() << endl;
  66. }
  67. void TestPmCm()
  68. {
  69. const int p_t_i = 5, c_t_i = 7;
  70. thread t_c[c_t_i];
  71. thread t_p[p_t_i];
  72. for (int i=0; i<c_t_i; ++i)
  73. t_c[i] = thread(ConsumerM);
  74. for (int i = 0; i<p_t_i; ++i)
  75. t_p[i] = thread(ProducerM);
  76. for (int i=0; i<c_t_i; ++i)
  77. t_c[i].join();
  78. for (int i = 0; i<p_t_i; ++i)
  79. t_p[i].join();
  80. }
  81. int main(void)
  82. {
  83. TestPmCm();
  84. return 0;
  85. }

运行截图:

 

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号