当前位置:   article > 正文

利用python实现生产者消费者的并发模型_1.使用python面向的对象写法,编写多线程生产消费者并发模型

1.使用python面向的对象写法,编写多线程生产消费者并发模型

一、使用多线程实现生产者与消费者模型

1、Condition模型

可以认为Condition对象维护了一个锁(Lock/RLock)和一个waiting池。线程通过acquire获得Condition对象,当调用wait方法时,线程会释放Condition内部的锁并进入blocked状态,同时在waiting池中记录这个线程。当调用notify方法时,Condition对象会从waiting池中挑选一个线程,通知其调用acquire方法尝试取到锁,但是notify and notifyall本身是不会释放占有的Condition内部的锁,所以随后需要condition.release()来显示的释放锁。

Condition对象的构造函数可以接受一个Lock/RLock对象作为参数,如果没有指定,则Condition对象会在内部自行创建一个RLock。

  1. import threading
  2. import time,random
  3. product=[];
  4. cond = threading.Condition()
  5. class Producer(threading.Thread):
  6. def __init__(self,speed):
  7. threading.Thread.__init__(self)
  8. self.speed = speed
  9. def run(self):
  10. while True:
  11. for i in range(self.speed):
  12. cond.acquire()
  13. while len(product)>=5:
  14. cond.wait();
  15. num=random.random();
  16. product.append(num)
  17. print("生产者"+str(self.speed)+"生产了:"+str(num))
  18. cond.notifyAll()
  19. cond.release()
  20. time.sleep(1)
  21. class Consumer(threading.Thread):
  22. def __init__(self):
  23. threading.Thread.__init__(self)
  24. def run(self):
  25. count=0
  26. time_start=time.clock()
  27. while True:
  28. speed=random.randint(1,5);
  29. for i in range(speed):
  30. cond.acquire()
  31. while len(product)<=0:
  32. cond.wait()
  33. num=product[0];
  34. del product[0]
  35. print("消费者,消费了"+str(num))
  36. count=count+1
  37. if(count==20):
  38. time_end=time.clock()
  39. print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
  40. cond.notify()
  41. cond.release()
  42. time.sleep(1)
  43. Producer(1).start();
  44. Producer(2).start();
  45. Consumer().start();

2、Queue模型

(1)创建一个 Queue.Queue() 的实例,来表示缓冲池。

(2)每次从使用生产者线程对队列中的数据进行填充,使用消费者线程取出队列中的数据。

  1. import threading
  2. import queue
  3. import time
  4. import random
  5. class Producer(threading.Thread):
  6. def __init__(self,speed,queue):
  7. threading.Thread.__init__(self)
  8. self.speed = speed
  9. self.queue = queue
  10. def run(self):
  11. while True:
  12. for i in range(self.speed):
  13. item = random.random()
  14. self.queue.put(item)
  15. print("生产者",self.speed,"生产了:",str(item))
  16. time.sleep(1)
  17. class Consumer(threading.Thread):
  18. def __init__(self,queue):
  19. threading.Thread.__init__(self)
  20. self.queue = queue
  21. def run(self):
  22. time_start=time.clock()
  23. count=0
  24. while True:
  25. speed=random.randint(1, 5)
  26. for i in range(speed):
  27. item = self.queue.get()
  28. print("消费者","消费:",item)
  29. count=count+1
  30. if(count==20):
  31. time_end=time.clock()
  32. print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
  33. time.sleep(1)
  34. q = queue.Queue(maxsize = 5)
  35. if __name__ == '__main__':
  36. Producer(1,q).start()
  37. Producer(2,q).start()
  38. Consumer(q).start()

