赞
踩
线程池内部维护了多个工作线程,每个工作线程都会去任务队列中拿取任务并执行,当执行完一个任务后不是马上销毁,而是继续保留执行其它任务。显然,线程池提高了多线程的复用率,减少了创建和销毁线程的时间。
线程池内部由任务队列、工作线程和管理者线程组成。
任务队列:存储需要处理的任务。每个任务其实就是具体的函数,在任务队列中存储函数指针和对应的实参。当工作线程获取任务后,就能根据函数指针来调用指定的函数。其实现可以是数组、链表、STL容器等。
工作线程:有N个工作线程,每个工作线程会去任务队列中拿取任务,然后执行具体的任务。当任务被处理后,任务队列中就不再有该任务了。当任务队列中没有任务时,工作线程就会阻塞。
管理者线程:周期性检测忙碌的工作线程数量和任务数量。当任务较多线程不够用时,管理者线程就会多创建几个工作线程来加快处理(不会超过工作线程数量的上限)。当任务较少线程空闲多时,管理者线程就会销毁几个工作线程来减少内存占用(不会低于工作线程数量的下限)。
注意:线程池中没有维护“生产者线程”,所谓的“生产者线程”就是往任务队列中添加任务的线程。
参考来源:爱编程的大丙。
【1】threadpool.c:
- #include "threadpool.h"
- #include <pthread.h>
- #include <stdlib.h>
- #include <string.h>
- #include <unistd.h>
- #include <stdio.h>
-
- #define NUMBER 2 //管理者线程增加或减少的工作线程数量
-
- //任务结构体
- typedef struct Task {
- void (*func)(void* arg);
- void* arg;
- } Task;
-
-
- //线程池结构体
- struct ThreadPool {
- //任务队列,视为环形队列
- Task* taskQ;
- int queueCapacity; //队列容量
- int queueSize; //当前任务个数
- int queueFront; //队头 -> 取任务
- int queueRear; //队尾 -> 加任务
- //线程相关
- pthread_t managerID; //管理者线程ID
- pthread_t* threadIDs; //工作线程ID
- int minNum; //工作线程最小数量
- int maxNum; //工作线程最大数量
- int busyNum; //工作线程忙的数量
- int liveNum; //工作线程存活数量
- int exitNum; //要销毁的工作线程数量
- pthread_mutex_t mutexPool; //锁整个线程池
- pthread_mutex_t mutexBusy; //锁busyNum
- pthread_cond_t notFull; //任务队列是否满
- pthread_cond_t notEmpty; //任务队列是否空
- //线程池是否销毁
- int shutdown; //释放为1,否则为0
- };
-
- /***************************************************************
- * 函 数: threadPoolCreate
- * 功 能: 创建线程池并初始化
- * 参 数: min---工作线程的最小数量
- * max---工作线程的最大数量
- * capacity---任务队列的最大容量
- * 返回值: 创建的线程池的地址
- **************************************************************/
- ThreadPool* threadPoolCreate(int min, int max, int capacity)
- {
- //申请线程池空间
- ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
- do {//此处循环只是为了便于失败释放空间,只会执行一次
- if (pool == NULL) {
- printf("pool create error!\n");
- break;
- }
- //申请任务队列空间,并初始化
- pool->taskQ = (Task*)malloc(sizeof(Task) * capacity);
- if (pool->taskQ == NULL) {
- printf("Task create error!\n");
- break;
- }
- pool->queueCapacity = capacity;
- pool->queueSize = 0;
- pool->queueFront = 0;
- pool->queueRear = 0;
- //初始化互斥锁和条件变量
- if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
- pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
- pthread_cond_init(&pool->notFull, NULL) != 0 ||
- pthread_cond_init(&pool->notEmpty, NULL) != 0)
- {
- printf("mutex or cond create error!\n");
- break;
- }
- //初始化shutdown
- pool->shutdown = 0;
- //初始化线程相关参数
- pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
- if (pool->threadIDs == NULL) {
- printf("threadIDs create error!\n");
- break;
- }
- memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
- pool->minNum = min;
- pool->maxNum = max;
- pool->busyNum = 0;
- pool->liveNum = min;
- pool->exitNum = 0;
- //创建管理者线程和工作线程
- pthread_create(&pool->managerID, NULL, manager, pool);//创建管理线程
- for (int i = 0; i < min; ++i) {
- pthread_create(&pool->threadIDs[i], NULL, worker, pool);//创建工作线程
- }
- return pool;
- } while (0);
- //申请资源失败,释放已分配的资源
- if (pool && pool->taskQ) free(pool->taskQ);
- if (pool && pool->threadIDs) free(pool->threadIDs);
- if (pool) free(pool);
- return NULL;
- }
-
-
- /***************************************************************
- * 函 数: threadPoolDestroy
- * 功 能: 销毁线程池
- * 参 数: pool---要销毁的线程池
- * 返回值: 0表示销毁成功,-1表示销毁失败
- **************************************************************/
- int threadPoolDestroy(ThreadPool* pool)
- {
- if (!pool) return -1;
- //关闭线程池
- pool->shutdown = 1;
- //阻塞回收管理者线程
- pthread_join(pool->managerID, NULL);
- //唤醒所有工作线程,让其自杀
- for (int i = 0; i < pool->liveNum; ++i) {
- pthread_cond_signal(&pool->notEmpty);
- }
- //释放所有互斥锁和条件变量
- pthread_mutex_destroy(&pool->mutexBusy);
- pthread_mutex_destroy(&pool->mutexPool);
- pthread_cond_destroy(&pool->notEmpty);
- pthread_cond_destroy(&pool->notFull);
- //释放堆空间
- if (pool->taskQ) {
- free(pool->taskQ);
- pool->taskQ = NULL;
- }
- if (pool->threadIDs) {
- free(pool->threadIDs);
- pool->threadIDs = NULL;
- }
- free(pool);
- pool = NULL;
- return 0;
- }
-
-
- /***************************************************************
- * 函 数: threadPoolAdd
- * 功 能: 生产者往线程池的任务队列中添加任务
- * 参 数: pool---线程池
- * func---函数指针,要执行的任务地址
- * arg---func指向的函数的实参
- * 返回值: 无
- **************************************************************/
- void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
- {
- pthread_mutex_lock(&pool->mutexPool);
- //任务队列满,阻塞生产者
- while (pool->queueSize == pool->queueCapacity && !pool->shutdown) {
- pthread_cond_wait(&pool->notFull, &pool->mutexPool);
- }
- //判断线程池是否关闭
- if (pool->shutdown) {
- pthread_mutex_unlock(&pool->mutexPool);
- return;
- }
- //添加任务进pool->taskQ
- pool->taskQ[pool->queueRear].func = func;
- pool->taskQ[pool->queueRear].arg = arg;
- pool->queueSize++;
- pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
- pthread_cond_signal(&pool->notEmpty);//唤醒工作线程
- pthread_mutex_unlock(&pool->mutexPool);
- }
-
-
- /***************************************************************
- * 函 数: getThreadPoolBusyNum
- * 功 能: 获取线程池忙的工作线程数量
- * 参 数: pool---线程池
- * 返回值: 忙的工作线程数量
- **************************************************************/
- int getThreadPoolBusyNum(ThreadPool* pool)
- {
- pthread_mutex_lock(&pool->mutexBusy);
- int busyNum = pool->busyNum;
- pthread_mutex_unlock(&pool->mutexBusy);
- return busyNum;
- }
-
-
- /***************************************************************
- * 函 数: getThreadPoolAliveNum
- * 功 能: 获取线程池存活的工作线程数量
- * 参 数: pool---线程池
- * 返回值: 存活的工作线程数量
- **************************************************************/
- int getThreadPoolAliveNum(ThreadPool* pool)
- {
- pthread_mutex_lock(&pool->mutexPool);
- int liveNum = pool->liveNum;
- pthread_mutex_unlock(&pool->mutexPool);
- return liveNum;
- }
-
-
- /***************************************************************
- * 函 数: worker
- * 功 能: 工作线程的执行函数
- * 参 数: arg---实参传入,这里传入的是线程池
- * 返回值: 空指针
- **************************************************************/
- void* worker(void* arg)
- {
- ThreadPool* pool = (ThreadPool*)arg;
- while (1) {
- /* 1.取出任务队列中的队头任务 */
- pthread_mutex_lock(&pool->mutexPool);
- //无任务就阻塞线程
- while (pool->queueSize == 0 && !pool->shutdown) {
- pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
- //唤醒后,判断是不是要销毁线程
- if (pool->exitNum > 0) {//线程自杀
- pool->exitNum--;//销毁指标-1
- if (pool->liveNum > pool->minNum) {
- pool->liveNum--;//活着的工作线程-1
- pthread_mutex_unlock(&pool->mutexPool);
- threadExit(pool);
- }
- }
- }
- //线程池关闭了就退出线程
- if (pool->shutdown) {
- pthread_mutex_unlock(&pool->mutexPool);
- threadExit(pool);
- }
- //取出pool中taskQ的任务
- Task task;
- task.func = pool->taskQ[pool->queueFront].func;
- task.arg = pool->taskQ[pool->queueFront].arg;
- pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;//移动队头
- pool->queueSize--;
- //通知生产者添加任务
- pthread_cond_signal(&pool->notFull);
- pthread_mutex_unlock(&pool->mutexPool);
-
- /* 2.设置pool的busyNum+1 */
- pthread_mutex_lock(&pool->mutexBusy);
- pool->busyNum++;
- pthread_mutex_unlock(&pool->mutexBusy);
-
- /* 3.执行取出的任务 */
- printf("thread %ld start working ...\n", pthread_self());
- task.func(task.arg);
- free(task.arg);
- task.arg = NULL;
- printf("thread %ld end working ...\n", pthread_self());
-
- /* 4.设置pool的busyNum-1 */
- pthread_mutex_lock(&pool->mutexBusy);
- pool->busyNum--;
- pthread_mutex_unlock(&pool->mutexBusy);
- }
- return NULL;
- }
-
-
- /***************************************************************
- * 函 数: manager
- * 功 能: 管理者线程的执行函数
- * 参 数: arg---实参传入,这里传入的是线程池
- * 返回值: 空指针
- **************************************************************/
- void* manager(void* arg)
- {
- ThreadPool* pool = (ThreadPool*)arg;
- while (!pool->shutdown) {
- /* 每隔3秒检测一次 */
- sleep(3);
-
- /* 获取pool中相关变量 */
- pthread_mutex_lock(&pool->mutexPool);
- int taskNum = pool->queueSize; //任务队列中的任务数量
- int liveNum = pool->liveNum; //存活的工作线程数量
- int busyNum = pool->busyNum; //忙碌的工作线程数量
- pthread_mutex_unlock(&pool->mutexPool);
-
- /* 功能一:增加工作线程,每次增加NUMBER个 */
- //当任务个数大于存活工作线程数,且存活工作线程数小于最大值
- if (taskNum > liveNum && liveNum < pool->maxNum) {
- pthread_mutex_lock(&pool->mutexPool);
- int counter = 0;
- for (int i = 0; i < pool->maxNum && counter < NUMBER
- && pool->liveNum < pool->maxNum; ++i)
- {
- if (pool->threadIDs[i] == 0) {
- pthread_create(&pool->threadIDs[i], NULL, worker, pool);
- counter++;
- pool->liveNum++;
- }
- }
- pthread_mutex_unlock(&pool->mutexPool);
- }
-
- /* 功能二:销毁工作线程,每次销毁NUMBER个 */
- //当忙的线程数*2 < 存活线程数,且存活线程数 > 最小线程数
- if (busyNum * 2 < liveNum && liveNum > pool->minNum) {
- pthread_mutex_lock(&pool->mutexPool);
- pool->exitNum = NUMBER;
- //唤醒NUMBER个工作线程,让其解除阻塞,在worker函数中自杀
- for (int i = 0; i < NUMBER; ++i) {
- pthread_cond_signal(&pool->notEmpty);
- }
- pthread_mutex_unlock(&pool->mutexPool);
- }
- }
- return NULL;
- }
-
-
-
- /***************************************************************
- * 函 数: threadExit
- * 功 能: 工作线程退出函数,将工作线程的ID置为0,然后退出
- * 参 数: pool---线程池
- * 返回值: 无
- **************************************************************/
- void threadExit(ThreadPool* pool)
- {
- //将pool->threadIDs中的ID改为0
- pthread_t tid = pthread_self();
- for (int i = 0; i < pool->maxNum; i++) {
- if (pool->threadIDs[i] == tid) {
- pool->threadIDs[i] = 0;
- printf("threadExit() called, %ld exiting...\n", tid);
- break;
- }
- }
- pthread_exit(NULL);//退出
- }
【2】threadpool.h:
- #ifndef _THREADPOOL_H
- #define _THREADPOOL_H
-
- typedef struct ThreadPool ThreadPool;
-
- //创建线程池并初始化
- ThreadPool* threadPoolCreate(int min, int max, int capacity);
-
- //销毁线程池
- int threadPoolDestroy(ThreadPool* pool);
-
- //给线程池添加任务
- void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);
-
- //获取当前忙碌的工作线程的数量
- int getThreadPoolBusyNum(ThreadPool* pool);
-
- //获取当前存活的工作线程的数量
- int getThreadPoolAliveNum(ThreadPool* pool);
-
- /*********************其它函数**********************/
- void* worker(void* arg);//工作线程的执行函数
- void* manager(void* arg);//管理者线程的执行函数
- void threadExit(ThreadPool* pool);//线程退出函数
-
- #endif
【3】main.c:
- #include <stdio.h>
- #include "threadpool.h"
- #include <stdlib.h>
- #include <unistd.h>
- #include <pthread.h>
-
- //任务函数,所有线程都执行此任务
- void testFunc(void* arg)
- {
- int* num = (int*)arg;
- printf("thread %ld is working, number = %d\n", pthread_self(), *num);
- sleep(1);
- }
-
- int main()
- {
- //创建线程池: 最少3个工作线程,最多10个,任务队列容量为100
- ThreadPool* pool = threadPoolCreate(3, 10, 100);
- //加入100个任务于任务队列
- for (int i = 0; i < 100; ++i) {
- int* num = (int*)malloc(sizeof(int));
- *num = i + 100;
- threadPoolAdd(pool, testFunc, num);
- }
- //销毁线程池
- sleep(30);//保证任务全部运行完毕
- threadPoolDestroy(pool);
- return 0;
- }
【4】运行结果:
......
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。