当前位置:   article > 正文

Linux--实现线程池(万字详解)_linux 线程池

linux 线程池

目录

1.概念

2.封装原生线程方便使用

3.线程池工作日志

4.线程池需要处理的任务

5.进程池的实现 

6.线程池运行测试 

7.优化线程池(单例模式 )

单例模式概念

优化后的代码

8.测试单例模式


1.概念

线程池:* 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
* 线程池的应用场景:

        * 1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
        * 2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
        * 3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误.

线程池包含:

  • 线程池:就是预先创建一定数量的线程,并将它们放入一个“池子”中。当程序需要执行新任务时,它会从线程池中取出一个空闲的线程来执行该任务,而不是每次都创建一个新的线程。
  • 任务队列:用于存放待执行的任务。当线程池中的所有线程都忙时,新任务会被添加到这个队列中等待处理。

本质上就是一个生产者和消费者模型。


2.封装原生线程方便使用

Thread.hpp:

  1. #pragma once
  2. #include <iostream>
  3. #include <string>
  4. #include <functional>
  5. #include <pthread.h>
  6. namespace ThreadMoudle
  7. {
  8. // typedef std::function<void()> func_t;
  9. //using就是给该类型重命名,有参数接收线程名
  10. //便于知道是什么线程做了什么事情
  11. using func_t = std::function<void(const std::string&)>;//函数对象
  12. class Thread
  13. {
  14. public:
  15. void Excute()
  16. {
  17. _isrunning = true;
  18. _func(_name);
  19. _isrunning = false;
  20. }
  21. public:
  22. Thread(const std::string &name, func_t func):_name(name), _func(func)
  23. {
  24. }
  25. static void *ThreadRoutine(void *args) // 新线程都会执行该方法!
  26. {
  27. Thread *self = static_cast<Thread*>(args); // 获得了当前对象
  28. self->Excute();
  29. return nullptr;
  30. }
  31. bool Start()
  32. {
  33. int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this);
  34. if(n != 0) return false;
  35. return true;
  36. }
  37. std::string Status()
  38. {
  39. if(_isrunning) return "running";
  40. else return "sleep";
  41. }
  42. void Stop()
  43. {
  44. if(_isrunning)
  45. {
  46. ::pthread_cancel(_tid);
  47. _isrunning = false;
  48. }
  49. }
  50. void Join()
  51. {
  52. ::pthread_join(_tid, nullptr);
  53. }
  54. std::string Name()
  55. {
  56. return _name;
  57. }
  58. ~Thread()
  59. {
  60. }
  61. private:
  62. std::string _name;
  63. pthread_t _tid;
  64. bool _isrunning;
  65. func_t _func; // 线程要执行的回调函数
  66. };
  67. }


3.线程池工作日志

关于日志的添加

在做大型项目的时候一般都要有日志

一般公司内部都有自己的日志库

日志是软件运行的记录信息,向显示器打印,向文件中打印。具有特定的格式

[日志等级][pid][filename][filenumber][time] 日志内容(支持可变参数)

日志等级:DEBUG INFO WANNING ERROR FATAL--致命的

这里我们自己写一个日志:

为了保证在多线程模式下,日志资源的安全,我们需要上锁,这是对锁的封装,方便调用:

 LockGuard.hpp

  1. #pragma once
  2. #include <pthread.h>
  3. class LockGuard
  4. {
  5. public:
  6. LockGuard(pthread_mutex_t *mutex):_mutex(mutex)
  7. {
  8. pthread_mutex_lock(_mutex);
  9. }
  10. ~LockGuard()
  11. {
  12. pthread_mutex_unlock(_mutex);
  13. }
  14. private:
  15. pthread_mutex_t *_mutex;
  16. };

