赞
踩
目录
if (FW.ConsumePersonNum== WarehouseSize):
if (FW.ProductPersonPosition== ProductNum):
生产者消费者模式是常用的多线程编程模式,用于解决生产者和消费者之间的数据交互问题。在该模式中,生产者负责产生数据,而消费者负责处理数据。通过多线程的方式,生产者将数据放入一个共享的缓冲区中,而消费者从缓冲区中获取数据进行处理。
1. 创建一个共享的缓冲区,用于存储生产者产生的数据。缓冲区可以是一个数组、队列或者其他数据结构。
2. 创建一个互斥锁(mutex)来保护对缓冲区的访问。互斥锁用于确保同时只有一个线程能够访问缓冲区。
3. 创建一个条件变量(condition variable)用于在生产者产生数据和消费者处理数据之间进行通信。条件变量用于阻塞线程,直到某个条件满足。
4. 创建一个生产者线程,用于产生数据。生产者线程将使用互斥锁来保护对缓冲区的访问,并在产生数据后通过条件变量通知消费者线程。
5. 创建一个或多个消费者线程,用于处理数据。消费者线程将使用互斥锁来保护对缓冲区的访问,并在缓冲区为空时通过条件变量等待通知。
6. 生产者线程产生数据后,获取互斥锁并将数据放入缓冲区。然后通过条件变量通知一个或多个消费者线程。
7. 消费者线程在处理数据前,获取互斥锁并从缓冲区中获取数据。如果缓冲区为空,则通过条件变量等待通知。
8. 当生产者线程产生数据并将数据放入缓冲区后,需要释放互斥锁,以便其他线程可以访问缓冲区。
9. 当消费者线程处理完数据后,需要释放互斥锁,以便其他线程可以访问缓冲区。
通过使用互斥锁和条件变量,可以实现线程间的同步和通信,确保生产者和消费者之间的正确交互。
- #include <iostream>
- #include <thread>
- #include <deque>//双向队列
- #include <mutex>
- #include<condition_variable>
- using namespace std;
- const int ProductNum = 100;//产品个数
- const int WarehouseSize = 10;//仓库大小
- //仓库类
- template<class T>
- class Warehouse
- {
- public:
- Warehouse()
- {
- size_t ProductPersonNum = 0;
- size_t ConsumePersonNum = 0;
- size_t ProductPersonPosition = 0;
- size_t ConsumePersonPosition = 0;
- };
- public:
- deque<T> SaveProduct; //存储产品
- mutex mtx; //生产者和消费者的互斥量
- mutex mtx_Product; //生产计数互斥量
- mutex mtx_Consume; //消费计数互斥量
- condition_variable Warehouse_noFull; //条件变量:仓库没满
- condition_variable Warehouse_noEmpty; //条件变量:仓库不是空的
-
- size_t ProductPersonNum;//生产者计数
- size_t ConsumePersonNum;//消费者计数
-
- //确定当前是哪个位置的生产者或消费者需要进行操作。
- size_t ProductPersonPosition;//生产者位置
- size_t ConsumePersonPosition;//消费者位置
- };
-
- //工厂类
- template<class T>
- class Factory
- {
- public:
- //任务派发,具体的实现在protected里面
- //生产者操作
- void ProducterTask()
- {
- bool bReadyExit = false;
- while (true)
- {
- unique_lock<mutex> lock(FactoryWarehouse.mtx_Product);//加锁产品互斥量
- //线程结束条件
- if (FactoryWarehouse.ProductPersonNum< ProductNum)//小于就继续做计数过程
- {
- FactoryWarehouse.ProductPersonNum++;
- //生产产品
- //this_thread::sleep_for(1s);//假设要1s生产产品
- T item = FactoryWarehouse.ProductPersonNum;
- cout << "生产者的ID:" << this_thread::get_id() << endl;
- cout << "货源号:" << item << endl;
- InputFWarehouse(FactoryWarehouse, item);
- }
- else//否则
- {
- bReadyExit = true;
- }
- lock.unlock();
- if (bReadyExit)
- {
- break;
- }
- }
- }
- //消费者操作
- void ConsumerTask()
- {
- bool bReadyExit = false;
- while (true)
- {
- unique_lock<mutex> lock(FactoryWarehouse.mtx_Consume);//加锁消费者互斥量
- if (FactoryWarehouse.ConsumePersonNum < ProductNum)
- {
- T item = GetProductInFWarehouse(FactoryWarehouse);
- //消费产品
- //this_thread::sleep_for(1s);
- cout << "消费者的ID:" << this_thread::get_id() << endl;
- cout << "消费的货源号:" << item << endl;
- FactoryWarehouse.ConsumePersonNum++;
- }
- else
- {
- bReadyExit = true;
- }
- lock.unlock();
- if (bReadyExit)
- {
- break;
- }
- }
- }
- protected:
- Warehouse<T> FactoryWarehouse;//工厂仓库
- //把产品放到仓库里面
- void InputFWarehouse(Warehouse<T>& FW ,T item)
- {
- unique_lock<mutex> lock(FW.mtx);//加锁
- FW.SaveProduct.push_back(item);
- //当ProductPersonPosition达到了ProductNum时,将ConsumePersonPosition重置为0。这样可以保证循环使用消费者线程来消费产品。
- if (FW.ProductPersonPosition== ProductNum)
- {
- FW.ConsumePersonPosition = 0;
- }
- FW.Warehouse_noEmpty.notify_all();//唤醒所有线程
- }
- //从仓库中取出产品
- T GetProductInFWarehouse(Warehouse<T>& FW)
- {
- unique_lock<mutex> lock(FW.mtx);
- while (FW.SaveProduct.empty())
- {
- cout << "无货物,请等待." << endl;
- FW.Warehouse_noEmpty.wait(lock);//没有货源,进行等待
- }
- T Data = FW.SaveProduct.front();
- FW.SaveProduct.pop_front();
- if (FW.ConsumePersonNum== WarehouseSize)
- {
- FW.ConsumePersonPosition = 0;
- }
- FW.Warehouse_noFull.notify_all();
- lock.unlock();
- return Data;
- }
- };
-
-
- int main()
- {
- //测试
- cout << "主线程ID:" << this_thread::get_id() << endl;
- Factory<int> myFactory;
- //4个生产者
- thread Producter1(&Factory<int>::ProducterTask, &myFactory);
- thread Producter2(&Factory<int>::ProducterTask, &myFactory);
- thread Producter3(&Factory<int>::ProducterTask, &myFactory);
- thread Producter4(&Factory<int>::ProducterTask, &myFactory);
- //5个消费者
- thread Consumer1(&Factory<int>::ConsumerTask, &myFactory);
- thread Consumer2(&Factory<int>::ConsumerTask, &myFactory);
- thread Consumer3(&Factory<int>::ConsumerTask, &myFactory);
- thread Consumer4(&Factory<int>::ConsumerTask, &myFactory);
- thread Consumer5(&Factory<int>::ConsumerTask, &myFactory);
-
- Producter1.join();
- Producter2.join();
- Producter3.join();
- Producter4.join();
- Consumer1.join();
- Consumer2.join();
- Consumer3.join();
- Consumer4.join();
- Consumer5.join();
- return 0;
- }
当消费者线程的数量达到了仓库的大小时,说明所有的消费者线程已经使用完了,需要重新开始计数。即将ConsumePersonPosition重置为0,下一个消费者线程从头开始使用。
当生产者线程的数量达到了产品的数量时,说明所有的产品已经生产完了,需要重新开始计数。即将ConsumePersonPosition重置为0,下一个生产者线程从头开始生产。
这样做的目的是实现循环使用生产者和消费者线程的效果,使得每个生产者和消费者线程都能循环使用,不会因为达到边界而停止工作。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。