当前位置:   article > 正文

线程池的工作原理

线程池的工作原理

一、应用场景

传统并发变成的缺陷:

1.创建和销毁线程上花费的时间和消耗的系统资源,甚至可能要比花在处理实际的用户请求的时间和资源要多得多
2. 活动的线程需要消耗系统资源,如果启动太多,会导致系统由于过度消耗内存或“切换过度”而导致系统资源不足

  1. 遇到特别耗时的任务 ,需要把它当成一个任务放在线程池里处理, 比较复杂的流程可拆分成单个简单的任务再放在线程池中
  2. 用于异步解耦,如日志的罗盘、 数据库的访问
  3. 线程资源的开销与cpu核心之间的平衡选择

I/O密集型任务:
当应用程序执行大量I/O操作(例如,网络通信或磁盘读写)时,线程池可以通过创建足够数量的线程来处理等待I/O操作的线程,从而最大化CPU的利用率。线程数 = 2* cpu核数 + 2
CPU密集型任务:
对于计算密集型任务,线程数应接近或等于CPU核心数,以避免过多的上下文切换和资源竞争,从而保持CPU的高效运行。线程数 = cpu核数
混合型任务:
在既有CPU密集型又有I/O密集型任务的情况下,线程池可以根据任务的特性动态调整线程数量,以实现资源的最优分配。 线程数 = (io等待时间 + cpu运算时间)* 核心数 / cpu 运算时间

二、工作原理

线程池由一个任务队列和一组处理队列的线程组成。一旦工作进程需要处理某个可能“阻塞”的操作,不用自己操作,工作进程将其作为一个任务放到线程池的队列,接着会被某个空闲线程提取处理。
在这里插入图片描述

工作进程把阻塞操作转给线程池

线程池是一个生产消费模型,由生产者线程发布任务。一般组成:
1.任务队列: 存放所有的任务。每个任务包含任务回调函数 、任务的上下文参数等
2.执行队列:存放所有的线程,管理线程的空闲和工作状态。
线程池中的线程队列不是绝对必须的组件,但它提供了多种便利和优势,使得线程池的管理和任务调度更加高效。
线程队列的特性:

简化管理:线程队列可以帮助简化线程的管理,使得线程池可以轻松跟踪哪些线程是空闲的,哪些是忙碌的。
支持实现负载均衡算法,确保任务在工作线程之间均匀分配,避免某些线程过载而其他线程空闲。
提高效率:通过线程队列,线程池可以快速地为新任务分配可用线程,从而减少任务等待时间和提高整体处理效率。
动态调整:如果线程池实现了动态调整线程数量的功能,线程队列可以作为一个缓冲,帮助平滑地进行线程的添加或移除。
工作窃取:在工作窃取模型中,线程队列是核心组件,允许忙碌的线程从其他线程的队列中窃取任务,以此平衡负载。
避免资源浪费:没有线程队列的情况下,线程池可能需要不断地创建和销毁线程,这会浪费资源并增加系统负担。
实现复杂的调度策略:线程队列使得实现复杂的调度策略(如优先级调度、轮询调度等)变得更加容易。
同步和协调:线程队列可以与同步机制(如条件变量)结合使用,以协调线程间的工作,避免忙等。
错误恢复:如果任务执行失败,线程队列可以帮助线程池恢复执行,例如通过重新调度失败的任务。
资源限制:线程队列有助于限制活跃线程的数量,防止系统资源的过度消耗。
然而,如果你的应用场景非常简单,或者有特殊的性能要求,你可能会设计一个不包含线程队列的线程池。例如,你可能有一个固定数量的线程,它们不断地轮询任务队列来寻找任务,而不是等待任务分配。这种方法可能会减少一些管理开销,但在大多数情况下,线程队列提供的好处会超过其带来的复杂性。

3.管理组件:条件变量和互斥锁,条件变量通常前后加上互斥锁 、减去互斥锁配合操作

三、主要函数

创建线程池后,可使用事件驱动型框架epoll投递任务到线程池,最后销毁线程池。主要函数:

1.创建线程池

