当前位置:   article > 正文

C语言简易线程池实现_c线程池代码怎么编写

c线程池代码怎么编写

  1. #ifndef _THREADPOOL_H_
  2. #define _THREADPOOL_H_
  3. #define EXIT_NUM 10
  4. #define ADD_NUM 10
  5. #include<stdbool.h>
  6. #include<stdlib.h>
  7. #include<string.h>
  8. #include<unistd.h>
  9. #include<stdio.h>
  10. #include<pthread.h>
  11. //任务结构体
  12. typedef struct Task
  13. {
  14. void (*function)(void* arg);
  15. void* arg;
  16. }task;
  17. //线程池
  18. typedef struct ThreadPool
  19. {
  20. task* task_list; //任务队列
  21. int task_front; //任务队列头
  22. int task_behind; //任务队列尾
  23. int task_num; //队列中任务数
  24. int task_capacity; //任务队列容量
  25. pthread_t manage_thread; //管理者线程
  26. int min_thread_num; //最小线程数
  27. int max__thread_num; //最大线程数
  28. int live_thread_num; //存活线程数
  29. int busy_thread_num; //工作线程数
  30. int exit_thread_num; //要退出的线程数
  31. pthread_mutex_t pool_lock; //线程池锁
  32. pthread_mutex_t busy_num_lock; //忙线程锁
  33. pthread_cond_t can_add; //任务队列可添加
  34. pthread_cond_t can_work; //空闲线程可工作
  35. bool shutdown; //是否销毁线程池标志
  36. }thread_pool;
  37. //创建线程池
  38. thread_pool* create_thread_pool(int pthread_min, int pthread_max, int task_capacity);
  39. //添加任务
  40. int add_task(thread_pool* pool,void(*function)(void*),void* arg);
  41. //管理者线程函数
  42. void* manage(void* arg);
  43. //工作者线程函数
  44. void* worker(void* arg);
  45. //查询忙线程数
  46. int query_busy(thread_pool* pool);
  47. //查询存活线程数
  48. int query_live(thread_pool* pool);
  49. //修改线程池运行状态
  50. void shutdown_pool(thread_pool* pool);
  51. //释放资源
  52. void exit_pool(thread_pool* pool);
  53. #endif // _THREADPOOL_H_

  1. #include"ThreadPool.h"
  2. //创建线程池
  3. thread_pool* create_thread_pool(int pthread_min, int pthread_max, int task_capacity)
  4. {
  5. //开辟线程池
  6. thread_pool* pool = (thread_pool*)malloc(sizeof(thread_pool));
  7. if (pool == NULL){
  8. printf("create thread_pool error!\n");
  9. return NULL;
  10. }
  11. memset(pool, 0, sizeof(thread_pool));
  12. //创建任务队列
  13. pool->task_list = (task*)malloc(sizeof(task)*task_capacity);
  14. if(pool->task_list == NULL){
  15. printf("create task_list error!\n");
  16. free(pool);
  17. pool = NULL;
  18. return NULL;
  19. }
  20. memset(pool->task_list, 0, sizeof(task)*task_capacity);
  21. //初始化锁和条件变量
  22. pthread_mutex_init(&pool->pool_lock, NULL);
  23. pthread_mutex_init(&pool->busy_num_lock, NULL);
  24. pthread_cond_init(&pool->can_add, NULL);
  25. pthread_cond_init(&pool->can_work, NULL);
  26. //初始化任务队列
  27. pool->task_capacity = task_capacity;
  28. pool->task_front = 0;
  29. pool->task_behind = 0;
  30. pool->task_num = 0;
  31. //初始化工作线程信息
  32. pool->min_thread_num = pthread_min;
  33. pool->max__thread_num = pthread_max;
  34. pool->live_thread_num = 0;
  35. pool->exit_thread_num = 0;
  36. pool->busy_thread_num = 0;
  37. //线程池工作信息
  38. pool->shutdown = false;
  39. //创建管理线程
  40. pthread_create(&pool->manage_thread, PTHREAD_CREATE_JOINABLE, manage, pool);
  41. //创建初始工作线程 (初始工作线程 = 最小工作线程)
  42. pthread_mutex_lock(&pool->pool_lock);
  43. pthread_t thread;
  44. for (int i = 0; i < pthread_min; i++){
  45. pthread_create(&thread, PTHREAD_CREATE_JOINABLE, worker, pool);
  46. pool->live_thread_num++;
  47. }
  48. pthread_mutex_unlock(&pool->pool_lock);
  49. return pool;
  50. }
  51. //释放资源
  52. void exit_pool(thread_pool* pool) {
  53. thread_pool* p = pool;
  54. int live_num = 0;
  55. do {
  56. pthread_mutex_lock(&pool->pool_lock);
  57. live_num = pool->live_thread_num;
  58. if (live_num > 0) {
  59. pool->exit_thread_num = pool->live_thread_num;
  60. pthread_cond_signal(&pool->can_work);
  61. }
  62. pthread_mutex_unlock(&pool->pool_lock);
  63. } while (live_num > 0);
  64. //释放任务队列
  65. while (pool->task_num != 0) {
  66. if (pool->task_list[pool->task_front].arg != NULL) {
  67. free(pool->task_list[pool->task_front].arg);
  68. pool->task_list[pool->task_front].arg = NULL;
  69. }
  70. pool->task_front = (pool->task_front + 1) % pool->task_capacity;
  71. pool->task_num--;
  72. }
  73. //释放堆内存
  74. free(pool->task_list);
  75. pool->task_list = NULL;
  76. //销毁锁和条件变量
  77. pthread_mutex_destroy(&pool->busy_num_lock);
  78. pthread_mutex_destroy(&pool->pool_lock);
  79. pthread_cond_destroy(&pool->can_add);
  80. pthread_cond_destroy(&pool->can_work);
  81. }
  82. //添加任务
  83. int add_task(thread_pool* pool,void(*function)(void*), void* arg)
  84. {
  85. pthread_mutex_lock(&pool->pool_lock);
  86. while (pool->task_num == pool->task_capacity && pool->shutdown == false){ //任务队列满 阻塞等待can_add条件变量
  87. pthread_cond_wait(&pool->can_add, &pool->pool_lock);
  88. }
  89. if (pool->shutdown == true) { //线程池关闭 不再添加任务
  90. if (arg != NULL) {
  91. free(arg);
  92. arg = NULL;
  93. }
  94. return -1;
  95. }
  96. //向任务队列中添加任务
  97. pool->task_list[pool->task_behind].function = function;
  98. pool->task_list[pool->task_behind].arg = arg;
  99. pool->task_behind = (pool->task_behind + 1)%pool->task_capacity; //队尾移动
  100. if (pool->task_num == 0) { //任务队列不为空 -- 通知阻塞在can_work上的工作线程工作
  101. pthread_cond_signal(&pool->can_work);
  102. }
  103. pool->task_num++;
  104. pthread_mutex_unlock(&pool->pool_lock);
  105. return 0;
  106. }
  107. //管理者线程执行函数
  108. void* manage(void* arg)
  109. {
  110. thread_pool* pool = (thread_pool*)arg;
  111. while (1) {
  112. //退出部分闲置线程
  113. pthread_mutex_lock(&pool->pool_lock);
  114. if (pool->busy_thread_num * 2 < pool->live_thread_num
  115. && pool->task_num == 0
  116. && pool->live_thread_num - EXIT_NUM > pool->min_thread_num){
  117. pthread_mutex_unlock(&pool->pool_lock);
  118. sleep(1);
  119. pthread_mutex_lock(&pool->pool_lock);
  120. if (pool->busy_thread_num * 2 < pool->live_thread_num
  121. &&pool->task_num == 0
  122. && pool->live_thread_num - EXIT_NUM > pool->min_thread_num
  123. ) {
  124. pool->exit_thread_num = EXIT_NUM;
  125. pthread_mutex_unlock(&pool->pool_lock);
  126. int live_num = 0;
  127. int min_num = 0;
  128. int i = 0;
  129. do {
  130. pthread_mutex_lock(&pool->pool_lock);
  131. live_num = pool->live_thread_num;
  132. min_num = pool->live_thread_num;
  133. if (live_num > min_num) {
  134. pthread_cond_signal(&pool->can_work);
  135. }
  136. pthread_mutex_unlock(&pool->pool_lock);
  137. i++;
  138. live_num--;
  139. } while (live_num > min_num && i < EXIT_NUM);
  140. }
  141. else {
  142. pthread_mutex_unlock(&pool->pool_lock);
  143. }
  144. }
  145. else {
  146. pthread_mutex_unlock(&pool->pool_lock);
  147. }
  148. //添加线程
  149. pthread_mutex_lock(&pool->pool_lock);
  150. if (pool->live_thread_num + ADD_NUM <= pool->max__thread_num
  151. && pool->task_num >= pool->task_capacity * 0.9){ //存活线程数+添加线程数 <= 最大线程数 且 任务队列到达临界值
  152. pthread_t thread;
  153. //创建新线程
  154. for (int i = 0; i < ADD_NUM && pool->live_thread_num < pool->max__thread_num;i++) {
  155. pthread_create(&thread, PTHREAD_CREATE_JOINABLE, worker, pool);
  156. pool->live_thread_num++;
  157. }
  158. }
  159. pthread_mutex_unlock(&pool->pool_lock);
  160. //检查线程池状态
  161. pthread_mutex_lock(&pool->pool_lock);
  162. if (pool->shutdown) {
  163. pthread_mutex_unlock(&pool->pool_lock);
  164. exit_pool(pool);
  165. pthread_detach(pthread_self());
  166. pthread_exit(NULL);
  167. }
  168. pthread_mutex_unlock(&pool->pool_lock);
  169. sleep(1);
  170. }
  171. }
  172. //工作者线程执行函数
  173. void* worker(void* arg)
  174. {
  175. thread_pool* pool = (thread_pool*)arg;
  176. while (1) {
  177. pthread_mutex_lock(&pool->pool_lock);
  178. while (pool->task_num == 0){ //判断是否有任务
  179. pthread_cond_wait(&pool->can_work,&pool->pool_lock); //无任务时阻塞等待can_work条件变量
  180. if (pool->exit_thread_num > 0){ //判定是否要退出线程
  181. pool->live_thread_num--;
  182. pthread_mutex_unlock(&pool->pool_lock);
  183. pthread_detach(pthread_self());
  184. pthread_exit(NULL);
  185. }
  186. }
  187. //从任务队列中取出任务
  188. task obj = pool->task_list[pool->task_front];
  189. //修改队列头节点
  190. pool->task_front = (pool->task_front + 1) % pool->task_capacity; //任务取出 队头后移
  191. if (pool->task_num == pool->task_capacity) { //任务队列 :满->不满 通知添加任务函数添加任务
  192. pthread_cond_signal(&pool->can_add);
  193. }
  194. pool->task_num--;
  195. pthread_mutex_unlock(&pool->pool_lock);
  196. //修改忙线程数
  197. pthread_mutex_lock(&pool->busy_num_lock);
  198. pool->busy_thread_num++;
  199. pthread_mutex_unlock(&pool->busy_num_lock);
  200. //执行任务
  201. obj.function(obj.arg);
  202. if (obj.arg != NULL) { //任务函数参数为堆内存时 此处需要释放内存
  203. free(obj.arg);
  204. obj.arg = NULL;
  205. }
  206. //修改忙线程数
  207. pthread_mutex_lock(&pool->busy_num_lock);
  208. pool->busy_thread_num--;
  209. pthread_mutex_unlock(&pool->busy_num_lock);
  210. }
  211. }
  212. //查询忙线程数
  213. int query_busy(thread_pool* pool)
  214. {
  215. pthread_mutex_lock(&pool->busy_num_lock);
  216. int temp = pool->busy_thread_num;
  217. pthread_mutex_unlock(&pool->busy_num_lock);
  218. return temp;
  219. }
  220. //查询存活线程数
  221. int query_live(thread_pool* pool)
  222. {
  223. pthread_mutex_lock(&pool->pool_lock);
  224. int temp = pool->live_thread_num;
  225. pthread_mutex_unlock(&pool->pool_lock);
  226. return temp;
  227. }
  228. //关闭线程池
  229. void shutdown_pool(thread_pool* pool)
  230. {
  231. pthread_mutex_lock(&pool->pool_lock);
  232. pool->shutdown = true;
  233. pthread_mutex_unlock(&pool->pool_lock);
  234. }
  1. #include<stdio.h>
  2. #include"ThreadPool.h"
  3. #include<stdlib.h>
  4. #include<string.h>
  5. void func1(void* arg);
  6. void func2(void* arg);
  7. void func3(void* arg);
  8. static int var = 0;
  9. int main(int argc,char** argv)
  10. {
  11. thread_pool* pool = create_thread_pool(10, 160, 500); //参数:最小线程数,最大线程数,任务队列容量
  12. int a = 1, b = 3, c = 4;
  13. while (1) {
  14. var++;
  15. for (int i = 0; i < 500; i++) {
  16. /*add_task(pool, func1, NULL); //空参数
  17. add_task(pool, func2, NULL);
  18. add_task(pool, func3, NULL);*/
  19. int* arg_1 = (int*)malloc(4); //堆内存传参
  20. int* arg_2 = (int*)malloc(4);
  21. int* arg_3 = (int*)malloc(4);
  22. *arg_1 = a;
  23. *arg_2 = b;
  24. *arg_3 = c;
  25. add_task(pool, func1, arg_1);
  26. add_task(pool, func2, arg_2);
  27. add_task(pool, func3, arg_3);
  28. }
  29. //sleep(1);
  30. if (var == 50) { break; } //任务量:500*50
  31. }
  32. shutdown_pool(pool);
  33. sleep(3);
  34. free(pool);
  35. return 0;
  36. }
  37. void func1(void* arg) {
  38. //printf("------func --- 1 --num = %d\n",*(int*)arg);
  39. }
  40. void func2(void* arg) {
  41. //printf("------func --- 2 --num = %d\n", *(int*)arg);
  42. }
  43. void func3(void* arg) {
  44. //printf("------func --- 3 --num = %d\n",*(int*)arg);
  45. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/621829
推荐阅读
相关标签
  

闽ICP备14008679号