赞
踩
- #ifndef _THREADPOOL_H_
- #define _THREADPOOL_H_
- #define EXIT_NUM 10
- #define ADD_NUM 10
-
- #include<stdbool.h>
- #include<stdlib.h>
- #include<string.h>
- #include<unistd.h>
- #include<stdio.h>
- #include<pthread.h>
-
-
- //任务结构体
- typedef struct Task
- {
- void (*function)(void* arg);
- void* arg;
-
- }task;
-
-
- //线程池
- typedef struct ThreadPool
- {
- task* task_list; //任务队列
- int task_front; //任务队列头
- int task_behind; //任务队列尾
- int task_num; //队列中任务数
- int task_capacity; //任务队列容量
-
- pthread_t manage_thread; //管理者线程
- int min_thread_num; //最小线程数
- int max__thread_num; //最大线程数
- int live_thread_num; //存活线程数
- int busy_thread_num; //工作线程数
- int exit_thread_num; //要退出的线程数
-
- pthread_mutex_t pool_lock; //线程池锁
- pthread_mutex_t busy_num_lock; //忙线程锁
- pthread_cond_t can_add; //任务队列可添加
- pthread_cond_t can_work; //空闲线程可工作
-
- bool shutdown; //是否销毁线程池标志
-
- }thread_pool;
-
-
- //创建线程池
- thread_pool* create_thread_pool(int pthread_min, int pthread_max, int task_capacity);
-
- //添加任务
- int add_task(thread_pool* pool,void(*function)(void*),void* arg);
-
- //管理者线程函数
- void* manage(void* arg);
-
- //工作者线程函数
- void* worker(void* arg);
-
- //查询忙线程数
- int query_busy(thread_pool* pool);
-
- //查询存活线程数
- int query_live(thread_pool* pool);
-
- //修改线程池运行状态
- void shutdown_pool(thread_pool* pool);
-
- //释放资源
- void exit_pool(thread_pool* pool);
-
- #endif // _THREADPOOL_H_
-
- #include"ThreadPool.h"
-
-
-
- //创建线程池
- thread_pool* create_thread_pool(int pthread_min, int pthread_max, int task_capacity)
- {
- //开辟线程池
- thread_pool* pool = (thread_pool*)malloc(sizeof(thread_pool));
-
- if (pool == NULL){
- printf("create thread_pool error!\n");
- return NULL;
- }
- memset(pool, 0, sizeof(thread_pool));
-
-
- //创建任务队列
- pool->task_list = (task*)malloc(sizeof(task)*task_capacity);
- if(pool->task_list == NULL){
- printf("create task_list error!\n");
- free(pool);
- pool = NULL;
- return NULL;
- }
- memset(pool->task_list, 0, sizeof(task)*task_capacity);
-
- //初始化锁和条件变量
- pthread_mutex_init(&pool->pool_lock, NULL);
- pthread_mutex_init(&pool->busy_num_lock, NULL);
- pthread_cond_init(&pool->can_add, NULL);
- pthread_cond_init(&pool->can_work, NULL);
-
- //初始化任务队列
- pool->task_capacity = task_capacity;
- pool->task_front = 0;
- pool->task_behind = 0;
- pool->task_num = 0;
-
- //初始化工作线程信息
- pool->min_thread_num = pthread_min;
- pool->max__thread_num = pthread_max;
- pool->live_thread_num = 0;
- pool->exit_thread_num = 0;
- pool->busy_thread_num = 0;
-
- //线程池工作信息
- pool->shutdown = false;
-
- //创建管理线程
- pthread_create(&pool->manage_thread, PTHREAD_CREATE_JOINABLE, manage, pool);
-
- //创建初始工作线程 (初始工作线程 = 最小工作线程)
- pthread_mutex_lock(&pool->pool_lock);
- pthread_t thread;
- for (int i = 0; i < pthread_min; i++){
- pthread_create(&thread, PTHREAD_CREATE_JOINABLE, worker, pool);
- pool->live_thread_num++;
- }
- pthread_mutex_unlock(&pool->pool_lock);
- return pool;
- }
-
-
- //释放资源
- void exit_pool(thread_pool* pool) {
-
- thread_pool* p = pool;
- int live_num = 0;
-
- do {
- pthread_mutex_lock(&pool->pool_lock);
- live_num = pool->live_thread_num;
- if (live_num > 0) {
- pool->exit_thread_num = pool->live_thread_num;
- pthread_cond_signal(&pool->can_work);
- }
- pthread_mutex_unlock(&pool->pool_lock);
-
- } while (live_num > 0);
-
- //释放任务队列
- while (pool->task_num != 0) {
- if (pool->task_list[pool->task_front].arg != NULL) {
- free(pool->task_list[pool->task_front].arg);
- pool->task_list[pool->task_front].arg = NULL;
- }
- pool->task_front = (pool->task_front + 1) % pool->task_capacity;
- pool->task_num--;
- }
-
- //释放堆内存
-
- free(pool->task_list);
- pool->task_list = NULL;
-
- //销毁锁和条件变量
- pthread_mutex_destroy(&pool->busy_num_lock);
- pthread_mutex_destroy(&pool->pool_lock);
- pthread_cond_destroy(&pool->can_add);
- pthread_cond_destroy(&pool->can_work);
- }
-
-
- //添加任务
- int add_task(thread_pool* pool,void(*function)(void*), void* arg)
- {
- pthread_mutex_lock(&pool->pool_lock);
-
- while (pool->task_num == pool->task_capacity && pool->shutdown == false){ //任务队列满 阻塞等待can_add条件变量
- pthread_cond_wait(&pool->can_add, &pool->pool_lock);
- }
-
- if (pool->shutdown == true) { //线程池关闭 不再添加任务
-
- if (arg != NULL) {
- free(arg);
- arg = NULL;
- }
- return -1;
- }
-
- //向任务队列中添加任务
- pool->task_list[pool->task_behind].function = function;
- pool->task_list[pool->task_behind].arg = arg;
- pool->task_behind = (pool->task_behind + 1)%pool->task_capacity; //队尾移动
-
- if (pool->task_num == 0) { //任务队列不为空 -- 通知阻塞在can_work上的工作线程工作
- pthread_cond_signal(&pool->can_work);
- }
- pool->task_num++;
- pthread_mutex_unlock(&pool->pool_lock);
- return 0;
- }
-
-
- //管理者线程执行函数
- void* manage(void* arg)
- {
- thread_pool* pool = (thread_pool*)arg;
-
- while (1) {
-
- //退出部分闲置线程
- pthread_mutex_lock(&pool->pool_lock);
- if (pool->busy_thread_num * 2 < pool->live_thread_num
- && pool->task_num == 0
- && pool->live_thread_num - EXIT_NUM > pool->min_thread_num){
- pthread_mutex_unlock(&pool->pool_lock);
-
- sleep(1);
- pthread_mutex_lock(&pool->pool_lock);
- if (pool->busy_thread_num * 2 < pool->live_thread_num
- &&pool->task_num == 0
- && pool->live_thread_num - EXIT_NUM > pool->min_thread_num
- ) {
- pool->exit_thread_num = EXIT_NUM;
- pthread_mutex_unlock(&pool->pool_lock);
- int live_num = 0;
- int min_num = 0;
- int i = 0;
- do {
- pthread_mutex_lock(&pool->pool_lock);
- live_num = pool->live_thread_num;
- min_num = pool->live_thread_num;
- if (live_num > min_num) {
- pthread_cond_signal(&pool->can_work);
- }
- pthread_mutex_unlock(&pool->pool_lock);
- i++;
- live_num--;
- } while (live_num > min_num && i < EXIT_NUM);
- }
- else {
- pthread_mutex_unlock(&pool->pool_lock);
- }
- }
- else {
- pthread_mutex_unlock(&pool->pool_lock);
- }
-
-
- //添加线程
- pthread_mutex_lock(&pool->pool_lock);
- if (pool->live_thread_num + ADD_NUM <= pool->max__thread_num
- && pool->task_num >= pool->task_capacity * 0.9){ //存活线程数+添加线程数 <= 最大线程数 且 任务队列到达临界值
-
- pthread_t thread;
- //创建新线程
- for (int i = 0; i < ADD_NUM && pool->live_thread_num < pool->max__thread_num;i++) {
- pthread_create(&thread, PTHREAD_CREATE_JOINABLE, worker, pool);
- pool->live_thread_num++;
- }
- }
- pthread_mutex_unlock(&pool->pool_lock);
-
- //检查线程池状态
- pthread_mutex_lock(&pool->pool_lock);
- if (pool->shutdown) {
- pthread_mutex_unlock(&pool->pool_lock);
- exit_pool(pool);
- pthread_detach(pthread_self());
- pthread_exit(NULL);
- }
- pthread_mutex_unlock(&pool->pool_lock);
- sleep(1);
- }
- }
-
-
- //工作者线程执行函数
- void* worker(void* arg)
- {
- thread_pool* pool = (thread_pool*)arg;
-
- while (1) {
-
- pthread_mutex_lock(&pool->pool_lock);
- while (pool->task_num == 0){ //判断是否有任务
- pthread_cond_wait(&pool->can_work,&pool->pool_lock); //无任务时阻塞等待can_work条件变量
-
- if (pool->exit_thread_num > 0){ //判定是否要退出线程
- pool->live_thread_num--;
- pthread_mutex_unlock(&pool->pool_lock);
- pthread_detach(pthread_self());
- pthread_exit(NULL);
- }
- }
-
- //从任务队列中取出任务
- task obj = pool->task_list[pool->task_front];
-
- //修改队列头节点
- pool->task_front = (pool->task_front + 1) % pool->task_capacity; //任务取出 队头后移
- if (pool->task_num == pool->task_capacity) { //任务队列 :满->不满 通知添加任务函数添加任务
- pthread_cond_signal(&pool->can_add);
- }
- pool->task_num--;
- pthread_mutex_unlock(&pool->pool_lock);
-
- //修改忙线程数
- pthread_mutex_lock(&pool->busy_num_lock);
- pool->busy_thread_num++;
- pthread_mutex_unlock(&pool->busy_num_lock);
-
- //执行任务
- obj.function(obj.arg);
- if (obj.arg != NULL) { //任务函数参数为堆内存时 此处需要释放内存
- free(obj.arg);
- obj.arg = NULL;
- }
-
- //修改忙线程数
- pthread_mutex_lock(&pool->busy_num_lock);
- pool->busy_thread_num--;
- pthread_mutex_unlock(&pool->busy_num_lock);
- }
- }
-
-
- //查询忙线程数
- int query_busy(thread_pool* pool)
- {
- pthread_mutex_lock(&pool->busy_num_lock);
- int temp = pool->busy_thread_num;
- pthread_mutex_unlock(&pool->busy_num_lock);
- return temp;
- }
-
-
- //查询存活线程数
- int query_live(thread_pool* pool)
- {
- pthread_mutex_lock(&pool->pool_lock);
- int temp = pool->live_thread_num;
- pthread_mutex_unlock(&pool->pool_lock);
- return temp;
- }
-
-
- //关闭线程池
- void shutdown_pool(thread_pool* pool)
- {
- pthread_mutex_lock(&pool->pool_lock);
- pool->shutdown = true;
- pthread_mutex_unlock(&pool->pool_lock);
- }
- #include<stdio.h>
- #include"ThreadPool.h"
- #include<stdlib.h>
- #include<string.h>
-
- void func1(void* arg);
- void func2(void* arg);
- void func3(void* arg);
- static int var = 0;
-
- int main(int argc,char** argv)
- {
- thread_pool* pool = create_thread_pool(10, 160, 500); //参数:最小线程数,最大线程数,任务队列容量
-
- int a = 1, b = 3, c = 4;
- while (1) {
- var++;
- for (int i = 0; i < 500; i++) {
-
- /*add_task(pool, func1, NULL); //空参数
- add_task(pool, func2, NULL);
- add_task(pool, func3, NULL);*/
-
- int* arg_1 = (int*)malloc(4); //堆内存传参
- int* arg_2 = (int*)malloc(4);
- int* arg_3 = (int*)malloc(4);
- *arg_1 = a;
- *arg_2 = b;
- *arg_3 = c;
- add_task(pool, func1, arg_1);
- add_task(pool, func2, arg_2);
- add_task(pool, func3, arg_3);
- }
-
- //sleep(1);
- if (var == 50) { break; } //任务量:500*50
-
- }
-
- shutdown_pool(pool);
-
- sleep(3);
-
- free(pool);
-
- return 0;
- }
-
-
- void func1(void* arg) {
- //printf("------func --- 1 --num = %d\n",*(int*)arg);
- }
-
- void func2(void* arg) {
- //printf("------func --- 2 --num = %d\n", *(int*)arg);
- }
-
- void func3(void* arg) {
- //printf("------func --- 3 --num = %d\n",*(int*)arg);
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。