赞
踩
在传统服务器结构中,常用一个总的监听线程监听新用户连接,当有一个新用户进入时,服务器就开启一个新的线程,用于处理这个用户的数据收发,这个线程只服务于这个用户,当用户与服务器端连接关闭以后,服务器将销毁这个线程
然而频繁地开辟与销毁线程会极大地占用系统资源,线程池的基本思想是提前创建好一些线程,有新任务到来时,则在线程队列中找到一个空闲线程来处理,处理完后线程不退出重回空闲状态继续等待新任务。如果没有空闲线程,则将任务存放到队列, 等待线程池内有线程空闲以后再从任务队列取出任务进行处理。这样就避免了线程创建和销毁的系统资源开销
线程池应用场景:一个应用要频繁的创建和销毁线程,但任务执行的时间又非常短,这样线程创建和销毁的系统资源开销就不容忽视,这时就可以使用线程池了
线程池包含四个部分:
i. 线程队列,存储创建的多个线程
ii. 任务队列,存储需要处理的任务,使用 list 链表实现,当有新任务到来时,将节点插入到 list 尾部,当空闲线程读取任务进行处理时,则读取 list 头部节点
iii. 信号量 sem,指示任务队列中的任务个数,当有新任务加入时,执行 sem_post 操作+1,空闲线程通过 sem_wait 等待任务队列变成非空状态
iv. 互斥量 mutex, 用于执行添加/移除 list 节点时,进行多线程保护
a. 创建并开始多个线程,线程执行函数是个 while 循环,循环等待任务队列不为空,一旦不为空,则读取 list 头节点,获取任务执行方法和参数,然后执行任务,完成后释放头结点空间
b. 添加新任务到队列,申请新任务节点空间,用于存储新任务执行方法和参数,然后将新任务节点插入到 list 的尾部
- #include <stdio.h>
- #include <stdlib.h>
- #include <unistd.h>
- #include <semaphore.h>
- #include <pthread.h>
-
- #define MAXNUM_THREAD 3
- #define MAXNUM_JOB 10
-
- typedef void* (*Func)(void *arg);
-
- struct job {
- Func func;
- void *arg;
- struct job *next;
- };
-
- struct pool {
- pthread_t threads[MAXNUM_THREAD];
- struct job *head;
- pthread_mutex_t mutex; // used for add/remove list node action
- sem_t sem; // indicate threads can get one job node from list
- };
-
- void* threadFunc(void* arg)
- {
- Func jobFunc;
- void *jobArg;
- struct job *head;
- struct pool *pl = (struct pool*)arg;
-
- while (1)
- {
- sem_wait(&(pl->sem)); // wait new job node added to list
- pthread_mutex_lock(&(pl->mutex)); // wait other threads complete list action
- head = pl->head;
- pl->head = pl->head->next; // unlink head node from list, free after func done
- pthread_mutex_unlock(&(pl->mutex));
-
- jobFunc = head->func;
- jobArg = head->arg;
- jobFunc(jobArg);
- free(head); // free unlinked head node
- }
- }
-
- void poolInit(struct pool *pl)
- {
- pl->head = NULL;
- pthread_mutex_init(&(pl->mutex), NULL);
- sem_init(&(pl->sem), 0, 0);
-
- for (int i=0; i<MAXNUM_THREAD; i++)
- {
- pthread_create(&(pl->threads[i]), NULL, threadFunc, (void *)pl);
- printf("thread created: %u\n", (unsigned int)(pl->threads[i]));
- }
- }
-
- void addJob(struct pool *pl, Func func, void *arg)
- {
- struct job* jobNode = (struct job*)malloc(sizeof(struct job));
- jobNode->func = func;
- jobNode->arg = arg;
- jobNode->next = NULL;
-
- pthread_mutex_lock(&(pl->mutex));
- if (NULL == pl->head) // add job node to list tail
- {
- pl->head = jobNode;
- }
- else
- {
- struct job *head = pl->head;
- while (head->next)
- {
- head = head->next;
- }
- head->next = jobNode;
- }
- pthread_mutex_unlock(&(pl->mutex));
- sem_post(&(pl->sem)); // threads can get job node from list
- }
-
- void* jobFunc(void* arg)
- {
- int index = *((int*)arg);
- printf("job: %d started by thread: %u\n", index, (unsigned int)pthread_self());
- sleep(rand()%5);
- printf("job: %d ended\n", index);
- }
-
- int main()
- {
- int i = 0;
- int jobIds[MAXNUM_JOB] = {0};
- struct pool pl;
-
- poolInit(&pl);
- for (i=0; i<MAXNUM_JOB; i++)
- {
- jobIds[i] = i;
- printf("job: %d added\n", i);
- addJob(&pl, jobFunc, (void*)&jobIds[i]); // add job node to list
- sleep(rand()%2);
- }
-
- for (i=0; i<MAXNUM_THREAD; i++)
- {
- pthread_join(pl.threads[i], NULL);
- }
- pthread_mutex_destroy(&(pl.mutex));
- sem_destroy(&(pl.sem));
-
- return 0;
- }
-
运行结果如下:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。