先上代码,声明部分
1 #ifndef _THREAD_POOL_ 2 #define _THREAD_POOL_ 3 4 #pragma once 5 6 #include<thread> 7 #include<condition_variable> 8 #include<mutex> 9 #include<shared_mutex> 10 #include"Loop_Queue.h" 11 #include<map> 12 #include<vector> 13 #include<exception> 14 15 #ifdef _WIN32 //sleep函数 16 #include<Windows.h> 17 #else 18 #include<unistd.h> 19 #endif // WIN32 20 21 22 #define EACH_MODIFCATION 2 //每次调整的线程数 23 24 25 26 27 28 enum RetStatus { // 运行状态的枚举 29 OK = 0, TODO,NOSUCHWORK, RUNNING, ERR 30 }; 31 32 33 typedef struct { //返回结构体,返回值和运行结果 34 RetStatus flag; 35 void *val; 36 }Ret_t; 37 38 typedef struct Task_t{ 39 int key; 40 void* (*work)(void*); 41 void* arg; 42 43 inline explicit Task_t(const int a=0) { //转换构造函数,使得可以(T)5 这样使用 44 key = a; 45 work = NULL; 46 arg = NULL; 47 } 48 }Task_t; 49 50 51 void fun() { 52 Task_t t(5); 53 Task_t a = (Task_t)5; 54 } 55 56 57 58 class ThreadPool 59 { 60 friend void ThreadTask(ThreadPool *pthis); 61 friend void Control(ThreadPool *p); 62 63 64 std::condition_variable TaskCondVar; 65 std::mutex mutexCond; 66 std::mutex TaskQLock; 67 Loop_Queue<Task_t> *TaskQ = NULL; 68 69 70 std::shared_timed_mutex RetTreeLock; 71 std::map<int, Ret_t> RetTree; //结果树 72 73 std::mutex keyidLock; 74 unsigned int keyid=1; 75 76 std::mutex KillLock; 77 unsigned Kill = 0; //要销毁的线程数 78 79 std::mutex SleepLock; 80 unsigned Sleep = 0; //阻塞线程数 81 82 unsigned sumThreads; //总线程数 仅由管理者修改 83 unsigned minThreads; //最小线程数 初始化后不修改 84 unsigned maxThreads; //最大线程数 初始化后不修改 85 bool PoolAlive = true; //线程池是否要回收(存活) 86 87 88 89 std::thread *Controller=NULL; 90 91 92 public: 93 94 Ret_t GetResult(int key); //根据key返回任务执行状态 95 96 int PushTask(void*(*task)(void*),void*argc=NULL,bool ifneedRet=false); //类外调用,提交任务,是否需要返回值 97 98 99 100 public: 101 ThreadPool(int minThreads=4,int maxThreads=32,int maxTaskQue=1000); 102 ~ThreadPool(); 103 }; 104 105 106 107 108 #endif // !_THREAD_POOL_
windows的sleep和Linux的sleep不一样,所以只好#ifdef ,大概是Linux: usleep(微妙) sleep(秒) windows:Sleep(毫秒),只有这部分必须要涉及平台特性
构造和析构:
1 ThreadPool::ThreadPool(int minThreads,int maxThreads,int maxTaskQueue) //栈区列表初始化(本来有要初始化的,现在没了) 2 { 3 //不得不说,用c++特性的锁还能简化mutex的初始化销毁等等操作 4 this->maxThreads = maxThreads; 5 this->minThreads = minThreads; 6 sumThreads = minThreads; 7 TaskQ = new Loop_Queue<Task_t>(maxTaskQueue); //构造队列 8 9 10 11 //线程要最后创建以免上面的对象还没构造完成就被访问, 12 Controller = new std::thread(Control, this); //线程池管理者 13 14 for (int i = 0; i < minThreads; ++i) { 15 std::thread* aThread = new std::thread(ThreadTask, this); 16 aThread->detach(); //暂时就让子线程自生自灭... 17 18 19 delete aThread; 20 //new的空间还是要释放,这里我研究了好一阵,堆上new的空间detach之后立刻delete没问题 21 //防止内存泄漏,虽然一次8字节,不过就是没了指针,没有直接管理线程的手段了,getID,nativeHandle等等 22 23 } 24 25 26 27 } 28 29 ThreadPool::~ThreadPool() //析构有好几层析构,会比较慢 30 { 31 this->PoolAlive = false; //唯一修改点,代表要回收线程池 32 33 if(Controller->joinable()) 34 Controller->join(); //把线程池管理者回收了,以确定全部线程被销毁 35 36 37 delete Controller; 38 delete TaskQ; 39 40 }
构造函数的一个要点:必须要先把一些东西都创建好了,再启动线程,因为线程可能会访问那些(尚未创建好)的元素
1 //#include<stdio.h> 2 //#define _CRT_SECURE_NO_WARNINGS 3 #include<iostream> 4 #include<map> 5 #include<string> 6 #include"ThreadPool.h" 7 #include<Windows.h> 8 using namespace std; 9 10 11 bool ifPrime(unsigned a) { 12 if (a < 2) { 13 return false; 14 } 15 for (unsigned i = 2; i < a; ++i) { 16 if (a%i == 0) 17 return false; 18 } 19 return true; 20 } 21 22 23 24 void *test(void *arg) { 25 int a = (int)arg; 26 27 vector<unsigned> *prime=new vector<unsigned>; 28 29 for (unsigned i = a; i < a+10'0000; ++i) { 30 //Sleep(20); 31 if(ifPrime(i)) 32 prime->push_back(i); 33 } 34 for (int i = 0; i < prime->size(); ++i) { 35 cout << (*prime)[i] << "\t"; 36 } 37 38 39 40 return NULL; 41 42 } 43 44 45 46 int main() { 47 48 freopen("out.txt", "w", stdout); 49 ThreadPool MyPool(2,32); 50 MyPool.PushTask(test, (void*)2); 51 Sleep(400); 52 MyPool.PushTask(test, (void*)10'0002); 53 MyPool.PushTask(test, (void*)20'0002); 54 Sleep(400); 55 MyPool.PushTask(test, (void*)30'0002); //模拟不定时插入任务 56 MyPool.PushTask(test, (void*)40'0002); //模拟不定时插入任务 57 MyPool.PushTask(test, (void*)50'0002); //模拟不定时插入任务 58 MyPool.PushTask(test, (void*)60'0002); //模拟不定时插入任务 59 Sleep(2000); 60 MyPool.PushTask(test, (void*)70'0002); //模拟不定时插入任务 61 MyPool.PushTask(test, (void*)80'0002); //模拟不定时插入任务 62 MyPool.PushTask(test, (void*)90'0002); //模拟不定时插入任务 63 64 65 Sleep(1000*2000); 66 67 68 return 0; 69 }
这个任务就是求素数集合,
可以看到,刚开始运行,4线程,1个主线程,1个管理者线程,剩余两个工作线程满载,在我8核CPU上占用率正好是25%
运行一段时间之后,线程数由管理者线程不断增加(由于所有线程都在忙),从右侧来看CPU占用率是略有突变的上升,正是每次开启两个任务线程,且都忙
由于我开了录屏占用一部分CPU之外,几乎Cases程序占用了所有CPU,没有损耗太多CPU
再过一段时间,部分任务线程休眠,但是不到总量的一半,线程管理者不动,再过一段时间,大多数线程都阻塞在条件变量上,过半了总线程数,于是开始销毁线程,
可以看到,线程总数也是每次两个两个削减,直到到最少线程数
所有Push的任务结束