赞
踩
生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
总的来说,生产者消费者模型提供了一种优雅和可扩展的方式来处理资源共享和通信问题,提高了系统的并发性、响应性和可伸缩性,使得程序设计更灵活和可维护。
import threading import time buffer = [] # 共享的缓冲区 lock = threading.Lock() # 锁对象 # 生产者函数 def producer(): for i in range(10): item = f'商品{i}' print(f'生产了{item}') with lock: buffer.append(item) # 加锁操作,向缓冲区添加商品 time.sleep(1) # 消费者函数 def consumer(): while True: with lock: if buffer: item = buffer.pop(0) # 加锁操作,从缓冲区取出商品 print(f'消费了{item}') time.sleep(2) # 创建生产者和消费者线程 producer_thread = threading.Thread(target=producer) consumer_thread = threading.Thread(target=consumer) # 启动线程 producer_thread.start() consumer_thread.start() # 等待线程结束 producer_thread.join() consumer_thread.join()
在这个示例中,我们使用一个列表来作为共享的缓冲区,并使用一个锁对象来确保对缓冲区的访问是线程安全的。
在生产者函数中,我们使用with lock来加锁,然后向缓冲区添加商品。在消费者函数中,我们同样使用with lock来加锁,然后从缓冲区中取出商品进行消费。
通过使用线程和锁,我们实现了生产者消费者模式的功能。生产者线程负责生产商品并放入缓冲区,消费者线程负责从缓冲区中取出商品进行消费,而且保证了线程安全。
import threading import queue import time buffer = queue.Queue(5) # 共享的缓冲区 # 生产者函数 def producer(): for i in range(10): item = f'商品{i}' print(f'生产了{item}') buffer.put(item) # 将商品放入缓冲区 time.sleep(1) # 消费者函数 def consumer(): while True: item = buffer.get() # 从缓冲区取出商品 print(f'消费了{item}') time.sleep(2) # 创建生产者和消费者线程 producer_thread = threading.Thread(target=producer) consumer_thread = threading.Thread(target=consumer) # 启动线程 producer_thread.start() consumer_thread.start() # 等待线程结束 producer_thread.join() consumer_thread.join()
在这个示例中,我们使用了queue.Queue作为共享的缓冲区。
在生产者函数中,我们使用buffer.put(item)将商品放入缓冲区。在消费者函数中,我们使用buffer.get()从缓冲区中取出商品进行消费。
通过使用队列,我们不需要手动管理缓冲区的大小和同步,而是利用队列的线程安全性,生产者线程可以向队列中放入商品,而消费者线程可以从队列中取出商品,而不会造成冲突。
通过创建生产者和消费者线程并启动它们,我们实现了生产者消费者模式的功能。
import threading import time buffer = [] # 共享的缓冲区 buffer_size = 5 # 缓冲区大小 # 条件变量 buffer_not_full = threading.Condition() # 缓冲区非满条件 buffer_not_empty = threading.Condition() # 缓冲区非空条件 # 生产者函数 def producer(): for i in range(10): item = f'商品{i}' with buffer_not_full: while len(buffer) >= buffer_size: buffer_not_full.wait() # 等待缓冲区非满 buffer.append(item) # 向缓冲区添加商品 print(f'生产了{item}') buffer_not_empty.notify() # 唤醒等待的消费者线程 time.sleep(1) # 消费者函数 def consumer(): while True: with buffer_not_empty: while len(buffer) == 0: buffer_not_empty.wait() # 等待缓冲区非空 item = buffer.pop(0) # 从缓冲区取出商品 print(f'消费了{item}') buffer_not_full.notify() # 唤醒等待的生产者线程 time.sleep(2) # 创建生产者和消费者线程 producer_thread = threading.Thread(target=producer) consumer_thread = threading.Thread(target=consumer) # 启动线程 producer_thread.start() consumer_thread.start() # 等待线程结束 producer_thread.join() consumer_thread.join()
在这个示例中,我们使用了两个条件变量buffer_not_full和buffer_not_empty来控制生产者和消费者线程的执行。
在生产者函数中,我们使用buffer_not_full.wait()来等待缓冲区非满,当缓冲区已满时,生产者线程会暂时挂起等待。当缓冲区非满时,生产者线程将商品添加到缓冲区,并使用buffer_not_empty.notify()唤醒等待的消费者线程。
在消费者函数中,我们使用buffer_not_empty.wait()来等待缓冲区非空,当缓冲区为空时,消费者线程会暂时挂起等待。当缓冲区非空时,消费者线程从缓冲区中取出商品,并使用buffer_not_full.notify()唤醒等待的生产者线程。
通过使用条件变量,我们可以精确控制生产者和消费者线程的执行,并在合适的时机进行等待和唤醒操作,以实现生产者消费者模式的功能。
import threading import time buffer = [] # 共享的缓冲区 max_size = 5 # 缓冲区最大容量 producer_sem = threading.Semaphore(max_size) # 生产者信号量 consumer_sem = threading.Semaphore(0) # 消费者信号量 # 生产者函数 def producer(): for i in range(10): item = f'商品{i}' producer_sem.acquire() # 获取生产者信号量 buffer.append(item) # 向缓冲区添加商品 print(f'生产了{item}') consumer_sem.release() # 释放消费者信号量 time.sleep(1) # 消费者函数 def consumer(): while True: consumer_sem.acquire() # 获取消费者信号量 item = buffer.pop(0) # 从缓冲区取出商品 print(f'消费了{item}') producer_sem.release() # 释放生产者信号量 time.sleep(2) # 创建生产者和消费者线程 producer_thread = threading.Thread(target=producer) consumer_thread = threading.Thread(target=consumer) # 启动线程 producer_thread.start() consumer_thread.start() # 等待线程结束 producer_thread.join() consumer_thread.join()
在这个示例中,我们使用了两个信号量producer_sem和consumer_sem来控制生产者和消费者线程的执行。
在生产者函数中,我们使用producer_sem.acquire()来获取生产者信号量,当缓冲区已满时,生产者线程会在此等待。当缓冲区有空位时,生产者线程将商品添加到缓冲区,并使用consumer_sem.release()释放消费者信号量,以唤醒一个等待的消费者线程。
在消费者函数中,我们使用consumer_sem.acquire()来获取消费者信号量,当缓冲区为空时,消费者线程会在此等待。当缓冲区有商品时,消费者线程从缓冲区中取出商品,并使用producer_sem.release()释放生产者信号量,以唤醒一个等待的生产者线程。
通过使用信号量,我们可以控制生产者和消费者线程的执行顺序和数量,以实现生产者消费者模式的功能。
import asyncio import time buffer = [] # 共享的缓冲区 # 生产者协程 async def producer(): for i in range(10): item = f'商品{i}' print(f'生产了{item}') buffer.append(item) # 向缓冲区添加商品 await asyncio.sleep(1) # 消费者协程 async def consumer(): while True: await asyncio.sleep(0) # 让出CPU时间片 if buffer: item = buffer.pop(0) # 从缓冲区取出商品 print(f'消费了{item}') await asyncio.sleep(2) # 创建事件循环并运行协程 loop = asyncio.get_event_loop() tasks = asyncio.gather(producer(), consumer()) loop.run_until_complete(tasks)
在这个示例中,我们使用了两个协程producer和consumer来完成生产者消费者的功能。
在生产者协程中,我们使用await asyncio.sleep(1)来模拟生产商品的延迟,然后将商品添加到缓冲区中。每次添加商品后,我们打印出生产的商品信息。
在消费者协程中,我们使用await asyncio.sleep(0)来让出CPU时间片,以便其他协程有机会执行。然后,我们检查缓冲区是否有商品可消费,如果有,则从缓冲区中取出商品并打印消费的商品信息。每次消费商品后,我们再次等待一段时间。
通过使用asyncio库,我们可以方便地创建和管理协程,并使用事件循环来调度协程的执行。这种方式可以实现高效的异步编程,适用于处理IO密集型任务。
import time # 生产者生成商品 def producer(): for i in range(10): item = f'商品{i}' print(f'生产了{item}') yield item time.sleep(1) # 消费者消费商品 def consumer(products): for product in products: print(f'消费了{product}') time.sleep(2) # 创建生产者生成器 products = producer() # 消费商品 consumer(products)
在这个示例中,producer函数是一个生成器函数,它通过yield语句生成商品并返回。生成器函数可以在每次调用yield时暂停执行,并返回生成的值。在这个示例中,producer函数每次生成一个商品并暂停,直到被消费者消费。
消费者函数consumer接受生成的商品作为参数,并逐个消费。在这个示例中,我们直接将生成器对象传递给消费者函数,消费者会依次迭代生成器并消费每个商品。
通过使用生成器函数,我们可以实现异步的生产者消费者模式。生产者可以在生产商品后暂停执行,而消费者可以根据需要消费商品,这种方式避免了缓冲区可能带来的额外复杂性。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。