赞
踩
本文旨在纯c实现线程池。
线程池线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。
1.减少了线程的创建与销毁
2.异步解耦,在具体事务处理过程中大大的提高了效率。
主要是为了让具体事务得到异步方式同时处理,这样可以大大提高效率。比如说主线程只负责抛任务到有很多线程的线程池里面,抛完之后主线程就继续抛任务,在这个抛任务的同时,子线程就在完成相应的任务,也就是业务处理。比较耗时的操作如数据库操作,io处理等,都可以用线程池来提高效率。
将任务抛到线程池中,先放到线程池中的任务队列,然后各线程再从任务队列中拿任务来处理,处理后返回结果。
我们在使用线程池的时候,线程池应该提供哪些api?
主要就是以下三种:
1.线程池的初始化(创建) init/create
2.往池里面抛任务push_task
3.线程池的销毁 deinit/destroy
其他的扩展接口可有可无,这三个是重中之重。
线程内主要就是实现任务队列
,执行队列
,以及一个池管理组件
。池管理组件负责任务队列与执行队列之间的调度等让他们有秩序的工作,可以理解为中间人什么的。
typedef struct NTASK {
void (*task_func)(void *arg);
void *user_data;
struct NTASK *prev;
struct NTASK *next;
} task_t;
每个任务有自己要执行的事件,所以这里定义了一个万能函数指针。每个任务都有自己的数据,再定义一个万能指针来指向它。因为是任务队列,所以有前驱和后继。
typedef struct NTHREADPOLL {
worker_t *workers;
task_t *tasks;
pthread_cond_t cond;
pthread_mutex_t mutex;
} thread_poll_t;
这里需要定义两个指针来指向任务队列与执行队列,因为要管理。这里还定义了一个条件变量与一把互斥锁。当执行队列的线程同时向任务队列取任务的时候需要对任务队列加锁。当任务很少的时候,不需要这么多线程运行,就需要让他们条件变量等待,条件变量使线程可以睡眠等待
某种条件出现不继续执行也就不占用CPU了。
typedef struct NWORKER {
pthread_t id;
int termination;
struct NTHREADPOLL *thread_poll;
struct NWORKER *prev;
struct NWORKER *next;
} worker_t;
执行队列是一个个的线程,需要有各自的线程id以供外界识别回收等等操作。这个termination就是一个线程的状态,用来判断线程状态将线程销毁退出。(当然,也可以直接销毁线程,但是在销毁的时候不好判断线程正处于什么状态就把线程给关闭了)。
下面宏定义的 ‘’ 符号不要看成注释了,这是个续行符
,有了续航符才能在多行输入。
头插法添加:
#define LL_ADD(item, list)do{ \
item->prev=NULL; \
item->next=list; \
if(list!=NULL){ \
list->prev=item; \
} \
list=item; \
}while(0)
删除:
#define LL_REMOVE(item, list)do{ \
if(item->prev!=NULL){ \
item->prev->next=item->next; \
} \
if(item->next!=NULL){ \
item->next->prev=item->prev; \
} \
if(list==item){ \
list=item->next; \
} \
item->prev=item->next=NULL; \
}while(0)
创建其实就是创建初始化thread_poll_t结构体,然后按照给定的宏创建线程数量到执行队列。
int thread_poll_create(thread_poll_t *thread_poll, int thread_num) { if (thread_num < 1)thread_num = 1; memset(thread_poll, 0, sizeof(thread_poll_t)); //init cond pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER; memcpy(&thread_poll->cond, &blank_cond, sizeof(pthread_cond_t)); //init mutex pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER; memcpy(&thread_poll->mutex, &blank_mutex, sizeof(pthread_mutex_t)); // one thread one worker int idx = 0; for (idx = 0; idx < thread_num; idx++) { worker_t *worker = malloc(sizeof(worker_t)); if (worker == NULL) { perror("worker malloc err\n"); return idx; } memset(worker, 0, sizeof(worker_t)); worker->thread_poll = thread_poll; int ret = pthread_create(&worker->id, NULL, thread_callback, worker); if (ret) { perror("pthread_create err\n"); free(worker); return idx; } LL_ADD(worker, thread_poll->workers); } return idx; }
就是给task队列增加一个任务,然后用signal通知cond(条件变量)。
int thread_poll_push_task(thread_poll_t *thread_poll, task_t *task) {
pthread_mutex_lock(&thread_poll->mutex);
LL_ADD(task, thread_poll->tasks);
pthread_cond_signal(&thread_poll->cond);
pthread_mutex_unlock(&thread_poll->mutex);
}
销毁将所有线程的termination置1,然后广播cond,让所有线程池的线程条件变量收到信号继续执行,具体怎么销毁退出往下看就能知道这样设计的优雅。
void thread_destroy(thread_poll_t *thread_poll) {
worker_t *worker = NULL;
for (worker = thread_poll->workers; worker != NULL; worker = worker->next) {
worker->termination = 1;
}
pthread_mutex_lock(&thread_poll->mutex);
pthread_cond_broadcast(&thread_poll->cond);
pthread_mutex_unlock(&thread_poll->mutex);
}
task_t *get_task(worker_t *worker) { while (1) { pthread_mutex_lock(&worker->thread_poll->mutex); //条件变量 等待任务到来 while (worker->thread_poll->workers == NULL) { if (worker->termination)break; pthread_cond_wait(&worker->thread_poll->cond, &worker->thread_poll->mutex); } if (worker->termination) { pthread_mutex_unlock(&worker->thread_poll->mutex); return NULL; } task_t *task = worker->thread_poll->tasks; if (task) { LL_REMOVE(task, worker->thread_poll->tasks); } pthread_mutex_unlock(&worker->thread_poll->mutex); if (task != NULL) { return task; } } } void *thread_callback(void *arg) { worker_t *worker = (worker_t *) arg; while (1) { task_t *task = get_task(worker); if (task == NULL) { free(worker); pthread_exit("thread termination\n"); } task->task_func(task); } }
这里我们创建了1000个task,开了10个thread。记住task以及task的参数,由task的func来销毁。
记得编译的时候pthread不是Linux下的默认的库,也就是在链接的时候,无法找到phread库中个函数的入口地址,于是链接会失败。记得在gcc编译的时候,附加要加 -lpthread参数。
#include <pthread.h> #include <memory.h> #include <malloc.h> #include <stdio.h> #include <unistd.h> #define LL_ADD(item, list)do{ \ item->prev=NULL; \ item->next=list; \ if(list!=NULL){ \ list->prev=item; \ } \ list=item; \ }while(0) #define LL_REMOVE(item, list)do{\ if(item->prev!=NULL){ \ item->prev->next=item->next; \ } \ if(item->next!=NULL){ \ item->next->prev=item->prev; \ } \ if(list==item){ \ list=item->next; \ } \ item->prev=item->next=NULL; \ }while(0) typedef struct NTASK { void (*task_func)(void *arg); void *user_data; struct NTASK *prev; struct NTASK *next; } task_t; typedef struct NWORKER { pthread_t id; int termination; struct NTHREADPOLL *thread_poll; struct NWORKER *prev; struct NWORKER *next; } worker_t; typedef struct NTHREADPOLL { worker_t *workers; task_t *tasks; pthread_cond_t cond; pthread_mutex_t mutex; } thread_poll_t; task_t *get_task(worker_t *worker) { while (1) { pthread_mutex_lock(&worker->thread_poll->mutex); //条件变量 等待任务到来 while (worker->thread_poll->tasks == NULL) { if (worker->termination)break; pthread_cond_wait(&worker->thread_poll->cond, &worker->thread_poll->mutex); } if (worker->termination) { pthread_mutex_unlock(&worker->thread_poll->mutex); return NULL; } task_t *task = worker->thread_poll->tasks; if (task) { LL_REMOVE(task, worker->thread_poll->tasks); } pthread_mutex_unlock(&worker->thread_poll->mutex); if (task != NULL) { return task; } } } void *thread_callback(void *arg) { worker_t *worker = (worker_t *) arg; while (1) { task_t *task = get_task(worker); if (task == NULL) { free(worker); pthread_exit("thread termination\n"); } task->task_func(task); } } int thread_poll_create(thread_poll_t *thread_poll, int thread_num) { if (thread_num < 1)thread_num = 1; memset(thread_poll, 0, sizeof(thread_poll_t)); //init cond pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER; memcpy(&thread_poll->cond, &blank_cond, sizeof(pthread_cond_t)); //init mutex pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER; memcpy(&thread_poll->mutex, &blank_mutex, sizeof(pthread_mutex_t)); // one thread one worker int idx = 0; for (idx = 0; idx < thread_num; idx++) { worker_t *worker = malloc(sizeof(worker_t)); if (worker == NULL) { perror("worker malloc err\n"); return idx; } memset(worker, 0, sizeof(worker_t)); worker->thread_poll = thread_poll; int ret = pthread_create(&worker->id, NULL, thread_callback, (void *)worker); if (ret) { perror("pthread_create err\n"); free(worker); return idx; } LL_ADD(worker, thread_poll->workers); } return idx; } void thread_poll_push_task(thread_poll_t *thread_poll, task_t *task) { pthread_mutex_lock(&thread_poll->mutex); LL_ADD(task, thread_poll->tasks); pthread_cond_signal(&thread_poll->cond); pthread_mutex_unlock(&thread_poll->mutex); } void thread_destroy(thread_poll_t *thread_poll) { worker_t *worker = NULL; for (worker = thread_poll->workers; worker != NULL; worker = worker->next) { worker->termination = 1; } pthread_mutex_lock(&thread_poll->mutex); pthread_cond_broadcast(&thread_poll->cond); pthread_mutex_unlock(&thread_poll->mutex); } void counter(task_t *task) { int idx = *(int *) task->user_data; printf("idx:%d pthread_id:%lu\n", idx, pthread_self()); free(task->user_data); free(task); } #define THREAD_COUNT 10 #define TASK_COUNT 1001 int main() { thread_poll_t thpol; int threadret= thread_poll_create(&thpol,THREAD_COUNT); if(threadret!=THREAD_COUNT){ printf("poll creat error\n"); return -1; } int i=0; for(int i=0;i<TASK_COUNT;i++){ task_t *task=malloc(sizeof(task_t)); if(task==NULL){ printf("task malloc error\n"); return -1; } task->task_func=counter; task->user_data=malloc(sizeof(int)); *(int *)(task->user_data)=i; thread_poll_push_task(&thpol,task); } //提早销毁线程可能任务还没全都处理完 //thread_destroy(&thpol); getchar(); return 0; }
线程到底初始化多少呢?如果是计算密集型就不用太多的线程,如果是任务密集型可以多几个。以下是经验值,不一定一定按照这个来。
计算密集型:强计算,计算时间较长,线程数量与cpu核心数成比例即可,如1:1。
任务密集型:处理任务,io操作。可以开多一点,如cpu核心数的2倍。
随着任务越来越多,线程不够用怎么办?我们可以开一个监控线程,设n=running线程 / 总线程。当n>上水位时,监控线程创建几个线程;当n<下水位时,监控线程销毁几个线程。可以设置30%和70%。
本文参考纯c手写线程池
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。