当前位置:   article > 正文

【Linux】简单的线程池

【Linux】简单的线程池

目录

线程池介绍

基本概念

定义

组成部分

线程池的优点

资源高效

响应迅速

可管理性

线程池的工作原理

线程池的使用场景

线程池的注意事项

实现简单的线程池

前置函数

Mutex 类介绍

LockGuard 类介绍

Log类的介绍

枚举定义

Log类

全局对象

Conf类

myThread类的介绍

类成员变量

构造函数

成员函数

使用场景

注意事项

Task类的介绍

成员变量

成员函数

错误处理

使用场景

注意事项

线程池的实现

ThreadPool 类

成员变量

成员函数


线程池介绍

基本概念

线程池是一种多线程处理形式,它用于高效地处理大量并发任务,通过重用已创建的线程来避免频繁地线程创建与销毁所带来的开销。

定义

  • 线程池:一种维护多个线程并等待执行任务的系统。

  • 任务:任何可以执行的代码片段,如函数或方法。

组成部分

  1. 任务队列:存放待执行的任务。

  2. 线程集合:一组可并行执行任务的线程。

  3. 线程池管理器:负责线程的创建、销毁、管理,以及任务的调度。

线程池的优点

资源高效

  • 通过重用线程,降低了线程创建和销毁的频率,从而节省了系统资源。

响应迅速

  • 预先创建的线程处于等待状态,任务到达时无需等待,可立即执行。

可管理性

  • 提供了统一的管理和监控接口,便于跟踪线程状态和行为。

线程池的工作原理

  1. 任务提交:当有新任务时,将其提交到线程池。

  2. 线程分配线程池管理器检查是否有空闲线程,若有,则分配任务;若无,则任务排队等待。

  3. 任务执行与调度:线程完成任务后,从队列中取出下一个任务继续执行。

线程池的使用场景

  • 适用于处理大量并发任务,特别是任务执行时间较短的情况。

  • 常用于服务器程序,如Web服务器和数据库服务器,以应对大量并发请求。

线程池的注意事项

  • 合理设置线程池大小:避免线程过多导致的资源竞争和性能下降。

  • 监控线程池状态:确保任务的正常执行,并及时发现潜在问题。

  • 同步与互斥:注意数据的一致性和线程安全性,避免数据竞争和死锁情况。

实现简单的线程池

前置函数

Mutex 类是对 POSIX 线程互斥锁的封装,而 LockGuard 类则试图利用 RAII(Resource Acquisition Is Initialization)原则来自动管理锁的生命周期。

Mutex 类介绍

  1. class Mutex
  2. {
  3. public:
  4. Mutex(pthread_mutex_t* pMutex)
  5. :_pMutex(pMutex)
  6. {}
  7. void Lock()
  8. {
  9.   pthread_mutex_lock(_pMutex);
  10. }
  11. void UnLock()
  12. {
  13.   pthread_mutex_unlock(_pMutex);
  14. }
  15. ~Mutex()
  16. {}
  17. private:
  18.   pthread_mutex_t* _pMutex;
  19. };
  1. 构造函数:接收一个指向 pthread_mutex_t 的指针,并将其存储在私有成员 _pMutex 中。这个指针应该指向一个有效的、已经初始化的互斥锁。

  2. Lock() 方法:调用 pthread_mutex_lock 函数来尝试锁定互斥锁。如果锁已经被其他线程持有,则当前线程会阻塞,直到锁变得可用。

  3. UnLock() 方法:调用 pthread_mutex_unlock 函数来解锁互斥锁。解锁后,其他等待该锁的线程可以获得锁并执行其临界区的代码。

  4. 析构函数:目前为空,不执行任何操作。在实际应用中,如果互斥锁在 Mutex 对象销毁时仍然锁定,可能会导致问题(如死锁)。因此,一些实现可能会在析构函数中检查锁的状态,并尝试解锁(尽管这种做法有争议,因为它可能隐藏了编程错误)。

