当前位置:   article > 正文

线程池666666

线程池666666

1. 作用

线程池内部维护了多个工作线程,每个工作线程都会去任务队列中拿取任务并执行,当执行完一个任务后不是马上销毁,而是继续保留执行其它任务。显然,线程池提高了多线程的复用率,减少了创建和销毁线程的时间。

2. 实现原理

线程池内部由任务队列、工作线程和管理者线程组成。

任务队列:存储需要处理的任务。每个任务其实就是具体的函数,在任务队列中存储函数指针和对应的实参。当工作线程获取任务后,就能根据函数指针来调用指定的函数。其实现可以是数组、链表、STL容器等。

工作线程:有N个工作线程,每个工作线程会去任务队列中拿取任务,然后执行具体的任务。当任务被处理后,任务队列中就不再有该任务了。当任务队列中没有任务时,工作线程就会阻塞。

管理者线程:周期性检测忙碌的工作线程数量和任务数量。当任务较多线程不够用时,管理者线程就会多创建几个工作线程来加快处理(不会超过工作线程数量的上限)。当任务较少线程空闲多时,管理者线程就会销毁几个工作线程来减少内存占用(不会低于工作线程数量的下限)。

注意:线程池中没有维护“生产者线程”,所谓的“生产者线程”就是往任务队列中添加任务的线程。

3. 手撕线程池

参考来源:爱编程的大丙。

