当前位置:   article > 正文

解读github上流行的ThreadPool源码_github 线程池

github 线程池

前言

偶然发现github上有个ThreadPool项目(GitHub - progschj/ThreadPool: A simple C++11 Thread Pool implementation ),star数居然3k+,里面也就两个文件,一个ThreadPool.h,一个example.cpp

看了一下,项目代码是cpp11写的。老实说,代码极其简洁又难懂。

下面是ThreadPool.h可以看看,有个直观印象。

  1. #ifndef THREAD_POOL_H
  2. #define THREAD_POOL_H
  3. #include <vector>
  4. #include <queue>
  5. #include <memory>
  6. #include <thread>
  7. #include <mutex>
  8. #include <condition_variable>
  9. #include <future>
  10. #include <functional>
  11. #include <stdexcept>
  12. class ThreadPool {
  13. public:
  14. ThreadPool(size_t);
  15. template<class F, class... Args>
  16. auto enqueue(F&& f, Args&&... args)
  17. -> std::future<typename std::result_of<F(Args...)>::type>;
  18. ~ThreadPool();
  19. private:
  20. // need to keep track of threads so we can join them
  21. std::vector< std::thread > workers;
  22. // the task queue
  23. std::queue< std::function<void()> > tasks;
  24. // synchronization
  25. std::mutex queue_mutex;
  26. std::condition_variable condition;
  27. bool stop;
  28. };
  29. // the constructor just launches some amount of workers
  30. inline ThreadPool::ThreadPool(size_t threads)
  31. : stop(false)
  32. {
  33. for(size_t i = 0;i<threads;++i)
  34. workers.emplace_back(
  35. [this]
  36. {
  37. for(;;)
  38. {
  39. std::function<void()> task;
  40. {
  41. std::unique_lock<std::mutex> lock(this->queue_mutex);
  42. this->condition.wait(lock,
  43. [this]{ return this->stop || !this->tasks.empty(); });
  44. if(this->stop && this->tasks.empty())
  45. return;
  46. task = std::move(this->tasks.front());
  47. this->tasks.pop();
  48. }
  49. task();
  50. }
  51. }
  52. );
  53. }
  54. // add new work item to the pool
  55. template<class F, class... Args>
  56. auto ThreadPool::enqueue(F&& f, Args&&... args)
  57. -> std::future<typename std::result_of<F(Args...)>::type>
  58. {
  59. using return_type = typename std::result_of<F(Args...)>::type;
  60. auto task = std::make_shared< std::packaged_task<return_type()> >(
  61. std::bind(std::forward<F>(f), std::forward<Args>(args)...)
  62. );
  63. std::future<return_type> res = task->get_future();
  64. {
  65. std::unique_lock<std::mutex> lock(queue_mutex);
  66. // don't allow enqueueing after stopping the pool
  67. if(stop)
  68. throw std::runtime_error("enqueue on stopped ThreadPool");
  69. tasks.emplace([task](){ (*task)(); });
  70. }
  71. condition.notify_one();
  72. return res;
  73. }
  74. // the destructor joins all threads
  75. inline ThreadPool::~ThreadPool()
  76. {
  77. {
  78. std::unique_lock<std::mutex> lock(queue_mutex);
  79. stop = true;
  80. }
  81. condition.notify_all();
  82. for(std::thread &worker: workers)
  83. worker.join();
  84. }
  85. #endif

example.cpp是对线程池ThreadPool.h的调用。

  1. #include <iostream>
  2. #include <vector>
  3. #include <chrono>
  4. #include "ThreadPool.h"
  5. int main()
  6. {
  7. ThreadPool pool(4);
  8. std::vector< std::future<int> > results;
  9. for(int i = 0; i < 8; ++i) {
  10. results.emplace_back(
  11. pool.enqueue([i] {
  12. std::cout << "hello " << i << std::endl;
  13. std::this_thread::sleep_for(std::chrono::seconds(1));
  14. std::cout << "world " << i << std::endl;
  15. return i*i;
  16. })
  17. );
  18. }
  19. for(auto && result: results)
  20. std::cout << result.get() << ' ';
  21. std::cout << std::endl;
  22. return 0;
  23. }

看了以上代码应该是要劝退不少人了,反正我的第一感觉是这样的。

ThreadPool分析

ThreadPool类中有:

5个成员变量
  • std::vector< std::thread > workers 用于存放线程的数组,用vector容器保存

  • std::queue< std::function<void()> > tasks 用于存放任务的队列,用queue队列进行保存。任务类型为std::function<void()>。因为 std::function是通用多态函数封装器,也就是说本质上任务队列中存放的是一个个函数

  • std::mutex queue_mutex 一个访问任务队列的互斥锁,在插入任务或者线程取出任务都需要借助互斥锁进行安全访问

  • std::condition_variable condition 一个用于通知线程任务队列状态的条件变量,若有任务则通知线程可以执行,否则进入wait状态

  • bool stop 标识线程池的状态,用于构造与析构中对线程池状态的了解