LockGuard 类介绍

  1. class LockGuard
  2. {
  3. public:
  4.   LockGuard(pthread_mutex_t* mutex)
  5.   :_mutex(mutex)
  6.   {
  7.     _mutex.Lock();
  8.   }
  9.   ~LockGuard()
  10.   {
  11.     _mutex.UnLock();
  12.   }
  13.   private:
  14.   Mutex _mutex;
  15. };
  1. 构造函数:接收一个指向 pthread_mutex_t 的指针,并使用该指针构造一个 Mutex 对象 _mutex。然后立即调用 _mutex.Lock() 来锁定互斥锁。这种方式确保了当 LockGuard 对象被创建时,相关的互斥锁会被立即锁定。

  2. 析构函数:在 LockGuard 对象被销毁时(例如,离开其作用域时)自动调用。析构函数调用 _mutex.UnLock() 来解锁互斥锁。这确保了无论何种情况下(包括异常),互斥锁都会被正确解锁,从而防止了死锁和其他多线程同步问题。

  3. 私有成员_mutex 是一个 Mutex 类型的对象,它封装了对互斥锁的操作。由于 _mutex 是一个对象而非指针,我们不需要担心内存管理或空指针的问题。

Log类的介绍

  1. #pragma once
  2. #include <iostream>
  3. #include <fstream>
  4. #include <string>
  5. #include <cstdarg>
  6. #include <ctime>
  7. #include <unistd.h>
  8. #include <sys/types.h>
  9. #include <sys/stat.h>
  10. #include <fcntl.h>
  11. enum
  12. {
  13.   Debug = 0,
  14.   Info,
  15.   Warning,
  16.   Error,
  17.   Fatal
  18. };
  19. enum
  20. {
  21.   Screen = 10,
  22.   OneFile,
  23.   ClassFile
  24. };
  25. std::string LevelToString(int level)
  26. {
  27.   switch (level)
  28.   {
  29.   case Debug:
  30.     return "Debug";
  31.   case Info:
  32.     return "Info";
  33.   case Warning:
  34.     return "Warning";
  35.   case Error:
  36.     return "Error";
  37.   case Fatal:
  38.     return "Fatal";
  39.   default:
  40.     return "Unknown";
  41.   }
  42. }
  43. const int defaultstyle = Screen;
  44. const std::string default_filename = "log.";
  45. const std::string logdir = "log";
  46. class Log
  47. {
  48. public:
  49.   Log() : style(defaultstyle), filename(default_filename)
  50.   {
  51.     mkdir(logdir.c_str(), 0775);
  52.   }
  53.   void Enable(int sty) //
  54.   {
  55.     style = sty;
  56.   }
  57.   std::string TimeStampExLocalTime()
  58.   {
  59.     time_t currtime = time(nullptr);
  60.     struct tm *curr = localtime(&currtime);
  61.     char time_buffer[128];
  62.     snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d",
  63.              curr->tm_year + 1900, curr->tm_mon + 1, curr->tm_mday,
  64.              curr->tm_hour, curr->tm_min, curr->tm_sec);
  65.     return time_buffer;
  66.   }
  67.   void WriteLogToOneFile(const std::string &logname, const std::string &message)
  68.   {
  69.     umask(0);
  70.     int fd = open(logname.c_str(), O_CREAT | O_WRONLY | O_APPEND, 0666);
  71.     if (fd < 0)
  72.       return;
  73.     write(fd, message.c_str(), message.size());
  74.     close(fd);
  75.   }
  76.   void WriteLogToClassFile(const std::string &levelstr, const std::string &message)
  77.   {
  78.     std::string logname = logdir;
  79.     logname += "/";
  80.     logname += filename;
  81.     logname += levelstr;
  82.     WriteLogToOneFile(logname, message);
  83.   }
  84.   void WriteLog(const std::string &levelstr, const std::string &message)
  85.   {
  86.     switch (style)
  87.     {
  88.     case Screen:
  89.       std::cout << message;
  90.       break;
  91.     case OneFile:
  92.       WriteLogToClassFile("all", message);
  93.       break;
  94.     case ClassFile:
  95.       WriteLogToClassFile(levelstr, message);
  96.       break;
  97.     default:
  98.       break;
  99.     }
  100.   }
  101.   void LogMessage(int level, const char *format, ...) // 类C的一个日志接口
  102.   {
  103.     char leftbuffer[1024];
  104.     std::string levelstr = LevelToString(level);
  105.     std::string currtime = TimeStampExLocalTime();
  106.     std::string idstr = std::to_string(getpid());
  107.     char rightbuffer[1024];
  108.     va_list args; // char *, void *
  109.     va_start(args, format);
  110.     // args 指向了可变参数部分
  111.     vsnprintf(rightbuffer, sizeof(rightbuffer), format, args);
  112.     va_end(args); // args = nullptr;
  113.     snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%s][%s] ",
  114.              levelstr.c_str(), currtime.c_str(), idstr.c_str());
  115.     std::string loginfo = leftbuffer;
  116.     loginfo += rightbuffer;
  117.     WriteLog(levelstr, loginfo);
  118.   }
  119.   ~Log() {}
  120. private:
  121.   int style;
  122.   std::string filename;
  123. };
  124. Log lg;
  125. class Conf
  126. {
  127. public:
  128.   Conf()
  129.   {
  130.     lg.Enable(ClassFile);
  131.   }
  132.   ~Conf()
  133.   {
  134.   }
  135. };
  136. Conf conf;

