#include #include #include #include #include #include #include using name._c++线程池">
当前位置:   article > 正文

线程池 - C++_c++线程池


1. 基本概念

线程池(thread pool): 一种线程使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在短时间任务创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数据取决于可用的并发处理器、处理器内核、内存、网络sockets等数量。

2. 线程池的组成

2.1 线程池管理器


2.2 工作线程


2.3 任务接口


2.4 任务队列


3. 线程池工作的四种情况


3.1 主程序当前没有任务要执行,线程池中任务队列为空闲状态


3.2 主程序添加小于等于线程池中线程数量得任务


3.3 主程序添加任务数量大于当前线程池中线程数量的任务


3.4 主程序添加任务数量大于当前线程池中线程数量的任务,且任务缓冲队列已满


4. 线程池的C++实现

参考连接: Thread poolThreadPool


  • 任务队列(Task Quene)
  • 线程池(Thread Pool)
  • 完成队列(Completed Tasks)


  1. #ifndef _THREADPOOL_HH
  2. #define _THREADPOOL_HH
  3. #include <vector>
  4. #include <utility>
  5. #include <queue>
  6. #include <thread>
  7. #include <functional>
  8. #include <mutex>
  9. #include "Condition.hh"
  10. class ThreadPool{
  11. public:
  12. static const int kInitThreadsSize = 3;
  13. enum taskPriorityE { level0, level1, level2, };
  14. typedef std::function<void()> Task;
  15. typedef std::pair<taskPriorityE, Task> TaskPair;
  16. ThreadPool();
  17. ~ThreadPool();
  18. void start();
  19. void stop();
  20. void addTask(const Task&);
  21. void addTask(const TaskPair&);
  22. private:
  23. ThreadPool(const ThreadPool&);//禁止复制拷贝.
  24. const ThreadPool& operator=(const ThreadPool&);
  25. struct TaskPriorityCmp
  26. {
  27. bool operator()(const ThreadPool::TaskPair p1, const ThreadPool::TaskPair p2)
  28. {
  29. return p1.first > p2.first; //first的小值优先
  30. }
  31. };
  32. void threadLoop();
  33. Task take();
  34. typedef std::vector<std::thread*> Threads;
  35. typedef std::priority_queue<TaskPair, std::vector<TaskPair>, TaskPriorityCmp> Tasks;
  36. Threads m_threads;
  37. Tasks m_tasks;
  38. std::mutex m_mutex;
  39. Condition m_cond;
  40. bool m_isStarted;
  41. };
  42. #endif
  43. //Cpp
  44. #include <assert.h>
  45. #include "Logger.hh" // debug
  46. #include "CurrentThread.hh" // debug
  47. #include "ThreadPool.hh"
  48. ThreadPool::ThreadPool()
  49. :m_mutex(),
  50. m_cond(m_mutex),
  51. m_isStarted(false)
  52. {
  53. }
  54. ThreadPool::~ThreadPool()
  55. {
  56. if(m_isStarted)
  57. {
  58. stop();
  59. }
  60. }
  61. void ThreadPool::start()
  62. {
  63. assert(m_threads.empty());
  64. m_isStarted = true;
  65. m_threads.reserve(kInitThreadsSize);
  66. for (int i = 0; i < kInitThreadsSize; ++i)
  67. {
  68. m_threads.push_back(new std::thread(std::bind(&ThreadPool::threadLoop, this)));
  69. }
  70. }
  71. void ThreadPool::stop()
  72. {
  73. LOG_TRACE << "ThreadPool::stop() stop.";
  74. {
  75. std::unique_lock<std::mutex> lock(m_mutex);
  76. m_isStarted = false;
  77. m_cond.notifyAll();
  78. LOG_TRACE << "ThreadPool::stop() notifyAll().";
  79. }
  80. for (Threads::iterator it = m_threads.begin(); it != m_threads.end() ; ++it)
  81. {
  82. (*it)->join();
  83. delete *it;
  84. }
  85. m_threads.clear();
  86. }
  87. void ThreadPool::threadLoop()
  88. {
  89. LOG_TRACE << "ThreadPool::threadLoop() tid : " << CurrentThread::tid() << " start.";
  90. while(m_isStarted)
  91. {
  92. Task task = take();
  93. if(task)
  94. {
  95. task();
  96. }
  97. }
  98. LOG_TRACE << "ThreadPool::threadLoop() tid : " << CurrentThread::tid() << " exit.";
  99. }
  100. void ThreadPool::addTask(const Task& task)
  101. {
  102. std::unique_lock<std::mutex> lock(m_mutex);
  103. /*while(m_tasks.isFull())
  104. {//when m_tasks have maxsize
  105. cond2.wait();
  106. }
  107. */
  108. TaskPair taskPair(level2, task);
  109. m_tasks.push(taskPair);
  110. m_cond.notify();
  111. }
  112. void ThreadPool::addTask(const TaskPair& taskPair)
  113. {
  114. std::unique_lock<std::mutex> lock(m_mutex);
  115. /*while(m_tasks.isFull())
  116. {//when m_tasks have maxsize
  117. cond2.wait();
  118. }
  119. */
  120. m_tasks.push(taskPair);
  121. m_cond.notify();
  122. }
  123. ThreadPool::Task ThreadPool::take()
  124. {
  125. std::unique_lock<std::mutex> lock(m_mutex);
  126. //always use a while-loop, due to spurious wakeup
  127. while(m_tasks.empty() && m_isStarted)
  128. {
  129. LOG_TRACE << "ThreadPool::take() tid : " << CurrentThread::tid() << " wait.";
  130. m_cond.wait(lock);
  131. }
  132. LOG_TRACE << "ThreadPool::take() tid : " << CurrentThread::tid() << " wakeup.";
  133. Task task;
  134. Tasks::size_type size = m_tasks.size();
  135. if(!m_tasks.empty() && m_isStarted)
  136. {
  137. task = m_tasks.top().second;
  138. m_tasks.pop();
  139. assert(size - 1 == m_tasks.size());
  140. /*if (TaskQueueSize_ > 0)
  141. {
  142. cond2.notify();
  143. }*/
  144. }
  145. return task;
  146. }

