赞
踩
该项目是一个实现多线程池的简单项目,我将它作为简单的C++项目入门,代码并不多但是精读该代码对于学习C++的新特性以及线程实现有很好的作用。本文会详细讲解代码及使用的库特性,作为初学者如有错误敬请指正。
- #ifndef THREAD_POOL_H
- #define THREAD_POOL_H
-
- #include <vector>
- #include <queue>
- #include <memory>
- #include <thread>
- #include <mutex>
- #include <condition_variable>
- #include <future>
- #include <functional>
- #include <stdexcept>
memory库:智能指针库,操作用std::shared_ptr或std::make_shared创建指针,通过该库创建的指针可以在不再需要指针后自动释放,避免在线程未结束前指针被释放引起崩溃。
thread库:线程库,通过std::thread创建线程,并通过.join()保护线程防止在其他线程未结束之前主线程结束,.joinabel()判断是否可以保护线程。
mutex库:互斥锁库,配合线程使用防止不同线程对同一个变量进行操作,整个文件中只能有一个互斥锁定义std::mutex mtx。最简单的操作方式是在写操作之前mtx.lock()锁进程,用mtx.unlock()解锁进程。最最常用的方式是std::unique_lock<std:mutex> lg(mtx),当构造函数调用时,互斥量自动锁定,当析构函数调用时,互斥量自动释放。并且它只能在局部作用域中使用。相比于std::lock_guard<std:mutex> lg(mtx),std::unique_lock<std:mutex> lg(mtx)有能够限时等待的操作方式。
condition_variable库:条件变量库,std::condition_variable cv配合互斥锁使用,cv.wait(lock,false):需要等待。cv.notify_one():通知消费者来取任务。
future库:异步编程库,在该项目中很重要,因为future可以在不调用拷贝构造函数的情况下获取线程,不是为了节省内存!!!!!是因为线程不能调用拷贝构造函数。
- class ThreadPool {
- public:
- ThreadPool(size_t);
- template<class F, class... Args>
- auto enqueue(F&& f, Args&&... args)
- -> std::future<typename std::result_of<F(Args...)>::type>;
- ~ThreadPool();
- private:
- std::vector< std::thread > workers;
- std::queue< std::function<void()> > tasks;
-
- std::mutex queue_mutex;
- std::condition_variable condition;
- bool stop;
- };
文件中只有一个类,有一个构造函数、一个析构函数以及一个成员函数enqueue,其中成员函数enqueue是为了添加线程。成员有5个,workers线程数组,tasks任务队列(函数作为元素),queue_mutex互斥锁,condition环境变量,stop线程池终止变量。
在学习过程中最难了解的是enqueue的定义,其用到了万能引用&&。在C++中左值引用为&,右值引用为&&,在函数模板中才有万能模板&&可以根据数据进行左值引用和右值引用的调整。其中f是函数,args是函数参数,...表示函数参数数量不受限制。 -> std::future<typename std::result_of<F(Args...)>::type>表示auto的返回值固定为std::future。
- inline ThreadPool::ThreadPool(size_t threads)
- : stop(false)
- {
- for(size_t i = 0;i<threads;++i)
- workers.emplace_back(
- [this]
- {
- for(;;)
- {
- std::function<void()> task;
-
- {
- std::unique_lock<std::mutex> lock(this->queue_mutex);
- this->condition.wait(lock,
- [this]{ return this->stop || !this->tasks.empty(); });
- if(this->stop && this->tasks.empty())
- return;
- task = std::move(this->tasks.front());
- this->tasks.pop();
- }
-
- task();
- }
- }
- );
- }
首先为stop进行赋值,之后对线程进行任务分配,workers通过使用emplace_back进行push_back,同样因为push_back会调用拷贝构造函数,而thread不能使用拷贝构造函数。
在emplace_back中使用了lambda表达式并捕获类内所有局部变量(即this指针所能获取的所有成员)。由于使用了内联函数的原因,不能使用while只能使用for(;;)。其中一些空的{}是为了作为局部作用域,解除互斥锁。
如果在for(;;)中没有任务可分配了那么就等待任务到达分配,如果stop被置true那么就直接结束构造函数。有任务立即执行!!!!!!
非常重要的一点std::move为从一个对象转移到另一个对象而不经过拷贝构造函数的操作,最后task执行。
- template<class F, class... Args>
- auto ThreadPool::enqueue(F&& f, Args&&... args)
- -> std::future<typename std::result_of<F(Args...)>::type>
- {
- using return_type = typename std::result_of<F(Args...)>::type;
-
- auto task = std::make_shared< std::packaged_task<return_type()> >(
- std::bind(std::forward<F>(f), std::forward<Args>(args)...)
- );
-
- std::future<return_type> res = task->get_future();
- {
- std::unique_lock<std::mutex> lock(queue_mutex);
-
- if(stop)
- throw std::runtime_error("enqueue on stopped ThreadPool");
-
- tasks.emplace([task](){ (*task)(); });
- }
- condition.notify_one();
- return res;
- }
该成员函数使用auto作为返回值类型,并通过->指定返回类型,其中std::result_of<F(Args...)>::type用以获取函数返回结果,typename确定类型,std::future来保存结果。
为了方便之后指定类型,使用using return_type = typename std::result_of<F(Args...)>::type保存输出结果的类型。
这里对task使用了std::make_shared进行智能指针的使用,std::packaged_task封装异步操作,使其能不调用拷贝构造函数的情况下获取结果。
使用std::bind进行函数参数绑定,std::forward完美转发使其能够在左值引用和右值引用通用。
利用test指针获取.get_future()函数获取future并通过.get()获取值。这里有个重要小技巧,可以通过promise对象跨线程获取值。
最后通过*task获取最终结果,在加入任务成功后利用condition.notify_one()通知线程过来取任务。来活了兄弟们!!!
- inline ThreadPool::~ThreadPool()
- {
- {
- std::unique_lock<std::mutex> lock(queue_mutex);
- stop = true;
- }
- condition.notify_all();
- for(std::thread &worker: workers)
- worker.join();
- }
互斥锁全局只有一个,在任务结束之前不允许其他线程调用,因此这里的操作必须在所有线程都执行结束后才能进行,首先互斥锁上锁,将stop置true。
condition.notify_all()通知各线程将任务队列中的所有任务做完,之后检查每个线程都是否结束,最后结束主线程。
找工作学习的第一个小项目,如有错误敬请指正讨论,一起提高。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。