日志代码:Log.hpp

  1. #include<iostream>
  2. #pragma once
  3. #include <iostream>
  4. #include <sys/types.h>
  5. #include <unistd.h>
  6. #include <ctime>
  7. #include <cstdarg>
  8. #include <fstream>
  9. #include <cstring>
  10. #include <pthread.h>
  11. #include "LockGuard.hpp"
  12. namespace log_ns
  13. {
  14. enum//日志等级
  15. {
  16. DEBUG = 1,
  17. INFO,
  18. WARNING,
  19. ERROR,
  20. FATAL
  21. };
  22. //将日志等级转为字符串
  23. std::string LevelToString(int level)
  24. {
  25. switch (level)
  26. {
  27. case DEBUG:
  28. return "DEBUG";
  29. case INFO:
  30. return "INFO";
  31. case WARNING:
  32. return "WARNING";
  33. case ERROR:
  34. return "ERROR";
  35. case FATAL:
  36. return "FATAL";
  37. default:
  38. return "UNKNOWN";
  39. }
  40. }
  41. //获取当前时间
  42. std::string GetCurrTime()
  43. {
  44. time_t now = time(nullptr);//该函数可获取此时刻时间戳
  45. //localtime通过时间戳可转化为当前的时间的年月日时分秒
  46. struct tm *curr_time = localtime(&now);
  47. char buffer[128];
  48. snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d",
  49. curr_time->tm_year + 1900,
  50. curr_time->tm_mon + 1,
  51. curr_time->tm_mday,
  52. curr_time->tm_hour,
  53. curr_time->tm_min,
  54. curr_time->tm_sec);
  55. return buffer;
  56. }
  57. class logmessage
  58. {
  59. public:
  60. std::string _level;//等级
  61. pid_t _id;//进程的id
  62. std::string _filename;//文件名
  63. int _filenumber;//文件编号
  64. std::string _curr_time;//时间
  65. std::string _message_info;//日志内容
  66. };
  67. #define SCREEN_TYPE 1 //向显示器打印
  68. #define FILE_TYPE 2 //往文件写
  69. //给个缺省的文件名方便测试
  70. const std::string glogfile = "./log.txt";
  71. //保证在多线程模式下,日志资源的安全(上锁)
  72. pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;
  73. // log.logMessage("", 12, INFO, "this is a %d message ,%f, %s hellwrodl", x, , , );
  74. class Log
  75. {
  76. public:
  77. Log(const std::string &logfile = glogfile) : _logfile(logfile), _type(SCREEN_TYPE)
  78. {
  79. }
  80. //调用该函数可以选择向哪里打印
  81. void Enable(int type)
  82. {
  83. _type = type;
  84. }
  85. void FlushLogToScreen(const logmessage &lg)
  86. {//向显示器打印的方法
  87. printf("[%s][%d][%s][%d][%s] %s",
  88. lg._level.c_str(),
  89. lg._id,
  90. lg._filename.c_str(),
  91. lg._filenumber,
  92. lg._curr_time.c_str(),
  93. lg._message_info.c_str());
  94. }
  95. void FlushLogToFile(const logmessage &lg)
  96. {//向文件打印的方法
  97. //文件操作的接口,std::ios::app表“append”模式
  98. //所有的输出都会被追加到文件的末尾,而不是覆盖文件的现有内容。
  99. std::ofstream out(_logfile, std::ios::app);
  100. if (!out.is_open())
  101. return;
  102. char logtxt[2048];
  103. snprintf(logtxt, sizeof(logtxt), "[%s][%d][%s][%d][%s] %s",
  104. lg._level.c_str(),
  105. lg._id,
  106. lg._filename.c_str(),
  107. lg._filenumber,
  108. lg._curr_time.c_str(),
  109. lg._message_info.c_str());
  110. out.write(logtxt, strlen(logtxt));
  111. out.close();
  112. }
  113. void FlushLog(const logmessage &lg)
  114. {
  115. //对日志做保护本质是对打印的资源做保护
  116. LockGuard lockguard(&glock);
  117. switch (_type)//选择向哪里打印
  118. {
  119. case SCREEN_TYPE:
  120. FlushLogToScreen(lg);
  121. break;
  122. case FILE_TYPE:
  123. FlushLogToFile(lg);
  124. break;
  125. }
  126. }
  127. //记录的日志信息
  128. void logMessage(std::string filename, int filenumber, int level, const char *format, ...)
  129. {
  130. logmessage lg;
  131. lg._level = LevelToString(level);
  132. lg._id = getpid();//当前进程自己的pid
  133. lg._filename = filename;
  134. lg._filenumber = filenumber;
  135. lg._curr_time = GetCurrTime();
  136. //捕获可变参数c语言的处理方法
  137. va_list ap;
  138. //可变参初始化
  139. va_start(ap, format);
  140. //取出可变参数
  141. char log_info[1024];
  142. //把可变参变为转为字符串存放在log_info
  143. vsnprintf(log_info, sizeof(log_info), format, ap);
  144. //销毁
  145. va_end(ap);
  146. lg._message_info = log_info;
  147. // 打印出来日志
  148. FlushLog(lg);
  149. }
  150. ~Log()
  151. {
  152. }
  153. private:
  154. int _type;//表示向什么设备上打印
  155. std::string _logfile;//如果向文件里打印,那么就要接收文件路径
  156. };
  157. //包含Log.hpp就能直接使用了,对外直接使用下面的三个接口就行了
  158. Log lg;
  159. //使用宏封装调用接口,__FILE__ 和 __LINE__ 是两个预定义的宏,
  160. //它们分别用于在编译时提供当前源文件的名称和当前行号。
  161. #define LOG(Level, Format, ...) \
  162. do \
  163. { \
  164. lg.logMessage(__FILE__, __LINE__, Level, Format, ##__VA_ARGS__); \
  165. } while (0)
  166. //向显示器打印
  167. #define EnableScreen() \
  168. do \
  169. { \
  170. lg.Enable(SCREEN_TYPE); \
  171. } while (0)
  172. //向文件打印
  173. #define EnableFILE() \
  174. do \
  175. { \
  176. lg.Enable(FILE_TYPE); \
  177. } while (0)
  178. };

        在这个LOG宏中,##__VA_ARGS__的用途主要是为了解决当LOG宏被调用时没有提供除了LevelFormat之外的额外参数时的问题。如果没有##操作符,当LOG宏以少于三个参数的形式被调用时(例如LOG(INFO, "Message");),编译器会收到一个关于__VA_ARGS__展开成空字符串后,逗号多余的错误,因为logMessage函数的调用会像这样:lg.logMessage(__FILE__, __LINE__, Level, Format, ,);,注意这里的两个逗号之间什么都没有。

        使用##操作符后,如果__VA_ARGS__为空,则逗号会被省略,从而避免了编译错误。因此,当LOG宏没有提供额外的参数时,logMessage函数的调用会正确地成为lg.logMessage(__FILE__, __LINE__, Level, Format);

        总结来说,虽然在这个特定的例子中##操作符并没有直接连接两个标记,但它通过允许__VA_ARGS__在宏展开时可能为空(从而省略了多余的逗号),确保了宏的灵活性和正确性。


