1 void ThreadTask(ThreadPool*p) { 2 3 Task_t now; //储存当前任务的一些信息,函数地址,参数等等 4 void *ret; 5 std::unique_lock<std::mutex> tLock(p->mutexCond); //p->mutexCond是公用的 6 tLock.unlock(); 7 while (1) { 8 AGAIN: 9 while (1) //循环防止虚假唤醒,检测是否要销毁的循环 10 { 11 12 p->KillLock.lock(); //如果发现线程唤醒之后要销毁,则销毁 13 if (p->Kill > 0) { 14 --p->Kill; 15 p->KillLock.unlock(); 16 return; 17 } 18 p->KillLock.unlock(); 19 20 21 p->TaskQLock.lock(); 22 if (p->TaskQ->Empty()) { //队列为空则阻塞到条件变量上 23 p->TaskQLock.unlock(); 24 25 p->SleepLock.lock(); //阻塞线程数加一 26 p->Sleep++; 27 p->SleepLock.unlock(); 28 29 tLock.lock(); 30 /* 31 这个位置调试了好久,tLock是一个模板式锁,当构造(mutex&)的时候会立刻试着锁上, 32 同时标记本tLock是持有者 (步进调试出来的), 33 当析构的时候会释放锁,但是要循环使用的话,每一个线程都要有一个自己的tLock,内部对同一个mutex上锁 34 为了解决我condition_variable是循环的,所以使用tLock显式的Lock和unLock,构造放在循环外, 35 同时立刻解锁 36 */ 37 p->TaskCondVar.wait(tLock); //等待条件变量,参数必须是"模板锁".. 38 tLock.unlock(); 39 /* 40 wait有重载方法,第二个参数可以是一个谓词,false的时候阻塞,并且被唤醒后会再次判断是否为false 41 不过,这里唤醒之后我还要判断是否要销毁线程,故用基本的就好,自己写循环 42 */ 43 44 p->SleepLock.lock(); //阻塞线程数减一 45 p->Sleep--; 46 p->SleepLock.unlock(); 47 48 } 49 else { 50 p->TaskQLock.unlock(); 51 break; 52 } 53 54 } 55 56 57 p->TaskQLock.lock(); //获得一个任务 58 now =p->TaskQ->Pop(); 59 p->TaskQLock.unlock(); 60 if (now.work == NULL) 61 goto AGAIN; //Pop出错,返回继续 62 63 //开始执行一个任务,修改任务状态 64 if (now.key == 0) //代表无需返回值的任务 65 { 66 try 67 { 68 now.work(now.arg); 69 } 70 catch (...) 71 { 72 perror("An error occured in a NoReturn work,stopped"); 73 continue; 74 } 75 76 } 77 else { //需要返回值的任务 78 p->RetTreeLock.lock_shared(); //读锁 79 p->RetTree[now.key].flag = RUNNING; 80 p->RetTreeLock.unlock_shared(); //读锁释放 81 82 try { 83 ret = now.work(now.arg); //执行任务 84 } 85 catch (...) { 86 perror("An error occured in a ReturnAble work,stopped"); 87 p->RetTreeLock.lock_shared(); 88 89 p->RetTree[now.key].val = NULL; //虽然任务失败,ret也许没意义,但是还是给调用者 90 p->RetTree[now.key].flag = ERR; //修改任务状态为失败 91 p->RetTreeLock.unlock_shared(); 92 continue; 93 } 94 95 p->RetTreeLock.lock_shared(); 96 p->RetTree[now.key].flag = OK; //任务执行完成 97 p->RetTree[now.key].val = ret; 98 p->RetTreeLock.unlock_shared(); 99 100 } 101 } 102 103 return; 104 }
上面部分是工作者线程,循环取任务,try执行,条件变量阻塞,push任务的时候会唤醒一个,而且本身就会有执行完发现有任务,继续执行不等待的,所以不会有明明有任务,却线程都在等待的情况
值得一说的一点是,由于map不管是用哈希表,还是红黑树,对单独一个节点的内容值操作都不影响其他节点,故这里用读写锁去锁定 result map,当要增删节点的时候,写锁定,否则读锁定,增删节点会影响其他节点
还有,这篇的函数都是类外函数,是线程池的友元函数,因为线程函数不可以是类内成员函数(Linux下是这样,C++11好像不限制,但是写成友元也没得影响,这里通过p指针代替this指针手动传递)
下面是线程管理者模块,自动根据任务多少动态的调整线程数,在最小线程数和最大线程数之间 为什么需要动态调整这些线程数呢? 想象一个场景,4个服务窗口,四个大汉排在前面进行很长的业务,后来的人
也许只是一个很短的任务却不得不等待前面的人结束任务, 就像在分组交换和报文交换方式一样.相对更公平合理一点,概括来说就是,在大量业务的情况下,使得每个业务都能相对平均的执行完,而不是前面的很快,
后面的纯等
1 void Control(ThreadPool* p) { 2 /* 动态管理线程数量,每过一段时间查看一次线程数量, 3 连续3次全部忙线程则增加EACH_MODIFCATION个,连续3次少于一半忙碌则减少EACH_MODIFCATION个 4 为什么要这么做?使得每个任务都有比较平均的执行时间,而不是先到的任务快速执行,后面的任务队列很长 5 有点像分组交换和报文交换 6 7 */ 8 9 int v=0; //指示向量v 10 unsigned sleepy; 11 int modi = EACH_MODIFCATION; 12 std::thread* tmp; 13 14 15 while (1) { 16 17 18 #ifdef _WIN32 19 Sleep(2000); //睡眠2000ms 20 #else 21 usleep(500 * 1000); //Linux 22 #endif // _WIN32 23 24 25 if (p->PoolAlive != true) 26 break; 27 28 sleepy = p->Sleep; //只读可以不获取锁,没什么影响 29 30 if (sleepy > p->sumThreads / 2&&p->sumThreads>p->minThreads) { //过半在睡眠,且不是最小线程数 31 --v; 32 } 33 else if (sleepy == 0&&p->sumThreads<p->maxThreads) { //全部在忙,且不是最大线程数 34 ++v; 35 } 36 37 if (v < -2) { 38 ++v; //还原v,再次满线程再加 39 modi = EACH_MODIFCATION; 40 41 //如果减少线程后小于最小线程,则减差值 42 if (p->sumThreads - EACH_MODIFCATION < p->minThreads) 43 modi = p->sumThreads - p->minThreads; 44 p->KillLock.lock(); 45 p->Kill += modi; 46 p->KillLock.unlock(); 47 p->sumThreads -= modi; //总线程数预先调整 48 for (int i = 0; i < modi; i++) { 49 p->TaskCondVar.notify_one(); //唤醒一个线程,让其结束 50 } 51 } 52 else if(v>2) { 53 --v; 54 modi = EACH_MODIFCATION; 55 if (p->sumThreads + EACH_MODIFCATION > p->maxThreads) 56 modi = p->maxThreads - p->sumThreads; 57 for (int i = 0; i < modi; ++i) { //创建modi个线程 58 tmp = new std::thread(ThreadTask, p); 59 tmp->detach(); 60 delete tmp; 61 } 62 p->sumThreads += modi; 63 64 } 65 } //控制循环 66 67 //结束线程池 68 p->Kill = 88888;//销毁所有任务线程 69 70 71 do{ 72 p->TaskCondVar.notify_all(); //唤醒所有的阻塞线程,逻辑上没问题,如果有没唤醒的... 73 #ifdef _WIN32 74 Sleep(200); 75 #else 76 usleep(1000 * 200); 77 #endif // _WIN32 78 79 } while (88888 - p->Kill == p->sumThreads); //直到所有工作线程结束(每个线程退出会把Kill减一) 80 81 return; 82 }
线程池管理者不是detach的,为了实现在析构掉整个线程池的时候,确保所有已经在执行的任务都会被执行完,(新来的任务不一定会再执行)
析构的时候join它