赞
踩
一、使用多线程实现生产者与消费者模型
1、Condition模型
可以认为Condition对象维护了一个锁(Lock/RLock)和一个waiting池。线程通过acquire获得Condition对象,当调用wait方法时,线程会释放Condition内部的锁并进入blocked状态,同时在waiting池中记录这个线程。当调用notify方法时,Condition对象会从waiting池中挑选一个线程,通知其调用acquire方法尝试取到锁,但是notify and notifyall本身是不会释放占有的Condition内部的锁,所以随后需要condition.release()来显示的释放锁。
Condition对象的构造函数可以接受一个Lock/RLock对象作为参数,如果没有指定,则Condition对象会在内部自行创建一个RLock。
- import threading
- import time,random
-
- product=[];
- cond = threading.Condition()
-
- class Producer(threading.Thread):
- def __init__(self,speed):
- threading.Thread.__init__(self)
- self.speed = speed
- def run(self):
- while True:
- for i in range(self.speed):
- cond.acquire()
- while len(product)>=5:
- cond.wait();
- num=random.random();
- product.append(num)
- print("生产者"+str(self.speed)+"生产了:"+str(num))
- cond.notifyAll()
- cond.release()
- time.sleep(1)
-
- class Consumer(threading.Thread):
- def __init__(self):
- threading.Thread.__init__(self)
-
- def run(self):
- count=0
- time_start=time.clock()
- while True:
- speed=random.randint(1,5);
- for i in range(speed):
- cond.acquire()
- while len(product)<=0:
- cond.wait()
- num=product[0];
- del product[0]
- print("消费者,消费了"+str(num))
- count=count+1
- if(count==20):
- time_end=time.clock()
- print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
- cond.notify()
- cond.release()
- time.sleep(1)
-
-
-
- Producer(1).start();
- Producer(2).start();
- Consumer().start();

2、Queue模型
(1)创建一个 Queue.Queue() 的实例,来表示缓冲池。
(2)每次从使用生产者线程对队列中的数据进行填充,使用消费者线程取出队列中的数据。
- import threading
- import queue
- import time
- import random
-
-
- class Producer(threading.Thread):
- def __init__(self,speed,queue):
- threading.Thread.__init__(self)
- self.speed = speed
- self.queue = queue
-
- def run(self):
- while True:
- for i in range(self.speed):
- item = random.random()
- self.queue.put(item)
- print("生产者",self.speed,"生产了:",str(item))
- time.sleep(1)
-
- class Consumer(threading.Thread):
- def __init__(self,queue):
- threading.Thread.__init__(self)
- self.queue = queue
-
- def run(self):
- time_start=time.clock()
- count=0
- while True:
- speed=random.randint(1, 5)
- for i in range(speed):
- item = self.queue.get()
- print("消费者","消费:",item)
- count=count+1
- if(count==20):
- time_end=time.clock()
- print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
- time.sleep(1)
-
- q = queue.Queue(maxsize = 5)
-
-
- if __name__ == '__main__':
-
- Producer(1,q).start()
-
- Producer(2,q).start()
-
- Consumer(q).start()

3、信号量模型:
- import sys, time
- import random
- from threading import Thread, Semaphore
-
- product = []
-
- mutex = Semaphore(1)
- #也可以是mutex = RLock()
- full = Semaphore(0)
- empty = Semaphore(5)
-
- class Producer(Thread):
- def __init__(self, speed):
- Thread.__init__(self);
- self.speed=speed;
-
- def run(self):
- while True:
- for i in range(self.speed):
- ProductThing='';
- empty.acquire()
- mutex.acquire()
- num=random.random()
- ProductThing+=str(num)+' '
- product.append(str(num))
- print ('%s: count=%d' % ("producer"+str(self.speed), len(product)))
- print (' product things:'+ProductThing)
- mutex.release()
- full.release()
- time.sleep(1)
-
-
- class Consumer(Thread):
- def __init__(self):
- Thread.__init__(self);
-
- def run(self):
- count=0
- time_start=time.clock()
- while True:
- speed=random.randint(1, 5)
- for i in range(speed):
- consumeThing=""
- full.acquire()
- mutex.acquire()
- consumeThing+=str(product[0])+' '
- del product[0]
- print('%s: count=%d' % ("consumer", len(product)))
- print(' consume things:'+consumeThing)
- count=count+1
- if(count==20):
- time_end=time.clock()
- print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
- mutex.release()
- empty.release()
- time.sleep(1)
-
-
- if __name__ == '__main__':
-
- Producer(1).start()
-
- Producer(2).start()
-
- Consumer().start()

