赞
踩
大家都知道C++支持多线程开发,也就是支持多个任务并行运行,我们也知道线程的生命周期中包括创建、就绪、运行、阻塞、销毁等阶段,所以如果要执行的任务很多,每个任务都需要一个线程的话,那么频繁的创建、销毁线程会比较耗性能。
有了线程池就不用创建更多的线程来完成任务,它可以:
①降低资源的消耗,通过重复利用已经创建的线程降低线程创建和销毁造成的消耗。
②提高相应速度,当任务到达的时候,任务可以不需要等到线程创建就能立刻执行。
③提高线程的可管理性,线程是稀缺资源,使用线程池可以统一的分配、调优和监控。
通俗的讲,线程池就是一个线程集合,里面已经提前创建好了若干个线程,当需要线程的时候到线程集合里获取一个即可,这样省去了创建线程的时间,当然也省去了系统回收线程的时间,当线程池里的线程都被使用了后,只能阻塞等待了,等待获取线程池后被释放的线程。
线程池主要包括任务队列,线程队列,线程池类,线程池管理类和任务类等组成部分。当线程池提交一个任务到线程池后,执行流程如下:
线程池先判断核心线程池里面的线程是否都在执行任务。如果不是都在执行任务,则创建一个新的工作线程来执行任务。如果核心线程池中的线程都在执行任务,则判断工作队列是否已满。如果工作队列没有满,则将新提交的任务存储到这个工作队列中,如果工作队列满了,线程池则判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理 ,也就是拒接策略。
一句话:管理一个任务队列,一个线程队列,然后每次去一个任务分配给一个线程去做,循环往复。
有什么问题?线程池一般是要复用线程,所以如果是取一个task分配给某一个thread,执行完之后再重新分配,在语言层面这是基本不能实现的:C++的thread都是执行一个固定的task函数,执行完之后线程也就结束了。所以该如何实现task和thread的分配呢?
让每一个thread创建后,就去执行调度函数:循环获取task,然后执行。
这个循环该什么时候停止呢?
很简单,当线程池停止使用时,循环停止。
这样一来,就保证了thread函数的唯一性,而且复用线程执行task。
总结一下,我们的线程池的主要组成部分有二:
- #ifndef THREAD_POOL_H
- #define THREAD_POOL_H
-
- #include <vector>
- #include <queue>
- #include <memory>
- #include <thread>
- #include <mutex>
- #include <condition_variable>
- #include <future>
- #include <functional>
- #include <stdexcept>
-
- class ThreadPool {
-
- public:
- ThreadPool(size_t); //构造函数
- template<class F, class... Args> //类模板
- auto enqueue(F&& f, Args&&... args)->std::future<decltype(func(args...))>;//任务入队
- ~ThreadPool(); //析构函数
-
- private:
- std::vector< std::thread > workers; //线程队列,每个元素为一个Thread对象
- std::queue< std::function<void()> > tasks; //任务队列,每个元素为一个函数对象
-
- std::mutex queue_mutex; //互斥量
- std::condition_variable condition; //条件变量
- bool stop; //停止
- };
-
- // 构造函数,把线程插入线程队列,插入时调用embrace_back(),用匿名函数lambda初始化Thread对象
- inline ThreadPool::ThreadPool(size_t threads) : stop(false){
-
- for(size_t i = 0; i<threads; ++i)
- workers.emplace_back(
- [this]
- {
- for(;;)
- {
- // task是一个函数类型,从任务队列接收任务
- std::function<void()> task;
- {
- //给互斥量加锁,锁对象生命周期结束后自动解锁
- std::unique_lock<std::mutex> lock(this->queue_mutex);
-
- //(1)当匿名函数返回false时才阻塞线程,阻塞时自动释放锁。
- //(2)当匿名函数返回true且受到通知时解阻塞,然后加锁。
- this->condition.wait(lock,[this]{ return this->stop || !this->tasks.empty(); });
-
- if(this->stop && this->tasks.empty())
- return;
-
- //从任务队列取出一个任务
- task = std::move(this->tasks.front());
- this->tasks.pop();
- } // 自动解锁
- task(); // 执行这个任务
- }
- }
- );
- }
-
- // 添加新的任务到任务队列
- template<class F, class... Args>
- auto ThreadPool::enqueue(F&& f, Args&&... args)->std::future<decltype(func(args...))>
- {
- // 获取函数返回值类型
- using return_type = decltype(func(args...));
-
- // 创建一个指向任务的智能指针
- auto task = std::make_shared< std::packaged_task<return_type()> >(
- std::bind(std::forward<F>(f), std::forward<Args>(args)...)
- );
-
- std::future<return_type> res = task->get_future();
- {
- std::unique_lock<std::mutex> lock(queue_mutex); //加锁
- if(stop)
- throw std::runtime_error("enqueue on stopped ThreadPool");
-
- tasks.emplace([task](){ (*task)(); }); //把任务加入队列
- } //自动解锁
- condition.notify_one(); //通知条件变量,唤醒一个线程
- return res;
- }
-
- // 析构函数,删除所有线程
- inline ThreadPool::~ThreadPool()
- {
- {
- std::unique_lock<std::mutex> lock(queue_mutex);
- stop = true;
- }
- condition.notify_all();
- for(std::thread &worker: workers)
- worker.join();
- }
-
- #endif
data:image/s3,"s3://crabby-images/deb9d/deb9d52e6c78f73fbfaadc6e519fd00d286664e1" alt=""
使用
- #include <iostream>
- #include <chrono>
- #include "ThreadPool.h"
-
- void func()
- {
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- std::cout<<"worker thread ID:"<<std::this_thread::get_id()<<std::endl;
- }
-
- int main()
- {
- ThreadPool pool(4);
- while(1)
- {
- pool.enqueue(func);
- }
- }
data:image/s3,"s3://crabby-images/deb9d/deb9d52e6c78f73fbfaadc6e519fd00d286664e1" alt=""
打印
另一种实现,
其中线程池类 ThreadPool 维护了一个任务队列 tasks_,并且可以动态添加任务到队列中。线程池管理类 ThreadPoolManager 提供了获取线程池实例的接口,使用时只需要通过 ThreadPoolManager::getThreadPool() 获取线程池实例,然后调用 enqueue 方法添加任务即可
- #include <iostream>
- #include <queue>
- #include <thread>
- #include <mutex>
- #include <condition_variable>
- #include <functional>
-
- using namespace std;
-
- // 任务类
- class Task {
- public:
- Task(function<void()> f) : func(f) {}
- void operator()() { func(); }
- private:
- function<void()> func;
- };
-
- // 线程池类
- class ThreadPool {
- public:
- ThreadPool(int size) : stop(false) {
- for (int i = 0; i < size; ++i) {
- threads.emplace_back([this] {
- while (true) {
- function<void()> task;
- {
- unique_lock<mutex> lock(mutex_);
- condition_.wait(lock, [this] { return stop || !tasks_.empty(); });
- if (stop && tasks_.empty()) return;
- task = move(tasks_.front());
- tasks_.pop();
- }
- task();
- }
- });
- }
- }
-
- ~ThreadPool() {
- {
- unique_lock<mutex> lock(mutex_);
- stop = true;
- }
- condition_.notify_all();
- for (auto& thread : threads) {
- thread.join();
- }
- }
-
- template<typename F>
- void enqueue(F&& f) {
- {
- unique_lock<mutex> lock(mutex_);
- tasks_.emplace(forward<F>(f));
- }
- condition_.notify_one();
- }
-
- private:
- vector<thread> threads;
- queue<function<void()>> tasks_;
- mutex mutex_;
- condition_variable condition_;
- bool stop;
- };
-
- // 线程池管理类
- class ThreadPoolManager {
- public:
- static ThreadPool& getThreadPool() {
- static ThreadPool threadPool(thread::hardware_concurrency());
- return threadPool;
- }
- };
-
- int main() {
- ThreadPool& threadPool = ThreadPoolManager::getThreadPool();
- for (int i = 0; i < 10; ++i) {
- threadPool.enqueue(Task([] { cout << "Hello, World!" << endl; }));
- }
- return 0;
- }
data:image/s3,"s3://crabby-images/deb9d/deb9d52e6c78f73fbfaadc6e519fd00d286664e1" alt=""
参考:
C++实现线程池_蓬莱道人的博客-CSDN博客_c++ 线程池
C++实现线程池_晓枫寒叶的博客-CSDN博客_c++ 线程池
C++17future类+可变参模板实现线程池_刚入门的代码spa技师的博客-CSDN博客_c++17 可变参
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。