当前位置:   article > 正文

C++ 基础12、线程池和有锁队列设计封装_std 有锁队列

std 有锁队列

    在C++ 并发编程中,对于线程池和队列我们是经常使用到的,本篇就简单的回顾一下,之前关于C++ 并发编程的总结。

一、有锁队列封装

队列分无琐队列和有锁队列,无锁队列依赖的是CAS原子性操作达到的,有锁队列如c++11 当中的std::queue 双端队列,其实是一个双端链表,链表这是数据结构在读取操作上还是有损性能的,毕竟链表的存储并非连续的一块内存,而是依赖前后指针进行寻址,跨度大,查找起来费时,对插入和删除有影响一定性能,具体多少不是很清楚,因为我也是道听途说的,毕竟在现在的多核时代,能慢到哪里去,如果不是要求实时性且高效率存储,普通的应用场景已是足以(就扯到这),上代码。

  1. namespace Dispatch
  2. {
  3. template <typename T>
  4. class Queue
  5. {
  6. public:
  7. inline Queue():is_task_finished_(true)
  8. {
  9. }
  10. inline T front()
  11. {
  12. std::lock_guard<std::mutex> lock(mutex_);
  13. return queue_.front();
  14. }
  15. inline void push(const T& value)
  16. {
  17. std::lock_guard<std::mutex> lock(mutex_);
  18. queue_.push(value);
  19. }
  20. inline void pop()
  21. {
  22. std::lock_guard<std::mutex> lock(mutex_);
  23. if (!queue_.empty())
  24. {
  25. queue_.pop();
  26. }
  27. }
  28. inline T popFront()
  29. {
  30. T value;
  31. std::lock_guard<std::mutex> lock(mutex_);
  32. if (!queue_.empty())
  33. {
  34. value = std::move(queue_.front());
  35. queue_.pop();
  36. }
  37. return value;
  38. }
  39. inline void clear()
  40. {
  41. std::queue<T> empty;
  42. std::lock_guard<std::mutex> lock(mutex_);
  43. swap(empty, queue_);
  44. }
  45. inline size_t size()
  46. {
  47. std::lock_guard<std::mutex> lock(mutex_);
  48. return queue_.size();
  49. }
  50. public:
  51. std::queue<T> queue_;
  52. std::atomic<bool> is_task_finished_;
  53. private:
  54. mutable std::mutex mutex_;
  55. };
  56. } // namespace Dispatch

整体来说,代码通俗易懂,要注意的一个点是 互斥量锁的使用,还有mutalbe的中文意思是“可变的,易变的” ,即使是不可变的const 修饰函数时,也是可以突破不变的,即可修改的。

  1. #include <iostream>
  2. class Person {
  3. public:
  4. Person();
  5. ~Person();
  6. int getAge() const; /*调用方法*/
  7. int getCallingTimes() const; /*获取上面的getAge()方法被调用了多少次*/
  8. private:
  9. int age;
  10. char *name;
  11. float score;
  12. mutable int m_nums; /*用于统计次数*/
  13. };
  14. Person::Person()
  15. {
  16. m_nums = 0;
  17. }
  18. Person::~Person(){}
  19. int Person::getAge() const
  20. {
  21. std::cout << "Calling the method" << std::endl;
  22. m_nums++;
  23. // age = 4; 仍然无法修改该成员变量
  24. return age;
  25. }
  26. int Person::getCallingTimes()const
  27. {
  28. return m_nums;
  29. }
  30. int main()
  31. {
  32. Person *person = new Person();
  33. for (int i = 0; i < 10; i++) {
  34. person->getAge();
  35. }
  36. std::cout << "getAge()方法被调用了" << person->getCallingTimes() << "次" << std::endl;
  37. delete person;
  38. getchar();
  39. return 0;
  40. }
二、线程池设计