【1】threadpool.c:

  1. #include "threadpool.h"
  2. #include <pthread.h>
  3. #include <stdlib.h>
  4. #include <string.h>
  5. #include <unistd.h>
  6. #include <stdio.h>
  7. #define NUMBER 2 //管理者线程增加或减少的工作线程数量
  8. //任务结构体
  9. typedef struct Task {
  10. void (*func)(void* arg);
  11. void* arg;
  12. } Task;
  13. //线程池结构体
  14. struct ThreadPool {
  15. //任务队列,视为环形队列
  16. Task* taskQ;
  17. int queueCapacity; //队列容量
  18. int queueSize; //当前任务个数
  19. int queueFront; //队头 -> 取任务
  20. int queueRear; //队尾 -> 加任务
  21. //线程相关
  22. pthread_t managerID; //管理者线程ID
  23. pthread_t* threadIDs; //工作线程ID
  24. int minNum; //工作线程最小数量
  25. int maxNum; //工作线程最大数量
  26. int busyNum; //工作线程忙的数量
  27. int liveNum; //工作线程存活数量
  28. int exitNum; //要销毁的工作线程数量
  29. pthread_mutex_t mutexPool; //锁整个线程池
  30. pthread_mutex_t mutexBusy; //锁busyNum
  31. pthread_cond_t notFull; //任务队列是否满
  32. pthread_cond_t notEmpty; //任务队列是否空
  33. //线程池是否销毁
  34. int shutdown; //释放为1,否则为0
  35. };
  36. /***************************************************************
  37. * 函 数: threadPoolCreate
  38. * 功 能: 创建线程池并初始化
  39. * 参 数: min---工作线程的最小数量
  40. * max---工作线程的最大数量
  41. * capacity---任务队列的最大容量
  42. * 返回值: 创建的线程池的地址
  43. **************************************************************/
  44. ThreadPool* threadPoolCreate(int min, int max, int capacity)
  45. {
  46. //申请线程池空间
  47. ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
  48. do {//此处循环只是为了便于失败释放空间,只会执行一次
  49. if (pool == NULL) {
  50. printf("pool create error!\n");
  51. break;
  52. }
  53. //申请任务队列空间,并初始化
  54. pool->taskQ = (Task*)malloc(sizeof(Task) * capacity);
  55. if (pool->taskQ == NULL) {
  56. printf("Task create error!\n");
  57. break;
  58. }
  59. pool->queueCapacity = capacity;
  60. pool->queueSize = 0;
  61. pool->queueFront = 0;
  62. pool->queueRear = 0;
  63. //初始化互斥锁和条件变量
  64. if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
  65. pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
  66. pthread_cond_init(&pool->notFull, NULL) != 0 ||
  67. pthread_cond_init(&pool->notEmpty, NULL) != 0)
  68. {
  69. printf("mutex or cond create error!\n");
  70. break;
  71. }
  72. //初始化shutdown
  73. pool->shutdown = 0;
  74. //初始化线程相关参数
  75. pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
  76. if (pool->threadIDs == NULL) {
  77. printf("threadIDs create error!\n");
  78. break;
  79. }
  80. memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
  81. pool->minNum = min;
  82. pool->maxNum = max;
  83. pool->busyNum = 0;
  84. pool->liveNum = min;
  85. pool->exitNum = 0;
  86. //创建管理者线程和工作线程
  87. pthread_create(&pool->managerID, NULL, manager, pool);//创建管理线程
  88. for (int i = 0; i < min; ++i) {
  89. pthread_create(&pool->threadIDs[i], NULL, worker, pool);//创建工作线程
  90. }
  91. return pool;
  92. } while (0);
  93. //申请资源失败,释放已分配的资源
  94. if (pool && pool->taskQ) free(pool->taskQ);
  95. if (pool && pool->threadIDs) free(pool->threadIDs);
  96. if (pool) free(pool);
  97. return NULL;
  98. }
  99. /***************************************************************
  100. * 函 数: threadPoolDestroy
  101. * 功 能: 销毁线程池
  102. * 参 数: pool---要销毁的线程池
  103. * 返回值: 0表示销毁成功,-1表示销毁失败
  104. **************************************************************/
  105. int threadPoolDestroy(ThreadPool* pool)
  106. {
  107. if (!pool) return -1;
  108. //关闭线程池
  109. pool->shutdown = 1;
  110. //阻塞回收管理者线程
  111. pthread_join(pool->managerID, NULL);
  112. //唤醒所有工作线程,让其自杀
  113. for (int i = 0; i < pool->liveNum; ++i) {
  114. pthread_cond_signal(&pool->notEmpty);
  115. }
  116. //释放所有互斥锁和条件变量
  117. pthread_mutex_destroy(&pool->mutexBusy);
  118. pthread_mutex_destroy(&pool->mutexPool);
  119. pthread_cond_destroy(&pool->notEmpty);
  120. pthread_cond_destroy(&pool->notFull);
  121. //释放堆空间
  122. if (pool->taskQ) {
  123. free(pool->taskQ);
  124. pool->taskQ = NULL;
  125. }
  126. if (pool->threadIDs) {
  127. free(pool->threadIDs);
  128. pool->threadIDs = NULL;
  129. }
  130. free(pool);
  131. pool = NULL;
  132. return 0;
  133. }
  134. /***************************************************************
  135. * 函 数: threadPoolAdd
  136. * 功 能: 生产者往线程池的任务队列中添加任务
  137. * 参 数: pool---线程池
  138. * func---函数指针,要执行的任务地址
  139. * arg---func指向的函数的实参
  140. * 返回值: 无
  141. **************************************************************/
  142. void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
  143. {
  144. pthread_mutex_lock(&pool->mutexPool);
  145. //任务队列满,阻塞生产者
  146. while (pool->queueSize == pool->queueCapacity && !pool->shutdown) {
  147. pthread_cond_wait(&pool->notFull, &pool->mutexPool);
  148. }
  149. //判断线程池是否关闭
  150. if (pool->shutdown) {
  151. pthread_mutex_unlock(&pool->mutexPool);
  152. return;
  153. }
  154. //添加任务进pool->taskQ
  155. pool->taskQ[pool->queueRear].func = func;
  156. pool->taskQ[pool->queueRear].arg = arg;
  157. pool->queueSize++;
  158. pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
  159. pthread_cond_signal(&pool->notEmpty);//唤醒工作线程
  160. pthread_mutex_unlock(&pool->mutexPool);
  161. }
  162. /***************************************************************
  163. * 函 数: getThreadPoolBusyNum
  164. * 功 能: 获取线程池忙的工作线程数量
  165. * 参 数: pool---线程池
  166. * 返回值: 忙的工作线程数量
  167. **************************************************************/
  168. int getThreadPoolBusyNum(ThreadPool* pool)
  169. {
  170. pthread_mutex_lock(&pool->mutexBusy);
  171. int busyNum = pool->busyNum;
  172. pthread_mutex_unlock(&pool->mutexBusy);
  173. return busyNum;
  174. }
  175. /***************************************************************
  176. * 函 数: getThreadPoolAliveNum
  177. * 功 能: 获取线程池存活的工作线程数量
  178. * 参 数: pool---线程池
  179. * 返回值: 存活的工作线程数量
  180. **************************************************************/
  181. int getThreadPoolAliveNum(ThreadPool* pool)
  182. {
  183. pthread_mutex_lock(&pool->mutexPool);
  184. int liveNum = pool->liveNum;
  185. pthread_mutex_unlock(&pool->mutexPool);
  186. return liveNum;
  187. }
  188. /***************************************************************
  189. * 函 数: worker
  190. * 功 能: 工作线程的执行函数
  191. * 参 数: arg---实参传入,这里传入的是线程池
  192. * 返回值: 空指针
  193. **************************************************************/
  194. void* worker(void* arg)
  195. {
  196. ThreadPool* pool = (ThreadPool*)arg;
  197. while (1) {
  198. /* 1.取出任务队列中的队头任务 */
  199. pthread_mutex_lock(&pool->mutexPool);
  200. //无任务就阻塞线程
  201. while (pool->queueSize == 0 && !pool->shutdown) {
  202. pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
  203. //唤醒后,判断是不是要销毁线程
  204. if (pool->exitNum > 0) {//线程自杀
  205. pool->exitNum--;//销毁指标-1
  206. if (pool->liveNum > pool->minNum) {
  207. pool->liveNum--;//活着的工作线程-1
  208. pthread_mutex_unlock(&pool->mutexPool);
  209. threadExit(pool);
  210. }
  211. }
  212. }
  213. //线程池关闭了就退出线程
  214. if (pool->shutdown) {
  215. pthread_mutex_unlock(&pool->mutexPool);
  216. threadExit(pool);
  217. }
  218. //取出pool中taskQ的任务
  219. Task task;
  220. task.func = pool->taskQ[pool->queueFront].func;
  221. task.arg = pool->taskQ[pool->queueFront].arg;
  222. pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;//移动队头
  223. pool->queueSize--;
  224. //通知生产者添加任务
  225. pthread_cond_signal(&pool->notFull);
  226. pthread_mutex_unlock(&pool->mutexPool);
  227. /* 2.设置pool的busyNum+1 */
  228. pthread_mutex_lock(&pool->mutexBusy);
  229. pool->busyNum++;
  230. pthread_mutex_unlock(&pool->mutexBusy);
  231. /* 3.执行取出的任务 */
  232. printf("thread %ld start working ...\n", pthread_self());
  233. task.func(task.arg);
  234. free(task.arg);
  235. task.arg = NULL;
  236. printf("thread %ld end working ...\n", pthread_self());
  237. /* 4.设置pool的busyNum-1 */
  238. pthread_mutex_lock(&pool->mutexBusy);
  239. pool->busyNum--;
  240. pthread_mutex_unlock(&pool->mutexBusy);
  241. }
  242. return NULL;
  243. }
  244. /***************************************************************
  245. * 函 数: manager
  246. * 功 能: 管理者线程的执行函数
  247. * 参 数: arg---实参传入,这里传入的是线程池
  248. * 返回值: 空指针
  249. **************************************************************/
  250. void* manager(void* arg)
  251. {
  252. ThreadPool* pool = (ThreadPool*)arg;
  253. while (!pool->shutdown) {
  254. /* 每隔3秒检测一次 */
  255. sleep(3);
  256. /* 获取pool中相关变量 */
  257. pthread_mutex_lock(&pool->mutexPool);
  258. int taskNum = pool->queueSize; //任务队列中的任务数量
  259. int liveNum = pool->liveNum; //存活的工作线程数量
  260. int busyNum = pool->busyNum; //忙碌的工作线程数量
  261. pthread_mutex_unlock(&pool->mutexPool);
  262. /* 功能一:增加工作线程,每次增加NUMBER个 */
  263. //当任务个数大于存活工作线程数,且存活工作线程数小于最大值
  264. if (taskNum > liveNum && liveNum < pool->maxNum) {
  265. pthread_mutex_lock(&pool->mutexPool);
  266. int counter = 0;
  267. for (int i = 0; i < pool->maxNum && counter < NUMBER
  268. && pool->liveNum < pool->maxNum; ++i)
  269. {
  270. if (pool->threadIDs[i] == 0) {
  271. pthread_create(&pool->threadIDs[i], NULL, worker, pool);
  272. counter++;
  273. pool->liveNum++;
  274. }
  275. }
  276. pthread_mutex_unlock(&pool->mutexPool);
  277. }
  278. /* 功能二:销毁工作线程,每次销毁NUMBER个 */
  279. //当忙的线程数*2 < 存活线程数,且存活线程数 > 最小线程数
  280. if (busyNum * 2 < liveNum && liveNum > pool->minNum) {
  281. pthread_mutex_lock(&pool->mutexPool);
  282. pool->exitNum = NUMBER;
  283. //唤醒NUMBER个工作线程,让其解除阻塞,在worker函数中自杀
  284. for (int i = 0; i < NUMBER; ++i) {
  285. pthread_cond_signal(&pool->notEmpty);
  286. }
  287. pthread_mutex_unlock(&pool->mutexPool);
  288. }
  289. }
  290. return NULL;
  291. }
  292. /***************************************************************
  293. * 函 数: threadExit
  294. * 功 能: 工作线程退出函数,将工作线程的ID置为0,然后退出
  295. * 参 数: pool---线程池
  296. * 返回值: 无
  297. **************************************************************/
  298. void threadExit(ThreadPool* pool)
  299. {
  300. //将pool->threadIDs中的ID改为0
  301. pthread_t tid = pthread_self();
  302. for (int i = 0; i < pool->maxNum; i++) {
  303. if (pool->threadIDs[i] == tid) {
  304. pool->threadIDs[i] = 0;
  305. printf("threadExit() called, %ld exiting...\n", tid);
  306. break;
  307. }
  308. }
  309. pthread_exit(NULL);//退出
  310. }

