当前位置:   article > 正文

用线程池实现多线程调用并使用回调函数实现函数调用_回调函数放在多线程

回调函数放在多线程
* 创建线程或者进程的开销是很大的
* 为了防止频繁的创建、销毁线程,提高程序的运行效率
* 往往会建立一个线程池用于多线程程序的调度
* 下面的程序就是完整的线程池实现
*
* 主要通过互斥量和条件变量实现同步

threadpool.h

  1. #ifndef _THREADPOOL_H_
  2. #define _THREADPOOL_H_
  3. #include <stdio.h>
  4. #include <stdlib.h>
  5. #include <unistd.h>
  6. #include <pthread.h>
  7. /* 线程体数据结构*/
  8. typedef struct runner
  9. {
  10. void (*callback)(void *arg);//回调函数指针
  11. void *arg; //回调函数参数
  12. struct runner *next;
  13. }thread_runner;
  14. /* 线程池数据结构*/
  15. typedef struct pool
  16. {
  17. pthread_mutex_t mutex; //互斥量
  18. pthread_cond_t cond; //条件变量
  19. thread_runner* runner_head; //线程池中所有等待任务的头指针
  20. thread_runner* runner_tail; //线程池中所有等待任务的尾指针
  21. int shutdown; //线程池是否销毁,0没有注销,1注销
  22. pthread_t* threads; //所有线程
  23. int max_thread_size; //线程池中允许的活动线程数目
  24. }thread_pool;
  25. void run(void *arg);
  26. void threadpool_init(thread_pool *pool, int max_thread_size);
  27. void threadpool_add_runner(thread_pool *pool, void (*callback)(void *arg), void *arg);
  28. void threadpool_destroy(thread_pool **ppool);
  29. #endif //_THREADPOOL_H_

