当前位置:   article > 正文

C++线程池的实现_c++ 线程池的实现

c++ 线程池的实现

池式结构

在计算机体系结构中有许多池式结构:内存池、数据库连接池、请求池、消息队列、对象池等等。
池式结构解决的主要问题为缓冲问题,起到的是缓冲区的作⽤。
 
线程池
通过使⽤线程池,我们可以有效降低多线程操作中任务申请和释放产⽣的性能消耗。特别是当我们每个线程的任务处理⽐较快时,系统
⼤部分性能消耗都花在了pthread_create以及释放线程的过程中。那既然是这样的话,何不在程序开始运⾏阶段提前创建好⼀堆线程,等
我们需要⽤的时候只要去这⼀堆线程中领⼀个线程,⽤完了再放回去,等程序运⾏结束时统⼀释放这⼀堆线程呢?,同时我们可以对线程进行统一的管理,能够根据处理消息的剩余多少,创建线程和销毁线程,按照这个想法,线程池出现了。
线线程程池解决的问题
1、解决任务处理,单线程处理比较慢。
2、阻塞IO
3、解决线程创建于销毁的成本问题
4、管理线程,能够动态的添加删除线程。
 
线程池适合场景
事实上,线程池并不是万能的。它有其特定的使⽤场合。线程池致⼒于减少线程本⾝的开销对应⽤所产⽣的影响,这是有前提的,前提
就是线程本⾝开销与线程执⾏任务相⽐不可忽略。如果线程本⾝的开销相对于线程任务执⾏开销⽽⾔是可以忽略不计的,那么此时线程池所
带来的好处是不明显的,⽐如对于FTP服务器以及Telnet服务器,通常传送⽂件的时间较长,开销较⼤,那么此时,我们采⽤线程池未必是
理想的⽅法,我们可以选择“即时创建,即时销毁”的策略。
总之线程池通常适合下⾯的⼏个场合:
(1)单位时间内处理任务频繁⽽且任务处理时间短
(2)对实时性要求较⾼。如果接受到任务后在创建线程,可能满⾜不了实时要求,因此必须采⽤线程池进⾏预创建。

线程池的组成主要分为 3 个部分,这三部分配合工作就可以得到一个完整的线程池:

1.任务队列,存储需要处理的任务,由工作的线程来处理这些任务
通过线程池提供的 API 函数,将一个待处理的任务添加到任务队列,或者从任务队列中删除
已处理的任务会被从任务队列中删除
线程池的使用者,也就是调用线程池函数往任务队列中添加任务的线程就是生产者线程


2.工作的线程(任务队列任务的消费者) ,N个
线程池中维护了一定数量的工作线程,他们的作用是是不停的读任务队列,从里边取出任务并处理
工作的线程相当于是任务队列的消费者角色,
如果任务队列为空,工作的线程将会被阻塞 (使用条件变量 / 信号量阻塞)
如果阻塞之后有了新的任务,由生产者将阻塞解除,工作线程开始工作


3.管理者线程(不处理任务队列中的任务),1个
它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测
当任务过多的时候,可以适当的创建一些新的工作线程
当任务过少的时候,可以适当的销毁一些工作的线程

4.线程池的池 结构定义:

线程池的示意图如下:

​​​​​​​

 

