当前位置:   article > 正文

Linux之 线程池 | 单例模式的线程安全问题 | 其他锁

Linux之 线程池 | 单例模式的线程安全问题 | 其他锁

目录

一、线程池

1、线程池

2、线程池代码

3、线程池的应用场景

二、单例模式的线程安全问题

1、线程池的单例模式

2、线程安全问题

三、其他锁


一、线程池

1、线程池

线程池是一种线程使用模式。线程池里面可以维护一些线程。

为什么要有线程池?

因为在我们使用线程去处理各种任务的时候,尤其是一些执行时间短的任务,我们必须要先对线程进行创建然后再进行任务处理,最后再销毁线程,效率是比较低的。而且有的时候线程过多会带来调度开销,进而影响缓存局部性和整体性能。

于是,我们可以通过线程池预先创建出一批线程,线程池维护着这些线程,线程等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。

线程池不仅能够保证内核的充分利用,还能防止过分调度。

2、线程池代码

我们先对线程进行封装:Thread.hpp

  1. #pragma once
  2. #include <iostream>
  3. #include <string>
  4. #include <cstdio>
  5. #include <pthread.h>
  6. using namespace std;
  7. typedef void *(*fun_t)(void *);
  8. class ThreadData
  9. {
  10. public:
  11. void *arg_;
  12. string name_;
  13. };
  14. class Thread
  15. {
  16. public:
  17. Thread(int num, fun_t callback, void *arg)
  18. : func_(callback)
  19. {
  20. char buffer[64];
  21. snprintf(buffer, sizeof(buffer), "Thread-%d", num);
  22. name_ = buffer;
  23. tdata_.name_ = name_;
  24. tdata_.arg_ = arg;
  25. }
  26. void start()
  27. {
  28. pthread_create(&tid_, nullptr, func_, (void *)&tdata_);
  29. }
  30. void join()
  31. {
  32. pthread_join(tid_, nullptr);
  33. }
  34. string &name()
  35. {
  36. return name_;
  37. }
  38. ~Thread()
  39. {
  40. }
  41. private:
  42. pthread_t tid_;
  43. string name_;
  44. fun_t func_;
  45. ThreadData tdata_;
  46. };

线程池代码:threadPool.hpp:

  1. #pragma once
  2. #include <vector>
  3. #include <queue>
  4. #include "thread.hpp"
  5. #define THREAD_NUM 3
  6. template <class T>
  7. class ThreadPool
  8. {
  9. public:
  10. bool Empty()
  11. {
  12. return task_queue_.empty();
  13. }
  14. pthread_mutex_t *getmutex()
  15. {
  16. return &lock;
  17. }
  18. void wait()
  19. {
  20. pthread_cond_wait(&cond, &lock);
  21. }
  22. T gettask()
  23. {
  24. T t = task_queue_.front();
  25. task_queue_.pop();
  26. return t;
  27. }
  28. public:
  29. ThreadPool(int num = THREAD_NUM) : num_(num)
  30. {
  31. for (int i = 0; i < num_; i++)
  32. {
  33. threads_.push_back(new Thread(i, routine, this));
  34. }
  35. pthread_mutex_init(&lock, nullptr);
  36. pthread_cond_init(&cond, nullptr);
  37. }
  38. static void *routine(void *arg)
  39. {
  40. ThreadData *td = (ThreadData *)arg;
  41. ThreadPool<T> *tp = (ThreadPool<T> *)td->arg_;
  42. while (true)
  43. {
  44. T task;
  45. {
  46. pthread_mutex_lock(tp->getmutex());
  47. while (tp->Empty())
  48. tp->wait();
  49. task = tp->gettask();
  50. pthread_mutex_unlock(tp->getmutex());
  51. }
  52. cout << "x+y=" << task() << " " << pthread_self() << endl;
  53. }
  54. }
  55. void run()
  56. {
  57. for (auto &iter : threads_)
  58. {
  59. iter->start();
  60. }
  61. }
  62. void PushTask(const T &task)
  63. {
  64. pthread_mutex_lock(&lock);
  65. task_queue_.push(task);
  66. pthread_mutex_unlock(&lock);
  67. pthread_cond_signal(&cond);
  68. }
  69. ~ThreadPool()
  70. {
  71. for (auto &iter : threads_)
  72. {
  73. iter->join();
  74. delete iter;
  75. }
  76. pthread_mutex_destroy(&lock);
  77. pthread_cond_destroy(&cond);
  78. }
  79. private:
  80. vector<Thread *> threads_;
  81. int num_;
  82. queue<T> task_queue_;
  83. pthread_mutex_t lock;
  84. pthread_cond_t cond;
  85. };