4.1 队列

队列作为先进先出的数据结构,当有可用的工作时,线程从队列中获取工作并执行。如果两个线程同时执行相同的工作会出现程序崩溃。为了避免这种问题,需要再标准C++ Queue上实现一个包装器,使用mutex来限制并发访问。

  1. void enqueue(T& t) {
  2. std::unique_lock<std::mutex> lock(m_mutex);
  3. m_queue.push(t);
  4. }


4.2 提交函数


5. 测试程序

5.1 start()、stop()


  1. int main(){
  2. {
  3. ThreadPool threadPool;
  4. threadPool.start();
  5. getchar();}
  6. getchar();
  7. return 0;
  8. }
  1. ./test.out
  2. 2018-11-25 16:50:36.054805 [TRACE] [ThreadPool.cpp:53] [threadLoop] ThreadPool::threadLoop() tid : 3680 start.
  3. 2018-11-25 16:50:36.054855 [TRACE] [ThreadPool.cpp:72] [take] ThreadPool::take() tid : 3680 wait.
  4. 2018-11-25 16:50:36.055633 [TRACE] [ThreadPool.cpp:53] [threadLoop] ThreadPool::threadLoop() tid : 3679 start.
  5. 2018-11-25 16:50:36.055676 [TRACE] [ThreadPool.cpp:72] [take] ThreadPool::take() tid : 3679 wait.
  6. 2018-11-25 16:50:36.055641 [TRACE] [ThreadPool.cpp:53] [threadLoop] ThreadPool::threadLoop() tid : 3681 start.
  7. 2018-11-25 16:50:36.055701 [TRACE] [ThreadPool.cpp:72] [take] ThreadPool::take() tid : 3681 wait.
  8. 2018-11-25 16:50:36.055736 [TRACE] [ThreadPool.cpp:53] [threadLoop] ThreadPool::threadLoop() tid : 3682 start.
  9. 2018-11-25 16:50:36.055746 [TRACE] [ThreadPool.cpp:72] [take] ThreadPool::take() tid : 3682 wait.
  10. 2018-11-25 16:51:01.411792 [TRACE] [ThreadPool.cpp:36] [stop] ThreadPool::stop() stop.
  11. 2018-11-25 16:51:01.411863 [TRACE] [ThreadPool.cpp:39] [stop] ThreadPool::stop() notifyAll().
  12. 2018-11-25 16:51:01.411877 [TRACE] [ThreadPool.cpp:76] [take] ThreadPool::take() tid : 3680 wakeup.
  13. 2018-11-25 16:51:01.411883 [TRACE] [ThreadPool.cpp:62] [threadLoop] ThreadPool::threadLoop() tid : 3680 exit.
  14. 2018-11-25 16:51:01.412062 [TRACE] [ThreadPool.cpp:76] [take] ThreadPool::take() tid : 3682 wakeup.
  15. 2018-11-25 16:51:01.412110 [TRACE] [ThreadPool.cpp:62] [threadLoop] ThreadPool::threadLoop() tid : 3682 exit.
  16. 2018-11-25 16:51:01.413052 [TRACE] [ThreadPool.cpp:76] [take] ThreadPool::take() tid : 3679 wakeup.
  17. 2018-11-25 16:51:01.413098 [TRACE] [ThreadPool.cpp:62] [threadLoop] ThreadPool::threadLoop() tid : 3679 exit.
  18. 2018-11-25 16:51:01.413112 [TRACE] [ThreadPool.cpp:76] [take] ThreadPool::take() tid : 3681 wakeup.
  19. 2018-11-25 16:51:01.413141 [TRACE] [ThreadPool.cpp:62] [threadLoop] ThreadPool::threadLoop() tid : 3681 exit.