3、信号量模型:

  1. import sys, time
  2. import random
  3. from threading import Thread, Semaphore
  4. product = []
  5. mutex = Semaphore(1)
  6. #也可以是mutex = RLock()
  7. full = Semaphore(0)
  8. empty = Semaphore(5)
  9. class Producer(Thread):
  10. def __init__(self, speed):
  11. Thread.__init__(self);
  12. self.speed=speed;
  13. def run(self):
  14. while True:
  15. for i in range(self.speed):
  16. ProductThing='';
  17. empty.acquire()
  18. mutex.acquire()
  19. num=random.random()
  20. ProductThing+=str(num)+' '
  21. product.append(str(num))
  22. print ('%s: count=%d' % ("producer"+str(self.speed), len(product)))
  23. print (' product things:'+ProductThing)
  24. mutex.release()
  25. full.release()
  26. time.sleep(1)
  27. class Consumer(Thread):
  28. def __init__(self):
  29. Thread.__init__(self);
  30. def run(self):
  31. count=0
  32. time_start=time.clock()
  33. while True:
  34. speed=random.randint(1, 5)
  35. for i in range(speed):
  36. consumeThing=""
  37. full.acquire()
  38. mutex.acquire()
  39. consumeThing+=str(product[0])+' '
  40. del product[0]
  41. print('%s: count=%d' % ("consumer", len(product)))
  42. print(' consume things:'+consumeThing)
  43. count=count+1
  44. if(count==20):
  45. time_end=time.clock()
  46. print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
  47. mutex.release()
  48. empty.release()
  49. time.sleep(1)
  50. if __name__ == '__main__':
  51. Producer(1).start()
  52. Producer(2).start()
  53. Consumer().start()

4、Event模型

threading.Event机制类似于一个线程向其它多个线程发号施令的模式,其它线程都会持有一个threading.Event的对象,这些线程都会等待这个事件的“发生”,如果此事件一直不发生,那么这些线程将会阻塞,直至事件的“发生”。生产者生产完商品会立即通知消费者去消费,消费者消费完商品后会立即通知生产者去生产,适用于产品池数目为一的情况。

  1. import threading
  2. import random
  3. import time
  4. def produce(speed,e_p1,e_p2,e_c,product):
  5. while True:
  6. for i in range(speed):
  7. if(speed==1):
  8. e_p1.wait();
  9. if(speed==2):
  10. e_p2.wait();
  11. num=random.random();
  12. product.append(num)
  13. print("生产者"+str(speed)+",生产了"+str(num))
  14. e_c.set();
  15. if(speed==1):
  16. e_p1.clear();
  17. if(speed==2):
  18. e_p2.clear();
  19. time.sleep(1)
  20. def consume(e_p1,e_p2,e_c,product):
  21. count=0
  22. time_start=time.clock()
  23. while True:
  24. speed=random.randint(1,5);
  25. for i in range(speed):
  26. e_c.wait();
  27. num=product[0];
  28. del product[0]
  29. print("消费者,消费了"+str(num))
  30. count=count+1
  31. if(count==20):
  32. time_end=time.clock()
  33. print("消费 "+str(count)+"个商品所用的时间: %f s" % (time_end - time_start))
  34. r=random.randint(1,2)
  35. if(r==1):
  36. e_p1.set()
  37. if(r==2):
  38. e_p2.set()
  39. e_c.clear()
  40. time.sleep(1)
  41. if __name__ == '__main__':
  42. e_p1= threading.Event();
  43. e_p2= threading.Event();
  44. e_c= threading.Event();
  45. e_p1.set()
  46. product=[]
  47. p1=threading.Thread(target=produce, args=(1,e_p1,e_p2,e_c,product))
  48. p1.start()
  49. p2=threading.Thread(target=produce, args=(2,e_p1,e_p2,e_c,product))
  50. p2.start()
  51. c1=threading.Thread(target=consume, args=(e_p1,e_p2,e_c,product))
  52. c1.start()

二、多进程实现生产者与消费者模型