3个成员函数
  • ThreadPool(size_t) 线程池的构造函数

  • auto enqueue(F&& f, Args&&... args) 将任务添加到线程池的任务队列中

  • ~ThreadPool() 线程池的析构函数

    1. class ThreadPool {
    2. public:
    3. ThreadPool(size_t);
    4. template<class F, class... Args>
    5. auto enqueue(F&& f, Args&&... args)
    6. -> std::future<typename std::result_of<F(Args...)>::type>;
    7. ~ThreadPool();
    8. private:
    9. // need to keep track of threads so we can join them
    10. std::vector< std::thread > workers;
    11. // the task queue
    12. std::queue< std::function<void()> > tasks;
    13. // synchronization
    14. std::mutex queue_mutex;
    15. std::condition_variable condition;
    16. bool stop;
    17. };
    构造函数解析
    1. inline ThreadPool::ThreadPool(size_t threads)
    2. : stop(false)
    3. {
    4. for(size_t i = 0;i<threads;++i)
    5. workers.emplace_back(
    6. [this]
    7. {
    8. for(;;)
    9. {
    10. std::function<void()> task;
    11. {
    12. std::unique_lock<std::mutex> lock(this->queue_mutex);
    13. this->condition.wait(lock,
    14. [this]{ return this->stop || !this->tasks.empty(); });
    15. if(this->stop && this->tasks.empty())
    16. return;
    17. task = std::move(this->tasks.front());
    18. this->tasks.pop();
    19. }
    20. task();
    21. }
    22. }
    23. );
    24. }

    构造函数定义为inline。

    接收参数threads表示线程池中要创建多少个线程。

    初始化成员变量stopfalse,即表示线程池启动着。

    然后进入for循环,依次创建threads个线程,并放入线程数组workers中。

    在vector中,emplace_back()成员函数的作用是在容器尾部插入一个对象,作用效果与push_back()一样,但是两者有略微差异,即emplace_back(args)中放入的对象的参数,而push_back(OBJ(args))中放入的是对象。即emplace_back()直接在容器中以传入的参数直接调用对象的构造函数构造新的对象,而push_back()中先调用对象的构造函数构造一个临时对象,再将临时对象拷贝到容器内存中。

    我们知道,在C++11中,创建线程的方式为:

     std::thread t(fun);    //fun为线程的执行函数

    所以,上述workers.emplace_back()中,我们传入的lambda表达式就是创建线程的fun()函数。

    下面来分析下该lambda表达式:

    1. [this]{
    2. for(;;)
    3. {
    4. std::function<void()> task;
    5. {
    6. std::unique_lock<std::mutex> lock(this->queue_mutex);
    7. this->condition.wait(lock,
    8. [this]{ return this->stop || !this->tasks.empty(); });
    9. if(this->stop && this->tasks.empty())
    10. return;
    11. task = std::move(this->tasks.front());
    12. this->tasks.pop();
    13. }
    14. task();
    15. }
    16. }

    lambda表达式的格式为:

    [ 捕获 ] ( 形参 ) 说明符(可选) 异常说明 attr -> 返回类型 { 函数体 }

    所以上述lambda表达式为 [ 捕获 ] { 函数体 } 类型。

    该lambda表达式捕获线程池指针this用于在函数体中使用(调用线程池成员变量stop、tasks等)

    分析函数体,for(;;)为一个死循环,表示每个线程都会反复这样执行,这其实每个线程池中的线程都会这样。

    在循环中,,先创建一个封装void()函数的std::function对象task,用于接收后续从任务队列中弹出的真实任务。

    在C++11中,

    std::unique_lock<std::mutex> lock(this->queue_mutex);

    可以在退出作用区域时自动解锁,无需显式解锁。所以,{}起的作用就是在退出 } 时自动回释放线程池的queue_mutex。

    在{}中,我们先对任务队列加锁,然后根据条件变量判断条件是否满足。

    1. void wait(unique_lock<mutex>& lock, _Predicate p)
    2. {
    3. while (!p())
    4. wait(lock);
    5. }

    为条件标量wait的运行机制, wait在p 为false的状态下,才会进入wait(lock)状态。当前线程阻塞直至条件变量被通知

    this->condition.wait(lock,[this]{ return this->stop || !this->tasks.empty(); });

    所以p表示上述代码中的lambda表达式[this]{ return this->stop || !this->tasks.empty(); },其中this->stop为false, !this->tasks.empty()也为false。即其表示若线程池已停止或者任务队列中不为空,则不会进入到wait状态。

    由于刚开始创建线程池,线程池表示未停止,且任务队列为空,所以每个线程都会进入到wait状态。

