赞
踩
在多线程中,生产者与消费者模型是一个非常经典的问题。
假设有一个厨师做馒头,另外一个顾客吃厨师做的馒头,这样生产者(厨师)和消费者(顾客)两个元素就到位了。一般来讲,厨师做馒头的速度和顾客吃馒头的速度应该是不一样的,如果厨师做馒头的速度比顾客吃馒头的速度快,那么一定时间后一定是馒头堆积如山顾客永远也吃不完;如果厨师做馒头的速度比顾客吃馒头的速度慢,那么一定时间后一定是餐桌空空厨师永远不能停下工作。
显然这个结果一般是人们不愿意看到的,于是我们希望有一种机制,能够对厨师和顾客的行为进行一定的协调:当现有的馒头足够多的时候,厨师可以暂时停下手头的工作;当现在的馒头已经不存在的时候,顾客去休息,等厨师再把馒头做出来。
这种工作模式也称作同步工作模式,两个任务(生产者与消费者)在工作时需要一直通信协调,来调整对方与自己的工作进度。
源代码来自于我在b站学多线程时看的教程,来自于爱编程的大丙。
#include <iostream> #include <thread> #include <mutex> #include <list> #include <functional> #include <condition_variable> using namespace std; class SyncQueue { public: SyncQueue(int maxSize) : m_maxSize(maxSize) {} //构造函数,只初始化了任务数量最大值 void put(const int& x) //生产函数,向队列里塞数据 { // 上锁,自动解锁 unique_lock<mutex> locker(m_mutex); // 判断任务队列是不是已经满了 while (m_queue.size() == m_maxSize) { cout << "任务队列已满, 不继续生产了..." << endl; // 阻塞线程,等待消费函数发通知 m_notFull.wait(locker); } //如果可以继续工作了 // 将任务放入到任务队列中 m_queue.push_back(x); cout << x << " 被生产" << endl; // 通知消费者去消费 m_notEmpty.notify_one(); } int take() // 消费函数,从队列中取数据 { //上锁,自动解锁 unique_lock<mutex> locker(m_mutex); //如果队列为空,阻塞 while (m_queue.empty()) { cout << "任务队列已空,请耐心等待。。。" << endl; //等待生产函数发通知 m_notEmpty.wait(locker); } // 从任务队列中取出任务,即消费 int x = m_queue.front(); m_queue.pop_front(); // 通知生产者去生产 m_notFull.notify_one(); cout << x << " 被消费" << endl; return x; } bool empty() { lock_guard<mutex> locker(m_mutex); // 对线程公共数据的处理都需要上锁与解锁 return m_queue.empty(); } bool full() { lock_guard<mutex> locker(m_mutex); // 对线程公共数据的处理都需要上锁与解锁 return m_queue.size() == m_maxSize; // 如果长度到了最大个数就返回true,否则返回false } int size() { lock_guard<mutex> locker(m_mutex); // 对线程公共数据的处理都需要上锁与解锁 return m_queue.size(); } private: list<int> m_queue; // 存储数据的队列 mutex m_mutex; // 互斥锁 std::condition_variable m_notEmpty; // 不为空的条件变量,阻塞消费者 std::condition_variable m_notFull; // 没有满的条件变量,阻塞生产者 int m_maxSize; // 任务队列里面的最大任务个数 }; int main() { SyncQueue taskQ(50);//示例化一个对象 auto produce = bind(&SyncQueue::put, &taskQ, placeholders::_1); auto consume = bind(&SyncQueue::take, &taskQ); thread t1[3]; thread t2[3]; for (int i = 0; i < 3; ++i) { t1[i] = thread(produce, i + 100); t2[i] = thread(consume); } for (int i = 0; i < 3; ++i) { t1[i].join(); t2[i].join(); } return 0; }
解决生产者与消费者模型的核心机制是C++多线程中的条件变量condition_variable
。
condition_variable
分为两部分:等待机制与唤醒机制。
等待机制实现了一个线程的阻塞:如果该条件变量未能被满足,该线程就会一直阻塞,不会继续运行,并且释放公共资源的锁,等待其他线程拿到锁,并实现条件变量的满足,最后发出通知唤醒该线程。
唤醒机制实现了一个线程对另一个因条件变量未能被满足的阻塞变量的唤醒:实现该条件变量,释放锁并发出通知,将另一个阻塞变量唤醒。
我们用两个函数分布模拟生产者与消费者的行为。
先看看消费者函数。
int take() // 消费函数,从队列中取数据 { //上锁,自动解锁 unique_lock<mutex> locker(m_mutex); //如果队列为空,阻塞 while (m_queue.empty()) { cout << "任务队列已空,请耐心等待。。。" << endl; //等待生产函数发通知 m_notEmpty.wait(locker); } // 从任务队列中取出任务,即消费 int x = m_queue.front(); m_queue.pop_front(); // 通知生产者去生产 m_notFull.notify_one(); cout << x << " 被消费" << endl; return x; }
首先访问公共资源,上锁是不可少的:
unique_lock<mutex> locker(m_mutex);
这里我们设定,如果队列为空,就停止工作,令当前线程阻塞,并且使用条件变量的wait
函数等待生产函数的通知。
实际上这里还隐式地将互斥量的锁解锁,这样生产函数可以顺利拿到锁进行生产,并且发通知给消费函数。
//如果队列为空,阻塞
while (m_queue.empty())
{
cout << "任务队列已空,请耐心等待。。。" << endl;
//等待生产函数发通知
m_notEmpty.wait(locker);
}
如果生产函数发来通知该线程该如何工作呢?
如果正在阻塞的线程收到了唤醒通知,它会开始尝试拿到锁,如果拿到了锁就会执行后面的代码;如果没有拿到锁还是会继续阻塞。
// 从任务队列中取出任务,即消费
int x = m_queue.front();
m_queue.pop_front();
// 通知生产者去生产
m_notFull.notify_one();
cout << x << " 被消费" << endl;
return x;
可以看到,如果完成了一次消费动作,该线程就会唤醒生产者线程,这里引申出了另一个问题:你在这里每次都执行唤醒操作,每次都会有结果吗?
其实不然,当对方并未处在阻塞状态时,唤醒操作等于是什么也没干。
也就是说,当对方线程处在阻塞状态时,唤醒操作才真正被执行。
生产函数也可以用同样的方式去分析,区别主要在于:生产函数是为队列添加数值,并且在队列数值数量达到规定个数的时候停止生产,进入阻塞模式,等到消费者线程发出唤醒通知再继续生产。
void put(const int& x) //生产函数,向队列里塞数据 { // 上锁,自动解锁 unique_lock<mutex> locker(m_mutex); // 判断任务队列是不是已经满了 while (m_queue.size() == m_maxSize) { cout << "任务队列已满, 不继续生产了..." << endl; // 阻塞线程,等待消费函数发通知 m_notFull.wait(locker); } //如果可以继续工作了 // 将任务放入到任务队列中 m_queue.push_back(x); cout << x << " 被生产" << endl; // 通知消费者去消费 m_notEmpty.notify_one(); }
我们分析一下主函数的流程。
程序中创建线程的方式比较有意思:
SyncQueue taskQ(50);//示例化一个对象,规定队列最大元素数量是50
auto produce = bind(&SyncQueue::put, &taskQ, placeholders::_1);
auto consume = bind(&SyncQueue::take, &taskQ);
thread t1[3];
thread t2[3];
这里需要说一下bind
函数的用法:
bind
函数需要头文件functional
才能使用。bind
函数的本质将对象与自定义的变量进行一个绑定,属于延迟操作的思想。
以该句为例:
auto produce = bind(&SyncQueue::put, &taskQ, placeholders::_1);
在该句中produce
就等同于taskQ
对象的SyncQueue::put()
函数,placeholders::_1
表示该函数的第一个形参,如果有多个形参的话还可以添加placeholders::_2
、placeholders::_3
等等。
produce(1);
以上代码就等效于:
taskQ.put(1);
可以看到,用了绑定函数之后可以更方便地去调用对象的成员函数。
这里额外注意一下,对象的传入使用了引用的方式,这样在函数执行中就不会拷贝对象。我们的实验是在一个对象中进行的,如果出现了拷贝对象操作那么程序的运行结果一定不是预期的结果。
thread t1[3];
thread t2[3];
for (int i = 0; i < 3; ++i)
{
t1[i] = thread(produce, i + 100);
t2[i] = thread(consume);
}
for (int i = 0; i < 3; ++i)
{
t1[i].join();
t2[i].join();
}
接下来的代码创建了两个线程数组t1和t2。
然后循环对线程进行初始化与等待结束,通过这种循环控制方式实现了生产一个消费一个的机制。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。