赞
踩
本文将通过posix信号量和一个环形队列一步一步实现生产者消费者模型
需要一个makefile,一个sem.hpp用于封装信号量,一个CQ.hpp用于封装环形队列,一个Task.hpp用于分发任务,一个test.cc用于主函数调用。
我们要实现一个多生产者多消费者的环形队列,main函数内创建多个线程,实现线程的回调方法,将环形队列作为参数传递到每一个回调方法中,从队列中取数据。
环形队列类的实现需要一个锁,两个信号量:_data_sem,_space_sem,前者用来表示数据的信号量,后者用来表示队列空间的信号量
举个例子,当data_sem为0时,消费者在等待队列中等待,因为队列中无数据可消费。同理当space_sem为0时,生产者等待消费者消费。
cq:test.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f cq
1.对信号量封装起始就是对P,V操作的封装,调用semaphore.h中的库函数
2.P操作即对信号量–,注意它得等到信号量为非0时才能–,可以这么理解去电影院看电影要先买票,票数减1,说明你预定了位置,等电影开始了你才能进电影院。如果票卖完了(即信号量为0时),就不能进电影院,只能在外面阻塞等待有谁退票让信号量(票数)为正你才能买票进去。
3.V操作调用post实现对信号量++,还是上面那个例子,你看完电影以后离开座位,那么那个座位就能给下一场看电影的人预定,实现P操作。
#include<semaphore.h> #include<pthread.h> class Sem { public: Sem(int value) { sem_init(&_sem,0,value); } void P() { sem_wait(&_sem); } void V() { sem_post(&_sem); } ~Sem() { sem_destroy(&_sem); } private: sem_t _sem; };
1.类中成员变量:_cq用来模拟队列,_cindex消费者的下标,_pindex生产者的下标,_num记录队列中元素个数,_clock记录是用来多消费者访问队列时加的锁,_plock同理。
2.类中成员函数:Push和Pop
3.当进行push操作时,先申请信号量,再加锁,原因是信号量存在的意义就是可以不用访问临界区就能知道资源的使用情况,可以减小锁的粒度。对空间信号量进行P操作,把数据放入队列后,对数据信号量进行V操作,表明数据+1.同理pop
#include<iostream> #include<vector> #include<pthread.h> #include"sem.hpp" #include"Task.hpp" const int default_num=10; //信号量本质是一个计数器,意义在于可以不用进入临界区,就可以判断资源使用情况,降低锁的粒度? template<class T> class CircularQueue { public: CircularQueue(int num=default_num) :_num(num) ,_cq(num) ,_cindex(0) ,_pindex(0) ,_space_sem(num) ,_data_sem(0) { pthread_mutex_init(&_clock,nullptr); pthread_mutex_init(&_plock,nullptr); } ~CircularQueue() { pthread_mutex_destroy(&_clock); pthread_mutex_destroy(&_plock); } void Push(const T& in) { _space_sem.P(); //先加锁还是先申请信号量问题 pthread_mutex_lock(&_plock); _cq[_pindex++]=in; _pindex%=_num; pthread_mutex_unlock(&_plock); _data_sem.V(); } void Pop(T* out) { //确保每次只有一个消费者竞争成功 _data_sem.P(); pthread_mutex_lock(&_clock); *out=_cq[_cindex++]; _cindex%=_num; pthread_mutex_unlock(&_clock); _space_sem.V(); } void debug() { std::cerr<<"size: "<<_cq.size()<<std::endl; } private: std::vector<T> _cq; int _num; int _cindex; int _pindex; Sem _data_sem; Sem _space_sem; pthread_mutex_t _clock; pthread_mutex_t _plock; };
#include<iostream> #include<functional> typedef std::function<int(int,int)> func_t;//函数类型 class Task { public: Task()=default; Task(int x,int y,func_t func):_x(x),_y(y),_func(func){} int operator()()//仿函数 { return _func(_x,_y); } public: int _x; int _y; func_t _func; };
这里我们定义了一个函数类型,func_t,定义了一个仿函数,成员变量为x,y,方法是_func由用户自定义传入
int main() { srand((uint64_t)time(nullptr)^getpid()); CircularQueue<Task>* cq=new CircularQueue<Task>(5); pthread_t c[3],p[3];//生产者消费者线程 pthread_create(c,NULL,producer,cq); pthread_create(c+1,NULL,producer,cq); pthread_create(c+2,NULL,producer,cq); pthread_create(p,NULL,consumer,cq); pthread_create(p+1,NULL,consumer,cq); pthread_create(p+2,NULL,consumer,cq); for(int i=0;i<3;i++)pthread_join(c[i],nullptr); for(int i=0;i<3;i++)pthread_join(p[i],nullptr); return 0; }
这里我们创建了三个消费者和生产者线程,实例化了一个环形队列队列对象,并在线程调用时把该对象指针作为参数传递给了各自的回调函数
int add(int x,int y) { int sum=0; for(int i=0;i<x;i++) for(int j=0;j<y;j++) sum=i+j; return sum; } void *producer(void *args) { auto cq=(CircularQueue<Task>*)args; while(true) { int x=rand()%1000,y=rand()%1000; Task t(x,y,add); cq->Push(t); std::cout<<"produce "<<"["<<pthread_self()<<"]"<<std::endl; } } void *consumer(void *args) { auto cq=(CircularQueue<Task>*)args; while(true) { sleep(1); Task t; cq->Pop(&t); std::cout<<"consume "<<t()<<"["<<pthread_self()<<"]"<<std::endl; } }
引入了随机数种子,定义了队列中的任务方法(一般情况下生产者的任务是从网络中读取),这里我们简易实现了加法函数,接收了环形队列对象的指针,生产者把Task对象tPush到队列中,同时消费者从队列中取出Task对象,两者并发执行,实现了生产者和消费者能并发的执行任务,大大提高了效率。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。