枚举定义

  1. 日志级别枚举:定义了五种日志级别,分别是DebugInfoWarningErrorFatal。这些级别通常用于表示日志信息的重要性和紧急性。

  2. 日志输出方式枚举:定义了三种输出方式,Screen表示直接输出到屏幕,OneFile表示将所有日志写入同一个文件,ClassFile表示将不同级别的日志分别写入不同的文件。

Log类

这个类是日志系统的核心,它包含了以下方法和成员:

  • 私有成员

    • style:表示当前的日志输出方式。

    • filename:日志文件的默认名称。

  • 构造函数:初始化日志系统和创建一个名为"log"的目录。

  • Enable方法:设置日志的输出方式。

  • TimeStampExLocalTime方法:返回当前的本地时间戳字符串。

  • WriteLogToOneFile方法:将日志信息写入指定的单个文件。

  • WriteLogToClassFile方法:根据日志级别将日志信息写入到对应的文件。

  • WriteLog方法:根据当前的日志输出方式,将日志信息输出到屏幕或文件。

  • LogMessage方法:这是一个可变参数的函数,用于格式化并记录日志信息。它接受一个日志级别和一个格式字符串,后面可以跟随任意数量的参数。这些参数会被格式化到日志信息中。

  • 析构函数:目前为空,但可以在这里添加清理代码,如果需要的话。

全局对象

  • lg:是一个全局的Log对象,用于在整个程序中记录日志。

  • conf:是一个Conf对象,其构造函数中启用了ClassFile日志输出方式。这意味着,除非在程序的其他地方进行更改,否则日志将默认写入到分类的文件中。

Conf类

这个类目前非常简单,只包含一个构造函数和一个析构函数。构造函数中调用了lg.Enable(ClassFile)来设置日志的输出方式为分类文件输出。这个类可以用于未来扩展更多的配置选项。