threadpool.c

  1. //======================= threadpool.c ===========================
  2. #include "threadpool.h"
  3. /**********************************************************
  4. * 初始化线程
  5. * 参数:
  6. * pool:指向线程池结构有效地址的动态指针
  7. * max_thread_size:最大的线程数
  8. **********************************************************/
  9. void threadpool_init(thread_pool *pool, int max_thread_size)
  10. {
  11. int iLoop = 0;
  12. /*线程池初始化操作*/
  13. pthread_mutex_init(&(pool->mutex), NULL); //初始化互斥量
  14. pthread_cond_init(&(pool->cond), NULL); //初始化条件变量
  15. pool->shutdown = 0; //线程池默认没有注销
  16. pool->threads = (pthread_t *)malloc(max_thread_size * sizeof(pthread_t)); //创建所有分离线程
  17. pool->runner_head = NULL;
  18. pool->runner_tail = NULL;
  19. pool->max_thread_size = max_thread_size;
  20. /*创建线程操作*/
  21. for(iLoop; iLoop < max_thread_size; iLoop++)
  22. {
  23. pthread_attr_t attr; //定义线程对象
  24. pthread_attr_init(&attr); //初始化线程的属性
  25. pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); //设置脱离状态的属性(决定这个线程在终止时是否可以被结合)
  26. pthread_create(&(pool->threads[iLoop]), &attr, (void *)run, (void *)pool); /*threads[i] 动态创建线程;
  27. *第一个参数为指向线程标识符的指针。
  28. *第二个参数用来设置线程属性。
  29. *第三个参数是线程运行函数的起始地址。
  30. *最后一个参数是运行函数的参数。*/
  31. }
  32. printf("threadpool_init-> create %d detached thread\n");
  33. }
  34. /**********************************************************
  35. * 线程体,创建线程后调用的函数
  36. * 参数:
  37. * arg:接收创建线程后传递的参数
  38. **********************************************************/
  39. void run(void *arg)
  40. {
  41. thread_pool *pool = (thread_pool *)arg;
  42. while(1)
  43. {
  44. pthread_mutex_lock(&(pool->mutex)); //加锁
  45. printf("run-> locked!\n");
  46. /*如果等待队列为0并且线程池未销毁,则处于阻塞状态 */
  47. while(pool->runner_head == NULL && !pool->shutdown)
  48. {
  49. pthread_cond_wait(&(pool->cond), &(pool->mutex));
  50. }
  51. /*如果线程已经销毁*/
  52. if(pool->shutdown)
  53. {
  54. pthread_mutex_unlock(&(pool->mutex)); //解锁
  55. printf("run-> unlock and thread exit!\n");
  56. pthread_exit(NULL);
  57. }
  58. thread_runner *runner = pool->runner_head; //取链表的头元素
  59. pool->runner_head = runner->next;
  60. pthread_mutex_unlock(&(pool->mutex)); //解锁
  61. printf("run-> unlocked!\n");
  62. (runner->callback)(runner->arg); //调用回调函数,执行任务
  63. free(runner); //释放线程操作
  64. runner = NULL;
  65. printf("run-> runned and free runner!\n");
  66. }
  67. pthread_exit(NULL);
  68. }
  69. /**********************************************************
  70. * 向线程池加入任务
  71. * 参数:
  72. * pool:指向线程池结构有效地址的动态指针
  73. * callback:线程回调函数
  74. * arg:回调函数参数
  75. **********************************************************/
  76. void threadpool_add_runner(thread_pool *pool, void(*callback)(void *arg), void *arg)
  77. {
  78. thread_runner *newrunner = (thread_runner *)malloc(sizeof(thread_runner));//构建一个新任务
  79. newrunner->callback = callback;
  80. newrunner->arg = arg;
  81. newrunner->next = NULL;
  82. pthread_mutex_lock(&(pool->mutex)); //加锁
  83. printf("threadpool_add_runner-> locked\n");
  84. /*将新任务加入到等待队列中,如果等待队列为空,直接运行当前的线程 */
  85. if(pool->runner_head != NULL)
  86. {
  87. pool->runner_tail->next = newrunner;
  88. pool->runner_tail = newrunner;
  89. }
  90. else
  91. {
  92. pool->runner_head = newrunner;
  93. pool->runner_tail = newrunner;
  94. }
  95. pthread_mutex_unlock(&(pool->mutex)); //解锁
  96. printf("threadpool_add_runner-> unlocked\n");
  97. pthread_cond_signal(&(pool->cond)); //唤醒一个等待线程
  98. printf("threadpool_add_runner-> add a runner and wakeup a waiting thread\n");
  99. }
  100. /**********************************************************
  101. * 销毁线程池
  102. * 参数:
  103. * ppool:指向线程池结构有效地址的动态指针地址(二级指针)
  104. **********************************************************/
  105. void threadpool_destroy(thread_pool **ppool)
  106. {
  107. thread_pool *pool = *ppool;
  108. /*判断线程池是否注销,防止二次销毁*/
  109. if(!pool->shutdown)
  110. {
  111. pool->shutdown = 1;
  112. pthread_cond_broadcast(&(pool->cond)); //唤醒所有的等待线程,线程池要销毁了
  113. sleep(1); //等待所有的线程终止
  114. printf("threadpool_destroy-> wakeup all waitting threads\n");
  115. free(pool->threads); //回收空间
  116. /*销毁等待队列*/
  117. thread_runner *head = NULL;
  118. while(pool->runner_head != NULL)
  119. {
  120. head = pool->runner_head;
  121. pool->runner_head = pool->runner_head->next;
  122. free(head);
  123. }
  124. printf("thread_destroy-> all runners freed\n");
  125. pthread_mutex_destroy(&(pool->mutex)); //销毁条件变量
  126. pthread_cond_destroy(&(pool->cond)); //销毁互斥量
  127. printf("thread_destroy-> mutex and cond destroyed\n");
  128. free(pool);
  129. pool = NULL;
  130. (*ppool) = NULL;
  131. printf("threadpool_destroy-> pool freed\n");
  132. }
  133. }
  134. //======================= end threadpool.c ===========================

main.c

  1. #include "threadpool.h"
  2. void threadrun(void *arg)
  3. {
  4. int i = *(int *)arg;
  5. printf("threadrun result == %d\n", i);
  6. }
  7. int main(int argc, char *argv[])
  8. {
  9. thread_pool *pool = (thread_pool *)malloc(sizeof(thread_pool));
  10. threadpool_init(pool, 9);
  11. int i;
  12. int tmp[10];
  13. for(i = 0; i < 10; i++)
  14. {
  15. tmp[i] = i;
  16. threadpool_add_runner(pool, threadrun, &tmp[i]);
  17. }
  18. sleep(1);
  19. threadpool_destroy(&pool);
  20. printf("main-> %p\n", pool);
  21. printf("main-> test over\n");
  22. return 0;
  23. }

Makefile

  1. #Makefile
  2. main: mian.o threadpool.o
  3. gcc -o main test.o threadpool.o -lpthread
  4. mian.o: threadpool.h
  5. gcc -c main.c -lpthread
  6. hreadpool.o: threadpool.h
  7. gcc -c threadpool.c -lpthread
  8. .PHONY:clean
  9. clean:
  10. rm -f *.o main


本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号