【2】threadpool.h:

  1. #ifndef _THREADPOOL_H
  2. #define _THREADPOOL_H
  3. typedef struct ThreadPool ThreadPool;
  4. //创建线程池并初始化
  5. ThreadPool* threadPoolCreate(int min, int max, int capacity);
  6. //销毁线程池
  7. int threadPoolDestroy(ThreadPool* pool);
  8. //给线程池添加任务
  9. void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);
  10. //获取当前忙碌的工作线程的数量
  11. int getThreadPoolBusyNum(ThreadPool* pool);
  12. //获取当前存活的工作线程的数量
  13. int getThreadPoolAliveNum(ThreadPool* pool);
  14. /*********************其它函数**********************/
  15. void* worker(void* arg);//工作线程的执行函数
  16. void* manager(void* arg);//管理者线程的执行函数
  17. void threadExit(ThreadPool* pool);//线程退出函数
  18. #endif

【3】main.c:

  1. #include <stdio.h>
  2. #include "threadpool.h"
  3. #include <stdlib.h>
  4. #include <unistd.h>
  5. #include <pthread.h>
  6. //任务函数,所有线程都执行此任务
  7. void testFunc(void* arg)
  8. {
  9. int* num = (int*)arg;
  10. printf("thread %ld is working, number = %d\n", pthread_self(), *num);
  11. sleep(1);
  12. }
  13. int main()
  14. {
  15. //创建线程池: 最少3个工作线程,最多10个,任务队列容量为100
  16. ThreadPool* pool = threadPoolCreate(3, 10, 100);
  17. //加入100个任务于任务队列
  18. for (int i = 0; i < 100; ++i) {
  19. int* num = (int*)malloc(sizeof(int));
  20. *num = i + 100;
  21. threadPoolAdd(pool, testFunc, num);
  22. }
  23. //销毁线程池
  24. sleep(30);//保证任务全部运行完毕
  25. threadPoolDestroy(pool);
  26. return 0;
  27. }

【4】运行结果:

......

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号