赞
踩
维基百科上对线程池的简要介绍:
线程池(thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
progschj/ThreadPool 是一个简易的基于 c++11 标准的线程池实现,采用了 Zlib license(相当宽松自由的开源协议,任意修改分发商用),截止当前时间点,已获得 7k+ stars。整个项目源码仅有一个头文件,代码行数不足一百行,早在多年前就已稳定不再更新。
#ifndef THREAD_POOL_H #define THREAD_POOL_H #include <vector> #include <queue> #include <memory> #include <thread> #include <mutex> #include <condition_variable> #include <future> #include <functional> #include <stdexcept> class ThreadPool { public: ThreadPool(size_t); template<class F, class... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>; ~ThreadPool(); private: // need to keep track of threads so we can join them std::vector< std::thread > workers; // the task queue std::queue< std::function<void()> > tasks; // synchronization std::mutex queue_mutex; std::condition_variable condition; bool stop; }; // the constructor just launches some amount of workers inline ThreadPool::ThreadPool(size_t threads) : stop(false) { for (size_t i = 0; i < threads; ++i) workers.emplace_back( [this] { for (;;) { std::function<void()> task; { std::unique_lock<std::mutex> lock(this->queue_mutex); this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); }); if (this->stop && this->tasks.empty()) return; task = std::move(this->tasks.front()); this->tasks.pop(); } task(); } } ); } // add new work item to the pool template<class F, class... Args> auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; auto task = std::make_shared< std::packaged_task<return_type()> >( std::bind(std::forward<F>(f), std::forward<Args>(args)...) ); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(queue_mutex); // don't allow enqueueing after stopping the pool if (stop) throw std::runtime_error("enqueue on stopped ThreadPool"); tasks.emplace([task]() { (*task)(); }); } condition.notify_one(); return res; } // the destructor joins all threads inline ThreadPool::~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); for (std::thread& worker : workers) worker.join(); } #endif
// create thread pool with 4 worker threads
ThreadPool pool(4);
// enqueue and store future
auto result = pool.enqueue([](int answer) { return answer; }, 42);
// get result from future
std::cout << result.get() << std::endl;
std::vector< std::thread > workers;
std::queue< std::function<void()> > tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
workers
:存储线程池中 std::thread 的容器tasks
:任务队列queue_mutex
:任务队列的互斥锁condition
:任务队列的条件变量stop
:线程池是否停止的标志位inline ThreadPool::ThreadPool(size_t threads)
: stop(false)
size_t
类型的参数,初始化线程池中线程的数量for (size_t i = 0; i < threads; ++i)
{
workers.emplace_back(
[this]
{
for (;;)
{
//...
}
}
);
}
std::function<void()> task; { std::unique_lock<std::mutex> lock(this->queue_mutex); this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); }); if (this->stop && this->tasks.empty()) return; task = std::move(this->tasks.front()); this->tasks.pop(); } task();
std::function<void()>
类型的变量 taskcondition.wait()
等待任务队列非空或线程池停止,线程创建后,会在这里等待;如果 stop 标志位为 true 或者任务队列不为空,解除等待,继续往下执行task()
,也就是上一步从队列头部取出的任务template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
enqueue
函数模板,用于任务的入队列F&& f
,这里预期是一个任意的 callable 对象Args&&... args
,一个可变模板参数,会在编译期展开参数包std::future
类型的对象,用于获取任务的执行结果,std::future
的模板参数使用 std::result_of
萃取可调用对象的返回值类型std::result_of
就已经是 deprecated,可以使用 std::invoke_result
类型萃取继续往下看 enqueue 函数的实现:
using return_type = typename std::result_of<F(Args...)>::type;
std::result_of
类型萃取可调用对象的返回值类型,并使用 using 为其起个别名 reture_typeauto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
这一段做了好几件事,一步一步拆解:
std::make_shared
构建一个 std::shared_ptr
std::packaged_task
模板是用于包装 callable 对象,使用了前面推导出的 return_type
类型来实例化模板std::make_shared
需要调用实例类型的构造函数,而 std::packaged_task
的构造函数需要一个可调用对象,所以这里使用 std::bind
将可变模板参数绑定给 f(对 std::bind
不熟悉的建议先行查阅资料),std::forward
转发一下类型std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don't allow enqueueing after stopping the pool
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return res;
std::future
对象condition.wait()
会被唤醒,以执行后面的代码块,即从队列头部取出一个任务并执行std::future
对象遵循 RAII 原则,释放所有资源
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread& worker : workers)
worker.join();
该项目仅是一个线程池的简易实现,对学习 c++11 标准的多线程及部分特性有一定帮助,如果想要更复杂的具有各种调度策略的线程池,还需进一步细化。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。