赞
踩
当有一个队列在多线程使用时,必须保证资源同步,需要实现线程安全。根据pop实现策略,通常有几种实现方式:
方法一适用于pop频率比push频率低的情况。
方法二适用于pop频率比push频率高的情况,同时不在乎pop的等待导致线程阻塞的情况。
方法三适用于pop频率比push频率高或相当的情况,同时也在意pop的等待时间。
这里的队列数据可以是任意类型,因此是模板实现。当类中要使用模板数据类型。typedef和传参须使用typename, 另外简单方式是使用 using定义简化使用。
主要代码结构如下:
template<typename T> class ThreadSafeQueue final { public: using value_type = T; //typedef typename std::queue<T>::size_type size_type; using size_type = typename std::queue<T>::size_type; // explicit ThreadSafeQueue(typename /* 必须使用typename*/ std::queue<T>::size_type queueSizeMax // = std::numeric_limits<typename /* 必须使用typename*/ std::queue<T>::size_type>::max()): // queueSizeMax_(queueSizeMax) { } explicit ThreadSafeQueue(size_type queueSizeMax = std::numeric_limits<size_type>::max()): queueSizeMax_(queueSizeMax) { } ThreadSafeQueue(const ThreadSafeQueue& src) = delete; ThreadSafeQueue& operator=(const ThreadSafeQueue& rhs) = delete; enum TQueueResult { qrNoError = 0,//returned successfully. qrFull, // currently full qrLocked // currently locked }; size_type getMaxSize() const { return queueSizeMax_; } size_type getCurrentSize() const { std::lock_guard<std::mutex> scopeLock(mutex_); return queue_.size(); } bool isFull() const { std::lock_guard<std::mutex> scopeLock(mutex_); return queue_.size() == queueSizeMax_; } bool isEmpty() const { std::lock_guard<std::mutex> scopeLock(mutex_); return queue_.empty(); } // pData为空,仅判断是都为空,效果同bool isEmpty(); // pData不为空, 取队列中的第一个元素 bool front(T* pData = nullptr) { std::lock_guard<std::mutex> scopeLock(mutex_); const bool boResult = !queue_.empty(); if(boResult && pData) { *pData = queue_.front(); } return boResult; } TQueueResult push(const T& t); TQueueResult push(T&& t); bool pop( T* pData = nullptr); private: std::queue<T> queue_; // typename /* 必须使用typename*/ std::queue<T>::size_type queueSizeMax_; size_type queueSizeMax_; std::mutex mutex_; std::condition_variable conditionVariable_; };
queueSizeMax_在类构造时已经确定,后面使用仅对变量进行读操作,不需要加锁。
其他在多线程使用的函数,对数据访问读写时必须加锁。
下面主要三种实现方式说明push、pop的函数实现。
push时,先加锁,查询队列是否已满。已满返回qrFull,否则将数据加如队列并返回qrNoError状态。
/// push TQueueResult push(const T& t) { std::lock_guard<std::mutex> scopeLock(mutex_); if(queue_.size() >= queueSizeMax_) { printf("queue full\n"); return qrFull; } queue_.push(t); printf("push %d \n", t); return qrNoError; } // 后续不再说明 TQueueResult push(T&& t) { std::lock_guard<std::mutex> scopeLock(mutex_); if(queue_.size() >= queueSizeMax_) { printf("queue full\n"); return qrFull; } queue_.emplace(t); printf("push %d \n", t); return qrNoError; }
pop时,先加锁,查看队列是否为空,不为空时,返回队头的数据(若需要)。
bool pop( T* pData = nullptr)
{
std::lock_guard<std::mutex> scopeLock(mutex_);
if(!queue_.empty())
{
if(pData) {
*pData = std::move(queue_.front());
}
queue_.pop();
return true;
}
return false;
}
pop时若队里为空,进行等待,直到push唤醒。不为空时,直接返回。
在pop中,先加锁。若conditionVariable_.wait(scopeLock, [this]{ return !queue_.empty(); });
的匿名函数判断不为空,则直接进行extractData。 否则解锁mutex_,等待直到push中通知唤醒,再加锁进而extractData。此时,pop只可能返回true。
bool pop( T* pData = nullptr) { std::unique_lock<std::mutex> scopeLock(mutex_); printf("wait for date...\n"); conditionVariable_.wait(scopeLock, [this]{ return !queue_.empty(); }); printf("extractData...\n"); if(pData) { *pData = std::move(queue_.front()); } queue_.pop(); return true; } TQueueResult push(const T& t) { std::lock_guard<std::mutex> scopeLock(mutex_); if(queue_.size() >= queueSizeMax_) { printf("queue full\n"); return qrFull; } queue_.push(t); conditionVariable_.notify_one(); printf("push %d \n", t); return qrNoError; }
在2.2的基础上,添加pop的睡眠的等待时间,若超过等待时间未唤醒,直接pop失败返回,不继续阻塞。
bool pop(unsigned int timeout_ms, T* pData = nullptr) { std::unique_lock<std::mutex> scopeLock(mutex_); auto rel_time = std::chrono::milliseconds(timeout_ms); // 是否等待超时 bool boWaitResult = conditionVariable_.wait_for(scopeLock, rel_time, [this] { return !queue_.empty(); }); if(boWaitResult) { // 未超时 if(!queue_.empty()) { if(pData) *pData = std::move(queue_.front()); } queue_.pop(); return true; } return false; }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。