当前位置:   article > 正文

Python 生产者消费者模型_p操作生产者消费者模型

p操作生产者消费者模型

生产者消费者模型介绍

1、为什么要使用生产者消费者模型

生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

2、什么是生产者和消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

生产者消费者模型是一种并发编程模型,主要用于解决多线程或多进程中的资源共享和通信问题。使用生产者消费者模型可以带来以下好处:

  1. 解耦生产者和消费者:生产者和消费者之间的通信通过消息队列或缓冲区进行,彼此之间不需要直接交互。这样使得生产者和消费者可以独立演化和调试,提高了代码的灵活性和可维护性。
  2. 并发和流量控制:生产者和消费者可以并发执行,提高系统的吞吐量和资源利用率。同时,可以通过控制生产者和消费者的速度,实现流量控制,防止资源过载。
  3. 缓冲和异步处理:通过引入缓冲区可以平衡生产者和消费者之间的速度差异,从而提高系统性能。生产者可以将数据放入缓冲区中,而消费者可以从缓冲区中取出数据进行处理。这种机制可以实现异步处理和解耦合。
  4. 分布式计算:生产者消费者模型可用于构建分布式系统中的任务调度、消息传递等机制。不同节点的生产者和消费者之间可以通过消息队列或共享存储进行通信,实现分布式计算和协同处理。

总的来说,生产者消费者模型提供了一种优雅和可扩展的方式来处理资源共享和通信问题,提高了系统的并发性、响应性和可伸缩性,使得程序设计更灵活和可维护。

生产者消费者模型实现

1. 使用线程和锁实现生产者消费者模型:

import threading
import time

buffer = []  # 共享的缓冲区
lock = threading.Lock()  # 锁对象

# 生产者函数
def producer():
    for i in range(10):
        item = f'商品{i}'
        print(f'生产了{item}')
        with lock:
            buffer.append(item)  # 加锁操作,向缓冲区添加商品
        time.sleep(1)

# 消费者函数
def consumer():
    while True:
        with lock:
            if buffer:
                item = buffer.pop(0)  # 加锁操作,从缓冲区取出商品
                print(f'消费了{item}')
        time.sleep(2)

# 创建生产者和消费者线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

# 启动线程
producer_thread.start()
consumer_thread.start()

# 等待线程结束
producer_thread.join()
consumer_thread.join()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

在这个示例中,我们使用一个列表来作为共享的缓冲区,并使用一个锁对象来确保对缓冲区的访问是线程安全的。
在生产者函数中,我们使用with lock来加锁,然后向缓冲区添加商品。在消费者函数中,我们同样使用with lock来加锁,然后从缓冲区中取出商品进行消费。

通过使用线程和锁,我们实现了生产者消费者模式的功能。生产者线程负责生产商品并放入缓冲区,消费者线程负责从缓冲区中取出商品进行消费,而且保证了线程安全。

2. 使用队列实现生产者消费者模型:

import threading
import queue
import time

buffer = queue.Queue(5)  # 共享的缓冲区

# 生产者函数
def producer():
    for i in range(10):
        item = f'商品{i}'
        print(f'生产了{item}')
        buffer.put(item)  # 将商品放入缓冲区
        time.sleep(1)

# 消费者函数
def consumer():
    while True:
        item = buffer.get()  # 从缓冲区取出商品
        print(f'消费了{item}')
        time.sleep(2)

# 创建生产者和消费者线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

# 启动线程
producer_thread.start()
consumer_thread.start()

# 等待线程结束
producer_thread.join()
consumer_thread.join()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

在这个示例中,我们使用了queue.Queue作为共享的缓冲区。

在生产者函数中,我们使用buffer.put(item)将商品放入缓冲区。在消费者函数中,我们使用buffer.get()从缓冲区中取出商品进行消费。

通过使用队列,我们不需要手动管理缓冲区的大小和同步,而是利用队列的线程安全性,生产者线程可以向队列中放入商品,而消费者线程可以从队列中取出商品,而不会造成冲突。

通过创建生产者和消费者线程并启动它们,我们实现了生产者消费者模式的功能。

3. 使用条件变量实现生产者消费者模型:

import threading
import time

buffer = []  # 共享的缓冲区
buffer_size = 5  # 缓冲区大小

# 条件变量
buffer_not_full = threading.Condition()  # 缓冲区非满条件
buffer_not_empty = threading.Condition()  # 缓冲区非空条件

# 生产者函数
def producer():
    for i in range(10):
        item = f'商品{i}'
        with buffer_not_full:
            while len(buffer) >= buffer_size:
                buffer_not_full.wait()  # 等待缓冲区非满
            buffer.append(item)  # 向缓冲区添加商品
            print(f'生产了{item}')
            buffer_not_empty.notify()  # 唤醒等待的消费者线程
        time.sleep(1)

# 消费者函数
def consumer():
    while True:
        with buffer_not_empty:
            while len(buffer) == 0:
                buffer_not_empty.wait()  # 等待缓冲区非空
            item = buffer.pop(0)  # 从缓冲区取出商品
            print(f'消费了{item}')
            buffer_not_full.notify()  # 唤醒等待的生产者线程
        time.sleep(2)

# 创建生产者和消费者线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

# 启动线程
producer_thread.start()
consumer_thread.start()

# 等待线程结束
producer_thread.join()
consumer_thread.join()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