thread_pool_t* thread_pool_init()
{
    int             err;
    pthread_t       tid;
    uint_t          n;
    pthread_attr_t  attr;
	thread_pool_t   *tp=NULL;
    
	tp = (thread_pool_t * )calloc(1,sizeof(thread_pool_t));

	if(tp == NULL){
	    fprintf(stderr, "thread_pool_init: calloc failed!\n");
	}
    //初始化线程池的基本参数,如最大线程数、最小线程数、任务队列容量等
	thread_pool_init_default(tp, NULL);

    //初始化线程属性对象,并根据需要设置属性,如栈大小、调度策略等
    thread_pool_queue_init(&tp->queue);
    
	//创建互斥锁
    if (thread_mutex_create(&tp->mtx) != T_OK) {    
		free(tp);
        return NULL;
    }
	//创建条件变量
    if (thread_cond_create(&tp->cond) != T_OK) {    
        (void) thread_mutex_destroy(&tp->mtx);
		free(tp);
        return NULL;
    }

    err = pthread_attr_init(&attr);
    if (err) {
        fprintf(stderr, "pthread_attr_init() failed, reason: %s\n",strerror(errno));
		free(tp);
        return NULL;
    }

   //设置将要创建线程的默认状(detached thread),在这种状态下,线程结束后,系统会自动回收其资源,无需调用 pthread_join()
    err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    if (err) {
        fprintf(stderr, "pthread_attr_setdetachstate() failed, reason: %s\n",strerror(errno));
		free(tp);
        return NULL;
    }

   //创建线程,设置线程回调函数
    for (n = 0; n < tp->threads; n++) {
        err = pthread_create(&tid, &attr, thread_pool_cycle, tp);
        if (err) {
            fprintf(stderr, "pthread_create() failed, reason: %s\n",strerror(errno));
			free(tp);
            return NULL;
        }
    }

    (void) pthread_attr_destroy(&attr);

    return tp;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

2.投递任务线程池
在线程池中,任务队列可能被多个线程访问,包括投递任务的生产者线程和从队列中取出任务的工作线程。锁可以防止这些线程同时修改队列,从而避免冲突

int_t thread_task_post(thread_pool_t *tp, thread_task_t *task)
{
    if (thread_mutex_lock(&tp->mtx) != T_OK) {
        return T_ERROR;
    }

    if (tp->waiting >= tp->max_queue) {
        (void) thread_mutex_unlock(&tp->mtx);

        fprintf(stderr,"thread pool \"%s\" queue overflow: %ld tasks waiting\n",
                      tp->name, tp->waiting);
        return T_ERROR;
    }

    task->id = thread_pool_task_id++;
    task->next = NULL;

    //通知线程池有任务投递进来
    if (thread_cond_signal(&tp->cond) != T_OK) {
        (void) thread_mutex_unlock(&tp->mtx);
        return T_ERROR;
    }

    *tp->queue.last = task;
    tp->queue.last = &task->next;

    tp->waiting++;

    (void) thread_mutex_unlock(&tp->mtx);

    if(debug)fprintf(stderr,"task #%lu added to thread pool \"%s\"\n",
                   task->id, tp->name);

    return T_OK;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

3.线程入口函数
线程创建时已指定。如果任务队列为空 ,线程阻塞 pthead_cond_wait 。任务投递后线程收到条件变量信号,抢占互斥锁,然后到任务队列取出任务执行。这里使用轮询调度方式:按照任务到达的顺序,将任务分配给工作线程。

还有其他任务调度方式:
优先级对应列:为每个任务分配一个优先级,优先级高的任务先执行。可以用堆等数据结构来快速访问最高优先级的任务。
工作窃取(Work Stealing)
策略:每个工作线程维护自己的任务队列。当一个线程完成自己的任务后,它可以从其他线程的队列中“窃取”任务。
最少工作优先(Least Work First Scheduling)
策略:优先分配任务给完成任务数量最少的线程。

static void *thread_pool_cycle(void *data)
{
    thread_pool_t *tp =(thread_pool_t * )data;

    int                 err;
    thread_task_t       *task;

    if(debug)fprintf(stderr,"thread in pool \"%s\" started\n", tp->name);

    for ( ;; ) {
        if (thread_mutex_lock(&tp->mtx) != T_OK) {
            return NULL;
        }

        tp->waiting--;

        while (tp->queue.first == NULL) {
            if (thread_cond_wait(&tp->cond, &tp->mtx)
                != T_OK)
            {
                (void) thread_mutex_unlock(&tp->mtx);
                return NULL;
            }
        }

        task = tp->queue.first;
        tp->queue.first = task->next;

        if (tp->queue.first == NULL) {
            tp->queue.last = &tp->queue.first;
        }
		
        if (thread_mutex_unlock(&tp->mtx) != T_OK) {
            return NULL;
        }
        if(debug) fprintf(stderr,"run task #%lu in thread pool \"%s\"\n",
                       task->id, tp->name);
                       
        task->handler(task->ctx);
        
        if(debug) fprintf(stderr,"complete task #%lu in thread pool \"%s\"\n",task->id, tp->name);
        task->next = NULL;
        
        thread_task_free(task);

        //notify 
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

4.销毁线程池

void thread_pool_destroy(thread_pool_t *tp)
{
    uint_t           n;
    thread_task_t    task;
    volatile uint_t  lock;

    memset(&task,'\0', sizeof(thread_task_t));

    //执行thread_exit(0)线程退出任务
    task.handler = thread_pool_exit_handler;
    task.ctx = (void *) &lock;

    for (n = 0; n < tp->threads; n++) {
        lock = 1;

        if (thread_task_post(tp, &task) != T_OK) {
            return;
        }
        //循环使用 sched_yield() 让出 CPU 等待 lock 变为非阻塞状态(即线程安全退出)
        while (lock) {
            sched_yield();
        }
    }

    //销毁同步机制:条件变量互斥锁
    (void) thread_cond_destroy(&tp->cond);
    (void) thread_mutex_destroy(&tp->mtx);

    //释放内存池
	free(tp);
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

好处总结:

1.复用线程资源
2.充分利用系统资源
3.异步执行耗时任务

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/680565
推荐阅读
相关标签
  

闽ICP备14008679号