1、  信号量和共享内存

  1. '''
  2. @author: jqy
  3. '''
  4. from multiprocessing import Process
  5. import time
  6. import random
  7. import multiprocessing
  8. def produce(speed,mutex,full,empty,product,pindex,cindex):
  9. while True:
  10. for i in range(speed):
  11. ProductThing='';
  12. empty.acquire()
  13. mutex.acquire()
  14. num=random.random()
  15. ProductThing+=str(num)+' '
  16. product[pindex.value]=num
  17. pindex.value=(pindex.value+1)%len(product)
  18. print ('%s: product things:%s' % ("producer"+str(speed), ProductThing))
  19. mutex.release()
  20. full.release()
  21. time.sleep(1)
  22. def consume(mutex,full,empty,product,pindex,cindex):
  23. count=0
  24. time_start=time.clock()
  25. while True:
  26. speed=random.randint(1, 5)
  27. for i in range(speed):
  28. consumeThing=""
  29. full.acquire()
  30. with mutex:
  31. consumeThing+=str(product[cindex.value])+' '
  32. cindex.value=(cindex.value+1)%len(product)
  33. print('%s: consume things:%s' % ("consumer", consumeThing))
  34. count=count+1
  35. if(count==20):
  36. time_end=time.clock()
  37. print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
  38. empty.release()
  39. time.sleep(1)
  40. if __name__ == '__main__':
  41. product = multiprocessing.Array('d',range(5))
  42. pindex=multiprocessing.Value('i',0)
  43. cindex=multiprocessing.Value('i',0)
  44. mutex = multiprocessing.RLock()
  45. full = multiprocessing.Semaphore(0)
  46. empty = multiprocessing.Semaphore(5)
  47. Process(target=produce, args=(1,mutex,full,empty,product,pindex,cindex)).start()
  48. Process(target=produce, args=(2,mutex,full,empty,product,pindex,cindex)).start()
  49. Process(target=consume, args=(mutex,full,empty,product,pindex,cindex)).start()

2、服务进程管理器Manager

  1. '''
  2. @author: jqy
  3. '''
  4. from multiprocessing import Process
  5. import time
  6. import random
  7. import multiprocessing
  8. def produce(speed,mutex,full,empty,product,pindex,cindex):
  9. while True:
  10. for i in range(speed):
  11. ProductThing='';
  12. empty.acquire()
  13. mutex.acquire()
  14. num=random.random()
  15. ProductThing+=str(num)+' '
  16. product[pindex.value]=num
  17. pindex.value=(pindex.value+1)%len(product)
  18. print ('%s: product things:%s' % ("producer"+str(speed), ProductThing))
  19. mutex.release()
  20. full.release()
  21. time.sleep(1)
  22. def consume(mutex,full,empty,product,pindex,cindex):
  23. count=0
  24. time_start=time.clock()
  25. while True:
  26. speed=random.randint(1, 5)
  27. for i in range(speed):
  28. consumeThing=""
  29. full.acquire()
  30. with mutex:
  31. consumeThing+=str(product[cindex.value])+' '
  32. cindex.value=(cindex.value+1)%len(product)
  33. print('%s: consume things:%s' % ("consumer", consumeThing))
  34. count=count+1
  35. if(count==20):
  36. time_end=time.clock()
  37. print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
  38. empty.release()
  39. time.sleep(1)
  40. if __name__ == '__main__':
  41. mgr = multiprocessing.Manager()
  42. product = mgr.Array('d',range(5))
  43. pindex = mgr.Value('i',0)
  44. cindex = mgr.Value('i',0)
  45. mutex = mgr.RLock()
  46. full = mgr.Semaphore(0)
  47. empty = mgr.Semaphore(5)
  48. p1 = Process(target=produce, args=(1,mutex,full,empty,product,pindex,cindex))
  49. p2 = Process(target=produce, args=(2,mutex,full,empty,product,pindex,cindex))
  50. p3 = Process(target=consume, args=(mutex,full,empty,product,pindex,cindex))
  51. p1.start(), p2.start(), p3.start()
  52. p1.join(), p2.join(), p3.join()