myThread类的介绍

  1. #pragma once
  2. #include <iostream>
  3. #include <string>
  4. #include <functional>
  5. #include <pthread.h>
  6. #include <string>
  7. template <class T>
  8. using func_t = std::function<void(T&)>;
  9. template <class T>
  10. class myThread
  11. {
  12. public:
  13.   myThread(std::string threadName = "default", T data = T(), func_t<T> func = nullptr)//传入线程名、要操作的数据、要执行的函数
  14.       : _tid(0),
  15.         _threadName(threadName),
  16.         _func(func),
  17.         _data(data),
  18.         _isRunning(false)
  19.   {}
  20.   static void *ThreadRoutine(void *args)//内部的运行函数
  21.   {
  22.     myThread *pmt = static_cast<myThread *> (args);
  23.     pmt->_func(pmt->_data);
  24.     return nullptr;
  25.   }
  26.   bool Start()//创建进程
  27.   {
  28.     int ret = pthread_create(&_tid, NULL, ThreadRoutine, this);
  29.     if (ret != 0)
  30.     {
  31.       return false;
  32.     }
  33.     _isRunning = true;
  34.     return true;
  35.   }
  36.   bool Join()//等待进程
  37.   {
  38.     if (!_isRunning)
  39.     {
  40.       return true;
  41.     }
  42.     int ret = pthread_join(_tid, NULL);
  43.     if (ret == 0)
  44.     {
  45.       _isRunning = false;
  46.       return true;
  47.     }
  48.     return false;
  49.   }
  50.   bool IsRunning()//返回进程的运行状态
  51.   {
  52.     return _isRunning;
  53.   }
  54.   const std::string& ThreadName()//返回线程名
  55.   {
  56.     return _threadName;
  57.   }
  58.   ~myThread()
  59.   {}
  60. private:
  61.   pthread_t _tid;
  62.   std::string _threadName;
  63.   func_t<T> _func;
  64.   T _data;
  65.   bool _isRunning;
  66. };

这个myThread类是一个C++模板类,用于封装POSIX线程(也称为pthreads)的创建、运行和等待过程。通过使用这个类,用户可以更方便地管理线程,而不需要直接处理底层的pthread API。下面是对这个类的详细介绍:

类成员变量

  • _tid: 存储线程ID的变量,类型为pthread_t

  • _threadName: 一个字符串,用于存储线程的名称。

  • _func: 一个std::function对象,存储了要在线程中执行的函数。这个函数接受一个类型为T的引用作为参数。

  • _data: 一个类型为T的对象,它将被传递给_func函数。

  • _isRunning: 一个布尔变量,用于表示线程是否正在运行。

构造函数

构造函数myThread接受三个参数:线程名、要操作的数据以及要执行的函数。这些参数都有默认值,所以用户可以选择性地提供它们。

成员函数

  1. ThreadRoutine: 这是一个静态成员函数,作为线程的入口点。它接受一个void*类型的参数(这是pthread API的要求),然后将其转换为myThread类的指针。接着,它调用存储在_func中的函数,并将_data作为参数传递。

  2. Start: 这个函数用于创建并启动线程。它调用pthread_create函数,并传入ThreadRoutine作为线程的入口点。如果线程创建成功,它将_isRunning设置为true并返回true;否则返回false

  3. Join: 这个函数用于等待线程完成执行。如果线程已经在运行,它会调用pthread_join来等待线程结束。如果线程成功结束,它将_isRunning设置为false并返回true;否则返回false。如果线程没有运行,它直接返回true

  4. IsRunning: 这个函数返回一个布尔值,表示线程是否正在运行。

  5. ThreadName: 这个函数返回线程的名称。

  6. 析构函数: 目前为空,但可以在这里添加必要的清理代码(例如,确保线程已经正确结束)。

使用场景

这个类适用于需要并发执行某个任务,同时又不希望直接处理复杂的线程API的场景。用户只需提供一个函数和一个数据对象,然后调用Start来创建和启动线程。当需要等待线程完成时,可以调用Join函数。

注意事项

  • 由于这个类使用了C++11的特性(如std::function),因此需要确保编译器支持C++11或更高版本。

  • 当使用多线程时,需要注意线程安全问题,特别是当多个线程访问共享数据时。

  • 如果线程函数抛出异常,这个异常将不会被外部捕获,并可能导致程序崩溃。因此,需要确保线程函数不会抛出异常,或者在函数内部处理所有可能的异常。