4、Event模型
threading.Event机制类似于一个线程向其它多个线程发号施令的模式,其它线程都会持有一个threading.Event的对象,这些线程都会等待这个事件的“发生”,如果此事件一直不发生,那么这些线程将会阻塞,直至事件的“发生”。生产者生产完商品会立即通知消费者去消费,消费者消费完商品后会立即通知生产者去生产,适用于产品池数目为一的情况。
- import threading
- import random
- import time
-
- def produce(speed,e_p1,e_p2,e_c,product):
- while True:
- for i in range(speed):
- if(speed==1):
- e_p1.wait();
- if(speed==2):
- e_p2.wait();
- num=random.random();
- product.append(num)
- print("生产者"+str(speed)+",生产了"+str(num))
- e_c.set();
- if(speed==1):
- e_p1.clear();
- if(speed==2):
- e_p2.clear();
- time.sleep(1)
-
- def consume(e_p1,e_p2,e_c,product):
- count=0
- time_start=time.clock()
- while True:
- speed=random.randint(1,5);
- for i in range(speed):
- e_c.wait();
- num=product[0];
- del product[0]
- print("消费者,消费了"+str(num))
- count=count+1
- if(count==20):
- time_end=time.clock()
- print("消费 "+str(count)+"个商品所用的时间: %f s" % (time_end - time_start))
- r=random.randint(1,2)
- if(r==1):
- e_p1.set()
- if(r==2):
- e_p2.set()
- e_c.clear()
- time.sleep(1)
-
- if __name__ == '__main__':
-
- e_p1= threading.Event();
-
- e_p2= threading.Event();
-
- e_c= threading.Event();
-
- e_p1.set()
-
- product=[]
-
- p1=threading.Thread(target=produce, args=(1,e_p1,e_p2,e_c,product))
-
- p1.start()
-
- p2=threading.Thread(target=produce, args=(2,e_p1,e_p2,e_c,product))
-
- p2.start()
-
- c1=threading.Thread(target=consume, args=(e_p1,e_p2,e_c,product))
-
- c1.start()

二、多进程实现生产者与消费者模型
1、 信号量和共享内存
- '''
- @author: jqy
- '''
-
- from multiprocessing import Process
- import time
- import random
- import multiprocessing
-
- def produce(speed,mutex,full,empty,product,pindex,cindex):
- while True:
- for i in range(speed):
- ProductThing='';
- empty.acquire()
- mutex.acquire()
- num=random.random()
- ProductThing+=str(num)+' '
- product[pindex.value]=num
- pindex.value=(pindex.value+1)%len(product)
- print ('%s: product things:%s' % ("producer"+str(speed), ProductThing))
- mutex.release()
- full.release()
- time.sleep(1)
-
-
- def consume(mutex,full,empty,product,pindex,cindex):
- count=0
- time_start=time.clock()
- while True:
- speed=random.randint(1, 5)
- for i in range(speed):
- consumeThing=""
- full.acquire()
- with mutex:
- consumeThing+=str(product[cindex.value])+' '
- cindex.value=(cindex.value+1)%len(product)
- print('%s: consume things:%s' % ("consumer", consumeThing))
- count=count+1
- if(count==20):
- time_end=time.clock()
- print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
- empty.release()
- time.sleep(1)
-
-
- if __name__ == '__main__':
-
- product = multiprocessing.Array('d',range(5))
-
- pindex=multiprocessing.Value('i',0)
-
- cindex=multiprocessing.Value('i',0)
-
- mutex = multiprocessing.RLock()
-
- full = multiprocessing.Semaphore(0)
-
- empty = multiprocessing.Semaphore(5)
-
- Process(target=produce, args=(1,mutex,full,empty,product,pindex,cindex)).start()
-
- Process(target=produce, args=(2,mutex,full,empty,product,pindex,cindex)).start()
-
- Process(target=consume, args=(mutex,full,empty,product,pindex,cindex)).start()