3、  Condition模型

  1. import multiprocessing
  2. import random
  3. import time
  4. def produce(speed,cond,product):
  5. while True:
  6. for i in range(speed):
  7. cond.acquire()
  8. while len(product)>=5:
  9. cond.wait();
  10. num=random.random();
  11. product.append(num)
  12. print("生产者"+str(speed)+",生产了:"+str(num))
  13. cond.notify()
  14. cond.release()
  15. time.sleep(1)
  16. def consume(cond,product):
  17. count=0
  18. time_start=time.clock()
  19. while True:
  20. speed=random.randint(1,5);
  21. for i in range(speed):
  22. cond.acquire()
  23. while len(product)<=0:
  24. cond.wait()
  25. num=product[0];
  26. del product[0]
  27. print("消费者,消费了"+str(num))
  28. count=count+1
  29. if(count==20):
  30. time_end=time.clock()
  31. print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
  32. cond.notify()
  33. cond.release()
  34. time.sleep(1)
  35. if __name__ == '__main__':
  36. c= multiprocessing.Condition();
  37. product=multiprocessing.Manager().list()
  38. p1=multiprocessing.Process(target=produce, args=(1,c,product))
  39. p1.start()
  40. p2=multiprocessing.Process(target=produce, args=(2,c,product))
  41. p2.start()
  42. c1=multiprocessing.Process(target=consume, args=(c,product))
  43. c1.start()
  44. p1.join()
  45. p2.join()
  46. c1.join()

4、  消息队列模型

  1. '''
  2. @author: jqy
  3. '''
  4. from multiprocessing import Process
  5. import random
  6. import time
  7. import multiprocessing
  8. def produce(speed,q):
  9. while True:
  10. for i in range(speed):
  11. item = random.random()
  12. q.put(item)
  13. print("生产者",speed,"生产了",str(item))
  14. time.sleep(1)
  15. def consume(num,q):
  16. count=0
  17. time_start=time.clock()
  18. while True:
  19. speed=random.randint(1, 5)
  20. for i in range(speed):
  21. item = q.get()
  22. print("消费者","消费了",item)
  23. count=count+1
  24. if(count==20):
  25. time_end=time.clock()
  26. print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
  27. time.sleep(1)
  28. if __name__ == '__main__':
  29. q = multiprocessing.Queue(maxsize = 5)
  30. Process(target=produce, args=(1,q)).start()
  31. Process(target=produce, args=(2,q)).start()
  32. Process(target=consume, args=(1,q)).start()

5、  管道模型:

  1. '''
  2. Created on 2016/11/11
  3. @author: jqy
  4. '''
  5. from multiprocessing import Process
  6. import random
  7. import time
  8. import multiprocessing
  9. def produce(speed,empty,p):
  10. while True:
  11. for i in range(speed):
  12. empty.acquire()
  13. item = random.random()
  14. p.send(item)
  15. print("生产者",speed,",生产了",str(item))
  16. time.sleep(1)
  17. def consume(num,empty,p):
  18. count=0
  19. time_start=time.clock()
  20. while True:
  21. speed=random.randint(1,5)
  22. for i in range(speed):
  23. item = p.recv()
  24. print("消费者"," 消费了",item)
  25. count=count+1
  26. if(count==20):
  27. time_end=time.clock()
  28. print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
  29. empty.release()
  30. time.sleep(1)
  31. if __name__ == '__main__':
  32. parent_conn, child_conn = multiprocessing.Pipe()
  33. empty = multiprocessing.Semaphore(5)
  34. Process(target=produce, args=(1,empty,child_conn)).start()
  35. Process(target=produce, args=(2,empty,child_conn)).start()
  36. Process(target=consume, args=(1,empty,parent_conn)).start()

6、  Event模型

