当前位置:   article > 正文

C++百行项目,实现多线程池,ThreadPool代码解析_c++ threadpool

c++ threadpool

1、介绍

        该项目是一个实现多线程池的简单项目,我将它作为简单的C++项目入门,代码并不多但是精读该代码对于学习C++的新特性以及线程实现有很好的作用。本文会详细讲解代码及使用的库特性,作为初学者如有错误敬请指正。

2、头文件

2.1、头文件包含的库

  1. #ifndef THREAD_POOL_H
  2. #define THREAD_POOL_H
  3. #include <vector>
  4. #include <queue>
  5. #include <memory>
  6. #include <thread>
  7. #include <mutex>
  8. #include <condition_variable>
  9. #include <future>
  10. #include <functional>
  11. #include <stdexcept>

        memory库:智能指针库,操作用std::shared_ptr或std::make_shared创建指针,通过该库创建的指针可以在不再需要指针后自动释放,避免在线程未结束前指针被释放引起崩溃。

        thread库:线程库,通过std::thread创建线程,并通过.join()保护线程防止在其他线程未结束之前主线程结束,.joinabel()判断是否可以保护线程。

        mutex库:互斥锁库,配合线程使用防止不同线程对同一个变量进行操作,整个文件中只能有一个互斥锁定义std::mutex mtx。最简单的操作方式是在写操作之前mtx.lock()锁进程,用mtx.unlock()解锁进程。最最常用的方式是std::unique_lock<std:mutex> lg(mtx),当构造函数调用时,互斥量自动锁定,当析构函数调用时,互斥量自动释放。并且它只能在局部作用域中使用。相比于std::lock_guard<std:mutex> lg(mtx),std::unique_lock<std:mutex> lg(mtx)有能够限时等待的操作方式。

        condition_variable库:条件变量库,std::condition_variable cv配合互斥锁使用,cv.wait(lock,false):需要等待。cv.notify_one():通知消费者来取任务。

        future库:异步编程库,在该项目中很重要,因为future可以在不调用拷贝构造函数的情况下获取线程,不是为了节省内存!!!!!是因为线程不能调用拷贝构造函数。

2.2、类

  1. class ThreadPool {
  2. public:
  3. ThreadPool(size_t);
  4. template<class F, class... Args>
  5. auto enqueue(F&& f, Args&&... args)
  6. -> std::future<typename std::result_of<F(Args...)>::type>;
  7. ~ThreadPool();
  8. private:
  9. std::vector< std::thread > workers;
  10. std::queue< std::function<void()> > tasks;
  11. std::mutex queue_mutex;
  12. std::condition_variable condition;
  13. bool stop;
  14. };

        文件中只有一个类,有一个构造函数、一个析构函数以及一个成员函数enqueue,其中成员函数enqueue是为了添加线程。成员有5个,workers线程数组,tasks任务队列(函数作为元素),queue_mutex互斥锁,condition环境变量,stop线程池终止变量。

        在学习过程中最难了解的是enqueue的定义,其用到了万能引用&&。在C++中左值引用为&,右值引用为&&,在函数模板中才有万能模板&&可以根据数据进行左值引用和右值引用的调整。其中f是函数,args是函数参数,...表示函数参数数量不受限制。 -> std::future<typename std::result_of<F(Args...)>::type>表示auto的返回值固定为std::future。

2.3、构造函数

  1. inline ThreadPool::ThreadPool(size_t threads)
  2. : stop(false)
  3. {
  4. for(size_t i = 0;i<threads;++i)
  5. workers.emplace_back(
  6. [this]
  7. {
  8. for(;;)
  9. {
  10. std::function<void()> task;
  11. {
  12. std::unique_lock<std::mutex> lock(this->queue_mutex);
  13. this->condition.wait(lock,
  14. [this]{ return this->stop || !this->tasks.empty(); });
  15. if(this->stop && this->tasks.empty())
  16. return;
  17. task = std::move(this->tasks.front());
  18. this->tasks.pop();
  19. }
  20. task();
  21. }
  22. }
  23. );
  24. }

        首先为stop进行赋值,之后对线程进行任务分配,workers通过使用emplace_back进行push_back,同样因为push_back会调用拷贝构造函数,而thread不能使用拷贝构造函数。

        在emplace_back中使用了lambda表达式并捕获类内所有局部变量(即this指针所能获取的所有成员)。由于使用了内联函数的原因,不能使用while只能使用for(;;)。其中一些空的{}是为了作为局部作用域,解除互斥锁。

        如果在for(;;)中没有任务可分配了那么就等待任务到达分配,如果stop被置true那么就直接结束构造函数。有任务立即执行!!!!!!

        非常重要的一点std::move为从一个对象转移到另一个对象而不经过拷贝构造函数的操作,最后task执行。

2.4、任务添加成员函数

  1. template<class F, class... Args>
  2. auto ThreadPool::enqueue(F&& f, Args&&... args)
  3. -> std::future<typename std::result_of<F(Args...)>::type>
  4. {
  5. using return_type = typename std::result_of<F(Args...)>::type;
  6. auto task = std::make_shared< std::packaged_task<return_type()> >(
  7. std::bind(std::forward<F>(f), std::forward<Args>(args)...)
  8. );
  9. std::future<return_type> res = task->get_future();
  10. {
  11. std::unique_lock<std::mutex> lock(queue_mutex);
  12. if(stop)
  13. throw std::runtime_error("enqueue on stopped ThreadPool");
  14. tasks.emplace([task](){ (*task)(); });
  15. }
  16. condition.notify_one();
  17. return res;
  18. }

        该成员函数使用auto作为返回值类型,并通过->指定返回类型,其中std::result_of<F(Args...)>::type用以获取函数返回结果,typename确定类型,std::future来保存结果。

        为了方便之后指定类型,使用using return_type = typename std::result_of<F(Args...)>::type保存输出结果的类型。

        这里对task使用了std::make_shared进行智能指针的使用,std::packaged_task封装异步操作,使其能不调用拷贝构造函数的情况下获取结果。

        使用std::bind进行函数参数绑定,std::forward完美转发使其能够在左值引用和右值引用通用。

        利用test指针获取.get_future()函数获取future并通过.get()获取值。这里有个重要小技巧,可以通过promise对象跨线程获取值。

        最后通过*task获取最终结果,在加入任务成功后利用condition.notify_one()通知线程过来取任务。来活了兄弟们!!!

2.5、析构函数

  1. inline ThreadPool::~ThreadPool()
  2. {
  3. {
  4. std::unique_lock<std::mutex> lock(queue_mutex);
  5. stop = true;
  6. }
  7. condition.notify_all();
  8. for(std::thread &worker: workers)
  9. worker.join();
  10. }

        互斥锁全局只有一个,在任务结束之前不允许其他线程调用,因此这里的操作必须在所有线程都执行结束后才能进行,首先互斥锁上锁,将stop置true。

        condition.notify_all()通知各线程将任务队列中的所有任务做完,之后检查每个线程都是否结束,最后结束主线程。

3、结语

        找工作学习的第一个小项目,如有错误敬请指正讨论,一起提高。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/1008743
推荐阅读
相关标签
  

闽ICP备14008679号