当前位置:   article > 正文

c++ 带超时的线程安全队列 ThreadSafeQueue

threadsafequeue

1、使用场景

当有一个队列在多线程使用时,必须保证资源同步,需要实现线程安全。根据pop实现策略,通常有几种实现方式:

  • 方法一:使用互斥锁, push、pop都尝试加锁互斥锁
    这种方式下,当队里为空时,需要不断加锁、解锁查询是否有最新的数据。
  • 方法二:使用条件变量和互斥锁
    在互斥锁的基础上,当队列为空时,pop睡眠,等待队列不为通过条件变量唤醒;相对上一种方式减少了反复查询过程中加解锁消耗资源的情况。
  • 方法三:增加等待时间
    在上一个基础上,增加一个超时时间,若pop睡眠等待超时,就立即返回

方法一适用于pop频率比push频率低的情况。
方法二适用于pop频率比push频率高的情况,同时不在乎pop的等待导致线程阻塞的情况。
方法三适用于pop频率比push频率高或相当的情况,同时也在意pop的等待时间。

2、代码实现

这里的队列数据可以是任意类型,因此是模板实现。当类中要使用模板数据类型。typedef和传参须使用typename, 另外简单方式是使用 using定义简化使用

主要代码结构如下:

template<typename T>
class ThreadSafeQueue final
{
public:
	using value_type = T;
    //typedef typename std::queue<T>::size_type size_type;
    using size_type = typename std::queue<T>::size_type; 

   	// explicit ThreadSafeQueue(typename /* 必须使用typename*/ std::queue<T>::size_type queueSizeMax 
   	//              = std::numeric_limits<typename /* 必须使用typename*/ std::queue<T>::size_type>::max()):
   	//   queueSizeMax_(queueSizeMax) { }  
   	explicit ThreadSafeQueue(size_type queueSizeMax = std::numeric_limits<size_type>::max()):
        queueSizeMax_(queueSizeMax) { }  

   	ThreadSafeQueue(const ThreadSafeQueue& src) = delete;
   	ThreadSafeQueue& operator=(const ThreadSafeQueue& rhs) = delete;

   enum TQueueResult
   {
       qrNoError = 0,//returned successfully.
       qrFull,  // currently full
       qrLocked // currently locked
   };

   	size_type getMaxSize() const { return queueSizeMax_; }
   	size_type getCurrentSize() const {
       std::lock_guard<std::mutex> scopeLock(mutex_);
       return queue_.size();
   	}

   	bool isFull() const  {
       std::lock_guard<std::mutex> scopeLock(mutex_);
       return queue_.size() == queueSizeMax_;
   	}
 
   	bool isEmpty() const  {
       std::lock_guard<std::mutex> scopeLock(mutex_);
       return queue_.empty();
   	}
   	
   // pData为空,仅判断是都为空,效果同bool isEmpty();
   // pData不为空, 取队列中的第一个元素
   bool front(T* pData = nullptr)
   {
       std::lock_guard<std::mutex> scopeLock(mutex_);
       const bool boResult = !queue_.empty();
       if(boResult && pData) { 
           *pData = queue_.front();
       }
       return boResult;
   }
   
    TQueueResult push(const T& t);
    TQueueResult push(T&& t);
   
	bool pop(  T* pData = nullptr);
	
private:
    std::queue<T> queue_;
   // typename /* 必须使用typename*/ std::queue<T>::size_type queueSizeMax_;
    size_type queueSizeMax_;

    std::mutex mutex_;
    std::condition_variable conditionVariable_;
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65

queueSizeMax_在类构造时已经确定,后面使用仅对变量进行读操作,不需要加锁。
其他在多线程使用的函数,对数据访问读写时必须加锁。

下面主要三种实现方式说明push、pop的函数实现。

2.1 互斥锁

push时,先加锁,查询队列是否已满。已满返回qrFull,否则将数据加如队列并返回qrNoError状态。

   /// push
   TQueueResult push(const T& t)
   {
       std::lock_guard<std::mutex> scopeLock(mutex_);
       if(queue_.size() >= queueSizeMax_) {
           printf("queue full\n");
           return qrFull;
       }

       queue_.push(t);
       printf("push %d \n", t);
       return qrNoError;
   }
	
	// 后续不再说明
   TQueueResult push(T&& t)
   {
       std::lock_guard<std::mutex> scopeLock(mutex_);
       if(queue_.size() >= queueSizeMax_) {
           printf("queue full\n");
           return qrFull;
       }

       queue_.emplace(t);
       printf("push %d \n", t);
       return qrNoError;
   }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

pop时,先加锁,查看队列是否为空,不为空时,返回队头的数据(若需要)。

   bool pop(  T* pData = nullptr)
   {
       	std::lock_guard<std::mutex> scopeLock(mutex_);
       	if(!queue_.empty()) 
		{
          	if(pData)  {
          		*pData = std::move(queue_.front());
          	}
            queue_.pop();
            return true;
       	}
       	return false;
   }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

2.2 互斥锁+条件变量

pop时若队里为空,进行等待,直到push唤醒。不为空时,直接返回。

在pop中,先加锁。若conditionVariable_.wait(scopeLock, [this]{ return !queue_.empty(); });的匿名函数判断不为空,则直接进行extractData。 否则解锁mutex_,等待直到push中通知唤醒,再加锁进而extractData。此时,pop只可能返回true。

   bool pop(  T* pData = nullptr)
   {
       std::unique_lock<std::mutex> scopeLock(mutex_);
       printf("wait for date...\n");
       conditionVariable_.wait(scopeLock, [this]{ return !queue_.empty(); });
       printf("extractData...\n");
       
      	if(pData)  {
          	*pData = std::move(queue_.front());
        }
        queue_.pop();
        return true;
   }


   TQueueResult push(const T& t)
   {
       std::lock_guard<std::mutex> scopeLock(mutex_);
       if(queue_.size() >= queueSizeMax_) {
           printf("queue full\n");
           return qrFull;
       }

       queue_.push(t);
       conditionVariable_.notify_one();
       printf("push %d \n", t);
       return qrNoError;
   }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

2.3 互斥锁+条件变量超时

在2.2的基础上,添加pop的睡眠的等待时间,若超过等待时间未唤醒,直接pop失败返回,不继续阻塞。

   bool pop(unsigned int timeout_ms, T* pData = nullptr)
   {
   	   	std::unique_lock<std::mutex> scopeLock(mutex_);
       	auto rel_time = std::chrono::milliseconds(timeout_ms);
		
		// 是否等待超时
		bool boWaitResult = conditionVariable_.wait_for(scopeLock, rel_time, [this] {  return !queue_.empty(); });

       	if(boWaitResult) { // 未超时
           	if(!queue_.empty()) {
               	if(pData)  *pData = std::move(queue_.front());
           	}
           	queue_.pop();
           	return true;
       	}
       	return false;
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/777997
推荐阅读
相关标签
  

闽ICP备14008679号