4.线程池需要处理的任务

Task.hpp:

  1. #pragma once
  2. #include<iostream>
  3. #include<functional>
  4. class Task
  5. {
  6. public:
  7. Task()
  8. {
  9. }
  10. Task(int x, int y) : _x(x), _y(y)
  11. {
  12. }
  13. void Excute()
  14. {
  15. _result = _x + _y;
  16. }
  17. void operator ()()
  18. {
  19. Excute();
  20. }
  21. std::string debug()
  22. {
  23. std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=?";
  24. return msg;
  25. }
  26. std::string result()
  27. {
  28. std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=" + std::to_string(_result);
  29. return msg;
  30. }
  31. private:
  32. int _x;
  33. int _y;
  34. int _result;
  35. };

5.线程池的实现 

TheadPool.hpp:线程池的实现还是相对简单的,需要注意的地方在代码中已经做了注释

  1. #pragma once
  2. #include <iostream>
  3. #include <unistd.h>
  4. #include <string>
  5. #include <vector>
  6. #include <queue>
  7. #include <functional>
  8. #include "Thread.hpp"
  9. #include "Log.hpp"
  10. #include "LockGuard.hpp"
  11. using namespace ThreadMoudle;
  12. using namespace log_ns;//日志
  13. static const int gdefaultnum = 5;
  14. void test()
  15. {
  16. while (true)
  17. {
  18. std::cout << "hello world" << std::endl;
  19. sleep(1);
  20. }
  21. }
  22. template <typename T>
  23. class ThreadPool
  24. {
  25. public:
  26. void LockQueue()
  27. {
  28. pthread_mutex_lock(&_mutex);//锁住队列
  29. }
  30. void UnlockQueue()
  31. {
  32. pthread_mutex_unlock(&_mutex);//解锁
  33. }
  34. void Wakeup()//唤醒操作
  35. {
  36. pthread_cond_signal(&_cond);
  37. }
  38. void WakeupAll()//全部唤醒
  39. {
  40. pthread_cond_broadcast(&_cond);
  41. }
  42. void Sleep()//阻塞等待
  43. {
  44. //线程被挂起,锁被归还
  45. pthread_cond_wait(&_cond, &_mutex);
  46. }
  47. bool IsEmpty()//判断队列是否为空
  48. {
  49. return _task_queue.empty();
  50. }
  51. //处理任务 (处理任务队列中的任务
  52. void HandlerTask(const std::string &name) // this
  53. {
  54. while (true)//线程不退出
  55. {
  56. // 取任务
  57. LockQueue(); //保护临界资源
  58. //任务为空且线程处于运行状态,就该去休眠
  59. while (IsEmpty() && _isrunning)
  60. {
  61. _sleep_thread_num++;
  62. LOG(INFO, "%s thread sleep begin!\n", name.c_str());
  63. Sleep();
  64. LOG(INFO, "%s thread wakeup!\n", name.c_str());
  65. _sleep_thread_num--;
  66. }
  67. // 任务为空线程没有运行,那么就该退出了
  68. if (IsEmpty() && !_isrunning)
  69. {
  70. UnlockQueue();
  71. LOG(INFO, "%s thread quit\n", name.c_str());
  72. break;
  73. }
  74. // 有任务
  75. //T为任务类型,取任务
  76. T t = _task_queue.front();
  77. _task_queue.pop();
  78. UnlockQueue();
  79. // 处理任务
  80. t(); // 处理任务,此处不用也不能在临界区中处理
  81. //任务被取出来从任务队列中移走,放在一个临时空间中
  82. //此处的任务只属于该线程,处理任务和临界资源的访问是两件事
  83. //这样做提高了效率,不然处理任务就成了串行执行了
  84. // std::cout << name << ": " << t.result() << std::endl;
  85. LOG(DEBUG, "hander task done, task is : %s\n", t.result().c_str());
  86. }
  87. }
  88. void Init()//初始化,给封装的原生线程的pthread_create进行传参
  89. {
  90. //HandlerTask有隐含的this指针,func_t只是单参数的,所以不能接收
  91. //bind 被用来创建一个可调用对象 func,封装该类HandlerTask 成员函数
  92. //和对该成员函数所属对象的引用(即 this 指针)。
  93. //然后,这个可调用对象被传递给 th 类的构造函数,并存储在 std::vector<th> 中。
  94. func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
  95. //构建线程对象
  96. for (int i = 0; i < _thread_num; i++)
  97. {
  98. std::string threadname = "thread-" + std::to_string(i + 1);
  99. //将任务也构造到线程中
  100. _threads.emplace_back(threadname, func);
  101. //日志
  102. LOG(DEBUG, "construct thread %s done, init success\n", threadname.c_str());
  103. }
  104. }
  105. void Start()//启动线程
  106. {
  107. _isrunning = true;//true则启动
  108. for (auto &thread : _threads)
  109. {
  110. LOG(DEBUG, "start thread %s done.\n", thread.Name().c_str());
  111. thread.Start();//这是封装的原生线程中的成员函数用于
  112. //该成员函数封装了pthread_create,用于创建且运行线程
  113. }
  114. }
  115. //构造
  116. ThreadPool(int thread_num = gdefaultnum)
  117. : _thread_num(thread_num), _isrunning(false), _sleep_thread_num(0)
  118. {
  119. pthread_mutex_init(&_mutex, nullptr);
  120. pthread_cond_init(&_cond, nullptr);
  121. }
  122. public:
  123. void Stop()
  124. {
  125. LockQueue();
  126. _isrunning = false;//false表示每运行则停止
  127. WakeupAll();//如果设为false后有线程在休眠那么就退出不了了
  128. UnlockQueue();//所以执行停止要将所有线程 全都唤醒
  129. LOG(INFO, "Thread Pool Stop Success!\n");
  130. }
  131. void Equeue(const T &in)//把任务放到任务队列中
  132. {
  133. LockQueue();
  134. //生产工作
  135. if (_isrunning)//保证线程池是运行状态才执行生产工作
  136. {
  137. _task_queue.push(in);
  138. if (_sleep_thread_num > 0)//有线程休眠才进行唤醒
  139. Wakeup();
  140. }
  141. UnlockQueue();
  142. }
  143. ~ThreadPool()
  144. {
  145. pthread_mutex_destroy(&_mutex);
  146. pthread_cond_destroy(&_cond);
  147. }
  148. private:
  149. int _thread_num;//线程数量
  150. std::vector<Thread> _threads;//组织多个线程
  151. std::queue<T> _task_queue;//任务队列
  152. bool _isrunning;//线程池是否在运行
  153. int _sleep_thread_num;//在休眠的线程便于唤醒
  154. pthread_mutex_t _mutex;//互斥锁
  155. pthread_cond_t _cond;//条件变量
  156. };


