赞
踩
如果多次使用线程,那么就需要多次的创建并撤销线程。但是创建/撤销的过程会消耗资源。线程池是一种数据结构,其中维护着多个线程,这避免了在处理短时间任务时,创建与销毁线程的代价。即在程序开始运行前预先创建一定数量的线程放入空闲队列中,这些线程都是处于阻塞状态,基本不消耗CPU,只占用较小的内存空间,程序在运行时,只需要从线程池中拿来用就可以了,大大提高了程序运行效率。
任务结构体
线程池结构体
线程工作函数
用户接口:
#include <stdio.h> #include<pthread.h> #include<unistd.h> typedef struct Task{ //任务结构体 void (*function) (void *arg); //函数指针 void* arg; //函数的参数 }Task; typedef struct threadPool{ //任务相关 Task *task; //任务队列的指针 int taskMaxNum; //任务队列的最大长度 int taskNum; //任务数量 int head; //任务队列的队头,C语言没有queue,需要手写queue int tail; //任务队列的队尾 //普通线程相关 pthread_t *workId; //线程数组的指针 int minNum; //线程池最小线程数,用户指定 int maxNum; //线程池最大线程数,用户指定 int liveNum; //线程池当前存活线程数 int busyNum; //线程池当前忙碌线程数,作为是否需要增加/减少线程数量的依据 //管理线程相关 pthread_t manageId; int deleteNum; //线程太多时,每一次要取消的线程数量 int shutDown; //判断线程池是否要销毁,1表示要销毁 //锁相关 pthread_mutex_t mutexPool; //互斥锁,锁住整个线程池 pthread_cond_t condFull; //条件锁,任务队列满了则阻塞,用于添加任务 pthread_cond_t condEmpty; //条件锁,任务队列空了则阻塞,用于取出任务 }threadPool; //普通线程工作函数 void* worker(void* arg); //管理线程工作函数 void* manager(void* arg); //新建线程池 threadPool* createPool(int minNum, int maxNum, int taskMaxNum); //添加任务,pool表示线程池,task表示任务的函数指针,arg表示函数的参数 int addtask(threadPool *pool, void (*func)(void*), void* arg); //查看线程数量 int getThreadNum(threadPool *pool); //查看忙线程数量 int getBusyThreadNum(threadPool *pool); //查看任务数量 int getTaskNum(threadPool *pool); //销毁线程池 int deletePool(threadPool *pool);
#include<stdio.h> #include "C-myThreadPool.h" #include<stdlib.h> // 普通线程工作函数,每次从任务队头取出任务并执行,如果没有任务则阻塞,参数为线程池 void* worker(void* arg){ threadPool* pool = (threadPool *)arg; //循环执行 while(1){ pthread_mutex_lock(&pool->mutexPool);//上锁,线程池为临界资源,需要进行线程同步 //销毁线程池 if(pool->shutDown){ pthread_mutex_unlock(&pool->mutexPool); pthread_exit(NULL); } //线程终止,即管理线程中设置了要取消的线程deleteNum的值,让空闲的线程自己自杀 while(pool->deleteNum>0){ //deleteNum大于0 pool->deleteNum--; pool->liveNum--; pthread_mutex_unlock(&pool->mutexPool); for(int i=0;i<pool->maxNum;++i){ //找到当前线程 if(pool->workId[i] == pthread_self()){ pool->workId[i] = 0; //重置线程池中线程数组的编号,0表示空闲 } } pthread_exit(NULL); //终止当前线程 } //从任务队列中取出任务,并执行 int taskNum = pool->taskNum; while(!pool->shutDown && taskNum==0){ // 如果当前没有任务 pthread_cond_wait(&pool->condEmpty, &pool->mutexPool); // 阻塞等待 } //如果在等待任务的时候,客户端调用销毁线程池,则需要再次判断一次 if(pool->shutDown){ pthread_mutex_unlock(&pool->mutexPool); pthread_exit(NULL); } Task task; task.function= pool->task[pool->head].function; //取出任务队列头结点 task.arg = pool->task[pool->head].arg; pool->head = (++pool->head) % pool->taskMaxNum; //头结点后移,%操作模拟队列 pool->taskNum--; //任务数量-1 pthread_mutex_unlock(&pool->mutexPool); // 解锁 pthread_cond_signal(&pool->condFull); //对该条件锁解锁,这样如果任务队列满了,告诉生产者就可以生产任务了 printf("thread %ld begin working\n", (long)pthread_self()); task.function(task.arg); //执行任务 pthread_mutex_lock(&pool->mutexPool); //上锁 pool->busyNum++; pthread_mutex_unlock(&pool->mutexPool); // 解锁 printf("thread %ld end working\n", (long)pthread_self()); pthread_mutex_lock(&pool->mutexPool); //上锁 pool->busyNum--; pthread_mutex_unlock(&pool->mutexPool); // 解锁 } return NULL; } // 管理线程工作函数 void* manager(void* arg){ threadPool* pool = (threadPool *)arg; while(!pool->shutDown){ //如果线程池没有被销毁 sleep(5);//每隔5秒检测一次 pthread_mutex_lock(&pool->mutexPool); int liveNum = pool->liveNum; int taskNum = pool->taskNum; pthread_mutex_unlock(&pool->mutexPool); //如果存活的线程太少,则增加线程,每次增加2个 if(taskNum > liveNum && liveNum < pool->maxNum) { pthread_mutex_lock(&pool->mutexPool); int num=0; //已经增加的线程数量,假设每次增加2个线程 for(int i=0;i<pool->maxNum && num<2 && pool->liveNum<pool->maxNum; ++i){ if(pool->workId[i]==0){ pthread_create(&pool->workId[i], NULL, worker, pool); //新建线程 num++; pool->liveNum++; } } pthread_mutex_unlock(&pool->mutexPool); printf("2 threads are added by manager\n"); } //如果存活的线程太多,则减少线程,每次减少2个 if(taskNum*2 < liveNum && liveNum > pool->minNum) { pthread_mutex_lock(&pool->mutexPool); if(pool->liveNum-2 >= pool->minNum)pool->deleteNum=2; //这里不能指定杀死某个线程,因为并不知道线程的状态,要让空闲的线程自己自杀 pthread_mutex_unlock(&pool->mutexPool); printf("2 threads are deleted by manager\n"); } } return NULL; } // 新建线程池 threadPool* createPool(int minNum, int maxNum, int taskMaxNum){ // 任务相关 threadPool *pool = (threadPool*)(malloc(sizeof(threadPool))); pool->task = (Task*)(malloc(sizeof(Task)*taskMaxNum)); pool->taskMaxNum = taskMaxNum; pool->head = 0; pool->tail = 0; // 普通线程相关 pool->workId = (pthread_t *)(malloc(sizeof(pthread_t)*maxNum)); pool->minNum = minNum; pool->maxNum = maxNum; pool->liveNum = 3; pool->busyNum = 0; for(int i=0;i<pool->maxNum;++i){ pool->workId[i]=0; // 初始化id均为0,0表示该位置没有对应的线程 } for(int i=0;i<pool->minNum;++i){ // 开始启动最小数量的线程 pthread_create(&pool->workId[i], NULL, worker, pool); } // 管理线程相关,直接启动管理线程 pthread_create(&pool->manageId, NULL, manager, pool); pool->deleteNum = 1; // 每次取消一个线程 pool->shutDown = 0; // 锁相关 pthread_mutex_init(&pool->mutexPool, NULL); pthread_cond_init(&pool->condFull, NULL); pthread_cond_init(&pool->condEmpty, NULL); return pool; //pool在共享堆内存,所以不会自动释放 } //添加任务,pool表示线程池,task表示任务的函数指针,arg表示函数的参数 int addtask(threadPool *pool, void (*func)(void*), void* arg){ pthread_mutex_lock(&pool->mutexPool); //加锁 //任务队列满了,则阻塞等待 while(pool->taskNum == pool->taskMaxNum && !pool->shutDown){ pthread_cond_wait(&pool->condFull, &pool->mutexPool); } if(pool->shutDown){ pthread_mutex_unlock(&pool->mutexPool); } pool->task[pool->tail].function = func; pool->task[pool->tail].arg = arg; pool->taskNum++; pool->tail = (++pool->tail) % pool->taskMaxNum; //任务队列尾部向后移动1 pthread_cond_signal(&pool->condEmpty); //对该条件变量进行解锁,这样等待的线程可以拿任务了 pthread_mutex_unlock(&pool->mutexPool); return 0; } //查看线程数量 int getThreadNum(threadPool *pool){ pthread_mutex_lock(&pool->mutexPool); //读加锁是为了防止极端情况:读了一半然后被其它线程修改 int liveNum = pool->liveNum; pthread_mutex_unlock(&pool->mutexPool); return liveNum; } //查看忙线程数量 int getBusyThreadNum(threadPool *pool){ pthread_mutex_lock(&pool->mutexPool); //读加锁是为了防止极端情况:读了一半然后被其它线程修改 int BusyThreadNum = pool->busyNum; pthread_mutex_unlock(&pool->mutexPool); return BusyThreadNum; } //查看任务数量 int getTaskNum(threadPool *pool){ pthread_mutex_lock(&pool->mutexPool); //读加锁是为了防止极端情况:读了一半然后被其它线程修改 int taskNum = pool->taskNum; pthread_mutex_unlock(&pool->mutexPool); return taskNum; } // 销毁线程池 int deletePool(threadPool *pool){ if(pool==NULL){ return -1; // 如果线程池已经不存在了,则返回-1 } pool->shutDown=1; pthread_join(pool->manageId, NULL); //阻塞回收管理子线程 for(int i=0;i<pool->maxNum;++i) //唤醒所有等待任务的普通子线程 pthread_cond_broadcast(&pool->condEmpty); //释放pool中的两个堆内存 if(pool->task)free(pool->task); if(pool->workId) free(pool->workId); //回收锁 pthread_mutex_destroy(&pool->mutexPool); pthread_cond_destroy(&pool->condFull); pthread_cond_destroy(&pool->condEmpty); //回收线程池 free(pool); pool=NULL; return 0; }
#include <stdio.h> #include <stdlib.h> #include "C-myThreadPool.h" void test_task(void *arg){ int num = *(int*)arg; printf("thread %ld is working, number = %d\n", (long)pthread_self(), num); sleep(1); } int main(int count, char** arg){ //测试代码 threadPool* pool =createPool(3, 10, 50); for(int i=0;i<100;++i){ int *num = (int*)malloc(sizeof(int)); *num = i; addtask(pool, test_task, num); } //每两秒检测一次,如果还存在任务或者忙的线程,则等待 while( getTaskNum(pool) || getBusyThreadNum(pool) ){ sleep(2); } //销毁线程池 deletePool(pool); return 0; }
部分运行结果
(注意:在linux命令行下运行gcc,需要 -l 指定链接的库文件pthread,执行 gcc main.c -l pthread -o output即可)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。