生产者生产完商品会立即通知消费者去消费,消费者消费完商品后会立即通知生产者去生产,适用于产品池数目为一的情况。

  1. import multiprocessing
  2. import random
  3. import time
  4. def produce(speed,e_p1,e_p2,e_c,product):
  5. while True:
  6. for i in range(speed):
  7. if(speed==1):
  8. e_p1.wait();
  9. if(speed==2):
  10. e_p2.wait();
  11. num=random.random();
  12. product.append(num)
  13. print("生产者"+str(speed)+",生产了"+str(num))
  14. e_c.set();
  15. if(speed==1):
  16. e_p1.clear();
  17. if(speed==2):
  18. e_p2.clear();
  19. time.sleep(1)
  20. def consume(e_p1,e_p2,e_c,product):
  21. count=0
  22. time_start=time.clock()
  23. while True:
  24. speed=random.randint(1,5);
  25. for i in range(speed):
  26. e_c.wait();
  27. num=product[0];
  28. del product[0]
  29. print("消费者,消费了"+str(num))
  30. count=count+1
  31. if(count==20):
  32. time_end=time.clock()
  33. print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
  34. r=random.randint(1,2)
  35. if(r==1):
  36. e_p1.set()
  37. if(r==2):
  38. e_p2.set()
  39. e_c.clear()
  40. time.sleep(1)
  41. if __name__ == '__main__':
  42. e_p1= multiprocessing.Event();
  43. e_p2= multiprocessing.Event();
  44. e_c= multiprocessing.Event();
  45. e_p1.set()
  46. product=multiprocessing.Manager().list()
  47. p1=multiprocessing.Process(target=produce, args=(1,e_p1,e_p2,e_c,product))
  48. p1.start()
  49. p2=multiprocessing.Process(target=produce, args=(2,e_p1,e_p2,e_c,product))
  50. p2.start()
  51. c1=multiprocessing.Process(target=consume, args=(e_p1,e_p2,e_c,product))
  52. c1.start()
  53. p1.join()
  54. p2.join()
  55. c1.join()

三、不同主机上生产者消费者模型:

1、Manager:服务进程管理器可以通过网络由不同计算机上的进程共享

2、 socketTCP模型:

  1. from multiprocessing import Process
  2. import queue
  3. import threading
  4. import socket
  5. import random
  6. import time
  7. MaxSize=5
  8. def produce(speed,host,port):
  9. A=(host,port)
  10. s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  11. s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF,MaxSize*3)
  12. s.setsockopt(socket.SOL_SOCKET,socket.SO_RCVBUF,MaxSize*3)
  13. s.connect(A)
  14. item=1;
  15. while True:
  16. for i in range(speed):
  17. if(speed==1):
  18. print("生产者",speed,",生产了","{0:0=3}".format(item))
  19. s.send(bytes("{0:0=3}".format(item), encoding = "utf8"))
  20. else:
  21. print("生产者",speed,",生产了","{0:x=3}".format(item))
  22. s.send(bytes("{0:x=3}".format(item), encoding = "utf8"))
  23. item=item+1
  24. time.sleep(1)
  25. s.close()
  26. def consume(host1,port1,host2,port2):
  27. q=queue.Queue(maxsize=MaxSize);
  28. threading.Thread(target=getMessage, args=(q,host1,port1)).start()
  29. threading.Thread(target=getMessage, args=(q,host2,port2)).start()
  30. count=0
  31. time_start=time.clock()
  32. while True:
  33. speed=random.randint(1,MaxSize)
  34. for i in range(speed):
  35. item=q.get()
  36. print("消费者"," 消费了",item)
  37. count=count+1
  38. if(count==20):
  39. time_end=time.clock()
  40. print("消费"+str(count)+"个商品所用的时间: %f s" % (time_end - time_start))
  41. time.sleep(1)
  42. def getMessage(q,host,port):
  43. A=(host,port)
  44. sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  45. sock.setsockopt(socket.SOL_SOCKET,socket.SO_RCVBUF,MaxSize*3)
  46. sock.setsockopt(socket.SOL_SOCKET,socket.SO_SNDBUF,MaxSize*3)
  47. sock.bind(A)
  48. sock.listen(0)
  49. tcpClientSock, addr=sock.accept()
  50. while True:
  51. try:
  52. data=tcpClientSock.recv(3)
  53. q.put(str(data,encoding = "utf8"))
  54. print("count="+str(q.qsize()))
  55. except:
  56. print("exception")
  57. tcpClientSock.close()
  58. sock.close()
  59. if __name__ == '__main__':
  60. Process(target=produce, args=(1,'localhost',8080)).start()
  61. Process(target=produce, args=(2,'localhost',8090)).start()
  62. Process(target=consume, args=('localhost',8080,'localhost',8090)).start()