6.线程池运行测试 

test.cc:

  1. #include "ThreadPool.hpp"
  2. #include "Task.hpp"
  3. #include "Log.hpp"
  4. using namespace log_ns;//日志
  5. int main()
  6. {
  7. EnableScreen();//向显示器打印日志
  8. ThreadPool<Task> *tp = new ThreadPool<Task>();
  9. tp->Init();//线程初始化
  10. tp->Start();//线程创建好了,开始运行
  11. int cnt = 10;
  12. while(cnt)
  13. {
  14. // 不断地向线程池推送任务
  15. sleep(1);
  16. Task t(1,1);
  17. tp->Equeue(t);//把任务放到任务队列中
  18. LOG(INFO, "equeue a task, %s\n", t.debug().c_str());
  19. sleep(1);
  20. cnt--;
  21. }
  22. tp->Stop();
  23. LOG(INFO, "thread pool stop!\n");
  24. return 0;
  25. }

运行效果:


7.优化线程池(单例模式

单例模式概念

什么是设计模式?
        IT行业这么火, 涌入的人很多. 俗话说林子大了啥鸟都有. 大佬和菜鸡们两极分化的越来越严重. 为了让菜鸡们不太拖大佬的后腿, 于是大佬们针对一些经典的常见的场景, 给定了一些对应的解决方案, 这个就是 设计模式
单例模式的特点
某些类, 只应该具有一个对象(实例), 就称之为单例.
例如一个男人只能有一个媳妇.
在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要用一个单例的类来管理这些数据.
饿汉实现方式和懒汉实现方式
[洗完的例子]
吃完饭, 立刻洗碗, 这种就是饿汉方式. 因为下一顿吃的时候可以立刻拿着碗就能吃饭.
吃完饭, 先把碗放下, 然后下一顿饭用到这个碗了再洗碗, 就是懒汉方式.
懒汉方式最核心的思想是 "延时加载". 从而能够优化服务器的启动速度

饿汉方式实现单例模式:

  1. template <typename T>
  2. class Singleton {
  3. static T data;
  4. public:
  5. static T* GetInstance() {
  6. return &data;
  7. }
  8. };

只要通过 Singleton 这个包装类来使用 T 对象, 则一个进程中只有一个 T 对象的实例
懒汉方式实现单例模式:

  1. template <typename T>
  2. class Singleton {
  3. static T* inst;
  4. public:
  5. static T* GetInstance() {
  6. if (inst == NULL) {
  7. inst = new T();
  8. }
  9. return inst;
  10. }
  11. };

        在多线程环境中,如果多个线程同时调用 GetInstance() 方法,并且此时 inst 还未被初始化(即 inst == NULL),那么这些线程可能会同时进入 if 语句块,从而导致 inst 被多次初始化。这不仅违反了单例模式的原则(即确保一个类仅有一个实例),还可能引发资源泄露或其他不可预测的行为。

懒汉方式实现单例模式(线程安全版本)
 

  1. // 懒汉模式, 线程安全
  2. template <typename T>
  3. class Singleton {
  4. volatile static T* inst; // 需要设置 volatile 关键字, 否则可能被编译器优化.
  5. static std::mutex lock;
  6. public:
  7. static T* GetInstance() {
  8. if (inst == NULL) { // 双重判定空指针, 降低锁冲突的概率, 提高性能.
  9. lock.lock(); // 使用互斥锁, 保证多线程情况下也只调用一次 new.
  10. if (inst == NULL) {
  11. inst = new T();
  12. }
  13. lock.unlock();
  14. }
  15. return inst;
  16. }
  17. };

注意事项:
1. 加锁解锁的位置
2. 双重 if 判定, 避免不必要的锁竞争
3. volatile关键字防止过度优化
   volatile 关键字告诉编译器,被修饰的变量可能会在程序控制之外被修改,因此编译器在编译过程中不能对该变量的访问进行优化,比如不能将其缓存到寄存器中,从而每次访问都直接从内存中读取。这在某些嵌入式系统或硬件编程中很有用,因为硬件状态可能会随时改变。


优化后的代码

这里我们使用懒汉模式:

        在单例模式中,禁止拷贝构造(以及拷贝赋值操作)是确保类的唯一实例不被意外复制,在多线程下引发资源泄露、竞争条件或数据不一致等问题。

变化部分:

创建线程池,只能通过获取单例的方法执行:

直接设置创建单例的指针,指向创建的线程池。需要对单例上锁。

完整代码:

  1. #pragma once
  2. #include <iostream>
  3. #include <unistd.h>
  4. #include <string>
  5. #include <vector>
  6. #include <queue>
  7. #include <functional>
  8. #include "Thread.hpp"
  9. #include "Log.hpp"
  10. #include "LockGuard.hpp"
  11. using namespace ThreadMoudle;
  12. using namespace log_ns;//日志
  13. static const int gdefaultnum = 5;
  14. void test()
  15. {
  16. while (true)
  17. {
  18. std::cout << "hello world" << std::endl;
  19. sleep(1);
  20. }
  21. }
  22. template <typename T>
  23. class ThreadPool
  24. {
  25. private:
  26. void LockQueue()
  27. {
  28. pthread_mutex_lock(&_mutex);//锁住队列
  29. }
  30. void UnlockQueue()
  31. {
  32. pthread_mutex_unlock(&_mutex);//解锁
  33. }
  34. void Wakeup()//唤醒操作
  35. {
  36. pthread_cond_signal(&_cond);
  37. }
  38. void WakeupAll()//全部唤醒
  39. {
  40. pthread_cond_broadcast(&_cond);
  41. }
  42. void Sleep()//阻塞等待
  43. {
  44. //线程被挂起,锁被归还
  45. pthread_cond_wait(&_cond, &_mutex);
  46. }
  47. bool IsEmpty()//判断队列是否为空
  48. {
  49. return _task_queue.empty();
  50. }
  51. //处理任务 (处理任务队列中的任务
  52. void HandlerTask(const std::string &name) // this
  53. {
  54. while (true)//线程不退出
  55. {
  56. // 取任务
  57. LockQueue(); //保护临界资源
  58. //任务为空且线程处于运行状态,就该去休眠
  59. while (IsEmpty() && _isrunning)
  60. {
  61. _sleep_thread_num++;
  62. LOG(INFO, "%s thread sleep begin!\n", name.c_str());
  63. Sleep();
  64. LOG(INFO, "%s thread wakeup!\n", name.c_str());
  65. _sleep_thread_num--;
  66. }
  67. // 任务为空线程没有运行,那么就该退出了
  68. if (IsEmpty() && !_isrunning)
  69. {
  70. UnlockQueue();
  71. LOG(INFO, "%s thread quit\n", name.c_str());
  72. break;
  73. }
  74. // 有任务
  75. //T为任务类型,取任务
  76. T t = _task_queue.front();
  77. _task_queue.pop();
  78. UnlockQueue();
  79. // 处理任务
  80. t(); // 处理任务,此处不用也不能在临界区中处理
  81. //任务被取出来从任务队列中移走,放在一个临时空间中
  82. //此处的任务只属于该线程,处理任务和临界资源的访问是两件事
  83. //这样做提高了效率,不然处理任务就成了串行执行了
  84. // std::cout << name << ": " << t.result() << std::endl;
  85. LOG(DEBUG, "hander task done, task is : %s\n", t.result().c_str());
  86. }
  87. }
  88. void Init()//初始化,给封装的原生线程的pthread_create进行传参
  89. {
  90. //HandlerTask有隐含的this指针,func_t只是单参数的,所以不能接收
  91. //bind 被用来创建一个可调用对象 func,封装该类HandlerTask 成员函数
  92. //和对该成员函数所属对象的引用(即 this 指针)。
  93. //然后,这个可调用对象被传递给 th 类的构造函数,并存储在 std::vector<th> 中。
  94. func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
  95. //构建线程对象
  96. for (int i = 0; i < _thread_num; i++)
  97. {
  98. std::string threadname = "thread-" + std::to_string(i + 1);
  99. //将任务也构造到线程中
  100. _threads.emplace_back(threadname, func);
  101. //日志
  102. LOG(DEBUG, "construct thread %s done, init success\n", threadname.c_str());
  103. }
  104. }
  105. void Start()//启动线程
  106. {
  107. _isrunning = true;//true则启动
  108. for (auto &thread : _threads)
  109. {
  110. LOG(DEBUG, "start thread %s done.\n", thread.Name().c_str());
  111. thread.Start();//这是封装的原生线程中的成员函数用于
  112. //该成员函数封装了pthread_create,用于创建且运行线程
  113. }
  114. }
  115. //构造函数私有
  116. ThreadPool(int thread_num = gdefaultnum)
  117. : _thread_num(thread_num), _isrunning(false), _sleep_thread_num(0)
  118. {
  119. pthread_mutex_init(&_mutex, nullptr);
  120. pthread_cond_init(&_cond, nullptr);
  121. }
  122. ThreadPool(const ThreadPool<T> &) = delete;//单例模式下禁止拷贝构造
  123. //不然会有线程安全问题
  124. void operator=(const ThreadPool<T> &) = delete;//同理赋值操作也是不能有的
  125. public:
  126. void Stop()
  127. {
  128. LockQueue();
  129. _isrunning = false;//false表示每运行则停止
  130. WakeupAll();//如果设为false后有线程在休眠那么就退出不了了
  131. UnlockQueue();//所以执行停止要将所有线程 全都唤醒
  132. LOG(INFO, "Thread Pool Stop Success!\n");
  133. }
  134. // 多线程获取单例的方法
  135. static ThreadPool<T> *GetInstance()//引用了静态成员变量,该函数也得是静态的
  136. {
  137. if (_tp == nullptr)//为空才能创建该对象,线程池只需要创建一次
  138. {
  139. //创建线程池的过程必须是串行的,上锁!
  140. LockGuard lockguard(&_sig_mutex);
  141. if (_tp == nullptr)
  142. {
  143. LOG(INFO, "create threadpool\n");
  144. // thread-1 thread-2 thread-3....
  145. _tp = new ThreadPool();
  146. _tp->Init();
  147. _tp->Start();
  148. }
  149. else
  150. {
  151. LOG(INFO, "get threadpool\n");
  152. }
  153. }
  154. return _tp;
  155. }
  156. void Equeue(const T &in)//把任务放到任务队列中
  157. {
  158. LockQueue();
  159. //生产工作
  160. if (_isrunning)//保证线程池是运行状态才执行生产工作
  161. {
  162. _task_queue.push(in);
  163. if (_sleep_thread_num > 0)//有线程休眠才进行唤醒
  164. Wakeup();
  165. }
  166. UnlockQueue();
  167. }
  168. ~ThreadPool()
  169. {
  170. pthread_mutex_destroy(&_mutex);
  171. pthread_cond_destroy(&_cond);
  172. }
  173. private:
  174. int _thread_num;//线程数量
  175. std::vector<Thread> _threads;//组织多个线程
  176. std::queue<T> _task_queue;//任务队列
  177. bool _isrunning;//线程池是否在运行
  178. int _sleep_thread_num;//在休眠的线程便于唤醒
  179. pthread_mutex_t _mutex;//互斥锁
  180. pthread_cond_t _cond;//条件变量
  181. // 单例模式
  182. // volatile static ThreadPool<T> *_tp;
  183. static ThreadPool<T> *_tp;//线程池所对应的指针,静态成员只能在类外完成初始化(单例)
  184. static pthread_mutex_t _sig_mutex;//单例的锁
  185. };
  186. //静态成员只能在类外完成初始化
  187. template <typename T>
  188. ThreadPool<T>* ThreadPool<T>::_tp = nullptr;
  189. template <typename T>
  190. pthread_mutex_t ThreadPool<T>::_sig_mutex = PTHREAD_MUTEX_INITIALIZER;

8.测试单例模式

  1. #include "ThreadPool.hpp"
  2. #include "Task.hpp"
  3. #include "Log.hpp"
  4. using namespace log_ns;
  5. int main()
  6. {
  7. EnableFILE();//向文件打印
  8. int cnt =10;
  9. while(cnt)
  10. {
  11. // 不断地向线程池推送任务
  12. sleep(1);
  13. Task t(1,1);
  14. ThreadPool<Task>::GetInstance()->Equeue(t);//单例模式下的创建
  15. LOG(INFO, "equeue a task, %s\n", t.debug().c_str());
  16. sleep(1);
  17. cnt--;
  18. }
  19. ThreadPool<Task>::GetInstance()->Stop();
  20. LOG(INFO, "thread pool stop!\n");
  21. return 0;
  22. }

 运行效果:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/944852
推荐阅读
相关标签
  

闽ICP备14008679号