5.2 addTask()、 PriorityTaskQueue


  1. std::mutex g_mutex;
  2. void priorityFunc()
  3. {
  4. for (int i = 1; i < 4; ++i)
  5. {
  6. std::this_thread::sleep_for(std::chrono::seconds(1));
  7. std::lock_guard<std::mutex> lock(g_mutex);
  8. LOG_DEBUG << "priorityFunc() [" << i << "at thread [ " << CurrentThread::tid() << "] output";// << std::endl;
  9. }
  10. }
  11. void testFunc()
  12. {
  13. // loop to print character after a random period of time
  14. for (int i = 1; i < 4; ++i)
  15. {
  16. std::this_thread::sleep_for(std::chrono::seconds(1));
  17. std::lock_guard<std::mutex> lock(g_mutex);
  18. LOG_DEBUG << "testFunc() [" << i << "] at thread [ " << CurrentThread::tid() << "] output";// << std::endl;
  19. }
  20. }
  21. int main()
  22. {
  23. ThreadPool threadPool;
  24. threadPool.start();
  25. for(int i = 0; i < 5 ; i++)
  26. threadPool.addTask(testFunc);
  27. std::this_thread::sleep_for(std::chrono::seconds(1));
  28. threadPool.addTask(ThreadPool::TaskPair(ThreadPool::level0, priorityFunc));
  29. getchar();
  30. return 0;
  31. }
  1. ./test.out
  2. 2018-11-25 18:24:20.886837 [TRACE] [ThreadPool.cpp:56] [threadLoop] ThreadPool::threadLoop() tid : 4121 start.
  3. 2018-11-25 18:24:20.886893 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4121 wakeup.
  4. 2018-11-25 18:24:20.887580 [TRACE] [ThreadPool.cpp:56] [threadLoop] ThreadPool::threadLoop() tid : 4120 start.
  5. 2018-11-25 18:24:20.887606 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4120 wakeup.
  6. 2018-11-25 18:24:20.887610 [TRACE] [ThreadPool.cpp:56] [threadLoop] ThreadPool::threadLoop() tid : 4122 start.
  7. 2018-11-25 18:24:20.887620 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4122 wakeup.
  8. 2018-11-25 18:24:21.887779 [DEBUG] [main.cpp:104] [testFunc] testFunc() [1] at thread [ 4120] output
  9. 2018-11-25 18:24:21.887813 [DEBUG] [main.cpp:104] [testFunc] testFunc() [1] at thread [ 4122] output
  10. 2018-11-25 18:24:21.888909 [DEBUG] [main.cpp:104] [testFunc] testFunc() [1] at thread [ 4121] output
  11. 2018-11-25 18:24:22.888049 [DEBUG] [main.cpp:104] [testFunc] testFunc() [2] at thread [ 4120] output
  12. 2018-11-25 18:24:22.888288 [DEBUG] [main.cpp:104] [testFunc] testFunc() [2] at thread [ 4122] output
  13. 2018-11-25 18:24:22.889978 [DEBUG] [main.cpp:104] [testFunc] testFunc() [2] at thread [ 4121] output
  14. 2018-11-25 18:24:23.888467 [DEBUG] [main.cpp:104] [testFunc] testFunc() [3] at thread [ 4120] output
  15. 2018-11-25 18:24:23.888724 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4120 wakeup.
  16. 2018-11-25 18:24:23.888778 [DEBUG] [main.cpp:104] [testFunc] testFunc() [3] at thread [ 4122] output
  17. 2018-11-25 18:24:23.888806 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4122 wakeup.
  18. 2018-11-25 18:24:23.890413 [DEBUG] [main.cpp:104] [testFunc] testFunc() [3] at thread [ 4121] output
  19. 2018-11-25 18:24:23.890437 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4121 wakeup.
  20. 2018-11-25 18:24:24.889247 [DEBUG] [main.cpp:92] [priorityFunc] priorityFunc() [1at thread [ 4120] output
  21. 2018-11-25 18:24:24.891187 [DEBUG] [main.cpp:104] [testFunc] testFunc() [1] at thread [ 4121] output
  22. 2018-11-25 18:24:24.893163 [DEBUG] [main.cpp:104] [testFunc] testFunc() [1] at thread [ 4122] output
  23. 2018-11-25 18:24:25.889567 [DEBUG] [main.cpp:92] [priorityFunc] priorityFunc() [2at thread [ 4120] output
  24. 2018-11-25 18:24:25.891477 [DEBUG] [main.cpp:104] [testFunc] testFunc() [2] at thread [ 4121] output
  25. 2018-11-25 18:24:25.893450 [DEBUG] [main.cpp:104] [testFunc] testFunc() [2] at thread [ 4122] output
  26. 2018-11-25 18:24:26.890295 [DEBUG] [main.cpp:92] [priorityFunc] priorityFunc() [3at thread [ 4120] output
  27. 2018-11-25 18:24:26.890335 [TRACE] [ThreadPool.cpp:99] [take] ThreadPool::take() tid : 4120 wait.
  28. 2018-11-25 18:24:26.892265 [DEBUG] [main.cpp:104] [testFunc] testFunc() [3] at thread [ 4121] output
  29. 2018-11-25 18:24:26.892294 [TRACE] [ThreadPool.cpp:99] [take] ThreadPool::take() tid : 4121 wait.
  30. 2018-11-25 18:24:26.894274 [DEBUG] [main.cpp:104] [testFunc] testFunc() [3] at thread [ 4122] output
  31. 2018-11-25 18:24:26.894299 [TRACE] [ThreadPool.cpp:99] [take] ThreadPool::take() tid : 4122 wait.
  32. 2018-11-25 18:24:35.359003 [TRACE] [ThreadPool.cpp:37] [stop] ThreadPool::stop() stop.
  33. 2018-11-25 18:24:35.359043 [TRACE] [ThreadPool.cpp:42] [stop] ThreadPool::stop() notifyAll().
  34. 2018-11-25 18:24:35.359061 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4120 wakeup.
  35. 2018-11-25 18:24:35.359067 [TRACE] [ThreadPool.cpp:65] [threadLoop] ThreadPool::threadLoop() tid : 4120 exit.
  36. 2018-11-25 18:24:35.359080 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4122 wakeup.
  37. 2018-11-25 18:24:35.359090 [TRACE] [ThreadPool.cpp:65] [threadLoop] ThreadPool::threadLoop() tid : 4122 exit.
  38. 2018-11-25 18:24:35.359123 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4121 wakeup.
  39. 2018-11-25 18:24:35.359130 [TRACE] [ThreadPool.cpp:65] [threadLoop] ThreadPool::threadLoop() tid : 4121 exit.

3. 课程的线程池 

  1. // project4.cpp: 定义控制台应用程序的入口点。
  2. //
  3. #include "stdafx.h"
  4. #include <iostream>
  5. #include <vector>
  6. #include <map>
  7. #include <string>
  8. #include <thread>
  9. #include <list>
  10. #include <mutex>
  11. #include <future>
  12. using namespace std;
  13. class A {
  14. public:
  15. atomic<int> atm;
  16. A()
  17. {
  18. atm = 0;
  19. //auto atm2 = atm; //这种定义时初始化操作不允许,显示 “尝试引用已删除的函数”编译器内部肯定把拷贝构造函数给干掉了用 = delete
  20. //atomic<int> atm3 = atm;
  21. //atomic<int> atm2;
  22. //atm2 = atm; //尝试引用已删除的函数,拷贝赋值运算符也不让用
  23. //load():以原子方式读atomic对象的值
  24. atomic<int> atm2(atm.load()); //
  25. auto atm3(atm.load());
  26. //store()以原子方式写入内容
  27. atm2.store(12);
  28. atm2 = 12;
  29. }
  30. void outMsgRecvQueue()
  31. {
  32. //int command = 0;
  33. //while (true)
  34. //{
  35. // std::unique_lock<std::mutex> sbguard1(my_mutex1); //临界进去
  36. // my_cond.wait(sbguard1, [this] { //this,可以参考 未归类知识点,第八节
  37. // if (!msgRecvQueue.empty())
  38. // return true; // 该lambda返回true,则wait就返回,流程走下来,互斥锁被本线程拿到。
  39. // return false; //解锁并休眠,卡在wait等待被再次唤醒
  40. // });
  41. // //现在互斥锁锁着,流程走下来了,队列里有数据;
  42. // command = msgRecvQueue.front(); //返回第一个元素,但不检查元素是否存在;
  43. // msgRecvQueue.pop_front(); //移除第一个元素,但不返回;
  44. // sbguard1.unlock(); //因为unique_lock的灵活性,我们可以随时unlock解锁,以免锁住太长时间
  45. // cout << "outMsgRecvQueue()执行,取出一个元素" << command << endl;
  46. //} //end while
  47. while (true)
  48. {
  49. cout << atm << endl; //读atm是个原子操作,但是整个这一行代码并不是个原子操作;
  50. }
  51. }
  52. void inMsgRecvQueue() //unlock()
  53. {
  54. //for (int i = 0; i < 100000; ++i)
  55. //{
  56. // cout << "inMsgRecvQueue()执行,插入一个元素" << i << endl;
  57. // std::unique_lock<std::mutex> sbguard1(my_mutex1);
  58. // msgRecvQueue.push_back(i); //假设这个数字i就是我收到的命令,我直接弄到消息队列里边来;
  59. // my_cond.notify_one(); //我们尝试把wait()的线程唤醒,其实现在outMsgRecvQueue()中的my_cond.wait()已经醒了,但光醒了没有用,你这里要是不把互斥量撒开,醒了他也要堵在另外一个线程的wait()那里;
  60. //}
  61. for (int i = 0; i < 1000000; ++i)
  62. {
  63. atm += 1; //原子操作
  64. //atm = atm + 1; //不是原子操作
  65. }
  66. return;
  67. }
  68. private:
  69. std::list<int> msgRecvQueue; //容器(消息队列),专门用于代表玩家给咱们发送过来的命令。
  70. std::mutex my_mutex1; //创建了一个互斥量 (一把锁头)
  71. std::condition_variable my_cond; //生成一个条件对象
  72. };
  73. int main()
  74. {
  75. //一:补充一些知识点
  76. //1.1)虚假唤醒:wait中要有第二参数(lambda)并且这个lambda中要正确判断要处理的公共数据是否存在;
  77. //wait(),notify_one(),notify_all()
  78. //1.2)atomic ,10,11节都有介绍
  79. //二:浅谈线程池
  80. //2.1)场景设想
  81. //服务器程序,--》客户端, 每来 一个客户端,就创建 一个新线程为该客户提供服务。
  82. //a)网络游戏,2万玩家不可能给每个玩家创建个新线程,此程序写法在这种场景下不通;
  83. //b)程序稳定性问题:编写的代码中,偶尔创建一个线程这种代码,这种写法,就让人感到不安;
  84. //线程池:把一堆线程弄到一起,统一管理。这种统一管理调度,循环利用线程的方式,就叫线程池;
  85. //2.2)实现方式
  86. //在程序启时,我一次性的创建好一定数量的线程。108100-200,更让人放心,觉得程序代码更稳定;
  87. //三:线程创建数量谈
  88. //3.1)线程开的数量极限问题,2000个线程基本就是极限;再创建线程就崩溃;
  89. //3.2)线程创建数量建议
  90. //a)采用某些技术开发程序;api接口提供商建议你 创建线程数量 = cpu数量,cpu *2 ,cpu *2 +2,遵照专业建议和指示来,专业意见确保程序高效率执行
  91. //b)创建多线程完成业务; 一个线程等于一条执行通路; 100要堵塞充值,我们这里开110个线程,那是很合适的;
  92. //c)1800个线程,建议,线程数量尽量不要超过500个,能控制在200个之内;
  93. //四:c++11多线程总结
  94. //windows,linux;
  95. A myobja;
  96. std::thread myOutnMsgObj(&A::outMsgRecvQueue, &myobja); //第二个参数是 引用,才能保证线程里 用的是同一个对象。
  97. std::thread myInMsgObj(&A::inMsgRecvQueue, &myobja);
  98. std::thread myInMsgObj2(&A::inMsgRecvQueue, &myobja);
  99. myInMsgObj.join();
  100. myOutnMsgObj.join();
  101. myInMsgObj2.join();
  102. //int abc = 10'00; c++新标准允许这么写
  103. return 0;
  104. }