在这个示例中,我们使用了两个条件变量buffer_not_full和buffer_not_empty来控制生产者和消费者线程的执行。

在生产者函数中,我们使用buffer_not_full.wait()来等待缓冲区非满,当缓冲区已满时,生产者线程会暂时挂起等待。当缓冲区非满时,生产者线程将商品添加到缓冲区,并使用buffer_not_empty.notify()唤醒等待的消费者线程。

在消费者函数中,我们使用buffer_not_empty.wait()来等待缓冲区非空,当缓冲区为空时,消费者线程会暂时挂起等待。当缓冲区非空时,消费者线程从缓冲区中取出商品,并使用buffer_not_full.notify()唤醒等待的生产者线程。

通过使用条件变量,我们可以精确控制生产者和消费者线程的执行,并在合适的时机进行等待和唤醒操作,以实现生产者消费者模式的功能。

4. 使用信号量实现生产者消费者模型:

import threading
import time

buffer = []  # 共享的缓冲区
max_size = 5  # 缓冲区最大容量

producer_sem = threading.Semaphore(max_size)  # 生产者信号量
consumer_sem = threading.Semaphore(0)  # 消费者信号量

# 生产者函数
def producer():
    for i in range(10):
        item = f'商品{i}'
        producer_sem.acquire()  # 获取生产者信号量
        buffer.append(item)  # 向缓冲区添加商品
        print(f'生产了{item}')
        consumer_sem.release()  # 释放消费者信号量
        time.sleep(1)

# 消费者函数
def consumer():
    while True:
        consumer_sem.acquire()  # 获取消费者信号量
        item = buffer.pop(0)  # 从缓冲区取出商品
        print(f'消费了{item}')
        producer_sem.release()  # 释放生产者信号量
        time.sleep(2)

# 创建生产者和消费者线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

# 启动线程
producer_thread.start()
consumer_thread.start()

# 等待线程结束
producer_thread.join()
consumer_thread.join()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

在这个示例中,我们使用了两个信号量producer_sem和consumer_sem来控制生产者和消费者线程的执行。

在生产者函数中,我们使用producer_sem.acquire()来获取生产者信号量,当缓冲区已满时,生产者线程会在此等待。当缓冲区有空位时,生产者线程将商品添加到缓冲区,并使用consumer_sem.release()释放消费者信号量,以唤醒一个等待的消费者线程。

在消费者函数中,我们使用consumer_sem.acquire()来获取消费者信号量,当缓冲区为空时,消费者线程会在此等待。当缓冲区有商品时,消费者线程从缓冲区中取出商品,并使用producer_sem.release()释放生产者信号量,以唤醒一个等待的生产者线程。

通过使用信号量,我们可以控制生产者和消费者线程的执行顺序和数量,以实现生产者消费者模式的功能。

5. 使用协程实现生产者消费者模型:

import asyncio
import time

buffer = []  # 共享的缓冲区

# 生产者协程
async def producer():
    for i in range(10):
        item = f'商品{i}'
        print(f'生产了{item}')
        buffer.append(item)  # 向缓冲区添加商品
        await asyncio.sleep(1)

# 消费者协程
async def consumer():
    while True:
        await asyncio.sleep(0)  # 让出CPU时间片
        if buffer:
            item = buffer.pop(0)  # 从缓冲区取出商品
            print(f'消费了{item}')
        await asyncio.sleep(2)

# 创建事件循环并运行协程
loop = asyncio.get_event_loop()
tasks = asyncio.gather(producer(), consumer())
loop.run_until_complete(tasks)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

在这个示例中,我们使用了两个协程producer和consumer来完成生产者消费者的功能。

在生产者协程中,我们使用await asyncio.sleep(1)来模拟生产商品的延迟,然后将商品添加到缓冲区中。每次添加商品后,我们打印出生产的商品信息。

在消费者协程中,我们使用await asyncio.sleep(0)来让出CPU时间片,以便其他协程有机会执行。然后,我们检查缓冲区是否有商品可消费,如果有,则从缓冲区中取出商品并打印消费的商品信息。每次消费商品后,我们再次等待一段时间。

通过使用asyncio库,我们可以方便地创建和管理协程,并使用事件循环来调度协程的执行。这种方式可以实现高效的异步编程,适用于处理IO密集型任务。

6、函数yield 方式:

import time

# 生产者生成商品
def producer():
    for i in range(10):
        item = f'商品{i}'
        print(f'生产了{item}')
        yield item
        time.sleep(1)

# 消费者消费商品
def consumer(products):
    for product in products:
        print(f'消费了{product}')
        time.sleep(2)

# 创建生产者生成器
products = producer()

# 消费商品
consumer(products)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

在这个示例中,producer函数是一个生成器函数,它通过yield语句生成商品并返回。生成器函数可以在每次调用yield时暂停执行,并返回生成的值。在这个示例中,producer函数每次生成一个商品并暂停,直到被消费者消费。

消费者函数consumer接受生成的商品作为参数,并逐个消费。在这个示例中,我们直接将生成器对象传递给消费者函数,消费者会依次迭代生成器并消费每个商品。

通过使用生成器函数,我们可以实现异步的生产者消费者模式。生产者可以在生产商品后暂停执行,而消费者可以根据需要消费商品,这种方式避免了缓冲区可能带来的额外复杂性。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/661591
推荐阅读
相关标签
  

闽ICP备14008679号