当前位置:   article > 正文

c++ 线程池/Github 开源项目源码分析(progschj/ThreadPool)_c++ 线程池 开源库

c++ 线程池 开源库

前言

维基百科上对线程池的简要介绍:

线程池(thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

ThreadPool 项目地址

progschj/ThreadPool 是一个简易的基于 c++11 标准的线程池实现,采用了 Zlib license(相当宽松自由的开源协议,任意修改分发商用),截止当前时间点,已获得 7k+ stars。整个项目源码仅有一个头文件,代码行数不足一百行,早在多年前就已稳定不再更新。

项目源码:

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>

class ThreadPool {
public:
    ThreadPool(size_t);
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args)
        -> std::future<typename std::result_of<F(Args...)>::type>;
    ~ThreadPool();
private:
    // need to keep track of threads so we can join them
    std::vector< std::thread > workers;
    // the task queue
    std::queue< std::function<void()> > tasks;

    // synchronization
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};

// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
    : stop(false)
{
    for (size_t i = 0; i < threads; ++i)
        workers.emplace_back(
            [this]
            {
                for (;;)
                {
                    std::function<void()> task;
                    
                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock, [this] 
                        { 
                            return this->stop || !this->tasks.empty(); 
                        });

                        if (this->stop && this->tasks.empty())
                            return;

                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }

                    task();
                }
            }
    );
}

// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
    using return_type = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared< std::packaged_task<return_type()> >(
        std::bind(std::forward<F>(f), std::forward<Args>(args)...)
    );

    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queue_mutex);

        // don't allow enqueueing after stopping the pool
        if (stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");

        tasks.emplace([task]() { (*task)(); });
    }
    condition.notify_one();
    return res;
}

// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for (std::thread& worker : workers)
        worker.join();
}

#endif
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102

基本用法

// create thread pool with 4 worker threads
ThreadPool pool(4);

// enqueue and store future
auto result = pool.enqueue([](int answer) { return answer; }, 42);

// get result from future
std::cout << result.get() << std::endl;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

类成员变量

std::vector< std::thread > workers;
std::queue< std::function<void()> > tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
  • 1
  • 2
  • 3
  • 4
  • 5
  • workers:存储线程池中 std::thread 的容器
  • tasks:任务队列
  • queue_mutex:任务队列的互斥锁
  • condition:任务队列的条件变量
  • stop:线程池是否停止的标志位

类成员函数

构造函数的签名

inline ThreadPool::ThreadPool(size_t threads)
    : stop(false)
  • 1
  • 2
  • 构造函数传入一个 size_t 类型的参数,初始化线程池中线程的数量
  • 初始化列表将 stop 标志位初始化为 false

创建线程

for (size_t i = 0; i < threads; ++i)
{
    workers.emplace_back(
        [this]
        {
            for (;;)
            {
                //...
            }
        }
    );
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 使用 for 循环创建 threads 个线程,将线程加入 workers 容器
  • lambda 表达式用于创建线程,捕获 this,lambda 表达式中包含一个无限循环

线程默认的任务

std::function<void()> task;

{
    std::unique_lock<std::mutex> lock(this->queue_mutex);
    this->condition.wait(lock, [this] 
    { 
        return this->stop || !this->tasks.empty(); 
    });

    if (this->stop && this->tasks.empty())
        return;

    task = std::move(this->tasks.front());
    this->tasks.pop();
}

task();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 首先声明了一个 std::function<void()> 类型的变量 task
  • 在互斥锁保护任务队列后,调用 condition.wait() 等待任务队列非空或线程池停止,线程创建后,会在这里等待;如果 stop 标志位为 true 或者任务队列不为空,解除等待,继续往下执行
  • 如果标志位 stop 为 true,且任务队列为空,此任务将退出
  • 以上条件都通过后,将从任务队列中取出一个任务 task,移动到局部变量 task 中(吐槽下:距离 c++11 标准的发布已经过去了十几年,现在还不明白这一条的,就很难评价了)
  • 执行 task(),也就是上一步从队列头部取出的任务

向任务队列中添加一个任务

template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
    -> std::future<typename std::result_of<F(Args...)>::type>;
  • 1
  • 2
  • 3
  • enqueue 函数模板,用于任务的入队列
  • F&& f,这里预期是一个任意的 callable 对象
  • Args&&... args,一个可变模板参数,会在编译期展开参数包
  • 返回值是一个 std::future 类型的对象,用于获取任务的执行结果,std::future 的模板参数使用 std::result_of 萃取可调用对象的返回值类型
  • 注意,c++17 后 std::result_of 就已经是 deprecated,可以使用 std::invoke_result 类型萃取

继续往下看 enqueue 函数的实现:

using return_type = typename std::result_of<F(Args...)>::type;
  • 1
  • 使用 std::result_of 类型萃取可调用对象的返回值类型,并使用 using 为其起个别名 reture_type
auto task = std::make_shared< std::packaged_task<return_type()> >(
    std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
  • 1
  • 2
  • 3

这一段做了好几件事,一步一步拆解:

  • std::make_shared 构建一个 std::shared_ptr
  • std::packaged_task 模板是用于包装 callable 对象,使用了前面推导出的 return_type 类型来实例化模板
  • std::make_shared 需要调用实例类型的构造函数,而 std::packaged_task 的构造函数需要一个可调用对象,所以这里使用 std::bind 将可变模板参数绑定给 f(对 std::bind 不熟悉的建议先行查阅资料),std::forward 转发一下类型
  • 简单来说,以上只是构建一个 callable 对象的包装器
std::future<return_type> res = task->get_future();
{
    std::unique_lock<std::mutex> lock(queue_mutex);

    // don't allow enqueueing after stopping the pool
    if (stop)
        throw std::runtime_error("enqueue on stopped ThreadPool");

    tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return res;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 从 task 中获取 std::future 对象
  • 使用大括号控制代码块,在这个代码块中上锁
  • 如果线程池已经停止,抛出异常
  • 否则正常执行,将 task 推入到队列尾部
  • 条件变量通知一个等待的线程,这个时候,构造函数中 condition.wait() 会被唤醒,以执行后面的代码块,即从队列头部取出一个任务并执行
  • 最后返回 std::future 对象

析构函数

遵循 RAII 原则,释放所有资源

{
    std::unique_lock<std::mutex> lock(queue_mutex);
    stop = true;
}
condition.notify_all();
for (std::thread& worker : workers)
    worker.join();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 上锁,将停止标志位置为 true
  • 通知所有等待的线程
  • 等待所有线程终止

总结

该项目仅是一个线程池的简易实现,对学习 c++11 标准的多线程及部分特性有一定帮助,如果想要更复杂的具有各种调度策略的线程池,还需进一步细化。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/763423
推荐阅读
相关标签
  

闽ICP备14008679号