任务:task.hpp:

  1. #pragma once
  2. #include <iostream>
  3. #include <queue>
  4. #include <pthread.h>
  5. #include <unistd.h>
  6. class task
  7. {
  8. public:
  9. task()
  10. {
  11. }
  12. task(int x, int y)
  13. : x_(x), y_(y)
  14. {
  15. }
  16. int operator()()
  17. {
  18. return x_ + y_;
  19. }
  20. private:
  21. int x_;
  22. int y_;
  23. };

 测试代码:test.cc:

  1. #include "threadPool.hpp"
  2. #include "task.hpp"
  3. #include <iostream>
  4. #include <ctime>
  5. int main()
  6. {
  7. srand((unsigned int)time(nullptr) ^ getpid() ^ 12232);
  8. ThreadPool<task> *tp = new ThreadPool<task>();
  9. tp->run();
  10. while (true)
  11. {
  12. int x = rand() % 100 + 1;
  13. sleep(1);
  14. int y = rand() % 100 + 1;
  15. task t(x, y);
  16. tp->PushTask(t);
  17. cout << x << "+" << y << "=?" << endl;
  18. }
  19. return 0;
  20. }

运行结果: 

3、线程池的应用场景

1、需要大量的线程来完成任务,且完成任务的时间比较短。 
2、对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
3、接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。

二、单例模式的线程安全问题

1、线程池的单例模式

首先,我们要做的第一件事就是把构造函数私有,再把拷贝构造和赋值运算符重载函数delete:

  1. private:
  2. ThreadPool(int num = THREAD_NUM) : num_(num)
  3. {
  4. for (int i = 0; i < num_; i++)
  5. {
  6. threads_.push_back(new Thread(i, routine, this));
  7. }
  8. pthread_mutex_init(&lock, nullptr);
  9. pthread_cond_init(&cond, nullptr);
  10. }
  11. ThreadPool(const TreadPool &other) = delete;
  12. ThreadPool operator=(const TreadPool &other) = delete;

接下来就要在类中定义一个成员变量:静态指针,方便获取单例对象,并在类外初始化:

  1. //线程池中的成员变量
  2. private:
  3. vector<Thread *> threads_;
  4. int num_;
  5. queue<T> task_queue_;
  6. pthread_mutex_t lock;
  7. pthread_cond_t cond;
  8. static ThreadPool<T> *tp;
  9. //在类外初始化
  10. template <class T>
  11. ThreadPool<T> *ThreadPool<T>::tp = nullptr;

最后我们写一个函数可以获取单例对象,在设置获取单例对象的函数的时候,注意要设置成静态成员函数,因为在获取对象前根本没有对象,无法调用非静态成员函数(无this指针): 

  1. static ThreadPool<T> *getThreadPool()
  2. {
  3. if (tp == nullptr)
  4. {
  5. tp = new ThreadPool<T>();
  6. }
  7. return tp;
  8. }

2、线程安全问题

上面的线程池的单例模式,看起来没有什么问题。可是当我们有多个线程去调用 getThreadPool函数,去创建线程池的时候,可能会有多个线程同时进入判断,判断出线程池指针为空,然后创建线程池对象。这样就会创建出多个线程池对象,这就不符合我们单例模式的要求了,所以我们必须让在同一时刻只有一个线程能够进入判断,我们就要用到锁了。

定义一个静态锁,并初始化:

  1. private:
  2. vector<Thread *> threads_;
  3. int num_;
  4. queue<T> task_queue_;
  5. pthread_mutex_t lock;
  6. pthread_cond_t cond;
  7. static ThreadPool<T> *tp;
  8. static pthread_mutex_t lock;
  9. // 类外初始化
  10. template <class T>
  11. pthread_mutex_t ThreadPool<T>::lock = PTHREAD_MUTEX_INITIALIZER;

对 getThreadPool函数进行加锁:

  1. static ThreadPool<T> *getThreadPool()
  2. {
  3. if (tp == nullptr)
  4. {
  5. pthread_mutex_lock(&lock);
  6. if (tp == nullptr)
  7. {
  8. tp = new ThreadPool<T>();
  9. }
  10. pthread_mutex_unlock(&lock);
  11. }
  12. return tp;
  13. }

对于上面的代码:我们为什么要在获取锁之前还要再加一个判断指针为空的条件呢?

当已经有一个线程创建出来了线程池的单例模式后,在这之后的所有其他线程即使申请到锁,紧着着下一步就是去释放锁,它不会进入第二个 if 条件里面。其实这样是效率低下的,因为线程会频繁申请锁,然后就释放锁。所以我们在最外层再加一个if判断,就可以阻止后来的线程不用去申请锁创建线程池了,直接返回已经创建出来的线程池。

三、其他锁

1、悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。

2、乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。
~ CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。

3、自旋锁:说到自旋锁,我们不得不说一说我们之前所用到的锁,我们之前所用的锁都是互斥锁,当线程没有竞争到互斥锁时,它会阻塞等待,只有等锁被释放了后,才能去重新申请锁。而对于自旋锁,当线程没有竞争到自旋锁的时候,线程会不断地循环检测去申请自旋锁,直到拿到锁。

一般来说,如果临界区的代码执行时间比较长的话,我们是使用互斥锁而不是自旋锁的,这样线程不会因为频繁地检测去申请锁而占用CPU资源。如果临界区的代码执行时间较短的话,我们一般就最好使用自旋锁,而不是互斥锁,因为互斥锁申请失败,是要阻塞等待,是需要发生上下文切换的,如果临界区执行的时间比较短,那可能上下文切换的时间会比临界区代码执行的时间还要长。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/403913
推荐阅读
相关标签
  

闽ICP备14008679号