赞
踩
线程池是用于解决多线程项目中的效率问题的,因此本文将从多线程入手,引出线程池的需求与作用,从而分析其原理与实现。
如若现在出现以下场景:多个客户端同时访问服务器时,若服务器对所有客户端以串行的方式进行统筹响应,则将会大大降低业务效率。
即:当某个程序需要同时进行多个操作(如io操作)时,便可以考虑使用多线程的并行思路进行解决。
但线程是需要占用系统资源的,当所需线程过多,如百万并发服务器时,是无法分配百万线程资源的,那么该如何解决该问题呢?
所谓池式组件就是通过*“提前向系统申请一些过量资源,并将资源交给自定义算法在软件内部进行分配”*的一种高性能组件。
作为中间件,池式组件将会对上层业务/程序/逻辑透明,即是否存在都不影响使用,但会影响使用时的性能与稳定性等。
而线程池则是针对多线程问题提出的一种池式组件,我们将在线程池中提前申请一些过量线程后,向线程池提交我们想要实现的任务,由线程池内部管理,将任务交给特定线程进行执行。
组成一个线程池的核心包括:执行线程队列,管理组件,任务队列
线程池应当包含接口如下:
线程池的工作逻辑为:管理组件管理着执行线程队列执行任务。
因此,下面将从基础开始,即以任务队列,执行线程队列,管理组件的顺序进行实现。
typedef void (*callback)(void*);
struct nTask{
callback func; // 函数地址
void* arg; // 函数参数
// 链表结构
struct nTask *next;
struct nTask *prev;
};
可以预知的是执行线程至少需要访问manager的锁资源
struct nWorker{
pthread_t threadid;
int terminal; // 线程退出标识
struct Manager* manager; // 需要访问manager资源
struct nWorker* next;
struct nWorker* prev;
};
链表操作将以define的方式进行实现
#define LIST_INSERT(item, list) do { \
item->prev = NULL; \
item->next = list; \
if ((list) != NULL) (list)->prev = item; \
(list) = item; \
} while(0)
#define LIST_REMOVE(item, list) do { \
if (item->prev != NULL) item->prev->next = item->next; \
if (item->next != NULL) item->next->prev = item->prev; \
if (list == item) list = item->next; \
item->prev = item->next = NULL; \
} while(0)
管理组件管理的内容便是任务队列与执行线程队列。
为了保证多线程过程中的临界资源安全,以及解决线程同步问题,需要使用互斥锁与条件变量。
typedef struct Manager {
struct nTask *tasks; // 任务队列
struct nWorker *workers; // 执行线程队列
pthread_mutex_t mutex; // 互斥锁
pthread_cond_t cond; // 条件变量
} ThreadPool;
其中,管理组件+任务队列+执行线程队列=简易线程池
// 初始化的任务即为对Manager结构体中的各变量进行初始化 // 包括:cond,mutex, 各workers int nThreadPoolCreate(ThreadPool *pool, int numWorkers) { // 保证输入参数安全性 if (pool == NULL) return -1; if (numWorkers < 1) numWorkers = 1; memset(pool, 0, sizeof(ThreadPool)); // 参数初始化 pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER; memcpy(&pool->cond, &blank_cond, sizeof(pthread_cond_t)); pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER; memcpy(&pool->mutex, &blank_mutex, sizeof(pthread_mutex_t)); // 各workers分别初始化 int i = 0; for (i = 0;i < numWorkers;i ++) { struct nWorker *worker = (struct nWorker*)malloc(sizeof(struct nWorker)); if (worker == NULL) { perror("malloc"); return -2; } memset(worker, 0, sizeof(struct nWorker)); worker->manager = pool; int ret = pthread_create(&worker->threadid, NULL, nThreadPoolCallback, worker); if (ret) { perror("pthread_create"); free(worker); return -3; } LIST_INSERT(worker, pool->workers); } return 0; }
// 销毁的工作即使各线程停止工作 int nThreadPoolDestory(ThreadPool *pool, int nWorker) { struct nWorker *worker = NULL; // 置位各worker停止标志位 for (worker = pool->workers;worker != NULL;worker = worker->next) { worker->terminate; } pthread_mutex_lock(&pool->mutex); // 各worker同步关闭 pthread_cond_broadcast(&pool->cond); pthread_mutex_unlock(&pool->mutex); pool->workers = NULL; pool->tasks = NULL; return 0; }
// 获取任务并执行 static void *nThreadPoolCallback(void *arg) { struct nWorker *worker = (struct nWorker*)arg; while (1) { // 访问临界资源需上锁 pthread_mutex_lock(&worker->manager->mutex); while (worker->manager->tasks == NULL) { if (worker->terminate) break; // 条件变量保证线程同步 pthread_cond_wait(&worker->manager->cond, &worker->manager->mutex); } // 若需要退出线程,则线程结束 if (worker->terminate) { pthread_mutex_unlock(&worker->manager->mutex); break; } struct nTask *task = worker->manager->tasks; LIST_REMOVE(task, worker->manager->tasks); pthread_mutex_unlock(&worker->manager->mutex); // 任务执行 task->func(task->arg); free(task); } free(worker); }
int nThreadPoolPushTask(ThreadPool *pool, struct nTask *task) {
pthread_mutex_lock(&pool->mutex);
LIST_INSERT(task, pool->tasks);
pthread_cond_signal(&pool->cond);
pthread_mutex_unlock(&pool->mutex);
}
好了,到这里一个简单的线程池即可完成了,xdm可以自行编写main函数进行测试。
如果觉得有帮助可以点一下赞,小编未来也可能会更新一些学习到的C++知识,感兴趣的可以关注一手,共同讨论学习进步。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。