赞
踩
线程系列:
Linux–线程的认识(一)
Linux–线程的分离、线程库的地址关系的理解、线程的简单封装(二)
线程的互斥:临界资源只能在同一时间被一个线程使用
生产消费模型
信号量
线程池(Thread Pool)是一种基于池化技术设计用于管理和复用线程的机制。
在多线程编程中,频繁地创建和销毁线程会消耗大量的系统资源,并且由于线程的创建和销毁需要时间,这也会降低程序的执行效率。线程池通过预先创建一定数量的线程并放入池中,需要时从池中取出线程执行任务,执行完毕后线程并不销毁而是重新放回池中等待下一次使用,从而避免了线程的频繁创建和销毁所带来的开销。
线程池的实现原理通常包括以下几个关键部分:
Thread.hpp
#ifndef __THREAD_HPP__ #define __THREAD_HPP__ #include<iostream> #include<string> #include<pthread.h> #include<functional> #include<unistd.h> using namespace std; namespace ThreadMdule { using func_t = std::function<void(string)>; class Thread { public: void Excute() { _func(_threadname); } Thread(func_t func, const std::string &name="none-name") : _func(func), _threadname(name), _stop(true) {} static void* threadroutine(void* args) { Thread* self=static_cast<Thread*>(args); self->Excute(); return nullptr; } bool start() { int n=pthread_create(&_tid,nullptr,threadroutine,this); if(!n) { _stop = false; return true; } else { return false; } } void Detach() { if(!_stop) { pthread_detach(_tid); } } void Join() { if(!_stop) { pthread_join(_tid,nullptr); } } string name() { return _threadname; } void Stop() { _stop = true; } ~Thread() {} private: pthread_t _tid; std::string _threadname; func_t _func; bool _stop; }; } #endif
ThreadPool.hpp
#pragma once #include<iostream> #include<vector> #include<queue> #include<pthread.h> #include"Thread.hpp" #include"Log.hpp" #include"LockGuard.hpp" using namespace ThreadMdule; using namespace std; const static int gdefaultthreadnum=3;//默认线程池的线程数 template <class T> class ThreadPool { public: ThreadPool(int threadnum=gdefaultthreadnum) :_threadnum(threadnum),_waitnum(0),_isrunning(false) { pthread_mutex_init(&_mutex,nullptr); pthread_cond_init(&_cond,nullptr); LOG(INFO,"ThreadPool COnstruct."); } //各个线程独立的任务函数 void HandlerTask(string name) { LOG(INFO,"%s is running...",name.c_str()); while(true) { LockQueue();//开启保护 //等到有任务时才退出循环执行下列语句 while(_task_queue.empty()&&_isrunning) { _waitnum++; ThreadSleep(); _waitnum--; } //当任务队列空并且线程池停止时线程退出 if(_task_queue.empty()&&!_isrunning) { UnlockQueue(); cout<<name<<" quit "<<endl; sleep(1); break; } //1.任务队列不为空&&线程池开启 //2.任务队列不为空&&线程池关闭,直到任务队列为空 //所以,只要有任务,就要处理任务 T t=_task_queue.front();//取出对应任务 _task_queue.pop(); UnlockQueue(); LOG(DEBUG,"%s get a task",name.c_str()); //处理任务 t(); LOG(DEBUG,"%s handler a task,result is: %s",name.c_str(),t.ResultToString().c_str()); } } //线程池中线程的构建 void InitThreadPool() { for(int i=0;i<_threadnum;i++) { string name="thread-"+to_string(i+1); _threads.emplace_back(bind(&ThreadPool::HandlerTask,this,placeholders::_1),name); LOG(INFO,"init thread %s done",name.c_str()); } _isrunning=true; } //线程池的启动 void Start() { for(auto& thread:_threads) { thread.start(); } } //线程池停止 void Stop() { LockQueue(); _isrunning=false; ThreadWakeupAll(); UnlockQueue(); } void Wait() { for(auto& thread:_threads) { thread.Join(); LOG(INFO,"%s is quit...",thread.name().c_str()); } } //将任务入队列 bool Enqueue(const T& t) { bool ret=false; LockQueue(); if(_isrunning) { _task_queue.push(t); //如果有空闲的线程,那么唤醒线程让其执行任务 if(_waitnum>0) { ThreadWakeup(); } LOG(DEBUG,"enqueue task success"); ret=true; } UnlockQueue(); return ret; } ~ThreadPool() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_cond); } private: void LockQueue() { pthread_mutex_lock(&_mutex); } void UnlockQueue() { pthread_mutex_unlock(&_mutex); } void ThreadSleep() { pthread_cond_wait(&_cond, &_mutex); } void ThreadWakeup() { pthread_cond_signal(&_cond); } void ThreadWakeupAll() { pthread_cond_broadcast(&_cond); } int _threadnum;//线程数 vector<Thread> _threads;//存储线程的vector queue<T> _task_queue;//输入的任务队列 pthread_mutex_t _mutex;//互斥锁 pthread_cond_t _cond;//条件变量 int _waitnum;//空闲的线程数 bool _isrunning;//表示线程池是否启动 };
Task.hpp
#include<iostream> #include<string> #include<functional> using namespace std; class Task { public: Task(){} Task(int a,int b): _a(a),_b(b),_result(0) {} void Excute() { _result=_a+_b; } string ResultToString() { return to_string(_a) + "+"+to_string(_b)+"="+to_string(_result); } string DebugToString() { return to_string(_a) + "+" + to_string(_b) + "= ?"; } void operator()() { Excute(); } private: int _a; int _b; int _result; };
main.cc
int main() { srand(time(nullptr)^getpid()^pthread_self()); //EnableScreen(); EnableFile(); unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>(5)); tp->InitThreadPool(); tp->Start(); int tasknum=10; while(tasknum) { sleep(1); int a=rand()%10+1; usleep(1024); int b=rand()%20+1; Task t(a,b); LOG(INFO,"main thread push task: %s",t.DebugToString().c_str()); tp->Enqueue(t); tasknum--; } tp->Stop(); tp->Wait(); }
Loh.hpp
#pragma once #include<iostream> #include<fstream> #include<ctime> #include<cstdarg> #include<string> #include<sys/types.h> #include<unistd.h> #include<cstdio> #include"LockGuard.hpp" using namespace std; bool gIsSave=false;//默认输出到屏幕 const string logname="log.txt"; //1.日志是有等级的 enum Level { DEBUG=0, INFO, WARNING, ERROR, FATAL }; void SaveFile(const string& filename,const string& messages) { ofstream out(filename,ios::app); if(!out.is_open()) { return; } out<<messages; out.close(); } //等级转化为字符串 string LevelToString(int level) { switch (level) { case DEBUG: return "Debug"; case INFO: return "Info"; case WARNING: return "Warning"; case ERROR: return "Error"; case FATAL: return "Fatal"; default: return "Unkonwn"; } } //获取当前时间 string GetTimeString() { time_t curr_time=time(nullptr);//时间戳 struct tm* format_time=localtime(&curr_time);//转化为时间结构 if(format_time==nullptr) return "None"; char time_buffer[1024]; snprintf(time_buffer,sizeof(time_buffer),"%d-%d-%d %d:%d:%d", format_time->tm_year + 1900, format_time->tm_mon + 1, format_time->tm_mday, format_time->tm_hour, format_time->tm_min, format_time->tm_sec); return time_buffer; } pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER; //获取日志信息 void LogMessage(string filename,int line,bool issave,int level,char* format,...) { string levelstr=LevelToString(level); string timestr=GetTimeString(); pid_t selfid=getpid(); char buffer[1024]; va_list arg; va_start(arg,format); vsnprintf(buffer,sizeof(buffer),format,arg); va_end(arg); string message= "[" + timestr + "]" + "[" + levelstr + "]" + "[" + std::to_string(selfid) + "]" + "[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer + "\n"; LockGuard lockguard(&lock); if(!issave) { cout<<message; } else { SaveFile(logname,message); } } #define LOG(level,format,...) \ do \ { \ LogMessage(__FILE__,__LINE__,gIsSave,level,format,##__VA_ARGS__); \ } while (0) #define EnableFile() \ do \ { \ gIsSave=true; \ } while (0) #define EnableScreen() \ do \ { \ gIsSave=false; \ } while (0)
日志是系统或程序记录所有发生事件的文件:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。