Task类的介绍

  1. #pragma once
  2. #include <iostream>
  3. #include <string>
  4. enum ERROR
  5. {
  6.   Normal = 10,
  7.   Div_Zeor,
  8.   Mod_Zeor,
  9.   UnKonwOperator
  10. };
  11. std::string opers = "!@#$%^&*()_=+-*/";
  12. template <class T>
  13. class Task
  14. {
  15. public:
  16.   Task()
  17.   {
  18.   }
  19.   Task(const T &data_x, const T &data_y, char oper)
  20.       : _data_y(data_y),
  21.         _data_x(data_x),
  22.         _oper(oper),
  23.         _code(Normal),
  24.         _result(0)
  25.   {
  26.   }
  27.   Task(const Task<T> &task)
  28.       : _data_x(task._data_x),
  29.         _data_y(task._data_y),
  30.         _oper(task._oper),
  31.         _result(task._result),
  32.         _code(task._code)
  33.   {
  34.   }
  35.   void Run()
  36.   {
  37.     switch (_oper)
  38.     {
  39.     case '+':
  40.     {
  41.       _result = _data_x + _data_y;
  42.       break;
  43.     }
  44.     case '-':
  45.     {
  46.       _result = _data_x - _data_y;
  47.       break;
  48.     }
  49.     case '*':
  50.     {
  51.       _result = _data_x * _data_y;
  52.       break;
  53.     }
  54.     case '/':
  55.     {
  56.       if (_data_y == 0)
  57.       {
  58.         _code = Div_Zeor;
  59.         break;
  60.       }
  61.       _result = _data_x / _data_y;
  62.       break;
  63.     }
  64.     case '%':
  65.     {
  66.       if (_data_y == 0)
  67.       {
  68.         _code = Mod_Zeor;
  69.         break;
  70.       }
  71.       _result = _data_x % _data_y;
  72.       break;
  73.     }
  74.     default:
  75.     {
  76.       _code = UnKonwOperator;
  77.       break;
  78.     }
  79.     }
  80.   }
  81.   std::string Print()
  82.   {
  83.     std::string show;
  84.     if (_code == Normal)
  85.     {
  86.       show += "left[";
  87.       show += std::to_string(_data_x);
  88.       show += "]";
  89.       show += _oper;
  90.       show += "right[";
  91.       show += std::to_string(_data_y);
  92.       show += "]";
  93.       show += "==";
  94.       show += " ?";
  95.     }
  96.     else
  97.     {
  98.       show += "该任务非法";
  99.       switch (_code)
  100.       {
  101.       case Div_Zeor:
  102.         show += "Div_Zeor";
  103.         break;
  104.       case Mod_Zeor:
  105.         show += "Mod_Zeor";
  106.       case UnKonwOperator:
  107.         show += "UnKonwOperator";
  108.       default:
  109.         break;
  110.       }
  111.     }
  112.     return show;
  113.   }
  114.   std::string PrintResult()
  115.   {
  116.     return std::to_string(_result);
  117.   }
  118. private:
  119.   T _data_x;
  120.   T _data_y;
  121.   T _result;
  122.   char _oper;
  123.   int _code;
  124. };

这个Task类是一个模板类,设计用于执行两个同类型数据之间的基本算术运算。模板类型T允许这个类处理不同的数据类型,只要这些类型支持算术运算符。这个类的主要功能包括:

成员变量

  1. _data_x_data_y:存储要执行运算的两个操作数。

  2. _oper:存储要执行的运算符(如+, -, *, /, %)。

  3. _result:存储运算的结果。

  4. _code:存储错误码,用于表示运算过程中是否出现错误(如除以零错误或未知运算符)。

成员函数
  1. 构造函数:有三个构造函数,一个默认构造函数,一个接受两个数据和一个运算符作为参数的构造函数,以及一个拷贝构造函数。

  2. **Run**:执行实际的运算。根据_oper的值执行相应的算术运算,并将结果存储在_result中。如果运算过程中出现错误(如除以零),则更新_code以反映错误类型。

  3. **Print**:返回一个字符串,表示运算的详细信息和可能发生的错误。如果没有错误,它将返回一个表示运算的字符串(例如,"left[5]+right[3]== ?")。如果发生错误,它将包含错误类型(例如,"该任务非法Div_Zeor")。

  4. **PrintResult**:返回表示运算结果的字符串。

