赞
踩
在C++ 并发编程中,对于线程池和队列我们是经常使用到的,本篇就简单的回顾一下,之前关于C++ 并发编程的总结。
一、有锁队列封装
队列分无琐队列和有锁队列,无锁队列依赖的是CAS原子性操作达到的,有锁队列如c++11 当中的std::queue 双端队列,其实是一个双端链表,链表这是数据结构在读取操作上还是有损性能的,毕竟链表的存储并非连续的一块内存,而是依赖前后指针进行寻址,跨度大,查找起来费时,对插入和删除有影响一定性能,具体多少不是很清楚,因为我也是道听途说的,毕竟在现在的多核时代,能慢到哪里去,如果不是要求实时性且高效率存储,普通的应用场景已是足以(就扯到这),上代码。
- namespace Dispatch
- {
-
- template <typename T>
- class Queue
- {
- public:
- inline Queue():is_task_finished_(true)
- {
- }
-
- inline T front()
- {
- std::lock_guard<std::mutex> lock(mutex_);
- return queue_.front();
- }
-
- inline void push(const T& value)
- {
- std::lock_guard<std::mutex> lock(mutex_);
- queue_.push(value);
- }
-
- inline void pop()
- {
- std::lock_guard<std::mutex> lock(mutex_);
- if (!queue_.empty())
- {
- queue_.pop();
- }
- }
-
- inline T popFront()
- {
- T value;
- std::lock_guard<std::mutex> lock(mutex_);
- if (!queue_.empty())
- {
- value = std::move(queue_.front());
- queue_.pop();
- }
- return value;
- }
-
- inline void clear()
- {
- std::queue<T> empty;
- std::lock_guard<std::mutex> lock(mutex_);
- swap(empty, queue_);
- }
-
- inline size_t size()
- {
- std::lock_guard<std::mutex> lock(mutex_);
- return queue_.size();
- }
-
- public:
- std::queue<T> queue_;
- std::atomic<bool> is_task_finished_;
-
- private:
- mutable std::mutex mutex_;
- };
- } // namespace Dispatch
整体来说,代码通俗易懂,要注意的一个点是 互斥量锁的使用,还有mutalbe的中文意思是“可变的,易变的” ,即使是不可变的const 修饰函数时,也是可以突破不变的,即可修改的。
- #include <iostream>
-
- class Person {
- public:
- Person();
- ~Person();
-
- int getAge() const; /*调用方法*/
- int getCallingTimes() const; /*获取上面的getAge()方法被调用了多少次*/
- private:
- int age;
- char *name;
- float score;
- mutable int m_nums; /*用于统计次数*/
- };
-
- Person::Person()
- {
- m_nums = 0;
- }
-
- Person::~Person(){}
-
- int Person::getAge() const
- {
- std::cout << "Calling the method" << std::endl;
- m_nums++;
- // age = 4; 仍然无法修改该成员变量
- return age;
- }
-
- int Person::getCallingTimes()const
- {
- return m_nums;
- }
-
- int main()
- {
- Person *person = new Person();
- for (int i = 0; i < 10; i++) {
- person->getAge();
- }
- std::cout << "getAge()方法被调用了" << person->getCallingTimes() << "次" << std::endl;
- delete person;
-
- getchar();
- return 0;
- }
二、线程池设计
上代码
-
-
- #pragma once
-
-
- namespace Dispatch
- {
-
- struct Thread
- {
- Thread() : start_(false)
- {
- }
- std::shared_ptr<std::thread> thread_;
- std::atomic<bool> start_;
- };
- class ThreadPool
- {
- public:
- typedef std::shared_ptr<ThreadPool> Ptr;
- inline ThreadPool(int thread) : stop_flag_(false), idl_thr_num_(thread)
- {
- for (int i = 0; i < idl_thr_num_; ++i)
- {
- pool_.emplace_back([this] {
- while (!this->stop_flag_)
- {
- std::function<void()> task;
- {
- std::unique_lock<std::mutex> lock{ this->mutex_ };
- this->cv_task_.wait(lock, [this] { return this->stop_flag_.load() || !this->tasks_.empty(); });
- if (this->stop_flag_ && this->tasks_.empty())
- return;
- task = std::move(this->tasks_.front());
- this->tasks_.pop();
- }
- idl_thr_num_--;
- task();
- idl_thr_num_++;
- }
- });
- }
- }
-
- inline ~ThreadPool()
- {
- stop_flag_.store(true);
- cv_task_.notify_all();
- for (std::thread& thread : pool_)
- {
- thread.join();
- }
- }
-
- public:
- template <class F, class... Args>
- inline void commit(F&& f, Args&&... args)
- {
- if (stop_flag_.load())
- throw std::runtime_error("Commit on LiDAR threadpool is stopped.");
- using RetType = decltype(f(args...));
- auto task =
- std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
- {
- std::lock_guard<std::mutex> lock{ mutex_ };
- tasks_.emplace([task]() { (*task)(); });
- }
- cv_task_.notify_one();
- }
-
- private:
- using Task = std::function<void()>;
- std::vector<std::thread> pool_;
- std::queue<Task> tasks_;
- std::mutex mutex_;
- std::condition_variable cv_task_;
- std::atomic<bool> stop_flag_;
- std::atomic<int> idl_thr_num_;
- };
- } // namespace Dispatch
如何配着使用呢?如下
- #include "lock_queue.h"
- #include "thread_pool.hpp"
- #include <thread>
- #include <mutex>
- #include <memory>
-
- using namespace Dispatch;
-
- /**
- * 请求数据结构体
- * */
- typedef struct re{
-
- /** 请求的url**/
- QString url;
- /** 请求任务编号**/
- QString data;
- /** 解析结果***/
- bool result=true;
- }re;
-
- class agreeMentServer{
-
- public:
- agreeMentServer(){
- std::unique_lock<std::mutex> lock (mu);
- if(thread_pool_ptr_==nullptr){
- thread_pool_ptr_ = std::make_shared<ThreadPool>(2);
- }
- };
- void addTask(re request){
- if (requst_pkt_queue_.size() > MAX_PACKETS_BUFFER_SIZE)
- {
- requst_pkt_queue_.clear();
- }
- requst_pkt_queue_.push(request);
- thread_pool_ptr_->commit([this]() { processRquest(); });
- }
- ~agreeMentServer(){
-
- };
- private:
- void processRquest(){
-
- re request;
- while (requst_pkt_queue_.size()) {
- request = requst_pkt_queue_.popFront();
- }
- }
- private:
- std::mutex mu;
- Queue<re> requst_pkt_queue_;
- std::shared_ptr<ThreadPool> thread_pool_ptr_;
- };
-
-
是不是很简单啊? 由于本人经验有限,如给错误,欢迎修正!!!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。