2、服务进程管理器Manager
- '''
- @author: jqy
- '''
-
- from multiprocessing import Process
- import time
- import random
- import multiprocessing
-
- def produce(speed,mutex,full,empty,product,pindex,cindex):
- while True:
- for i in range(speed):
- ProductThing='';
- empty.acquire()
- mutex.acquire()
- num=random.random()
- ProductThing+=str(num)+' '
- product[pindex.value]=num
- pindex.value=(pindex.value+1)%len(product)
- print ('%s: product things:%s' % ("producer"+str(speed), ProductThing))
- mutex.release()
- full.release()
- time.sleep(1)
-
-
- def consume(mutex,full,empty,product,pindex,cindex):
- count=0
- time_start=time.clock()
- while True:
- speed=random.randint(1, 5)
- for i in range(speed):
- consumeThing=""
- full.acquire()
- with mutex:
- consumeThing+=str(product[cindex.value])+' '
- cindex.value=(cindex.value+1)%len(product)
- print('%s: consume things:%s' % ("consumer", consumeThing))
- count=count+1
- if(count==20):
- time_end=time.clock()
- print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
- empty.release()
- time.sleep(1)
-
-
- if __name__ == '__main__':
-
- mgr = multiprocessing.Manager()
-
- product = mgr.Array('d',range(5))
-
- pindex = mgr.Value('i',0)
-
- cindex = mgr.Value('i',0)
-
- mutex = mgr.RLock()
-
- full = mgr.Semaphore(0)
-
- empty = mgr.Semaphore(5)
-
- p1 = Process(target=produce, args=(1,mutex,full,empty,product,pindex,cindex))
-
- p2 = Process(target=produce, args=(2,mutex,full,empty,product,pindex,cindex))
-
- p3 = Process(target=consume, args=(mutex,full,empty,product,pindex,cindex))
-
- p1.start(), p2.start(), p3.start()
-
- p1.join(), p2.join(), p3.join()

3、 Condition模型
- import multiprocessing
- import random
- import time
-
- def produce(speed,cond,product):
- while True:
- for i in range(speed):
- cond.acquire()
- while len(product)>=5:
- cond.wait();
- num=random.random();
- product.append(num)
- print("生产者"+str(speed)+",生产了:"+str(num))
- cond.notify()
- cond.release()
- time.sleep(1)
-
- def consume(cond,product):
- count=0
- time_start=time.clock()
- while True:
- speed=random.randint(1,5);
- for i in range(speed):
- cond.acquire()
- while len(product)<=0:
- cond.wait()
- num=product[0];
- del product[0]
- print("消费者,消费了"+str(num))
- count=count+1
- if(count==20):
- time_end=time.clock()
- print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
- cond.notify()
- cond.release()
- time.sleep(1)
-
- if __name__ == '__main__':
-
- c= multiprocessing.Condition();
-
- product=multiprocessing.Manager().list()
-
- p1=multiprocessing.Process(target=produce, args=(1,c,product))
-
- p1.start()
-
- p2=multiprocessing.Process(target=produce, args=(2,c,product))
-
- p2.start()
-
- c1=multiprocessing.Process(target=consume, args=(c,product))
-
- c1.start()
-
- p1.join()
-
- p2.join()
-
- c1.join()

4、 消息队列模型
- '''
- @author: jqy
- '''
- from multiprocessing import Process
- import random
- import time
- import multiprocessing
-
- def produce(speed,q):
- while True:
- for i in range(speed):
- item = random.random()
- q.put(item)
- print("生产者",speed,"生产了",str(item))
- time.sleep(1)
-
- def consume(num,q):
- count=0
- time_start=time.clock()
- while True:
- speed=random.randint(1, 5)
- for i in range(speed):
- item = q.get()
- print("消费者","消费了",item)
- count=count+1
- if(count==20):
- time_end=time.clock()
- print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
- time.sleep(1)
-
- if __name__ == '__main__':
-
- q = multiprocessing.Queue(maxsize = 5)
-
- Process(target=produce, args=(1,q)).start()
-
- Process(target=produce, args=(2,q)).start()
-
- Process(target=consume, args=(1,q)).start()