代码如下:

  1. //
  2. // Created by 张怀超 on 2022/4/27.
  3. //
  4. #include<pthread.h>
  5. #include<iostream>
  6. #include<unistd.h>
  7. using namespace std;
  8. //当发现线程,过少,或者过多当情况下,
  9. //一次创建和销毁当线程
  10. #define NUMBER 10
  11. // 任务结构体
  12. typedef struct Task
  13. {
  14. void (*function)(void* arg);
  15. void* arg;
  16. }Task;
  17. // 线程池结构体
  18. struct ThreadPool
  19. {
  20. // 任务队列
  21. Task* taskQ;
  22. int queueCapacity; // 容量
  23. int queueSize; // 当前任务个数
  24. int queueFront; // 队头 -> 取数据
  25. int queueRear; // 队尾 -> 放数据
  26. pthread_t managerID; // 管理者线程ID
  27. pthread_t *threadIDs; // 工作的线程ID
  28. int minNum; // 最小线程数量
  29. int maxNum; // 最大线程数量
  30. int busyNum; // 忙的线程的个数
  31. int liveNum; // 存活的线程的个数
  32. int exitNum; // 要销毁的线程个数
  33. pthread_mutex_t mutexPool; // 锁整个的线程池
  34. pthread_mutex_t mutexBusy; // 锁busyNum变量
  35. pthread_cond_t notFull; // 任务队列是不是满了
  36. pthread_cond_t notEmpty; // 任务队列是不是空了
  37. int shutdown; // 是不是要销毁线程池, 销毁为1, 不销毁为0
  38. };
  39. typedef struct ThreadPool ThreadPool;
  40. // 创建线程池并初始化
  41. ThreadPool *threadPoolCreate(int min, int max, int queueSize);
  42. // 销毁线程池
  43. int threadPoolDestroy(ThreadPool* pool);
  44. // 给线程池添加任务
  45. void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);
  46. // 获取线程池中工作的线程的个数
  47. int threadPoolBusyNum(ThreadPool* pool);
  48. // 获取线程池中活着的线程的个数
  49. int threadPoolAliveNum(ThreadPool* pool);
  50. //
  51. // 工作的线程(消费者线程)任务函数
  52. void* worker(void* arg);
  53. // 管理者线程任务函数
  54. void* manager(void* arg);
  55. // 单个线程退出
  56. void threadExit(ThreadPool* pool);
  57. ThreadPool* threadPoolCreate(int min, int max, int queueSize)
  58. {
  59. ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
  60. do
  61. {
  62. if (pool == NULL)
  63. {
  64. printf("malloc threadpool fail...\n");
  65. break;
  66. }
  67. pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
  68. if (pool->threadIDs == NULL)
  69. {
  70. printf("malloc threadIDs fail...\n");
  71. break;
  72. }
  73. memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
  74. pool->minNum = min;
  75. pool->maxNum = max;
  76. pool->busyNum = 0;
  77. pool->liveNum = min; // 和最小个数相等
  78. pool->exitNum = 0;
  79. if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
  80. pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
  81. pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
  82. pthread_cond_init(&pool->notFull, NULL) != 0)
  83. {
  84. printf("mutex or condition init fail...\n");
  85. break;
  86. }
  87. // 任务队列
  88. pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);
  89. pool->queueCapacity = queueSize;
  90. pool->queueSize = 0;
  91. pool->queueFront = 0;
  92. pool->queueRear = 0;
  93. pool->shutdown = 0;
  94. // 创建线程
  95. pthread_create(&pool->managerID, NULL, manager, pool);
  96. for (int i = 0; i < min; ++i)
  97. {
  98. pthread_create(&pool->threadIDs[i], NULL, worker, pool);
  99. }
  100. return pool;
  101. } while (0);
  102. // 释放资源
  103. if (pool && pool->threadIDs) free(pool->threadIDs);
  104. if (pool && pool->taskQ) free(pool->taskQ);
  105. if (pool) free(pool);
  106. return NULL;
  107. }
  108. int threadPoolDestroy(ThreadPool* pool)
  109. {
  110. if (pool == NULL)
  111. {
  112. return -1;
  113. }
  114. // 关闭线程池
  115. pool->shutdown = 1;
  116. // 阻塞回收管理者线程
  117. pthread_join(pool->managerID, NULL);
  118. // 唤醒阻塞的消费者线程
  119. for (int i = 0; i < pool->liveNum; ++i)
  120. {
  121. pthread_cond_signal(&pool->notEmpty);
  122. }
  123. // 释放堆内存
  124. if (pool->taskQ)
  125. {
  126. free(pool->taskQ);
  127. }
  128. if (pool->threadIDs)
  129. {
  130. free(pool->threadIDs);
  131. }
  132. pthread_mutex_destroy(&pool->mutexPool);
  133. pthread_mutex_destroy(&pool->mutexBusy);
  134. pthread_cond_destroy(&pool->notEmpty);
  135. pthread_cond_destroy(&pool->notFull);
  136. free(pool);
  137. pool = NULL;
  138. return 0;
  139. }
  140. void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
  141. {
  142. pthread_mutex_lock(&pool->mutexPool);
  143. while (pool->queueSize == pool->queueCapacity && !pool->shutdown)
  144. {
  145. // 阻塞生产者线程
  146. pthread_cond_wait(&pool->notFull, &pool->mutexPool);
  147. }
  148. if (pool->shutdown)
  149. {
  150. pthread_mutex_unlock(&pool->mutexPool);
  151. return;
  152. }
  153. // 添加任务
  154. pool->taskQ[pool->queueRear].function = func;
  155. pool->taskQ[pool->queueRear].arg = arg;
  156. pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
  157. pool->queueSize++;
  158. pthread_cond_signal(&pool->notEmpty);
  159. pthread_mutex_unlock(&pool->mutexPool);
  160. }
  161. int threadPoolBusyNum(ThreadPool* pool)
  162. {
  163. pthread_mutex_lock(&pool->mutexBusy);
  164. int busyNum = pool->busyNum;
  165. pthread_mutex_unlock(&pool->mutexBusy);
  166. return busyNum;
  167. }
  168. int threadPoolAliveNum(ThreadPool* pool)
  169. {
  170. pthread_mutex_lock(&pool->mutexPool);
  171. int aliveNum = pool->liveNum;
  172. pthread_mutex_unlock(&pool->mutexPool);
  173. return aliveNum;
  174. }
  175. void* worker(void* arg)
  176. {
  177. ThreadPool* pool = (ThreadPool*)arg;
  178. while (1)
  179. {
  180. pthread_mutex_lock(&pool->mutexPool);
  181. // 当前任务队列是否为空
  182. while (pool->queueSize == 0 && !pool->shutdown)
  183. {
  184. // 阻塞工作线程
  185. pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
  186. // 判断是不是要销毁线程
  187. if (pool->exitNum > 0)
  188. {
  189. pool->exitNum--;
  190. if (pool->liveNum > pool->minNum)
  191. {
  192. pool->liveNum--;
  193. pthread_mutex_unlock(&pool->mutexPool);
  194. threadExit(pool);
  195. }
  196. }
  197. }
  198. // 判断线程池是否被关闭了
  199. if (pool->shutdown)
  200. {
  201. pthread_mutex_unlock(&pool->mutexPool);
  202. threadExit(pool);
  203. }
  204. // 从任务队列中取出一个任务
  205. Task task;
  206. task.function = pool->taskQ[pool->queueFront].function;
  207. task.arg = pool->taskQ[pool->queueFront].arg;
  208. // 移动头结点
  209. pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
  210. pool->queueSize--;
  211. // 解锁
  212. pthread_cond_signal(&pool->notFull);
  213. pthread_mutex_unlock(&pool->mutexPool);
  214. printf("thread %ld start working...\n", pthread_self());
  215. pthread_mutex_lock(&pool->mutexBusy);
  216. pool->busyNum++;
  217. pthread_mutex_unlock(&pool->mutexBusy);
  218. task.function(task.arg);
  219. free(task.arg);
  220. task.arg = NULL;
  221. printf("thread %ld end working...\n", pthread_self());
  222. pthread_mutex_lock(&pool->mutexBusy);
  223. pool->busyNum--;
  224. pthread_mutex_unlock(&pool->mutexBusy);
  225. }
  226. return NULL;
  227. }
  228. void* manager(void* arg)
  229. {
  230. ThreadPool* pool = (ThreadPool*)arg;
  231. while (!pool->shutdown)
  232. {
  233. // 每隔3s检测一次
  234. sleep(3);
  235. // 取出线程池中任务的数量和当前线程的数量
  236. pthread_mutex_lock(&pool->mutexPool);
  237. int queueSize = pool->queueSize;
  238. int liveNum = pool->liveNum;
  239. pthread_mutex_unlock(&pool->mutexPool);
  240. // 取出忙的线程的数量
  241. pthread_mutex_lock(&pool->mutexBusy);
  242. int busyNum = pool->busyNum;
  243. pthread_mutex_unlock(&pool->mutexBusy);
  244. // 添加线程
  245. // 任务的个数>存活的线程个数 && 存活的线程数<最大线程数
  246. if (queueSize > liveNum && liveNum < pool->maxNum)
  247. {
  248. pthread_mutex_lock(&pool->mutexPool);
  249. int counter = 0;
  250. for (int i = 0; i < pool->maxNum && counter < NUMBER
  251. && pool->liveNum < pool->maxNum; ++i)
  252. {
  253. if (pool->threadIDs[i] == 0)
  254. {
  255. pthread_create(&pool->threadIDs[i], NULL, worker, pool);
  256. counter++;
  257. pool->liveNum++;
  258. }
  259. }
  260. pthread_mutex_unlock(&pool->mutexPool);
  261. }
  262. // 销毁线程
  263. // 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数
  264. if (busyNum * 2 < liveNum && liveNum > pool->minNum)
  265. {
  266. pthread_mutex_lock(&pool->mutexPool);
  267. pool->exitNum = NUMBER;
  268. pthread_mutex_unlock(&pool->mutexPool);
  269. // 让工作的线程自杀
  270. for (int i = 0; i < NUMBER; ++i)
  271. {
  272. pthread_cond_signal(&pool->notEmpty);
  273. }
  274. }
  275. }
  276. return NULL;
  277. }
  278. void threadExit(ThreadPool* pool)
  279. {
  280. pthread_t tid = pthread_self();
  281. for (int i = 0; i < pool->maxNum; ++i)
  282. {
  283. if (pool->threadIDs[i] == tid)
  284. {
  285. pool->threadIDs[i] = 0;
  286. printf("threadExit() called, %ld exiting...\n", tid);
  287. break;
  288. }
  289. }
  290. pthread_exit(NULL);
  291. }
  292. void taskFunc(void* arg)
  293. {
  294. int num = *(int*)arg;
  295. printf("thread %ld is working, number = %d\n",
  296. pthread_self(), num);
  297. sleep(1);
  298. }
  299. int main()
  300. {
  301. // 创建线程池
  302. ThreadPool* pool = threadPoolCreate(3, 10, 100);
  303. for (int i = 0; i < 100; ++i)
  304. {
  305. int* num = (int*)malloc(sizeof(int));
  306. *num = i + 100;
  307. threadPoolAdd(pool, taskFunc, num);
  308. }
  309. sleep(30);
  310. threadPoolDestroy(pool);
  311. return 0;
  312. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/621845
推荐阅读
相关标签
  

闽ICP备14008679号