赞
踩
为什么要将这两者放在一起进行呢?
主要是因为生产消费与条件变量关系密切,正好相辅相成。
我们假设有一个自习室,这个自习室每次只能有一个人进入(使用挂在自习室门前钥匙),其他的人都要排队(排队之前都去试图找钥匙)。
今天小A起了个大早去拿钥匙,一直学到中午,此时饿得受不了了,于是想出去吃饭,他就走到门前打算归还钥匙,可是刚挂到门前就后悔了,因为她不想排队那么久才能继续学习,于是又拿着钥匙将门打开,但是又很饿,就这样循环往复,自己没有得到知识,外面的人也只能干排队等待。
与之对应:
自习室就相当于临界资源,钥匙就是锁,当其中一个线程拿到锁就可以访问临界资源,其他的线程都陷入阻塞状态。
由于线程A继续访问临界资源已经没有意义,但是竞争锁的能力
很强,因为距离自己很近,造成了其他线程饥饿问题
。
虽然线程A这样做合法,但是不道德。
我们此时需要制定规则,保证过程具有一定顺序性,也就是同步。
而条件变量就是确保这个顺序性的!
我们知道了条件变量的作用,但是还是并不清楚条件变量是什么。
我们再来一个例子进行解释。
假设有一个游戏,一个蒙眼拿盘子中的苹果,一个睁眼将苹果放入盘子。
现在我们就可以对应一下了。
铃铛就是条件变量,他是一个队列,可以让线程进行等待
他的唤醒有两种策略,全部唤醒与单个唤醒。
我们也就可以对应一下条件变量的接口了。
有些接口与锁很类似。
其中init是条件变量初始化的函数,与锁一致。
signal与broadcast就是通知接口,signal是一次通知一个,boardcast就是全部通知
wait就是去铃铛下等待,也就是去条件变量下等待。
timedwait我们不管。
desory就是销毁条件。
关于这里还有一个细节,为什么条件变量要把锁业传入?
后面会进行解释。
我们目前可以浅浅的使用一下条件变量,熟悉一下接口。
我们现在要实现一个场景,
主要逻辑是先创建一批线程并进行管理,每个创建好的线程都去执行各自的任务,任务是个死循环,每次进入在条件变量下等待被唤醒。
#include <iostream> #include <pthread.h> #include <unistd.h> const int N = 5; pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t g_cond = PTHREAD_COND_INITIALIZER; void *Run(void *args) { while (true) { pthread_mutex_lock(&g_mutex); pthread_cond_wait(&g_cond, &g_mutex); std::cout << "i am new thread-" << reinterpret_cast<long long>(args) << std::endl; pthread_mutex_unlock(&g_mutex); } } int main() { pthread_t thds[N]; for (int i = 0; i < N; i++) { pthread_create(&thds[i], nullptr, Run, reinterpret_cast<void*>(i)); } // 唤醒 while (true) { sleep(1); pthread_cond_signal(&g_cond); } for (auto &val : thds) { pthread_join(val, nullptr); } return 0; }
现象:
条件变量暂时说到这里,我们先来谈一谈这个模型。
我们依旧是拿具体场景进行解释:
这是最简单的一个图示
实际上,我们的生产者代表的就是供货商,超市是一块缓存,顾客就是消费者。
这个模型有3大优点:
高度提炼一下可以简记为:1 2 3 原则。
1:一个交易场所(一段内存空间)
2:两种角色(生产角色,消费角色)
3:三种关系(生产与生产,消费与消费,生产与消费)
其中这三种关系分别为互斥,互斥,互斥&&同步。
相信前两个都很好理解,第三个呢?
比如供应商生产好商品还没录入数据就肯定不能被顾客拿走,这就是互斥。
这就像你在写数据,还没写完就被读走了一样,他们之间也是需要互斥的。
同样,如果超市没货了,我们要要依靠超市通知供应商,等有货了在反过来通知顾客。这就是同步。
为什么写单对单?因为他简单!为什么不吃牛,因为他善!
不过为什么单对单简单,因为我们的1 2 3原则中有三个关系,生产与生产,消费与消费,生产与消费,单对单就不需要考虑前两种,自然而言的简单了许多。
这里我们要解决一个历史问题:为什么wait要传入锁?
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。
其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;
当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
那么我们现在就要有一个大概的轮廓:
生产者一直写,消费者一直读即可,很基础的框架,有了框架我们就知道如何下手实现主要功能了。
#include "BlockingQueue.hpp" void *Consumer(void *args) { BlockingQueue *bq = static_cast<BlockingQueue*>(args); while (true) { // 接收数据 bp.Pop(); // 处理数据 } } void *Producer(void *args) { BlockingQueue *bq = static_cast<BlockingQueue*>(args); while (true) { // 构建数据 bp.Push(); } } int main() { BlockingQueue pc; pthread_t t1, t2; pthread_create(&t1, nullptr, Producer, static_cast<void*>(&pc)); pthread_create(&t2, nullptr, Consumer, static_cast<void*>(&pc)); pthread_join(t1, nullptr); pthread_join(t2, nullptr); return 0; }
这是我们的阻塞队列,里面有对于一些点的详细注释,
比如cond为什么要传入锁?唤醒在哪个位置进行唤醒?
而又因为我们也不确定用户使用的是什么类型,设计为模板即可。
#pragma once #include <iostream> #include <pthread.h> #include <queue> #include <unistd.h> template <class T> class BlockingQueue { public: BlockingQueue(int cap = 5) : _cap(cap) { pthread_mutex_init(&_mutex, nullptr); pthread_cond_init(&_cond_p, nullptr); pthread_cond_init(&_cond_c, nullptr); } void Push(const T &val) { pthread_mutex_lock(&_mutex); if (IsFull()) { pthread_cond_wait(&_cond_p, &_mutex); // 所以这里的wait为什么要传锁就一目了然了 // 因为如果你带着锁去wait,那么别的线程只会一直阻塞,不会拿到锁 // 所以wait还要进行解锁,等被唤醒在继续抢锁。 } // 出来了就代表肯定不是满的,此时push数据即可。 _q.push(val); pthread_mutex_unlock(&_mutex); pthread_cond_signal(&_cond_c); // 注意这里的唤醒放在解锁的上或下都可以,当放在下时,我们还是以单对单为例 // 假设我们此时队列中还没有数据,消费者在wait中解锁阻塞,生产者生产完数据进行解锁再唤醒消费者, // 而这时消费者会和生产者再次抢锁,因为消费者会从wait中被唤醒需要抢锁,而生产者在lock中抢锁。 // 如果消费者抢到了就罢了,直接进行写入数据;但是如果生产者抢到了,那就继续生产数据, // 我们假设生产者一直将队列写满,那么他就会在wait中进行阻塞,让消费者写入数据在解锁唤醒。 // 同理,实际上因为锁的钳制,在解锁前或后唤醒都是可以的 } void Pop(T *val) { pthread_mutex_lock(&_mutex); if (IsEmpty()) { pthread_cond_wait(&_cond_c, &_mutex); } // 出来了就代表此时数据肯定不为空,可以写给消费者了。 *val = _q.front(); _q.pop(); pthread_mutex_unlock(&_mutex); pthread_cond_signal(&_cond_p); } ~BlockingQueue() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_cond_p); pthread_cond_destroy(&_cond_c); } private: bool IsFull() { return _cap == _q.size(); } bool IsEmpty() { return _q.empty(); } private: int _cap; std::queue<T> _q; pthread_mutex_t _mutex; pthread_cond_t _cond_p; pthread_cond_t _cond_c; };
但是这里还有一个细节,我们判断空或满时使用了if,这在某些情况下是会出问题的。
比如当前队列中没有数据。
有一个生产者,两个消费者。
当生产者刚刚生产完,进行了broadcast唤醒,导致两个消费者线程都被唤醒我们假设生产者此时不会抢到锁,所以目前只会有一个消费者线程抢到锁,当其中一个A抢到后,进行写入数据并pop,这都是没有问题的,但是如果A解锁后被另一个消费者B抢到,那么B又会继续写入并进行pop。
问题就出现了,因为唯一的一个数据已经被A拿走了,此时就会出现err,所以我们要对if进行一下修改,改为while
,这样就避免了因为B抢到锁而继续pop造成的err。
```cpp #pragma once #include <iostream> #include <pthread.h> #include <queue> #include <unistd.h> template <class T> class BlockingQueue { public: BlockingQueue(int cap = 5) : _cap(cap) { pthread_mutex_init(&_mutex, nullptr); pthread_cond_init(&_cond_p, nullptr); pthread_cond_init(&_cond_c, nullptr); } void Push(const T &val) { pthread_mutex_lock(&_mutex); while (IsFull()) { pthread_cond_wait(&_cond_p, &_mutex); // 所以这里的wait为什么要传锁就一目了然了 // 因为如果你带着锁去wait,那么别的线程只会一直阻塞,不会拿到锁 // 所以wait还要进行解锁,等被唤醒在继续抢锁。 } // 出来了就代表肯定不是满的,此时push数据即可。 _q.push(val); pthread_mutex_unlock(&_mutex); pthread_cond_signal(&_cond_c); // 注意这里的唤醒放在解锁的上或下都可以,当放在下时,我们还是以单对单为例 // 假设我们此时队列中还没有数据,消费者在wait中解锁阻塞,生产者生产完数据进行解锁再唤醒消费者, // 而这时消费者会和生产者再次抢锁,因为消费者会从wait中被唤醒需要抢锁,而生产者在lock中抢锁。 // 如果消费者抢到了就罢了,直接进行写入数据;但是如果生产者抢到了,那就继续生产数据, // 我们假设生产者一直将队列写满,那么他就会在wait中进行阻塞,让消费者写入数据在解锁唤醒。 // 同理,实际上因为锁的钳制,在解锁前或后唤醒都是可以的 } void Pop(T *val) { pthread_mutex_lock(&_mutex); while (IsEmpty()) { pthread_cond_wait(&_cond_c, &_mutex); } // 出来了就代表此时数据肯定不为空,可以写给消费者了。 *val = _q.front(); _q.pop(); pthread_mutex_unlock(&_mutex); pthread_cond_signal(&_cond_p); } ~BlockingQueue() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_cond_p); pthread_cond_destroy(&_cond_c); } private: bool IsFull() { return _cap == _q.size(); } bool IsEmpty() { return _q.empty(); } private: int _cap; std::queue<T> _q; pthread_mutex_t _mutex; pthread_cond_t _cond_p; pthread_cond_t _cond_c; };
main函数代码:
#include "BlockingQueue.hpp" #include <random> void *Consumer(void *args) { BlockingQueue<int> *bq = static_cast<BlockingQueue<int>*>(args); while (true) { sleep(2); // 接收数据 int val; bq->Pop(&val); // 处理数据 std::cout << "Consumer receive a data: " << val << std::endl; } } void *Producer(void *args) { BlockingQueue<int> *bq = static_cast<BlockingQueue<int>*>(args); while (true) { // 构建数据 int val = rand() % 10 + 1;// [1, 10] bq->Push(val); std::cout << "Producer produce a data: " << val << std::endl; } } int main() { srand(time(nullptr)); BlockingQueue<int> pc; pthread_t t1, t2; pthread_create(&t1, nullptr, Producer, static_cast<void*>(&pc)); pthread_create(&t2, nullptr, Consumer, static_cast<void*>(&pc)); pthread_join(t1, nullptr); pthread_join(t2, nullptr); return 0; }
观察到生产者生产顺序为1 9 6 6 6…而消费者也正好是1 9 6 6 6…
注意:我们的生产者消费者不仅仅只能传递最简单的内置类型,也可以进行传递自定义类型!可调用对象也是可以的!
我们的多生产多消费不需要修改代码,因为锁已经帮我们搞定了生产与生产,消费与消费之间的关系。
为什么明明一次只能有一个线程访问队列,但还是说生产消费模型效率高?
因为我们不能只关注生产与消费在缓存的时间!
我们的生产任务或数据需要时间,处理任务或数据需要时间。
也就是说:当其中一个生产者在放任务,其他的生产者在生产任务;
其中一个消费者在拿任务,其他消费者在处理任务。
这种并发才让我们的效率变高!
为什么要在锁之后再进行条件变量wait?
这是因为我们push或pop之前一定会临街资源,临界资源一定是被锁保护起来的,所以设计者才会这样设计接口,我们也才要这样使用。
完~
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。