赞
踩
目录
线程池的组成主要分为3个部分。这三部分配合工作就可以得到一个完整的线程池:
1、任务队列,存储需要处理的任务。由工作的想程来处理这些任务
2、工作的线程 (任务队列任务的消费者),N个
3、管理者线程(不处理任务队列中的任务),1个
——当任务过多的时候,可以适当的创建一些新的工作线程
——当任务过少的时候,可以适当的销毁一些工作的线程
- #include "threadpool.h"
-
- //任务结构体
- typedef struct Task
- {
- void (*function) (void* arg);
- void* arg;
- }Task;
-
- //线程池结构体
- struct ThreadPool
- {
- //任务队列
- Task* taskQ;
- int queueCapacity; //容量
- int queueSize; //当前任务个数
- int queueFront; //队头->取数据
- int queueRear; //队尾->放数据
- pthread_t managerID; //管理者线程ID
- pthread_t *threadIDs; //工作的线程ID
- int minNum; //最小线程数量
- int maxNum; //最大线程数量
- int busyNum; //忙的线程的个数
- int liveNum; //存活的线程的个数
- int exitNum; //要销毁的线程个数
- pthread _mutex_t mutexPool; //锁整个的线程池
- pthread_mutex_t mutexBusy; //锁busyNum变量
- pthread_cond_t notFull; //任务队列是不是满了
- pthread_cond_t notEmpty; //任务队列是不是空了
- int shutdown; //是不是要销毁线程池,销毁为1,不销毁为0
- };
- typedef struct ThreadPool ThreadPool;
- ThreadPool* threadPoolCreate(int min, int max, int queueSize)
- {
- ThreadPool* pool=(ThreadPool*)malloc(sizeof (ThreadPool));
- do{
- if (pool == NULL)
- {
- printf ( "malloc threadpool fail ... \n");
- break;
- }
- pool->threadIDs = (pthread_t *) malloc(sizeof(pthread_t) *max);
- if(pool->threadIDs == NULL)
- {
- printf ("malloc threadIDs fail ... \n");
- break;
- }
- memset(pool->threadIDs,0, sizeof (pthread_t) * max);
- pool->minNum = min;
- pool->maxNum = max;
- pool->busyNum = 0;
- pool->liveNum = min; //和最小个数相等
- pool->exitNum = 0;
-
- if (pt.hread _mutex_init ( &pool->mutexPool,NUTI) !=0 ||
- pthread_mutex_init ( &pool->mutexBusy,NULL) !=0 ||
- pthread_cond_init (&pool->notEmpty,NULL) !=0 ||
- pthread_cond_init ( &pool->notFull,NULL) !=0 )
- {
- printf ( "mutex or condition init fail ...\n");
- break;
- }
-
- //任务队列
- pool->taskQ = malloc(sizeof (Task) * queueSize);
- pool->queueCapacity =qucuesizo;
- pool->queueSize= 0;
- pool->queueFront =0;
- pool->queueRear = 0;
- pool->shutdown = 0;
-
- //创建线程
- pthread_create ( &pool->managerID,NULL,manager,NULL);
- for (int i = 0; i < min; ++i)
- {
- pthread create (&pool->threadIDs[i],NULL,worker,NULL);
- }
- }whiie (O);
-
- //释放资源
- if(pool && pool->threadIDs) free(pool->threadIDs);
- if(pool && pool->taskQ) free(pool->taskQ);
- if(pool) free(pool);
-
- return NULL;
- }
- void* worker (void* arg)
- {
- ThreadPool* pool = (ThreadPool*)arg;
-
- while (1)
- {
- pthread_ mutex_lock(&pool->mtexPool) ;!当前任务队列是否为空
- while (pool->queuesize == 0 && !pool->shutdown )
- {
- //阻塞工作线程
- pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
- //判断是不是要销毁线程
- if (pool->exitNum > 0)
- {
- pool->exitNum--;
- pthread_ mutex_unlock(&pool->mtexPool) ;!当前任务队列是否为空
- pthread_exit (NULL);
- }
-
- }
- //判断线程池是否被关闭了
- if (pool->shutdown ){
- pthread_mutex_unlock(&pool->mutexPool);
- pthread_exit(NULL);
- }
- //从任务队列中取出一个任务
- Task task;
- task.function = pool->taskQ[pool->queueFront].function;
- task.arg = pool->taskQ[pool->queueFront].arg;
- //移动头结点
- pool->queueFront =(pool->queueFront + 1) % pool->queueCapacity;
- pool->queuesize--;
- //解锁
- pthread_cond_signal(&pool->notFull);
- pthread_mutex_unlock(&pool->mutexPool);
-
- printf("thread %ld start working ...\n");
- pthread mutex_lock (&pool->mutexBusy);
- pool->busyNum++;
- pthread mutex_unlock (&pool->mutexBusy) ;
-
- task.function(task.arg) ; //或者 (*task.function) (task.arg) 进行调用;
- free(task.arg);
- task.arg=NULL;
-
- printf("thread %ld end working ...\n");
- pthread_mutex_lock (&pool->mutexBusy);
- pool->busyNum--;
- pthread mutex_unlock (&pool->mutexBusy) ;
-
- }
-
- return NULL;
- }
- const int NUMBER = 2;
- void* manaqer(void* arg)
- {
- ThreadPool* pool = (ThreadPool+)arg;
- while(!pool->shutdown)
- {
- //每隔3s检测一次
- sleep(3);
-
- //取出线程池中任务的数量和当前线程的数量
- pthread_mutex_lock(&pool->muatexPool) ;
- int queueSize =pool->queuesize;
- int liveNum = pool->liveNum;
- pthread_mutex_unlock( &pool->mutexPool) ;
-
- //取出忙的线程的数量
- pthread_mutex_lock(&pool->mutexBusy);
- int busyNum = pool->busyNum;
- pthread_mutex__lock(&pool->mutexBusy) ;
-
- //添加线程
- // 任务的个数 > 存活的线程个数 && 存活的线程数 < 最大线程数
- if (queueSize > liveNum && liveNum< pool->maxNum)
- {
- pthread_mutex_lock(&pool->mutexPool);
- int counter = 0;
- for (int i = 0; i < pool->maxNum && counter < NUMBER
- && pool->liveNum < pool->maxNum ; ++i)
- {
- if (pool->threadIDs[i] == 0)
- {
- pthread_create( &pool->threadIDs[i],NULI,worker, pool);
- counter++;
- pool->liveNum++;
-
- }
- }
- pthread_mutex_unlock(&pool->mutexPool);
- }
- // 销毁线程
- // 忙的线程*2 < 存活的线程数 && 存活的线程 > 最小线程数
- if(busyNum * 2< liveNum && liveNum > pool->minNum)
- {
- pthread_mutex_lock(&pool->mutexPool);
- pool->exitNum = NUMBER;
- pthread_mutex_unlock(&pcol->mutexPool);
- // 让工作的线程自杀I
- for(int i=0;i<NUMBER; ++i)
- {
- pthread_cond_signal(&pool->notEmpty);
- }
- }
- }
- }
- void threadPoolAdd(ThreadPool* pool,void (*func)(void*), void* arg)
- {
- pthread_mutex_lock(&pool->mutexPool);
- while(pool->queueSize == pool->queueCapacity && !pool->shutdown)
- {
- //阻塞生产者线程
- pthread_cond_wait( &pool->notFull,&pool->mutexPool) ;
- }
- if (pool->shutdown)
- {
- pthread_mutex_unlock ( &pool->mutexPool) ;
- return;
- }
-
- //添加任务
- pool->taskQ[pool->queueRear].function = func;
- pool->taskQ[pool->queueRear].arg = arg;
- pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
- pool->queuesize++;
-
- pthread_cond_signal( &pool->notEmpty);
- pthread_mutex_unlock(&pool->mutexPool) ;
- }
- //获取线程池中工作的线程数量
- int threadPoolBusyNum ( ThreadPool* pool)
- {
- pthread_mutex_lock(&pool->mutexBusy);
- int busyNum = pool->busyNum;
- pthread_mutex_unlock(&pool->mutexBusy);
- return busyNum;
- }
-
-
- //获取线程池中活着的线程数量
- int threadPoolAliveNum (ThreadPool*pool)
- {
- pthread_mutex_lock(&pool->mutexPool);
- int busyNum = pool->liveNum;
- pthread_mutex_unlock(&pool->mutexPool);
- return aliveNum;
- }
- int threadPoolDestroy (ThreadPool* pool)
- {
- if(pool == NULL){
- return -1;
- }
-
- //关闭线程池
- pool->shutdown = 1;
- //阻塞回收管理者线程
- pthread_join(pool->nanagerID,NULL);
- //唤醒阻塞的消费者线程
- for (int i = 0; i < pool->liveNum; ++i){
- pthread_cond_signal( &pool->notEmpty) ;
- }
- //释放堆内存
- if(pool->taskQ){
- free(pool->taskQ);
- pool->taskQ=NULL;
- }
- if(pool->threadIDs){
- free(pool->threadIDs);
- pool->threadIDs=NULL;
- }
- free(pool);
- pool=NULL;
-
- pthread_mutex_destroy( &pool->mutexPool);
- pthread_mutex_destroy( &pool->mutexBusy);
- pthread_cond_ destroy( &pool->notEmpty);
- pthread_cond_destroy ( &pool->notFull);
-
- return 0;
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。