赞
踩
threadpool.h
- #ifndef _THREADPOOL_H_
- #define _THREADPOOL_H_
- #include <stdio.h>
- #include <stdlib.h>
- #include <unistd.h>
- #include <pthread.h>
-
- /* 线程体数据结构*/
- typedef struct runner
- {
- void (*callback)(void *arg);//回调函数指针
- void *arg; //回调函数参数
- struct runner *next;
- }thread_runner;
-
- /* 线程池数据结构*/
- typedef struct pool
- {
- pthread_mutex_t mutex; //互斥量
- pthread_cond_t cond; //条件变量
- thread_runner* runner_head; //线程池中所有等待任务的头指针
- thread_runner* runner_tail; //线程池中所有等待任务的尾指针
- int shutdown; //线程池是否销毁,0没有注销,1注销
- pthread_t* threads; //所有线程
- int max_thread_size; //线程池中允许的活动线程数目
- }thread_pool;
-
- void run(void *arg);
- void threadpool_init(thread_pool *pool, int max_thread_size);
- void threadpool_add_runner(thread_pool *pool, void (*callback)(void *arg), void *arg);
- void threadpool_destroy(thread_pool **ppool);
-
- #endif //_THREADPOOL_H_
//======================= threadpool.c =========================== #include "threadpool.h" /********************************************************** * 初始化线程 * 参数: * pool:指向线程池结构有效地址的动态指针 * max_thread_size:最大的线程数 **********************************************************/ void threadpool_init(thread_pool *pool, int max_thread_size) { int iLoop = 0; /*线程池初始化操作*/ pthread_mutex_init(&(pool->mutex), NULL); //初始化互斥量 pthread_cond_init(&(pool->cond), NULL); //初始化条件变量 pool->shutdown = 0; //线程池默认没有注销 pool->threads = (pthread_t *)malloc(max_thread_size * sizeof(pthread_t)); //创建所有分离线程 pool->runner_head = NULL; pool->runner_tail = NULL; pool->max_thread_size = max_thread_size; /*创建线程操作*/ for(iLoop; iLoop < max_thread_size; iLoop++) { pthread_attr_t attr; //定义线程对象 pthread_attr_init(&attr); //初始化线程的属性 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); //设置脱离状态的属性(决定这个线程在终止时是否可以被结合) pthread_create(&(pool->threads[iLoop]), &attr, (void *)run, (void *)pool); /*threads[i] 动态创建线程; *第一个参数为指向线程标识符的指针。 *第二个参数用来设置线程属性。 *第三个参数是线程运行函数的起始地址。 *最后一个参数是运行函数的参数。*/ } printf("threadpool_init-> create %d detached thread\n"); } /********************************************************** * 线程体,创建线程后调用的函数 * 参数: * arg:接收创建线程后传递的参数 **********************************************************/ void run(void *arg) { thread_pool *pool = (thread_pool *)arg; while(1) { pthread_mutex_lock(&(pool->mutex)); //加锁 printf("run-> locked!\n"); /*如果等待队列为0并且线程池未销毁,则处于阻塞状态 */ while(pool->runner_head == NULL && !pool->shutdown) { pthread_cond_wait(&(pool->cond), &(pool->mutex)); } /*如果线程已经销毁*/ if(pool->shutdown) { pthread_mutex_unlock(&(pool->mutex)); //解锁 printf("run-> unlock and thread exit!\n"); pthread_exit(NULL); } thread_runner *runner = pool->runner_head; //取链表的头元素 pool->runner_head = runner->next; pthread_mutex_unlock(&(pool->mutex)); //解锁 printf("run-> unlocked!\n"); (runner->callback)(runner->arg); //调用回调函数,执行任务 free(runner); //释放线程操作 runner = NULL; printf("run-> runned and free runner!\n"); } pthread_exit(NULL); } /********************************************************** * 向线程池加入任务 * 参数: * pool:指向线程池结构有效地址的动态指针 * callback:线程回调函数 * arg:回调函数参数 **********************************************************/ void threadpool_add_runner(thread_pool *pool, void(*callback)(void *arg), void *arg) { thread_runner *newrunner = (thread_runner *)malloc(sizeof(thread_runner));//构建一个新任务 newrunner->callback = callback; newrunner->arg = arg; newrunner->next = NULL; pthread_mutex_lock(&(pool->mutex)); //加锁 printf("threadpool_add_runner-> locked\n"); /*将新任务加入到等待队列中,如果等待队列为空,直接运行当前的线程 */ if(pool->runner_head != NULL) { pool->runner_tail->next = newrunner; pool->runner_tail = newrunner; } else { pool->runner_head = newrunner; pool->runner_tail = newrunner; } pthread_mutex_unlock(&(pool->mutex)); //解锁 printf("threadpool_add_runner-> unlocked\n"); pthread_cond_signal(&(pool->cond)); //唤醒一个等待线程 printf("threadpool_add_runner-> add a runner and wakeup a waiting thread\n"); } /********************************************************** * 销毁线程池 * 参数: * ppool:指向线程池结构有效地址的动态指针地址(二级指针) **********************************************************/ void threadpool_destroy(thread_pool **ppool) { thread_pool *pool = *ppool; /*判断线程池是否注销,防止二次销毁*/ if(!pool->shutdown) { pool->shutdown = 1; pthread_cond_broadcast(&(pool->cond)); //唤醒所有的等待线程,线程池要销毁了 sleep(1); //等待所有的线程终止 printf("threadpool_destroy-> wakeup all waitting threads\n"); free(pool->threads); //回收空间 /*销毁等待队列*/ thread_runner *head = NULL; while(pool->runner_head != NULL) { head = pool->runner_head; pool->runner_head = pool->runner_head->next; free(head); } printf("thread_destroy-> all runners freed\n"); pthread_mutex_destroy(&(pool->mutex)); //销毁条件变量 pthread_cond_destroy(&(pool->cond)); //销毁互斥量 printf("thread_destroy-> mutex and cond destroyed\n"); free(pool); pool = NULL; (*ppool) = NULL; printf("threadpool_destroy-> pool freed\n"); } } //======================= end threadpool.c ===========================
#include "threadpool.h" void threadrun(void *arg) { int i = *(int *)arg; printf("threadrun result == %d\n", i); } int main(int argc, char *argv[]) { thread_pool *pool = (thread_pool *)malloc(sizeof(thread_pool)); threadpool_init(pool, 9); int i; int tmp[10]; for(i = 0; i < 10; i++) { tmp[i] = i; threadpool_add_runner(pool, threadrun, &tmp[i]); } sleep(1); threadpool_destroy(&pool); printf("main-> %p\n", pool); printf("main-> test over\n"); return 0; }
- #Makefile
- main: mian.o threadpool.o
- gcc -o main test.o threadpool.o -lpthread
- mian.o: threadpool.h
- gcc -c main.c -lpthread
- hreadpool.o: threadpool.h
- gcc -c threadpool.c -lpthread
-
- .PHONY:clean
- clean:
- rm -f *.o main
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。