#include #include #include #include #include #include #include using name._c++线程池">搜索查看编辑修改首页UNITYNODEJSPYTHONAIGITPHPGOCEF3JAVAHTMLCSS搜索很楠不爱3 这个屌丝很懒,什么也没留下! 关注作者热门标签jqueryHTMLCSSPHPASPPYTHONGOAICC++C#PHOTOSHOPUNITYiOSandroidvuexml爬虫SEOLINUXWINDOWSJAVAMFCCEF3CADNODEJSGITPyppeteerarticle热门文章1Android 静态广播注册流程(广播2)2PHP代码审计——任意文件写入漏洞(LvyeCms)_lvyecms漏洞3【Git】如何进行Git的配置、用户名密码缓存和清除(总结)_清除git config credential.helper store的数据4Numpy——numpy的基本运算_numpy运算5frm一级4个1大神复习经验分享系列(二)6Observability:介绍 OpenTelemetry Java 代理的 Elastic 发行版7安装docker版redis并挂载配置文件redis.conf8今天开始用swift写服务器(一)_swift zewo9C# 生成腾讯云 IM 之 TLSSigAPIv2 UserSig10如何打造一条全水平扩展的基础公链 - TOP Network CTO Taylor Wei分享当前位置: article > 正文 线程池 - C++_c++线程池 作者:很楠不爱3 | 2024-05-25 11:10:46 赞踩c++线程池 1. 基本概念 线程池(thread pool): 一种线程使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在短时间任务创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数据取决于可用的并发处理器、处理器内核、内存、网络sockets等数量。 2. 线程池的组成 2.1 线程池管理器 创建一定数量的线程,启动线程,调配任务,管理着线程池。 线程池目前只需要启动(start()),停止方法(stop()),及任务添加方法(addTask). start()创建一定数量的线程池,进入线程循环. stop()停止所有的线程循环,回收所有资源. addTask()添加任务. 2.2 工作线程 线程池中线程,在线程池中等待并执行分配任务. 该文选用条件变量实现等待和通知机制 2.3 任务接口 添加任务接口,以供工作线程调度任务的执行. 2.4 任务队列 用于存放没有处理的任务,提供一种缓冲机制,同时任务队列具有调度功能,高级优先的任务放在任务队列前面;本文选用priority_queue与pair的结合用作任务优先队列结构. 3. 线程池工作的四种情况 假设我们的线程池大小为3,任务队列目前不做大小限制。 3.1 主程序当前没有任务要执行,线程池中任务队列为空闲状态 下面情况下所有工作线程处于空闲的等待状态,任务缓冲队列为空. 3.2 主程序添加小于等于线程池中线程数量得任务 基于3.1情况,所有的工作线程已处于等待状态,主线程开始添加三个任务,添加后通知(notif())唤醒线程池中的线程开始取(take())任务执行。此时的任务缓冲队列还是空。 3.3 主程序添加任务数量大于当前线程池中线程数量的任务 基于3.2情况,所有工作线程都在工作中,主线程开始添加第四个任务,添加后发现现在线程池中线程用完了,于是存入任务缓冲队列。工作线程空闲后主动从任务队列取任务执行。 3.4 主程序添加任务数量大于当前线程池中线程数量的任务,且任务缓冲队列已满 此情况发生情形3且设置了任务缓冲队列大小后面,主程序添加第N个任务,添加后发现线程池中线程已经用完了,任务缓冲队列已满,于是进入等待状态,等待任务缓冲队列中任务腾空通知。但是这种情形会阻塞主线程,本文不限制任务队列的大小,必要时再优化。 4. 线程池的C++实现 参考连接: Thread pool;ThreadPool 线程池的主要组成由三个部分构成: 任务队列(Task Quene)线程池(Thread Pool)完成队列(Completed Tasks) 等待通知机制通过条件变量实现,Logger和CurrentThread,用于调试,可以无视。 #ifndef _THREADPOOL_HH#define _THREADPOOL_HH #include <vector>#include <utility>#include <queue>#include <thread>#include <functional>#include <mutex> #include "Condition.hh" class ThreadPool{public: static const int kInitThreadsSize = 3; enum taskPriorityE { level0, level1, level2, }; typedef std::function<void()> Task; typedef std::pair<taskPriorityE, Task> TaskPair; ThreadPool(); ~ThreadPool(); void start(); void stop(); void addTask(const Task&); void addTask(const TaskPair&); private: ThreadPool(const ThreadPool&);//禁止复制拷贝. const ThreadPool& operator=(const ThreadPool&); struct TaskPriorityCmp { bool operator()(const ThreadPool::TaskPair p1, const ThreadPool::TaskPair p2) { return p1.first > p2.first; //first的小值优先 } }; void threadLoop(); Task take(); typedef std::vector<std::thread*> Threads; typedef std::priority_queue<TaskPair, std::vector<TaskPair>, TaskPriorityCmp> Tasks; Threads m_threads; Tasks m_tasks; std::mutex m_mutex; Condition m_cond; bool m_isStarted;}; #endif //Cpp #include <assert.h> #include "Logger.hh" // debug#include "CurrentThread.hh" // debug#include "ThreadPool.hh" ThreadPool::ThreadPool() :m_mutex(), m_cond(m_mutex), m_isStarted(false){ } ThreadPool::~ThreadPool(){ if(m_isStarted) { stop(); }} void ThreadPool::start(){ assert(m_threads.empty()); m_isStarted = true; m_threads.reserve(kInitThreadsSize); for (int i = 0; i < kInitThreadsSize; ++i) { m_threads.push_back(new std::thread(std::bind(&ThreadPool::threadLoop, this))); } } void ThreadPool::stop(){ LOG_TRACE << "ThreadPool::stop() stop."; { std::unique_lock<std::mutex> lock(m_mutex); m_isStarted = false; m_cond.notifyAll(); LOG_TRACE << "ThreadPool::stop() notifyAll()."; } for (Threads::iterator it = m_threads.begin(); it != m_threads.end() ; ++it) { (*it)->join(); delete *it; } m_threads.clear();} void ThreadPool::threadLoop(){ LOG_TRACE << "ThreadPool::threadLoop() tid : " << CurrentThread::tid() << " start."; while(m_isStarted) { Task task = take(); if(task) { task(); } } LOG_TRACE << "ThreadPool::threadLoop() tid : " << CurrentThread::tid() << " exit.";} void ThreadPool::addTask(const Task& task){ std::unique_lock<std::mutex> lock(m_mutex); /*while(m_tasks.isFull()) {//when m_tasks have maxsize cond2.wait(); } */ TaskPair taskPair(level2, task); m_tasks.push(taskPair); m_cond.notify();} void ThreadPool::addTask(const TaskPair& taskPair){ std::unique_lock<std::mutex> lock(m_mutex); /*while(m_tasks.isFull()) {//when m_tasks have maxsize cond2.wait(); } */ m_tasks.push(taskPair); m_cond.notify();} ThreadPool::Task ThreadPool::take(){ std::unique_lock<std::mutex> lock(m_mutex); //always use a while-loop, due to spurious wakeup while(m_tasks.empty() && m_isStarted) { LOG_TRACE << "ThreadPool::take() tid : " << CurrentThread::tid() << " wait."; m_cond.wait(lock); } LOG_TRACE << "ThreadPool::take() tid : " << CurrentThread::tid() << " wakeup."; Task task; Tasks::size_type size = m_tasks.size(); if(!m_tasks.empty() && m_isStarted) { task = m_tasks.top().second; m_tasks.pop(); assert(size - 1 == m_tasks.size()); /*if (TaskQueueSize_ > 0) { cond2.notify(); }*/ } return task; } 4.1 队列 队列作为先进先出的数据结构,当有可用的工作时,线程从队列中获取工作并执行。如果两个线程同时执行相同的工作会出现程序崩溃。为了避免这种问题,需要再标准C++ Queue上实现一个包装器,使用mutex来限制并发访问。 void enqueue(T& t) { std::unique_lock<std::mutex> lock(m_mutex); m_queue.push(t);} 要排队做的第一件事情就是锁定互斥锁来确保没有其他人正在访问该资源。然后,将元素推送到队列当中。当锁超出范围时,它会自动释放,这样使Queue线程安全,因此不用担心许多线程在相同时间访问或者修改它。 4.2 提交函数 线程池最重要的方法是负责向队列添加任务。 5. 测试程序 5.1 start()、stop() 测试线程池基本的创建退出工作,以及检测资源是否回收正常。 int main(){ { ThreadPool threadPool; threadPool.start(); getchar();} getchar(); return 0;} ./test.out 2018-11-25 16:50:36.054805 [TRACE] [ThreadPool.cpp:53] [threadLoop] ThreadPool::threadLoop() tid : 3680 start.2018-11-25 16:50:36.054855 [TRACE] [ThreadPool.cpp:72] [take] ThreadPool::take() tid : 3680 wait.2018-11-25 16:50:36.055633 [TRACE] [ThreadPool.cpp:53] [threadLoop] ThreadPool::threadLoop() tid : 3679 start.2018-11-25 16:50:36.055676 [TRACE] [ThreadPool.cpp:72] [take] ThreadPool::take() tid : 3679 wait.2018-11-25 16:50:36.055641 [TRACE] [ThreadPool.cpp:53] [threadLoop] ThreadPool::threadLoop() tid : 3681 start.2018-11-25 16:50:36.055701 [TRACE] [ThreadPool.cpp:72] [take] ThreadPool::take() tid : 3681 wait.2018-11-25 16:50:36.055736 [TRACE] [ThreadPool.cpp:53] [threadLoop] ThreadPool::threadLoop() tid : 3682 start.2018-11-25 16:50:36.055746 [TRACE] [ThreadPool.cpp:72] [take] ThreadPool::take() tid : 3682 wait. 2018-11-25 16:51:01.411792 [TRACE] [ThreadPool.cpp:36] [stop] ThreadPool::stop() stop.2018-11-25 16:51:01.411863 [TRACE] [ThreadPool.cpp:39] [stop] ThreadPool::stop() notifyAll().2018-11-25 16:51:01.411877 [TRACE] [ThreadPool.cpp:76] [take] ThreadPool::take() tid : 3680 wakeup.2018-11-25 16:51:01.411883 [TRACE] [ThreadPool.cpp:62] [threadLoop] ThreadPool::threadLoop() tid : 3680 exit.2018-11-25 16:51:01.412062 [TRACE] [ThreadPool.cpp:76] [take] ThreadPool::take() tid : 3682 wakeup.2018-11-25 16:51:01.412110 [TRACE] [ThreadPool.cpp:62] [threadLoop] ThreadPool::threadLoop() tid : 3682 exit.2018-11-25 16:51:01.413052 [TRACE] [ThreadPool.cpp:76] [take] ThreadPool::take() tid : 3679 wakeup.2018-11-25 16:51:01.413098 [TRACE] [ThreadPool.cpp:62] [threadLoop] ThreadPool::threadLoop() tid : 3679 exit.2018-11-25 16:51:01.413112 [TRACE] [ThreadPool.cpp:76] [take] ThreadPool::take() tid : 3681 wakeup.2018-11-25 16:51:01.413141 [TRACE] [ThreadPool.cpp:62] [threadLoop] ThreadPool::threadLoop() tid : 3681 exit. 5.2 addTask()、 PriorityTaskQueue 测试添加任务接口,以及优先级任务队列。主线程首先添加5个普通任务,1s后添加一个高优先级任务,当前3个线程中的最先一个空闲后,会最先执行后面添加的priorityFunc(). std::mutex g_mutex; void priorityFunc(){ for (int i = 1; i < 4; ++i) { std::this_thread::sleep_for(std::chrono::seconds(1)); std::lock_guard<std::mutex> lock(g_mutex); LOG_DEBUG << "priorityFunc() [" << i << "at thread [ " << CurrentThread::tid() << "] output";// << std::endl; } } void testFunc(){ // loop to print character after a random period of time for (int i = 1; i < 4; ++i) { std::this_thread::sleep_for(std::chrono::seconds(1)); std::lock_guard<std::mutex> lock(g_mutex); LOG_DEBUG << "testFunc() [" << i << "] at thread [ " << CurrentThread::tid() << "] output";// << std::endl; } } int main(){ ThreadPool threadPool; threadPool.start(); for(int i = 0; i < 5 ; i++) threadPool.addTask(testFunc); std::this_thread::sleep_for(std::chrono::seconds(1)); threadPool.addTask(ThreadPool::TaskPair(ThreadPool::level0, priorityFunc)); getchar(); return 0;} ./test.out 2018-11-25 18:24:20.886837 [TRACE] [ThreadPool.cpp:56] [threadLoop] ThreadPool::threadLoop() tid : 4121 start.2018-11-25 18:24:20.886893 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4121 wakeup.2018-11-25 18:24:20.887580 [TRACE] [ThreadPool.cpp:56] [threadLoop] ThreadPool::threadLoop() tid : 4120 start.2018-11-25 18:24:20.887606 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4120 wakeup.2018-11-25 18:24:20.887610 [TRACE] [ThreadPool.cpp:56] [threadLoop] ThreadPool::threadLoop() tid : 4122 start.2018-11-25 18:24:20.887620 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4122 wakeup.2018-11-25 18:24:21.887779 [DEBUG] [main.cpp:104] [testFunc] testFunc() [1] at thread [ 4120] output2018-11-25 18:24:21.887813 [DEBUG] [main.cpp:104] [testFunc] testFunc() [1] at thread [ 4122] output2018-11-25 18:24:21.888909 [DEBUG] [main.cpp:104] [testFunc] testFunc() [1] at thread [ 4121] output2018-11-25 18:24:22.888049 [DEBUG] [main.cpp:104] [testFunc] testFunc() [2] at thread [ 4120] output2018-11-25 18:24:22.888288 [DEBUG] [main.cpp:104] [testFunc] testFunc() [2] at thread [ 4122] output2018-11-25 18:24:22.889978 [DEBUG] [main.cpp:104] [testFunc] testFunc() [2] at thread [ 4121] output2018-11-25 18:24:23.888467 [DEBUG] [main.cpp:104] [testFunc] testFunc() [3] at thread [ 4120] output2018-11-25 18:24:23.888724 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4120 wakeup.2018-11-25 18:24:23.888778 [DEBUG] [main.cpp:104] [testFunc] testFunc() [3] at thread [ 4122] output2018-11-25 18:24:23.888806 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4122 wakeup.2018-11-25 18:24:23.890413 [DEBUG] [main.cpp:104] [testFunc] testFunc() [3] at thread [ 4121] output2018-11-25 18:24:23.890437 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4121 wakeup.2018-11-25 18:24:24.889247 [DEBUG] [main.cpp:92] [priorityFunc] priorityFunc() [1at thread [ 4120] output2018-11-25 18:24:24.891187 [DEBUG] [main.cpp:104] [testFunc] testFunc() [1] at thread [ 4121] output2018-11-25 18:24:24.893163 [DEBUG] [main.cpp:104] [testFunc] testFunc() [1] at thread [ 4122] output2018-11-25 18:24:25.889567 [DEBUG] [main.cpp:92] [priorityFunc] priorityFunc() [2at thread [ 4120] output2018-11-25 18:24:25.891477 [DEBUG] [main.cpp:104] [testFunc] testFunc() [2] at thread [ 4121] output2018-11-25 18:24:25.893450 [DEBUG] [main.cpp:104] [testFunc] testFunc() [2] at thread [ 4122] output2018-11-25 18:24:26.890295 [DEBUG] [main.cpp:92] [priorityFunc] priorityFunc() [3at thread [ 4120] output2018-11-25 18:24:26.890335 [TRACE] [ThreadPool.cpp:99] [take] ThreadPool::take() tid : 4120 wait.2018-11-25 18:24:26.892265 [DEBUG] [main.cpp:104] [testFunc] testFunc() [3] at thread [ 4121] output2018-11-25 18:24:26.892294 [TRACE] [ThreadPool.cpp:99] [take] ThreadPool::take() tid : 4121 wait.2018-11-25 18:24:26.894274 [DEBUG] [main.cpp:104] [testFunc] testFunc() [3] at thread [ 4122] output2018-11-25 18:24:26.894299 [TRACE] [ThreadPool.cpp:99] [take] ThreadPool::take() tid : 4122 wait. 2018-11-25 18:24:35.359003 [TRACE] [ThreadPool.cpp:37] [stop] ThreadPool::stop() stop.2018-11-25 18:24:35.359043 [TRACE] [ThreadPool.cpp:42] [stop] ThreadPool::stop() notifyAll().2018-11-25 18:24:35.359061 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4120 wakeup.2018-11-25 18:24:35.359067 [TRACE] [ThreadPool.cpp:65] [threadLoop] ThreadPool::threadLoop() tid : 4120 exit.2018-11-25 18:24:35.359080 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4122 wakeup.2018-11-25 18:24:35.359090 [TRACE] [ThreadPool.cpp:65] [threadLoop] ThreadPool::threadLoop() tid : 4122 exit.2018-11-25 18:24:35.359123 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4121 wakeup.2018-11-25 18:24:35.359130 [TRACE] [ThreadPool.cpp:65] [threadLoop] ThreadPool::threadLoop() tid : 4121 exit. 3. 课程的线程池 // project4.cpp: 定义控制台应用程序的入口点。// #include "stdafx.h"#include <iostream>#include <vector>#include <map>#include <string>#include <thread>#include <list>#include <mutex>#include <future> using namespace std; class A {public: atomic<int> atm; A() { atm = 0; //auto atm2 = atm; //这种定义时初始化操作不允许,显示 “尝试引用已删除的函数”编译器内部肯定把拷贝构造函数给干掉了用 = delete //atomic<int> atm3 = atm; //atomic<int> atm2; //atm2 = atm; //尝试引用已删除的函数,拷贝赋值运算符也不让用 //load():以原子方式读atomic对象的值 atomic<int> atm2(atm.load()); //读 auto atm3(atm.load()); //store()以原子方式写入内容 atm2.store(12); atm2 = 12; } void outMsgRecvQueue() { //int command = 0; //while (true) //{ // std::unique_lock<std::mutex> sbguard1(my_mutex1); //临界进去 // my_cond.wait(sbguard1, [this] { //this,可以参考 未归类知识点,第八节 // if (!msgRecvQueue.empty()) // return true; // 该lambda返回true,则wait就返回,流程走下来,互斥锁被本线程拿到。 // return false; //解锁并休眠,卡在wait等待被再次唤醒 // }); // //现在互斥锁锁着,流程走下来了,队列里有数据; // command = msgRecvQueue.front(); //返回第一个元素,但不检查元素是否存在; // msgRecvQueue.pop_front(); //移除第一个元素,但不返回; // sbguard1.unlock(); //因为unique_lock的灵活性,我们可以随时unlock解锁,以免锁住太长时间 // cout << "outMsgRecvQueue()执行,取出一个元素" << command << endl; //} //end while while (true) { cout << atm << endl; //读atm是个原子操作,但是整个这一行代码并不是个原子操作; } } void inMsgRecvQueue() //unlock() { //for (int i = 0; i < 100000; ++i) //{ // cout << "inMsgRecvQueue()执行,插入一个元素" << i << endl; // std::unique_lock<std::mutex> sbguard1(my_mutex1); // msgRecvQueue.push_back(i); //假设这个数字i就是我收到的命令,我直接弄到消息队列里边来; // my_cond.notify_one(); //我们尝试把wait()的线程唤醒,其实现在outMsgRecvQueue()中的my_cond.wait()已经醒了,但光醒了没有用,你这里要是不把互斥量撒开,醒了他也要堵在另外一个线程的wait()那里; //} for (int i = 0; i < 1000000; ++i) { atm += 1; //原子操作 //atm = atm + 1; //不是原子操作 } return; } private: std::list<int> msgRecvQueue; //容器(消息队列),专门用于代表玩家给咱们发送过来的命令。 std::mutex my_mutex1; //创建了一个互斥量 (一把锁头) std::condition_variable my_cond; //生成一个条件对象 }; int main(){ //一:补充一些知识点 //(1.1)虚假唤醒:wait中要有第二参数(lambda)并且这个lambda中要正确判断要处理的公共数据是否存在; //wait(),notify_one(),notify_all() //(1.2)atomic ,10,11节都有介绍 //二:浅谈线程池 //(2.1)场景设想 //服务器程序,--》客户端, 每来 一个客户端,就创建 一个新线程为该客户提供服务。 //a)网络游戏,2万玩家不可能给每个玩家创建个新线程,此程序写法在这种场景下不通; //b)程序稳定性问题:编写的代码中,偶尔创建一个线程这种代码,这种写法,就让人感到不安; //线程池:把一堆线程弄到一起,统一管理。这种统一管理调度,循环利用线程的方式,就叫线程池; //(2.2)实现方式 //在程序启时,我一次性的创建好一定数量的线程。10,8,100-200,更让人放心,觉得程序代码更稳定; //三:线程创建数量谈 //(3.1)线程开的数量极限问题,2000个线程基本就是极限;再创建线程就崩溃; //(3.2)线程创建数量建议 //a)采用某些技术开发程序;api接口提供商建议你 创建线程数量 = cpu数量,cpu *2 ,cpu *2 +2,遵照专业建议和指示来,专业意见确保程序高效率执行 //b)创建多线程完成业务; 一个线程等于一条执行通路; 100要堵塞充值,我们这里开110个线程,那是很合适的; //c)1800个线程,建议,线程数量尽量不要超过500个,能控制在200个之内; //四:c++11多线程总结 //windows,linux; A myobja; std::thread myOutnMsgObj(&A::outMsgRecvQueue, &myobja); //第二个参数是 引用,才能保证线程里 用的是同一个对象。 std::thread myInMsgObj(&A::inMsgRecvQueue, &myobja); std::thread myInMsgObj2(&A::inMsgRecvQueue, &myobja); myInMsgObj.join(); myOutnMsgObj.join(); myInMsgObj2.join(); //int abc = 10'00; c++新标准允许这么写 return 0;} 声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/621804推荐阅读article数据结构入门(C语言版)栈和队列之队列的介绍及实现_c语言中栈和队列有什么区别?如何实现...队列的介绍及实现_c语言中栈和队列有什么区别?如何实现c语言中栈和队列有什么区别?如何实现 ... 赞踩articlewether.html5.qq.com,人教版英语九年级全一册Unit 1单元测试卷及参考答案解析(...UNIT 1 达标测试卷时间:120分钟 满分:150分第一卷 听力部分(30分)一、关键词语选择(共5小题;每小题1分... 赞踩article二、二分和前缀和_二分 前缀和...二分和前缀和思想和例题_二分 前缀和二分 前缀和 目录 目录 二分: 一、二分步骤 二、二分类型... 赞踩articlepython中的&&及||_&&在python中...首先说明一下,在python中是没有&&及||这两个运算符的,取而代之的是英文and和or。其他运算符没有变动。 接着重... 赞踩article2022届毕业生达1076万人,平均月薪6507元,你达到了吗?_南京2022届毕业生工资平均水平...中新网北京4月27日电 (门睿)2022届我国高校毕业生预计1076万人,同比增加167万人,规模首次突破1000万人。... 赞踩article用Python 画出玫瑰图案 (Python经典编程案例)_python编程画玫瑰...用Python 画出玫瑰图案,代码如下:import turtle as t# 定义一个曲线绘制函数def Degree... 赞踩article彻底解决Flutter_downloader中出现的问题_flutterdownloader...解决Flutter_downloader中出现的问题由于之前已经发布过一篇文章Flutter内部更新遗留了一些问题就是关... 赞踩article微软发布Copilot+ PC:集成GPT-4o,史上最强、最快Windows!_copilot+p...数据爆炸时代,我们每天在电脑上做的事情实在太多,浏览了无数的网页,存储海量的文件夹,从数千封邮件中找出自己最想要的那一个... 赞踩article华为 HCIA-AI V3.0 认证人工智能工程师考试_华为hcia-ia3.0是如何考试的...华为 HCIA-AI V3.0 认证人工智能工程师考试_华为hcia-ia3.0是如何考试的华为hcia-ia3.0是如... 赞踩article郑州大学期末php试题,郑州大学英语期末考试试题及答案...卷一一、交际英语1.- What's the date today?- _________A.It's October.... 赞踩articleKafkaLagExporter安装部署_kafka exporter...KafkaLag计算,KafkaLag监控,KafkaLagExporter安装部署,Kafka-Lag-Exporte... 赞踩article二叉树详解(C++)_c++二叉树...树状图是一种数据结构,它是由 n(n>=1)个有限结点组成一个具有层次关系的集合。把它叫做“树”是因为它看起来像一棵倒挂... 赞踩articlepython程序设计基础课后习题答案(电子版,可复制)第二章_python程序设计基础钟雪灵电子书...本书全部答案请关注公众号【python数据分析之禅】,回复“课后答案”获取第二章答案2.1:实例1的改造。修改实例代码1... 赞踩articlePython123作业六:数据结构_输入一组数字,采用逗号分隔,输出其中的最大值。...获取以逗号分隔的多个数据输入(输入为一行),计算基本统计值(平均值、标准差、中位数)... 赞踩article【 FPGA 】序列检测器 11010 (mealy状态机,moore状态机)_状态机实验(实现序列...状态机是硬件电路设计的常用的描述工具,也是电路设计的重要思想。很早之前我就知道mealy状态机和moore状态机,但是对... 赞踩article15个经典面试问题及回答思路,,大厂面试必问_程序员面试问公司问题...包含最全MySQL、Redis、Java并发编程等等面试题和答案,用于参考~_程序员面试问公司问题程序员面试问公司问题 ... 赞踩articleRAG的10篇论文-2024Q1...大模型来了,论文都读不过来了。在大型模型的研究与工程应用领域,变化之迅猛令人瞠目,用“日新月异”来形容似乎都显得有些保守... 赞踩articlegit常用命令之Fetch_git fetch了之后执行什么代码...命令用于从远程仓库获取最新的提交历史和分支信息,但不会自动合并或修改本地代码。它是在进行Git协作开发时常用的命令之一,... 赞踩articleVue中组件之间的通信有哪些方法...以上就是在Vue中组件之间通信的一些常见方法。选择哪种方法取决于你的具体需求和项目的复杂性。Vue中组件之间的通信有哪些... 赞踩article在VMware中安装CentOS系统及部署DooTask平台操作笔记_dootask windows...此文章是作为小白安装CentOS 7系统及部署DooTask平台(包括安装Docker 和 Docker Compose... 赞踩相关标签数据结构c语言wether.html5.qq.com算法python编程语言C认证职场和发展python经典编程案例Flutter内部更新Flutter内部更新问题downloadermicrosoftcopilot华为HCIA-AI郑州大学期末php试题kafkajava大数据c++开发语言后端fpga开发
赞
踩
线程池(thread pool): 一种线程使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在短时间任务创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数据取决于可用的并发处理器、处理器内核、内存、网络sockets等数量。
创建一定数量的线程,启动线程,调配任务,管理着线程池。 线程池目前只需要启动(start()),停止方法(stop()),及任务添加方法(addTask). start()创建一定数量的线程池,进入线程循环. stop()停止所有的线程循环,回收所有资源. addTask()添加任务.
线程池中线程,在线程池中等待并执行分配任务. 该文选用条件变量实现等待和通知机制
添加任务接口,以供工作线程调度任务的执行.
用于存放没有处理的任务,提供一种缓冲机制,同时任务队列具有调度功能,高级优先的任务放在任务队列前面;本文选用priority_queue与pair的结合用作任务优先队列结构.
假设我们的线程池大小为3,任务队列目前不做大小限制。
下面情况下所有工作线程处于空闲的等待状态,任务缓冲队列为空.
基于3.1情况,所有的工作线程已处于等待状态,主线程开始添加三个任务,添加后通知(notif())唤醒线程池中的线程开始取(take())任务执行。此时的任务缓冲队列还是空。
基于3.2情况,所有工作线程都在工作中,主线程开始添加第四个任务,添加后发现现在线程池中线程用完了,于是存入任务缓冲队列。工作线程空闲后主动从任务队列取任务执行。
此情况发生情形3且设置了任务缓冲队列大小后面,主程序添加第N个任务,添加后发现线程池中线程已经用完了,任务缓冲队列已满,于是进入等待状态,等待任务缓冲队列中任务腾空通知。但是这种情形会阻塞主线程,本文不限制任务队列的大小,必要时再优化。
参考连接: Thread pool;ThreadPool
线程池的主要组成由三个部分构成:
等待通知机制通过条件变量实现,Logger和CurrentThread,用于调试,可以无视。
#ifndef _THREADPOOL_HH#define _THREADPOOL_HH #include <vector>#include <utility>#include <queue>#include <thread>#include <functional>#include <mutex> #include "Condition.hh" class ThreadPool{public: static const int kInitThreadsSize = 3; enum taskPriorityE { level0, level1, level2, }; typedef std::function<void()> Task; typedef std::pair<taskPriorityE, Task> TaskPair; ThreadPool(); ~ThreadPool(); void start(); void stop(); void addTask(const Task&); void addTask(const TaskPair&); private: ThreadPool(const ThreadPool&);//禁止复制拷贝. const ThreadPool& operator=(const ThreadPool&); struct TaskPriorityCmp { bool operator()(const ThreadPool::TaskPair p1, const ThreadPool::TaskPair p2) { return p1.first > p2.first; //first的小值优先 } }; void threadLoop(); Task take(); typedef std::vector<std::thread*> Threads; typedef std::priority_queue<TaskPair, std::vector<TaskPair>, TaskPriorityCmp> Tasks; Threads m_threads; Tasks m_tasks; std::mutex m_mutex; Condition m_cond; bool m_isStarted;}; #endif //Cpp #include <assert.h> #include "Logger.hh" // debug#include "CurrentThread.hh" // debug#include "ThreadPool.hh" ThreadPool::ThreadPool() :m_mutex(), m_cond(m_mutex), m_isStarted(false){ } ThreadPool::~ThreadPool(){ if(m_isStarted) { stop(); }} void ThreadPool::start(){ assert(m_threads.empty()); m_isStarted = true; m_threads.reserve(kInitThreadsSize); for (int i = 0; i < kInitThreadsSize; ++i) { m_threads.push_back(new std::thread(std::bind(&ThreadPool::threadLoop, this))); } } void ThreadPool::stop(){ LOG_TRACE << "ThreadPool::stop() stop."; { std::unique_lock<std::mutex> lock(m_mutex); m_isStarted = false; m_cond.notifyAll(); LOG_TRACE << "ThreadPool::stop() notifyAll()."; } for (Threads::iterator it = m_threads.begin(); it != m_threads.end() ; ++it) { (*it)->join(); delete *it; } m_threads.clear();} void ThreadPool::threadLoop(){ LOG_TRACE << "ThreadPool::threadLoop() tid : " << CurrentThread::tid() << " start."; while(m_isStarted) { Task task = take(); if(task) { task(); } } LOG_TRACE << "ThreadPool::threadLoop() tid : " << CurrentThread::tid() << " exit.";} void ThreadPool::addTask(const Task& task){ std::unique_lock<std::mutex> lock(m_mutex); /*while(m_tasks.isFull()) {//when m_tasks have maxsize cond2.wait(); } */ TaskPair taskPair(level2, task); m_tasks.push(taskPair); m_cond.notify();} void ThreadPool::addTask(const TaskPair& taskPair){ std::unique_lock<std::mutex> lock(m_mutex); /*while(m_tasks.isFull()) {//when m_tasks have maxsize cond2.wait(); } */ m_tasks.push(taskPair); m_cond.notify();} ThreadPool::Task ThreadPool::take(){ std::unique_lock<std::mutex> lock(m_mutex); //always use a while-loop, due to spurious wakeup while(m_tasks.empty() && m_isStarted) { LOG_TRACE << "ThreadPool::take() tid : " << CurrentThread::tid() << " wait."; m_cond.wait(lock); } LOG_TRACE << "ThreadPool::take() tid : " << CurrentThread::tid() << " wakeup."; Task task; Tasks::size_type size = m_tasks.size(); if(!m_tasks.empty() && m_isStarted) { task = m_tasks.top().second; m_tasks.pop(); assert(size - 1 == m_tasks.size()); /*if (TaskQueueSize_ > 0) { cond2.notify(); }*/ } return task; }
队列作为先进先出的数据结构,当有可用的工作时,线程从队列中获取工作并执行。如果两个线程同时执行相同的工作会出现程序崩溃。为了避免这种问题,需要再标准C++ Queue上实现一个包装器,使用mutex来限制并发访问。
void enqueue(T& t) { std::unique_lock<std::mutex> lock(m_mutex); m_queue.push(t);}
要排队做的第一件事情就是锁定互斥锁来确保没有其他人正在访问该资源。然后,将元素推送到队列当中。当锁超出范围时,它会自动释放,这样使Queue线程安全,因此不用担心许多线程在相同时间访问或者修改它。
线程池最重要的方法是负责向队列添加任务。
测试线程池基本的创建退出工作,以及检测资源是否回收正常。
int main(){ { ThreadPool threadPool; threadPool.start(); getchar();} getchar(); return 0;}
./test.out 2018-11-25 16:50:36.054805 [TRACE] [ThreadPool.cpp:53] [threadLoop] ThreadPool::threadLoop() tid : 3680 start.2018-11-25 16:50:36.054855 [TRACE] [ThreadPool.cpp:72] [take] ThreadPool::take() tid : 3680 wait.2018-11-25 16:50:36.055633 [TRACE] [ThreadPool.cpp:53] [threadLoop] ThreadPool::threadLoop() tid : 3679 start.2018-11-25 16:50:36.055676 [TRACE] [ThreadPool.cpp:72] [take] ThreadPool::take() tid : 3679 wait.2018-11-25 16:50:36.055641 [TRACE] [ThreadPool.cpp:53] [threadLoop] ThreadPool::threadLoop() tid : 3681 start.2018-11-25 16:50:36.055701 [TRACE] [ThreadPool.cpp:72] [take] ThreadPool::take() tid : 3681 wait.2018-11-25 16:50:36.055736 [TRACE] [ThreadPool.cpp:53] [threadLoop] ThreadPool::threadLoop() tid : 3682 start.2018-11-25 16:50:36.055746 [TRACE] [ThreadPool.cpp:72] [take] ThreadPool::take() tid : 3682 wait. 2018-11-25 16:51:01.411792 [TRACE] [ThreadPool.cpp:36] [stop] ThreadPool::stop() stop.2018-11-25 16:51:01.411863 [TRACE] [ThreadPool.cpp:39] [stop] ThreadPool::stop() notifyAll().2018-11-25 16:51:01.411877 [TRACE] [ThreadPool.cpp:76] [take] ThreadPool::take() tid : 3680 wakeup.2018-11-25 16:51:01.411883 [TRACE] [ThreadPool.cpp:62] [threadLoop] ThreadPool::threadLoop() tid : 3680 exit.2018-11-25 16:51:01.412062 [TRACE] [ThreadPool.cpp:76] [take] ThreadPool::take() tid : 3682 wakeup.2018-11-25 16:51:01.412110 [TRACE] [ThreadPool.cpp:62] [threadLoop] ThreadPool::threadLoop() tid : 3682 exit.2018-11-25 16:51:01.413052 [TRACE] [ThreadPool.cpp:76] [take] ThreadPool::take() tid : 3679 wakeup.2018-11-25 16:51:01.413098 [TRACE] [ThreadPool.cpp:62] [threadLoop] ThreadPool::threadLoop() tid : 3679 exit.2018-11-25 16:51:01.413112 [TRACE] [ThreadPool.cpp:76] [take] ThreadPool::take() tid : 3681 wakeup.2018-11-25 16:51:01.413141 [TRACE] [ThreadPool.cpp:62] [threadLoop] ThreadPool::threadLoop() tid : 3681 exit.
测试添加任务接口,以及优先级任务队列。主线程首先添加5个普通任务,1s后添加一个高优先级任务,当前3个线程中的最先一个空闲后,会最先执行后面添加的priorityFunc().
std::mutex g_mutex; void priorityFunc(){ for (int i = 1; i < 4; ++i) { std::this_thread::sleep_for(std::chrono::seconds(1)); std::lock_guard<std::mutex> lock(g_mutex); LOG_DEBUG << "priorityFunc() [" << i << "at thread [ " << CurrentThread::tid() << "] output";// << std::endl; } } void testFunc(){ // loop to print character after a random period of time for (int i = 1; i < 4; ++i) { std::this_thread::sleep_for(std::chrono::seconds(1)); std::lock_guard<std::mutex> lock(g_mutex); LOG_DEBUG << "testFunc() [" << i << "] at thread [ " << CurrentThread::tid() << "] output";// << std::endl; } } int main(){ ThreadPool threadPool; threadPool.start(); for(int i = 0; i < 5 ; i++) threadPool.addTask(testFunc); std::this_thread::sleep_for(std::chrono::seconds(1)); threadPool.addTask(ThreadPool::TaskPair(ThreadPool::level0, priorityFunc)); getchar(); return 0;}
./test.out 2018-11-25 18:24:20.886837 [TRACE] [ThreadPool.cpp:56] [threadLoop] ThreadPool::threadLoop() tid : 4121 start.2018-11-25 18:24:20.886893 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4121 wakeup.2018-11-25 18:24:20.887580 [TRACE] [ThreadPool.cpp:56] [threadLoop] ThreadPool::threadLoop() tid : 4120 start.2018-11-25 18:24:20.887606 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4120 wakeup.2018-11-25 18:24:20.887610 [TRACE] [ThreadPool.cpp:56] [threadLoop] ThreadPool::threadLoop() tid : 4122 start.2018-11-25 18:24:20.887620 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4122 wakeup.2018-11-25 18:24:21.887779 [DEBUG] [main.cpp:104] [testFunc] testFunc() [1] at thread [ 4120] output2018-11-25 18:24:21.887813 [DEBUG] [main.cpp:104] [testFunc] testFunc() [1] at thread [ 4122] output2018-11-25 18:24:21.888909 [DEBUG] [main.cpp:104] [testFunc] testFunc() [1] at thread [ 4121] output2018-11-25 18:24:22.888049 [DEBUG] [main.cpp:104] [testFunc] testFunc() [2] at thread [ 4120] output2018-11-25 18:24:22.888288 [DEBUG] [main.cpp:104] [testFunc] testFunc() [2] at thread [ 4122] output2018-11-25 18:24:22.889978 [DEBUG] [main.cpp:104] [testFunc] testFunc() [2] at thread [ 4121] output2018-11-25 18:24:23.888467 [DEBUG] [main.cpp:104] [testFunc] testFunc() [3] at thread [ 4120] output2018-11-25 18:24:23.888724 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4120 wakeup.2018-11-25 18:24:23.888778 [DEBUG] [main.cpp:104] [testFunc] testFunc() [3] at thread [ 4122] output2018-11-25 18:24:23.888806 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4122 wakeup.2018-11-25 18:24:23.890413 [DEBUG] [main.cpp:104] [testFunc] testFunc() [3] at thread [ 4121] output2018-11-25 18:24:23.890437 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4121 wakeup.2018-11-25 18:24:24.889247 [DEBUG] [main.cpp:92] [priorityFunc] priorityFunc() [1at thread [ 4120] output2018-11-25 18:24:24.891187 [DEBUG] [main.cpp:104] [testFunc] testFunc() [1] at thread [ 4121] output2018-11-25 18:24:24.893163 [DEBUG] [main.cpp:104] [testFunc] testFunc() [1] at thread [ 4122] output2018-11-25 18:24:25.889567 [DEBUG] [main.cpp:92] [priorityFunc] priorityFunc() [2at thread [ 4120] output2018-11-25 18:24:25.891477 [DEBUG] [main.cpp:104] [testFunc] testFunc() [2] at thread [ 4121] output2018-11-25 18:24:25.893450 [DEBUG] [main.cpp:104] [testFunc] testFunc() [2] at thread [ 4122] output2018-11-25 18:24:26.890295 [DEBUG] [main.cpp:92] [priorityFunc] priorityFunc() [3at thread [ 4120] output2018-11-25 18:24:26.890335 [TRACE] [ThreadPool.cpp:99] [take] ThreadPool::take() tid : 4120 wait.2018-11-25 18:24:26.892265 [DEBUG] [main.cpp:104] [testFunc] testFunc() [3] at thread [ 4121] output2018-11-25 18:24:26.892294 [TRACE] [ThreadPool.cpp:99] [take] ThreadPool::take() tid : 4121 wait.2018-11-25 18:24:26.894274 [DEBUG] [main.cpp:104] [testFunc] testFunc() [3] at thread [ 4122] output2018-11-25 18:24:26.894299 [TRACE] [ThreadPool.cpp:99] [take] ThreadPool::take() tid : 4122 wait. 2018-11-25 18:24:35.359003 [TRACE] [ThreadPool.cpp:37] [stop] ThreadPool::stop() stop.2018-11-25 18:24:35.359043 [TRACE] [ThreadPool.cpp:42] [stop] ThreadPool::stop() notifyAll().2018-11-25 18:24:35.359061 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4120 wakeup.2018-11-25 18:24:35.359067 [TRACE] [ThreadPool.cpp:65] [threadLoop] ThreadPool::threadLoop() tid : 4120 exit.2018-11-25 18:24:35.359080 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4122 wakeup.2018-11-25 18:24:35.359090 [TRACE] [ThreadPool.cpp:65] [threadLoop] ThreadPool::threadLoop() tid : 4122 exit.2018-11-25 18:24:35.359123 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4121 wakeup.2018-11-25 18:24:35.359130 [TRACE] [ThreadPool.cpp:65] [threadLoop] ThreadPool::threadLoop() tid : 4121 exit.
3. 课程的线程池
// project4.cpp: 定义控制台应用程序的入口点。// #include "stdafx.h"#include <iostream>#include <vector>#include <map>#include <string>#include <thread>#include <list>#include <mutex>#include <future> using namespace std; class A {public: atomic<int> atm; A() { atm = 0; //auto atm2 = atm; //这种定义时初始化操作不允许,显示 “尝试引用已删除的函数”编译器内部肯定把拷贝构造函数给干掉了用 = delete //atomic<int> atm3 = atm; //atomic<int> atm2; //atm2 = atm; //尝试引用已删除的函数,拷贝赋值运算符也不让用 //load():以原子方式读atomic对象的值 atomic<int> atm2(atm.load()); //读 auto atm3(atm.load()); //store()以原子方式写入内容 atm2.store(12); atm2 = 12; } void outMsgRecvQueue() { //int command = 0; //while (true) //{ // std::unique_lock<std::mutex> sbguard1(my_mutex1); //临界进去 // my_cond.wait(sbguard1, [this] { //this,可以参考 未归类知识点,第八节 // if (!msgRecvQueue.empty()) // return true; // 该lambda返回true,则wait就返回,流程走下来,互斥锁被本线程拿到。 // return false; //解锁并休眠,卡在wait等待被再次唤醒 // }); // //现在互斥锁锁着,流程走下来了,队列里有数据; // command = msgRecvQueue.front(); //返回第一个元素,但不检查元素是否存在; // msgRecvQueue.pop_front(); //移除第一个元素,但不返回; // sbguard1.unlock(); //因为unique_lock的灵活性,我们可以随时unlock解锁,以免锁住太长时间 // cout << "outMsgRecvQueue()执行,取出一个元素" << command << endl; //} //end while while (true) { cout << atm << endl; //读atm是个原子操作,但是整个这一行代码并不是个原子操作; } } void inMsgRecvQueue() //unlock() { //for (int i = 0; i < 100000; ++i) //{ // cout << "inMsgRecvQueue()执行,插入一个元素" << i << endl; // std::unique_lock<std::mutex> sbguard1(my_mutex1); // msgRecvQueue.push_back(i); //假设这个数字i就是我收到的命令,我直接弄到消息队列里边来; // my_cond.notify_one(); //我们尝试把wait()的线程唤醒,其实现在outMsgRecvQueue()中的my_cond.wait()已经醒了,但光醒了没有用,你这里要是不把互斥量撒开,醒了他也要堵在另外一个线程的wait()那里; //} for (int i = 0; i < 1000000; ++i) { atm += 1; //原子操作 //atm = atm + 1; //不是原子操作 } return; } private: std::list<int> msgRecvQueue; //容器(消息队列),专门用于代表玩家给咱们发送过来的命令。 std::mutex my_mutex1; //创建了一个互斥量 (一把锁头) std::condition_variable my_cond; //生成一个条件对象 }; int main(){ //一:补充一些知识点 //(1.1)虚假唤醒:wait中要有第二参数(lambda)并且这个lambda中要正确判断要处理的公共数据是否存在; //wait(),notify_one(),notify_all() //(1.2)atomic ,10,11节都有介绍 //二:浅谈线程池 //(2.1)场景设想 //服务器程序,--》客户端, 每来 一个客户端,就创建 一个新线程为该客户提供服务。 //a)网络游戏,2万玩家不可能给每个玩家创建个新线程,此程序写法在这种场景下不通; //b)程序稳定性问题:编写的代码中,偶尔创建一个线程这种代码,这种写法,就让人感到不安; //线程池:把一堆线程弄到一起,统一管理。这种统一管理调度,循环利用线程的方式,就叫线程池; //(2.2)实现方式 //在程序启时,我一次性的创建好一定数量的线程。10,8,100-200,更让人放心,觉得程序代码更稳定; //三:线程创建数量谈 //(3.1)线程开的数量极限问题,2000个线程基本就是极限;再创建线程就崩溃; //(3.2)线程创建数量建议 //a)采用某些技术开发程序;api接口提供商建议你 创建线程数量 = cpu数量,cpu *2 ,cpu *2 +2,遵照专业建议和指示来,专业意见确保程序高效率执行 //b)创建多线程完成业务; 一个线程等于一条执行通路; 100要堵塞充值,我们这里开110个线程,那是很合适的; //c)1800个线程,建议,线程数量尽量不要超过500个,能控制在200个之内; //四:c++11多线程总结 //windows,linux; A myobja; std::thread myOutnMsgObj(&A::outMsgRecvQueue, &myobja); //第二个参数是 引用,才能保证线程里 用的是同一个对象。 std::thread myInMsgObj(&A::inMsgRecvQueue, &myobja); std::thread myInMsgObj2(&A::inMsgRecvQueue, &myobja); myInMsgObj.join(); myOutnMsgObj.join(); myInMsgObj2.join(); //int abc = 10'00; c++新标准允许这么写 return 0;}