3、  远程调用模型:

先在主进程中注册获取产品的方法,消费者在取用商品时调用取用商品的远程方法来获取。取用商品有一定的延迟,使得程序的整个运行速度比较慢。

  1. from multiprocessing import Process
  2. from xmlrpc.client import ServerProxy
  3. from xmlrpc.server import SimpleXMLRPCServer
  4. import random
  5. import time
  6. import multiprocessing
  7. q = multiprocessing.Queue(maxsize = 5)
  8. def produce(speed,q):
  9. while True:
  10. for i in range(speed):
  11. item = random.random()
  12. q.put(item)
  13. print("生产者",speed,"生产了",str(item))
  14. time.sleep(1)
  15. def getAProduct():
  16. global q
  17. return q.get()
  18. def consume(host,port):
  19. server = ServerProxy("http://"+host+":"+str(port))
  20. count=0
  21. time_start=time.clock()
  22. while True:
  23. speed=random.randint(1, 5)
  24. for i in range(speed):
  25. print("开始远程调用")
  26. item = server.getAProduct()
  27. print("消费者","消费了",item)
  28. count=count+1
  29. if(count==20):
  30. time_end=time.clock()
  31. print("消费"+str(count)+"个资源所需要的时间: %f s" % (time_end - time_start))
  32. time.sleep(1)
  33. if __name__ == '__main__':
  34. s = SimpleXMLRPCServer(('localhost', 8000))
  35. s.register_function(getAProduct)
  36. print('注册获取产品方法完成')
  37. Process(target=produce, args=(1,q)).start()
  38. Process(target=produce, args=(2,q)).start()
  39. Process(target=consume, args=('localhost',8000)).start()
  40. s.serve_forever()

4、使用Redis发布订阅模式:

  1. #encoding=utf-8
  2. '''
  3. @author: jqy
  4. '''
  5. from multiprocessing import Process
  6. import random
  7. import time
  8. import redis
  9. def produce(speed):
  10. client = redis.StrictRedis()
  11. while True:
  12. for i in range(speed):
  13. item = random.random()
  14. client.publish("hole",item)
  15. print("生产者",speed,"生产了",str(item))
  16. time.sleep(1)
  17. def consume(speed):
  18. count=0
  19. time_start=time.clock()
  20. client = redis.StrictRedis()
  21. p = client.pubsub()
  22. p.subscribe("hole")
  23. for i in range(speed):
  24. for msg in p.listen():
  25. if msg['type'] != 'message':
  26. continue
  27. print("消费者", "消费了", msg)
  28. count = count + 1
  29. if (count == 20):
  30. time_end = time.clock()
  31. print("消费 " + str(count) + "个商品所用时间: %f s" % (time_end - time_start))
  32. time.sleep(1)
  33. if __name__ == '__main__':
  34. Process(target=consume, args=(2,)).start()
  35. Process(target=produce, args=(1,)).start()
  36. Process(target=produce, args=(2,)).start()

注:如果生产者存在,消费者不存在,那么消息直接会丢弃。生产者发送消息的时候,如果消费者突然断掉了,则消息丢失。

参考文献:

multiprocessing --- 基于进程的并行 — Python 3.10.1 文档

Python使用Manager对象实现不同机器上的进程跨网络传输数据 - 云+社区 - 腾讯云

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/153459
推荐阅读
相关标签
  

闽ICP备14008679号