(借用 Linux条件变量pthread_condition细节(为何先加锁,pthread_cond_wait为何先解锁,返回时又加锁)_pthread_cond_signal 为什么加锁-CSDN博客 一张图便于说明wait的过程)

在线程池刚刚创建,所有的线程都阻塞在了此处,即wait处。

若后续条件变量来了通知,线程就会继续往下进行:

  1. if(this->stop && this->tasks.empty())
  2. return;

这样,将任务队列中的第一个任务用task标记,然后将任务队列中该任务弹出。(此处线程实在获得了任务队列中的互斥锁的情况下进行的,从上图可以看出,在条件标量唤醒线程后,线程在wait周期内得到了任务队列的互斥锁才会继续往下执行。所以最终只会有一个线程拿到任务,不会发生惊群效应)

在退出了{ },我们队任务队列的所加的锁也释放了,然后我们的线程就可以执行我们拿到的任务task了,执行完毕之后,线程又进入了死循环。

至此,我们分析了ThreadPool的构造函数。

添加任务函数解析
  1. template<class F, class... Args>
  2. auto ThreadPool::enqueue(F&& f, Args&&... args)
  3. -> std::future<typename std::result_of<F(Args...)>::type>
  4. {
  5. using return_type = typename std::result_of<F(Args...)>::type;
  6. auto task = std::make_shared< std::packaged_task<return_type()> >(
  7. std::bind(std::forward<F>(f), std::forward<Args>(args)...)
  8. );
  9. std::future<return_type> res = task->get_future();
  10. {
  11. std::unique_lock<std::mutex> lock(queue_mutex);
  12. // don't allow enqueueing after stopping the pool
  13. if(stop)
  14. throw std::runtime_error("enqueue on stopped ThreadPool");
  15. tasks.emplace([task](){ (*task)(); });
  16. }
  17. condition.notify_one();
  18. return res;
  19. }

添加任务的函数本来不难理解,但是作者增加了许多新的C++11特性,这样就变得难以理解了。

  1. template<class F, class... Args>
  2. auto ThreadPool::enqueue(F&& f, Args&&... args)
  3. -> std::future<typename std::result_of<F(Args...)>::type>

equeue是一个模板函数,其类型形参为F与Args。其中class... Args表示多个类型形参。

auto用于自动推导出equeue的返回类型,函数的形参为(F&& f, Args&&... args),其中&&表示右值引用。表示接受一个F类型的f,与若干个Args类型的args。

-> std::future<typename std::result_of<F(Args...)>::type>

表示返回类型,与lambda表达式中的表示方法一样。

返回的是什么类型呢?

  1. typename std::result_of<F(Args...)>::type //获得以Args为参数的F的函数类型的返回类型
  2. std::future<typename std::result_of<F(Args...)>::type>
  3. //std::future用来访问异步操作的结果

所以,最终返回的是放在std::future中的F(Args…)返回类型的异步执行结果。

举个简单的例子来理解吧:

  1. // 来自 packaged_task 的 future
  2. std::packaged_task<int()> task([](){ return 7; }); // 包装函数,将lambda表达式进行包装
  3. std::future<int> f1 = task.get_future(); // 定义一个future对象f1,存放int型的值。此处已经表明:将task挂载到线程上执行,然后返回的结果才会保存到f1中
  4. std::thread(std::move(task)).detach(); // 将task函数挂载在线程上运行
  5. f1.wait(); //f1等待异步结果的输入
  6. f1.get(); //f1获取到的异步结果
  7. struct S {
  8. double operator()(char, int&);
  9. float operator()(int) { return 1.0;}
  10. };
  11. std::result_of<S(char, int&)>::type d = 3.14; // d 拥有 double 类型,等价于double d = 3.14
  12. std::result_of<S(int)>::type x = 3.14; // x 拥有 float 类型,等价于float x = 3.14

经过上述两个简单的小例子可以知道:

  1. -> std::future<typename std::result_of<F(Args...)>::type>
  2. //等价于
  3. //F(Args...) 为 int f(args)
  4. //std::result_of<F(Args...)>::type 表示为 int
  5. //std::future<int> f1
  6. //return f1
  7. //在后续我们根据f1.get就可以取出存放在里面的int值
  8. //最终返回了一个F(Args...)类型的值,而这个值是存储在std::future中,因为线程是异步处理的

接着分析:

 
 using return_type = typename std::result_of<F(Args...)>::type;

表示使用return_type表示F(Args...)的返回类型。

 
  auto task = std::make_shared< std::packaged_task<return_type()> >(
              std::bind(std::forward<F>(f), std::forward<Args>(args)...)
          );