上代码

  1. #pragma once
  2. namespace Dispatch
  3. {
  4. struct Thread
  5. {
  6. Thread() : start_(false)
  7. {
  8. }
  9. std::shared_ptr<std::thread> thread_;
  10. std::atomic<bool> start_;
  11. };
  12. class ThreadPool
  13. {
  14. public:
  15. typedef std::shared_ptr<ThreadPool> Ptr;
  16. inline ThreadPool(int thread) : stop_flag_(false), idl_thr_num_(thread)
  17. {
  18. for (int i = 0; i < idl_thr_num_; ++i)
  19. {
  20. pool_.emplace_back([this] {
  21. while (!this->stop_flag_)
  22. {
  23. std::function<void()> task;
  24. {
  25. std::unique_lock<std::mutex> lock{ this->mutex_ };
  26. this->cv_task_.wait(lock, [this] { return this->stop_flag_.load() || !this->tasks_.empty(); });
  27. if (this->stop_flag_ && this->tasks_.empty())
  28. return;
  29. task = std::move(this->tasks_.front());
  30. this->tasks_.pop();
  31. }
  32. idl_thr_num_--;
  33. task();
  34. idl_thr_num_++;
  35. }
  36. });
  37. }
  38. }
  39. inline ~ThreadPool()
  40. {
  41. stop_flag_.store(true);
  42. cv_task_.notify_all();
  43. for (std::thread& thread : pool_)
  44. {
  45. thread.join();
  46. }
  47. }
  48. public:
  49. template <class F, class... Args>
  50. inline void commit(F&& f, Args&&... args)
  51. {
  52. if (stop_flag_.load())
  53. throw std::runtime_error("Commit on LiDAR threadpool is stopped.");
  54. using RetType = decltype(f(args...));
  55. auto task =
  56. std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
  57. {
  58. std::lock_guard<std::mutex> lock{ mutex_ };
  59. tasks_.emplace([task]() { (*task)(); });
  60. }
  61. cv_task_.notify_one();
  62. }
  63. private:
  64. using Task = std::function<void()>;
  65. std::vector<std::thread> pool_;
  66. std::queue<Task> tasks_;
  67. std::mutex mutex_;
  68. std::condition_variable cv_task_;
  69. std::atomic<bool> stop_flag_;
  70. std::atomic<int> idl_thr_num_;
  71. };
  72. } // namespace Dispatch

如何配着使用呢?如下

  1. #include "lock_queue.h"
  2. #include "thread_pool.hpp"
  3. #include <thread>
  4. #include <mutex>
  5. #include <memory>
  6. using namespace Dispatch;
  7. /**
  8. * 请求数据结构体
  9. * */
  10. typedef struct re{
  11. /** 请求的url**/
  12. QString url;
  13. /** 请求任务编号**/
  14. QString data;
  15. /** 解析结果***/
  16. bool result=true;
  17. }re;
  18. class agreeMentServer{
  19. public:
  20. agreeMentServer(){
  21. std::unique_lock<std::mutex> lock (mu);
  22. if(thread_pool_ptr_==nullptr){
  23. thread_pool_ptr_ = std::make_shared<ThreadPool>(2);
  24. }
  25. };
  26. void addTask(re request){
  27. if (requst_pkt_queue_.size() > MAX_PACKETS_BUFFER_SIZE)
  28. {
  29. requst_pkt_queue_.clear();
  30. }
  31. requst_pkt_queue_.push(request);
  32. thread_pool_ptr_->commit([this]() { processRquest(); });
  33. }
  34. ~agreeMentServer(){
  35. };
  36. private:
  37. void processRquest(){
  38. re request;
  39. while (requst_pkt_queue_.size()) {
  40. request = requst_pkt_queue_.popFront();
  41. }
  42. }
  43. private:
  44. std::mutex mu;
  45. Queue<re> requst_pkt_queue_;
  46. std::shared_ptr<ThreadPool> thread_pool_ptr_;
  47. };

是不是很简单啊? 由于本人经验有限,如给错误,欢迎修正!!!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/778004
推荐阅读
相关标签
  

闽ICP备14008679号