5、 管道模型:
- '''
- Created on 2016/11/11
- @author: jqy
- '''
- from multiprocessing import Process
- import random
- import time
- import multiprocessing
-
- def produce(speed,empty,p):
- while True:
- for i in range(speed):
- empty.acquire()
- item = random.random()
- p.send(item)
- print("生产者",speed,",生产了",str(item))
- time.sleep(1)
-
- def consume(num,empty,p):
- count=0
- time_start=time.clock()
- while True:
- speed=random.randint(1,5)
- for i in range(speed):
- item = p.recv()
- print("消费者"," 消费了",item)
- count=count+1
- if(count==20):
- time_end=time.clock()
- print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
- empty.release()
- time.sleep(1)
-
- if __name__ == '__main__':
-
- parent_conn, child_conn = multiprocessing.Pipe()
-
- empty = multiprocessing.Semaphore(5)
-
- Process(target=produce, args=(1,empty,child_conn)).start()
-
- Process(target=produce, args=(2,empty,child_conn)).start()
-
- Process(target=consume, args=(1,empty,parent_conn)).start()

6、 Event模型
生产者生产完商品会立即通知消费者去消费,消费者消费完商品后会立即通知生产者去生产,适用于产品池数目为一的情况。
- import multiprocessing
- import random
- import time
-
- def produce(speed,e_p1,e_p2,e_c,product):
- while True:
- for i in range(speed):
- if(speed==1):
- e_p1.wait();
- if(speed==2):
- e_p2.wait();
- num=random.random();
- product.append(num)
- print("生产者"+str(speed)+",生产了"+str(num))
- e_c.set();
- if(speed==1):
- e_p1.clear();
- if(speed==2):
- e_p2.clear();
- time.sleep(1)
-
- def consume(e_p1,e_p2,e_c,product):
- count=0
- time_start=time.clock()
- while True:
- speed=random.randint(1,5);
- for i in range(speed):
- e_c.wait();
- num=product[0];
- del product[0]
- print("消费者,消费了"+str(num))
- count=count+1
- if(count==20):
- time_end=time.clock()
- print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
- r=random.randint(1,2)
- if(r==1):
- e_p1.set()
- if(r==2):
- e_p2.set()
- e_c.clear()
- time.sleep(1)
-
- if __name__ == '__main__':
-
- e_p1= multiprocessing.Event();
-
- e_p2= multiprocessing.Event();
-
- e_c= multiprocessing.Event();
-
- e_p1.set()
-
- product=multiprocessing.Manager().list()
-
- p1=multiprocessing.Process(target=produce, args=(1,e_p1,e_p2,e_c,product))
-
- p1.start()
-
- p2=multiprocessing.Process(target=produce, args=(2,e_p1,e_p2,e_c,product))
-
- p2.start()
-
- c1=multiprocessing.Process(target=consume, args=(e_p1,e_p2,e_c,product))
-
- c1.start()
-
- p1.join()
-
- p2.join()
-
- c1.join()

