赞
踩
线程池:一种线程使用模式。
线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。
这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池通过一个线程安全的阻塞任务队列加上一个或一个以上的线程实现,线程池中的线程可以从阻塞队列中获取任务进行任务处理,当线程都处于繁忙状态时可以将任务加入阻塞队列中,等到其它的线程空闲后进行处理。
testMain.cc
主线程任务逻辑启动线程,不断向任务队列中push任务就可以了,此时线程接收到任务就会进行处理:
#include <iostream> #include <ctime> #include <unistd.h> #include "threadPool.hpp" #include "Task.hpp" #include "log.hpp" int main() { srand((unsigned int)time(nullptr) ^ getpid()); ThreadPool<Task>* tp = new ThreadPool<Task>(); //启动线程 tp->run(); //主线程执行任务 while(true) { int x = rand() % 100 + 1; usleep(1000); int y = rand() % 50 + 1; Task t(x, y, [](int x, int y)->int{ return x + y; }); logMessage(DEBUG, "制作任务完成:%d+%d=?", x, y); // std::cout << "制作任务完成: " << x << "+" << y << "=?" << std::endl; //将任务推送到线程池中 tp->pushTask(t); sleep(1); } return 0; }
thread.hpp
我们对创建线程进行封装,包含线程名,线程个数,回调函数,线程ID等;
#pragma once #include <iostream> #include <string> #include <functional> #include <cstdio> typedef void *(*func_t)(void *); class ThreadData { public: std::string name_; void *args_; }; class Thread { public: Thread(int num, func_t callback, void *args) : func_(callback) { char nameBuffer[64]; snprintf(nameBuffer, sizeof nameBuffer, "Thread-%d", num); name_ = nameBuffer; tdata_.args_ = args; tdata_.name_ = name_; } void start() { pthread_create(&tid_, nullptr, func_, (void *)&tdata_); } void join() { pthread_join(tid_, nullptr); } std::string name() { return name_; } ~Thread() { } private: std::string name_; // 线程名 int num_; // 线程个数 func_t func_; // 回调函数 pthread_t tid_; // 线程ID ThreadData tdata_; };
threadPool.hpp
线程池中我们需要用注意的是:
- 使用pthread_create函数创建线程时,需要为创建的线程传入一个routine(执行例程),该routine只有一个参数类型为void的参数,以及返回类型为void的返回值。因为我们将线程池封装为一个类,此时routine函数就包含两个参数,第一个参数就是隐含的this指针,直接用来创建线程程序是会报错的;
- 静态成员函数属于类,而不属于某个对象,也就是说静态成员函数是没有隐藏的this指针的,因此我们需要将routine设置为静态方法,此时routine函数才真正只有一个参数类型为void*的参数。
- 但是在静态成员函数内部无法调用非静态成员函数,而我们需要在routine函数当中调用该类的某些非静态成员函数,比如pop。因此我们需要在创建线程时,向routine函数传入的当前对象的this指针,此时我们就能够通过该this指针在routine函数内部调用非静态成员函数了。
#pragma once #include <iostream> #include <vector> #include <queue> #include <unistd.h> #include "thread.hpp" #include "lockGuard.hpp" #include "log.hpp" #define NUM 3 template <class T> class ThreadPool { public: pthread_mutex_t *getMutex() { return &lock; } bool isEmpty() { return task_queue_.empty(); } void waitCond() { pthread_cond_wait(&cond, &lock); } T getTask() { T t = task_queue_.front(); task_queue_.pop(); return t; } public: ThreadPool(int thread_num = NUM) : num_(thread_num) { pthread_mutex_init(&lock, nullptr); pthread_cond_init(&cond, nullptr); for (int i = 1; i <= num_; i++) { threads_.push_back(new Thread(i, routine, this)); } } // 生产 void run() { for (auto &iter : threads_) { iter->start(); // std::cout << iter->name() << "启动成功" << std::endl; logMessage(NORMAL, "%s %s", iter->name().c_str(), "启动成功"); } } static void *routine(void *args) { ThreadData *td = (ThreadData *)args; ThreadPool<T> *tp = (ThreadPool<T> *)td->args_; while (true) { T task; { LockGuard lockguard(tp->getMutex()); while (tp->isEmpty()) tp->waitCond(); task = tp->getTask(); } // 处理任务 task(td->name_); } } void pushTask(const T &task) { LockGuard lockguard(&lock); task_queue_.push(task); pthread_cond_signal(&cond); } ~ThreadPool() { for (auto &iter : threads_) { iter->join(); delete iter; } pthread_mutex_destroy(&lock); pthread_cond_destroy(&cond); } private: std::vector<Thread *> threads_; // 线程组 int num_; std::queue<T> task_queue_; // 任务队列 pthread_mutex_t lock; // 互斥锁 pthread_cond_t cond; // 条件变量 };
lockGuard.hpp
为了代码更加的模块化,我们将互斥锁进行一个封装成一个RAII风格的锁,创建对象是调用构造函数加锁,出作用域调用析构函数解锁:
#pragma once #include <iostream> #include <pthread.h> class Mutex { public: Mutex(pthread_mutex_t *mtx) : pmtx_(mtx) { } void lock() { pthread_mutex_lock(pmtx_); } void unlock() { pthread_mutex_unlock(pmtx_); } ~Mutex() { } private: pthread_mutex_t *pmtx_; }; class LockGuard { public: LockGuard(pthread_mutex_t* mtx) : mtx_(mtx) { mtx_.lock(); } ~LockGuard() { mtx_.unlock(); } private: Mutex mtx_; };
Task.hpp
这是一个加法的计算任务:
#pragma once #include <iostream> #include <string> #include <functional> typedef std::function<int(int, int)> tfunc_t; class Task { public: Task() { } Task(int x, int y, tfunc_t func) : x_(x), y_(y), func_(func) { } void operator()(const std::string& name) { // std::cout << "线程 " << name << " 处理完成, 结果是: " << x_ << "+" << y_ << "=" << func_(x_, y_) << std::endl; logMessage(WARNING, "%s处理完成:%d+%d = %d | %s | %d", name.c_str(), x_, y_, func_(x_, y_), __FILE__, __LINE__); } private: int x_; int y_; tfunc_t func_; };
log.hpp
此处我们在设置一个日志文件,完整的日志功能,至少: 日志等级 时间 支持用户自定义(日志内容, 文件行,文件名);
#pragma once #include <iostream> #include <cstdio> #include <cstdarg> #include <ctime> #include <string> // 日志是有日志级别的 #define DEBUG 0 #define NORMAL 1 #define WARNING 2 #define ERROR 3 #define FATAL 4 const char *gLevelMap[] = { "DEBUG", "NORMAL", "WARNING", "ERROR", "FATAL" }; #define LOGFILE "./threadpool.log" // 完整的日志功能,至少: 日志等级 时间 支持用户自定义(日志内容, 文件行,文件名) void logMessage(int level, const char *format, ...) { char stdBuffer[1024]; //标准部分 time_t timestamp = time(nullptr); snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", gLevelMap[level], timestamp); char logBuffer[1024]; //自定义部分 va_list args; va_start(args, format); vsnprintf(logBuffer, sizeof logBuffer, format, args); va_end(args); printf("%s%s\n", stdBuffer, logBuffer); }
运行代码后,我们就会发现此时就有4个线程,其中1个为主线程:
并且我们会发现这3个线程在处理时会呈现出一定的顺序性,因为主线程是每秒push一个任务,这3个线程只会有一个线程获取到该任务,其他线程都会在等待队列中进行等待,当该线程处理完任务后就会因为任务队列为空而排到等待队列的最后,当主线程再次push一个任务后会唤醒等待队列首部的一个线程,这个线程处理完任务后又会排到等待队列的最后,因此这3个线程在处理任务时会呈现出一定的顺序性。
单例模式:指的就是一个类只能创建一个对象,该模式可以保证系统中该类只有一个实例,并提供一个访问它的全局访问点,该实例被所有程序模块共享。
接下来我们以懒汉模式为例,来实现我们的线程池:
通过上述三点就可以将我们的代码做出如下改变:
threadPool.hpp
#pragma once #include <iostream> #include <vector> #include <queue> #include <unistd.h> #include "thread.hpp" #include "lockGuard.hpp" #include "log.hpp" #define NUM 3 template <class T> class ThreadPool { public: pthread_mutex_t *getMutex() { return &lock; } bool isEmpty() { return task_queue_.empty(); } void waitCond() { pthread_cond_wait(&cond, &lock); } T getTask() { T t = task_queue_.front(); task_queue_.pop(); return t; } private: ThreadPool(int thread_num = NUM) : num_(thread_num) { pthread_mutex_init(&lock, nullptr); pthread_cond_init(&cond, nullptr); for (int i = 1; i <= num_; i++) { threads_.push_back(new Thread(i, routine, this)); } } ThreadPool(const ThreadPool<T> &other) = delete; const ThreadPool<T> &operator=(const ThreadPool<T> &other) = delete; public: static ThreadPool<T> *getThreadPool(int num = NUM) { if (thread_ptr == nullptr) { LockGuard lockguard(&mutex); if (thread_ptr == nullptr) { thread_ptr = new ThreadPool<T>(num); } } return thread_ptr; } // 生产 void run() { for (auto &iter : threads_) { iter->start(); // std::cout << iter->name() << "启动成功" << std::endl; logMessage(NORMAL, "%s %s", iter->name().c_str(), "启动成功"); } } static void *routine(void *args) { ThreadData *td = (ThreadData *)args; ThreadPool<T> *tp = (ThreadPool<T> *)td->args_; while (true) { T task; { LockGuard lockguard(tp->getMutex()); while (tp->isEmpty()) tp->waitCond(); task = tp->getTask(); } // 处理任务 task(td->name_); } } void pushTask(const T &task) { LockGuard lockguard(&lock); task_queue_.push(task); pthread_cond_signal(&cond); } ~ThreadPool() { for (auto &iter : threads_) { iter->join(); delete iter; } pthread_mutex_destroy(&lock); pthread_cond_destroy(&cond); } private: std::vector<Thread *> threads_; // 线程组 int num_; std::queue<T> task_queue_; // 任务队列 pthread_mutex_t lock; // 互斥锁 pthread_cond_t cond; // 条件变量 static ThreadPool<T> *thread_ptr; static pthread_mutex_t mutex; }; template <typename T> ThreadPool<T> *ThreadPool<T>::thread_ptr = nullptr; template <typename T> pthread_mutex_t ThreadPool<T>::mutex = PTHREAD_MUTEX_INITIALIZER;
我们需要注意的是getThreadPool函数在创建对象过程中需要双检查加锁,因为简单的在if语句前后进行加锁解锁操作的话,后续在获取创建的单例对象操作时就会进行大量无意义的加锁解锁操作,我们进行双检查操作以后,就会加锁之前在进行一次判断,不为空就直接返回,就避免了后序无意义的加锁解锁操作;
testMain.cc
#include <iostream> #include <ctime> #include <unistd.h> #include "threadPool.hpp" #include "Task.hpp" #include "log.hpp" int main() { srand((unsigned int)time(nullptr) ^ getpid()); // ThreadPool<Task>* tp = new ThreadPool<Task>(); //启动线程 ThreadPool<Task>::getThreadPool()->run(); //主线程执行任务 while(true) { int x = rand() % 100 + 1; usleep(1000); int y = rand() % 50 + 1; Task t(x, y, [](int x, int y)->int{ return x + y; }); logMessage(DEBUG, "制作任务完成:%d+%d=?", x, y); // std::cout << "制作任务完成: " << x << "+" << y << "=?" << std::endl; //将任务推送到线程池中 ThreadPool<Task>::getThreadPool()->pushTask(t); sleep(1); } return 0; }
STL中的容器是否是线程安全的?
不是。原因是, STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响,而且对于不同的容器,加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶),因此 STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全。
智能指针是否是线程安全的?
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。