错误处理

ERROR枚举用于定义可能的错误类型,如Div_Zeor(除数为零)、Mod_Zeor(取模运算的除数为零)和UnKonwOperator(未知运算符)。这些错误码在运算过程中被设置,并可以通过Print函数来查看。

使用场景

这个类可以用于创建一个简单的算术表达式求值器,能够处理基本的算术运算。由于它是一个模板类,因此可以很容易地处理不同类型的数据,如整数、浮点数等。

注意事项

  • 这个类没有进行复杂的错误处理,例如处理不支持的数据类型或检查运算符的有效性(除了基本的算术运算符外)。

  • 在多线程环境中使用这个类时需要注意线程安全,因为这个类不是线程安全的。

  • 如果需要处理更复杂的算术表达式(例如,包含括号、多个运算符和函数的表达式),则需要一个更复杂的解析器和求值器。

线程池的实现

  1. #pragma once
  2. #include <iostream>
  3. #include <queue>
  4. #include <vector>
  5. #include <string>
  6. #include "Task.hpp"
  7. #include "LockGuard.hpp"
  8. #include "myThread.hpp"
  9. #include "Log.hpp"
  10. #include <pthread.h>
  11. const int Deafult_Num = 10;
  12. struct ThreadData
  13. {
  14.   std::string _threaName;
  15.   ThreadData(const std::string &threadname)
  16.       : _threaName(threadname)
  17.   {
  18.   }
  19.   ~ThreadData()
  20.   {
  21.   }
  22. };
  23. template <class T>
  24. class ThreadPool
  25. {
  26. private:
  27.   ThreadPool(const int threadNum = Deafult_Num)
  28.       : _threadNum(threadNum)
  29.   {
  30.     pthread_cond_init(&_cond, nullptr);
  31.     pthread_mutex_init(&_mutex, nullptr);
  32.     for (int i = 0; i < _threadNum; i++)
  33.     {
  34.       std::string threadName = "thread-";
  35.       threadName += std::to_string(i + 1);
  36.       ThreadData tmp(threadName);
  37.       myThread<ThreadData> tmp_thread(threadName, tmp, std::bind(&ThreadPool::ThreadRun, this, std::placeholders::_1));
  38.       _threads.emplace_back(tmp_thread);
  39.       lg.LogMessage(Info, "%s is created...\n", threadName.c_str());
  40.     }
  41.   }
  42.   ThreadPool(const ThreadPool<T> &tp) = delete;
  43.     const ThreadPool<T> &operator=(const ThreadPool<T>= delete;
  44. public:
  45.     // 有线程安全问题的
  46.     static ThreadPool<T> *GetInstance()
  47.     {
  48.         if (instance == nullptr)
  49.         {
  50.             LockGuard lockguard(&sig_lock);
  51.             if (instance == nullptr)
  52.             {
  53.                 lg.LogMessage(Info, "创建单例成功...\n");
  54.                 instance = new ThreadPool<T>();
  55.             }
  56.         }
  57.         return instance;
  58.     }
  59.   bool Start()
  60.   {
  61.     // 启动
  62.     for (auto &thread : _threads)
  63.     {
  64.       thread.Start();
  65.       lg.LogMessage(Info, "%s is running ...\n", thread.ThreadName().c_str());
  66.     }
  67.     return true;
  68.   }
  69.   void ThreadRun(ThreadData &td)
  70.   {
  71.     while (true)
  72.     {
  73.       // 取任务
  74.       T t;
  75.       {
  76.         LockGuard lockguard(&_mutex);
  77.         while (_queue.empty())
  78.         {
  79.           ThreadWait(td);
  80.           lg.LogMessage(Debug, "thread %s is wakeup\n", td._threaName.c_str());
  81.         }
  82.         t = _queue.front();
  83.         _queue.pop();
  84.       }
  85.       // 处理任务
  86.       t.Run();
  87.       lg.LogMessage(Debug, "%s handler task %s done, result is : %s\n",
  88.                     td._threaName.c_str(), t.Print().c_str(), t.PrintResult().c_str());
  89.     }
  90.   }
  91.   void ThreadWait(const ThreadData &td)
  92.   {
  93.     lg.LogMessage(Debug, "no task, %s is sleeping...\n", td._threaName.c_str());
  94.     pthread_cond_wait(&_cond, &_mutex);
  95.   }
  96.   void ThreadWakeup()
  97.   {
  98.     pthread_cond_signal(&_cond);
  99.   }
  100.   void Push(T &in)
  101.   {
  102.     lg.LogMessage(Debug, "other thread push a task, task is : %s\n"in.Print().c_str());
  103.     LockGuard lockguard(&_mutex);
  104.     _queue.push(in);
  105.     ThreadWakeup();
  106.   }
  107.   void Wait()
  108.   {
  109.     for (auto &thread : _threads)
  110.     {
  111.       thread.Join();
  112.     }
  113.   }
  114.     ~ThreadPool()
  115.     {
  116.         pthread_mutex_destroy(&_mutex);
  117.         pthread_cond_destroy(&_cond);
  118.     }
  119. private:
  120.   std::queue<T> _queue;
  121.   std::vector<myThread<ThreadData>> _threads;
  122.   int _threadNum;
  123.   pthread_cond_t _cond;
  124.   pthread_mutex_t _mutex;
  125.     static ThreadPool<T> *instance;
  126.     static pthread_mutex_t sig_lock;
  127. };
  128. template <class T>
  129. ThreadPool<T> *ThreadPool<T>::instance = nullptr;
  130. template <class T>
  131. pthread_mutex_t ThreadPool<T>::sig_lock = PTHREAD_MUTEX_INITIALIZER;

ThreadPool 类

ThreadPool是一个线程池模板类,它允许你并行地处理一系列的任务。线程池中的线程数量可以在创建时指定,或者使用默认值(在这个例子中是Deafult_Num,尽管这个单词应该是Default_Num的拼写错误)。这个类使用了POSIX线程库(pthread)来实现线程同步。

成员变量
  1. private:
  2.   std::queue<T> _queue;
  3.   std::vector<myThread<ThreadData>> _threads;
  4.   int _threadNum;
  5.   pthread_cond_t _cond;
  6.   pthread_mutex_t _mutex;
  7.   static ThreadPool<T> *instance;
  8.   static pthread_mutex_t sig_lock;
  • _queue:一个存储Task类型对象的队列。

  • _threads:一个包含myThread<ThreadData>对象的向量,表示线程池中的线程。

  • _threadNum:线程池中的线程数量。

  • _cond:一个条件变量,用于线程的等待和唤醒。

  • _mutex:一个互斥锁,用于保护共享资源(如任务队列)。

  • instance:单例模式的实例指针。

  • sig_lock:保护单例实例化的互斥锁。

成员函数
  1. 构造函数 (ThreadPool)

  1. class ThreadPool
  2. {
  3. private:
  4.   ThreadPool(const int threadNum = Deafult_Num)
  5.       : _threadNum(threadNum)
  6.   {
  7.     pthread_cond_init(&_cond, nullptr);
  8.     pthread_mutex_init(&_mutex, nullptr);
  9.     for (int i = 0; i < _threadNum; i++)
  10.     {
  11.       std::string threadName = "thread-";
  12.       threadName += std::to_string(i + 1);
  13.       ThreadData tmp(threadName);
  14.       myThread<ThreadData> tmp_thread(threadName, tmp, std::bind(&ThreadPool::ThreadRun, this, std::placeholders::_1));
  15.       _threads.emplace_back(tmp_thread);
  16.       lg.LogMessage(Info, "%s is created...\n", threadName.c_str());
  17.     }
  18.   }
  1. * 初始化线程池,设置线程数量,初始化条件变量和互斥锁。
  2. * 根据线程数量创建相应数量的线程,并存储在`_threads`向量中。
  3. * 线程名通过`ThreadData`结构传递,并记录日志。
  1. GetInstance

  1.   static ThreadPool<T> *GetInstance()
  2.   {
  3.     if (instance == nullptr)
  4.     {
  5.       LockGuard lockguard(&sig_lock);
  6.       if (instance == nullptr)
  7.       {
  8.         lg.LogMessage(Info, "创建单例成功...\n");
  9.         instance = new ThreadPool<T>();
  10.       }
  11.     }
  12.     return instance;
  13.   }
  1. * 实现单例模式,确保整个程序中只有一个`ThreadPool`实例。
  2. * 使用双重检查锁定(Double-Checked Locking)来确保线程安全。
  1. Start

  1.   bool Start()
  2.   {
  3.     // 启动
  4.     for (auto &thread : _threads)
  5.     {
  6.       thread.Start();
  7.       lg.LogMessage(Info, "%s is running ...\n", thread.ThreadName().c_str());
  8.     }
  9.     return true;
  10.   }
  1. * 启动线程池中的所有线程。
  2. * 记录每个线程启动的日志。
  1. ThreadRun

  1.   void ThreadRun(ThreadData &td)
  2.   {
  3.     while (true)
  4.     {
  5.       // 取任务
  6.       T t;
  7.       {
  8.         LockGuard lockguard(&_mutex);
  9.         while (_queue.empty())
  10.         {
  11.           ThreadWait(td);
  12.           lg.LogMessage(Debug, "thread %s is wakeup\n", td._threaName.c_str());
  13.         }
  14.         t = _queue.front();
  15.         _queue.pop();
  16.       }
  17.       // 处理任务
  18.       t.Run();
  19.       lg.LogMessage(Debug, "%s handler task %s done, result is : %s\n",
  20.                     td._threaName.c_str(), t.Print().c_str(), t.PrintResult().c_str());
  21.     }
  22.   }
  1. * 线程的工作函数,每个线程都会执行这个函数。
  2. * 在一个无限循环中,线程从任务队列中取出一个任务,执行它,并记录日志。
  3. * 如果任务队列为空,线程会等待,直到有新的任务被推入队列。
  1. ThreadWait

  1.   void ThreadWait(const ThreadData &td)
  2.   {
  3.     lg.LogMessage(Debug, "no task, %s is sleeping...\n", td._threaName.c_str());
  4.     pthread_cond_wait(&_cond, &_mutex);
  5.   }
  1. * 当任务队列为空时,线程会调用这个函数来等待。
  2. * 使用条件变量来挂起线程,直到有新的任务被推入队列。
  1. ThreadWakeup

  1.   void ThreadWakeup()
  2.   {
  3.     pthread_cond_signal(&_cond);
  4.   }
  1. * 当有新任务被推入队列时,调用此函数来唤醒一个等待的线程。
  2. * 使用条件变量的信号功能来实现。
  1. Push

  1.   void Push(T &in)
  2.   {
  3.     lg.LogMessage(Debug, "other thread push a task, task is : %s\n", in.Print().c_str());
  4.     LockGuard lockguard(&_mutex);
  5.     _queue.push(in);
  6.     ThreadWakeup();
  7.   }
  1. * 允许其他线程将一个任务推入线程池的任务队列。
  2. * 使用互斥锁来保护任务队列的线程安全。
  3. * 唤醒一个等待的线程来处理新任务。
  1. Wait

  1.   void Wait()
  2.   {
  3.     for (auto &thread : _threads)
  4.     {
  5.       thread.Join();
  6.     }
  7.   }
  1. * 等待线程池中的所有线程完成它们当前的任务并退出。
  2. * 通常用于程序的结束阶段,确保所有任务都被处理完。
  1. **析构函数

  1.   ~ThreadPool()
  2.   {
  3.     pthread_mutex_destroy(&_mutex);
  4.     pthread_cond_destroy(&_cond);
  5.   }
  1. * 析构函数应该负责清理资源,如销毁线程、销毁互斥锁和条件变量等。
  2. * 还需要注意线程安全地停止和销毁线程。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/453058
推荐阅读
相关标签
  

闽ICP备14008679号