三、不同主机上生产者消费者模型:
1、Manager:服务进程管理器可以通过网络由不同计算机上的进程共享
2、 socketTCP模型:
- from multiprocessing import Process
- import queue
- import threading
- import socket
- import random
- import time
-
- MaxSize=5
-
- def produce(speed,host,port):
- A=(host,port)
- s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
- s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF,MaxSize*3)
- s.setsockopt(socket.SOL_SOCKET,socket.SO_RCVBUF,MaxSize*3)
- s.connect(A)
- item=1;
- while True:
- for i in range(speed):
- if(speed==1):
- print("生产者",speed,",生产了","{0:0=3}".format(item))
- s.send(bytes("{0:0=3}".format(item), encoding = "utf8"))
- else:
- print("生产者",speed,",生产了","{0:x=3}".format(item))
- s.send(bytes("{0:x=3}".format(item), encoding = "utf8"))
- item=item+1
- time.sleep(1)
- s.close()
-
- def consume(host1,port1,host2,port2):
- q=queue.Queue(maxsize=MaxSize);
- threading.Thread(target=getMessage, args=(q,host1,port1)).start()
- threading.Thread(target=getMessage, args=(q,host2,port2)).start()
- count=0
- time_start=time.clock()
- while True:
- speed=random.randint(1,MaxSize)
- for i in range(speed):
- item=q.get()
- print("消费者"," 消费了",item)
- count=count+1
- if(count==20):
- time_end=time.clock()
- print("消费"+str(count)+"个商品所用的时间: %f s" % (time_end - time_start))
- time.sleep(1)
-
- def getMessage(q,host,port):
- A=(host,port)
- sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
- sock.setsockopt(socket.SOL_SOCKET,socket.SO_RCVBUF,MaxSize*3)
- sock.setsockopt(socket.SOL_SOCKET,socket.SO_SNDBUF,MaxSize*3)
- sock.bind(A)
- sock.listen(0)
- tcpClientSock, addr=sock.accept()
- while True:
- try:
- data=tcpClientSock.recv(3)
- q.put(str(data,encoding = "utf8"))
- print("count="+str(q.qsize()))
- except:
- print("exception")
- tcpClientSock.close()
- sock.close()
-
-
- if __name__ == '__main__':
-
- Process(target=produce, args=(1,'localhost',8080)).start()
-
- Process(target=produce, args=(2,'localhost',8090)).start()
-
- Process(target=consume, args=('localhost',8080,'localhost',8090)).start()

3、 远程调用模型:
先在主进程中注册获取产品的方法,消费者在取用商品时调用取用商品的远程方法来获取。取用商品有一定的延迟,使得程序的整个运行速度比较慢。
- from multiprocessing import Process
- from xmlrpc.client import ServerProxy
- from xmlrpc.server import SimpleXMLRPCServer
- import random
- import time
- import multiprocessing
-
- q = multiprocessing.Queue(maxsize = 5)
-
- def produce(speed,q):
- while True:
- for i in range(speed):
- item = random.random()
- q.put(item)
- print("生产者",speed,"生产了",str(item))
- time.sleep(1)
-
- def getAProduct():
- global q
- return q.get()
-
- def consume(host,port):
- server = ServerProxy("http://"+host+":"+str(port))
- count=0
- time_start=time.clock()
- while True:
- speed=random.randint(1, 5)
- for i in range(speed):
- print("开始远程调用")
- item = server.getAProduct()
- print("消费者","消费了",item)
- count=count+1
- if(count==20):
- time_end=time.clock()
- print("消费"+str(count)+"个资源所需要的时间: %f s" % (time_end - time_start))
- time.sleep(1)
-
- if __name__ == '__main__':
-
- s = SimpleXMLRPCServer(('localhost', 8000))
-
- s.register_function(getAProduct)
-
- print('注册获取产品方法完成')
-
- Process(target=produce, args=(1,q)).start()
-
- Process(target=produce, args=(2,q)).start()
-
- Process(target=consume, args=('localhost',8000)).start()
-
- s.serve_forever()

4、使用Redis发布订阅模式:
- #encoding=utf-8
- '''
- @author: jqy
- '''
- from multiprocessing import Process
- import random
- import time
- import redis
-
- def produce(speed):
- client = redis.StrictRedis()
- while True:
- for i in range(speed):
- item = random.random()
- client.publish("hole",item)
- print("生产者",speed,"生产了",str(item))
- time.sleep(1)
-
- def consume(speed):
- count=0
- time_start=time.clock()
- client = redis.StrictRedis()
- p = client.pubsub()
- p.subscribe("hole")
- for i in range(speed):
- for msg in p.listen():
- if msg['type'] != 'message':
- continue
- print("消费者", "消费了", msg)
- count = count + 1
- if (count == 20):
- time_end = time.clock()
- print("消费 " + str(count) + "个商品所用时间: %f s" % (time_end - time_start))
- time.sleep(1)
-
- if __name__ == '__main__':
-
- Process(target=consume, args=(2,)).start()
-
- Process(target=produce, args=(1,)).start()
-
- Process(target=produce, args=(2,)).start()

注:如果生产者存在,消费者不存在,那么消息直接会丢弃。生产者发送消息的时候,如果消费者突然断掉了,则消息丢失。
参考文献:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。