由上述小例子,我们已经知道std::packaged_task是一个包装函数,所以

复制代码

  auto sp = std::make_shared<C>(12);   --->   auto sp = new C(12)  //创建一个智能指针sp,其指向一个用12初始化的C类对象
      
  std::packaged_task<return_type()>   //表示包装一个返回值为return_type的函数
      
  auto task = std::make_shared< std::packaged_task<return_type()> > (std::bind(std::forward<F>(f), std::forward<Args>(args)...)   //创建一个智能指针task,其指向一个用std::bind(std::forward<F>(f), std::forward<Args>(args)... 来初始化的 std::packaged_task<return_type()> 对象
  ​
  //即  std::packaged_task<return_type()> t1(std::bind(std::forward<F>(f), std::forward<Args>(args)...)
  //然后task指向了t1,即task指向了返回值为return_type的f(args)
      
  std::packaged_task<int()> task(std::bind(f, 2, 11));    //将函数f(2,11)打包成task,其返回值为int

复制代码

 

所以最终,task指向了传递进来的函数。

   std::future<return_type> res = task->get_future();
  //res中保存了类型为return_type的变量,有task异步执行完毕才可以将值保存进去

所以,res会在异步执行完毕后即可获得所求。

复制代码

  {
      std::unique_lock<std::mutex> lock(queue_mutex);
  ​
      // don't allow enqueueing after stopping the pool
      if(stop)
          throw std::runtime_error("enqueue on stopped ThreadPool");
  ​
      tasks.emplace([task](){ (*task)(); });  //(*task)() ---> f(args)
  }

复制代码

在新的作用于内加锁,若线程池已经停止,则抛出异常。

否则,将task所指向的f(args)插入到tasks任务队列中。需要指出,这儿的emplace中传递的是构造函数的参数。

  condition.notify_one(); //任务加入任务队列后,需要去唤醒一个线程
  return res; //待线程执行完毕,将异步执行的结果返回
 

经过上述分析,这样将每个人物插入到任务队列中的过程就完成了。

析构函数解析

复制代码

  inline ThreadPool::~ThreadPool()
  {
      {
          std::unique_lock<std::mutex> lock(queue_mutex);
          stop = true;
      }
      condition.notify_all();
      for(std::thread &worker: workers)
          worker.join();
  }

复制代码

在析构函数中,先对任务队列中加锁,将停止标记设置为true,这样后续即使有新的插入任务操作也会执行失败。

使用条件变量唤醒所有线程,所有线程都会往下执行:

  if(this->stop && this->tasks.empty())
      return;

在stop设置为true且任务队列中为空时,对应的线程进而跳出循环结束。

  for(std::thread &worker: workers)
     worker.join();

将每个线程设置为join,等到每个线程结束完毕后,主线程再退出。

主函数解析

复制代码

  ThreadPool pool(4); //创建一个线程池,池中线程为4
  std::vector< std::future<int> > results;    //创建一个保存std::future<int>的数组,用于存储4个异步线程的结果
  ​
  for(int i = 0; i < 8; ++i) {    //创建8个任务
      results.emplace_back(   //一次保存每个异步结果
          pool.enqueue([i] {  //将每个任务插入到任务队列中,每个任务的功能均为“打印+睡眠1s+打印+返回结果”
              std::cout << "hello " << i << std::endl;
              std::this_thread::sleep_for(std::chrono::seconds(1));
              std::cout << "world " << i << std::endl;
              return i*i;
          })
      );
  }
  ​
  for(auto && result: results)    //一次取出保存在results中的异步结果
      std::cout << result.get() << ' ';
  std::cout << std::endl;

复制代码

需要对主函数中的任务函数进行说明:

复制代码

  [i] {   //将每个任务插入到任务队列中,每个任务的功能均为“打印+睡眠1s+打印+返回结果”
      std::cout << "hello " << i << std::endl;
      std::this_thread::sleep_for(std::chrono::seconds(1));
      std::cout << "world " << i << std::endl;
      return i*i;
  }

复制代码

这个lambda表达式用来表示一个匿名函数,该函数分写执行 打印-睡眠-打印-返回结果。

pool.enqueue(fun);

对应于类中的

  auto ThreadPool::enqueue(F&& f, Args&&... args) 
      -> std::future<typename std::result_of<F(Args...)>::type>

其中,F&& f 是lambda表达式(或者说fun)的形参,而参数为0。

std::future<typename std::result_of<F(Args...)>::type>

则用来保存 i*i 

对应的

std::result_of<F(Args...)>::type    //int型

上述是简要的分析。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/952332
推荐阅读
相关标签
  

闽ICP备14008679号