当前位置:   article > 正文

Python —— 并发编程(多线程、多进程)_python concurrency

python concurrency

 
 
 
 
 
 
 
 

并行与并发


 
 

并行(parallel)

同一时刻有多个事情在同时进行(真同时并非时间切片),如多核CPU可以容纳多个工作同时进行

 

并发(concurrency)

并发着重于发,即发生。在某个时刻或者某段时间,同时发生了很多需要处理的请求

 

队列和缓冲区

队列即任务队列,任务按照任务队列的顺序进行执行;队列中存在优先队列,优先队列中的任务优先执行。
缓冲区即队列长度,允许排列的最长任务数;

 

对比

并行实际可以解决并发的问题,但反之不成立。

 
 
 
 
 
 
 
 

高并发解决理念


解决方式:
1、队列。以队列顺序执行,一些特殊的任务可以以优先队列的形式进行优先执行

 

2、争抢。各个任务以争抢的方式进行排列,当某个任务争抢到资源时,会生成排他性的独占锁 若没有锁可能还未获取到资源便为其他任务挤出

特点:时间分布不均,某些应用可能长时间无法抢到计算机资源

 

3、预处理。以一种提前加载用户所需的数据的思路,提前进行一些处理以减少任务的实际运行时间(减少排他锁存在的时间),常用的如缓存

 

4、并行(水平扩展):通过多开进程来线性的提高工作效率。如有10个任务,以5个进程运行则只需要运行两批即可

特点:对硬件需求较高(CPU核心数)、分布式、云服务等

 

5、提速(垂直提升):提高硬件处理速度,如扩展内存、提高内存频率、CPU频率、减少南北桥通信延迟等

特点:容易摸到天花板,且容错能力极差

 

6、消息中间件(外部的队列):常用的如RabbitMQ、ActiveMQ(Apache)、RocketMQ(阿里的Apache)、kafka(Apache )等。系统外的外部第三方服务,将消息(数据)放在这些队列中,对请求高峰进行平滑

 

7、分布式部署

 
 
 
 
 
 
 
 

进程和线程


 
 

进程(Process)

进程是计算机中某程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。

程序和进程之间的关系,程序是源代码编译后的文件,当程序被操作系统加载到内存中,就是进程。进程中存放着指令和数据,也是线程的容器

Liunx中有父进程、子进程,Windows的进程是平等关系。

 
 
 
 

线程(LightWeight Process,LWP)

 

并非所有的操作系统都实现了线程,但在实现了线程的操作系统中线程是操作系统能够进行运算的最小操作单元。线程被包含在进程中,是进程中的实际运作单位。一个程序的运行实例就是一个进程。

 

一个标准的线程由线程ID,当前指令指针(PC),寄存器集合和堆栈组成。
线程ID:用于表示当前线程;
当前指令指针(PC):用于记录、描述当前指令运行位置,用于不同线程间的切换;
寄存器集合:将CPU还未处理完的数据保存在寄存器集合中;
堆栈:栈—只允许一段进行插入和删除,有底无顶的罐子,先进后出。先进进栈底后出出栈顶(TOP),当TOP = -1 时栈为空。
栈类似于数组,有以下五种操作:push(压栈)、pop(弹栈)、判断是否为空、判断是否已满、返回栈顶的值。

 
 
 
 

进程与线程的理解

1、在操作系统中,每一个进程都认为自己独占所有的计算机资源。进程是一个独立的王国,进程间不可以随便的共享数据。多进程本来就是操作系统对程序的虚拟化。
2、线程是省份,同一个进程内的线程可以共享进程的资源,每一个线程拥有自己独立的堆栈。

 
 
 
 
 
 
 
 

线程


 

线程的状态

1、运行态(Ready):该时刻,该线程正在占用CPU

2、就绪态(Running):可随时转换为运行态,因为其他线程正在运行而暂停。目前该线程不占用CPU

3、阻塞态(Blocked):线程处于阻塞状态,等待某些外部条件的触发,否则线程不运行(不希望出现永久阻塞)

4、终止态(Terminated):线程完成后退出或者被取消
在这里插入图片描述

 
 
 
 

线程安全

在 python 中多线程会将某些非原子性的语句块打断,交叉执行。如 print() 在多线程调度时,很容易把最后的回车和前面的输出字符串做打断。这种情况就称作线程不安全
 
logging 是线程安全的,建议在多线程(日常调 bug 也建议替换)中使用 logging 来替换 print

 
 
 
 

daemon线程和non-daemon线程

daemon 是在 python 中为了简化程序员的工作,让他们不用管理那些只需要丢在后台,并且会随着主线程的退出而退出的线程。常用场景:发送心跳包、监控或者一些随时可以被终止的线程
 
此处的 daemon 并非 linux 中的 daemon守护线程,其含义更多的偏向于 “可以随时停止的线程” 而非 “后台线程”
 
daemon 线程会在主线程结束时,自动结束掉。而非 daemon 线程则不论主线程是否结束,都会执行完自身或者有异常抛出时才结束
 
开启方式,在 threading.Thread(daemon=True) 即代表新开线程为 daemon 线程
 
注:当子线程中既存在 daemon 线程又存在 non-daemon 线程时,会等待所有的 non-daemon 线程全部执行完毕后,再去执行 daemon 线程
 
daemon 必须在 start() 前进行设置,可以在构造时设置 daemon 参数,也可以对 thread 对象使用 setDaemon 同样必须在线程 start() 之前进行

 

如:

import threading
import time

from log import Log
from logging import DEBUG

logger = Log.getLogger(DEBUG)
debug = logger.debug


def worker():
    for x in range(100):
        debug(f"{threading.current_thread()} is running")
        threading.Thread(name="t2", target=worker1, daemon=True).start()

    debug(f"=== {threading.enumerate()} ===")


def worker1():
    for x in range(10000):
        debug(f"{threading.current_thread()} is running")

    debug(f"xxx {threading.enumerate()} xxx")


debug(f"*** {threading.current_thread()} is running ***")
debug(f"*** {threading.enumerate()} ***")
t1 = threading.Thread(name="t1", target=worker, daemon=True)
t1.start()
time.sleep(0.5)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

 
 
 
 

join

写法: thread.join()
 
join() 的功能类似 ”使等待“,如在主线程中使用 t1.join(),即使当前线程(主线程)等待调用 join() 方法的线程,等到 t1 线程执行完毕后,当前线程(主线程)再继续向下执行(当前线程被阻塞在 join() 处)
在这里插入图片描述
 
join() —— 可以设置 timeout

import threading
import time

from log import Log
from logging import DEBUG

logger = Log.getLogger(DEBUG)
debug = logger.debug


def worker():
    for x in range(100):
        debug(f"{threading.current_thread()} is running")
        t2 = threading.Thread(name="t2", target=worker1, daemon=True)
        t2.start()
        t2.join()

    debug(f"=== {threading.enumerate()} ===")


def worker1():
    for x in range(100):
        debug(f"{threading.current_thread()} is running")

    debug(f"xxx {threading.enumerate()} xxx")


debug(f"*** {threading.current_thread()} is running ***")
debug(f"*** {threading.enumerate()} ***")
t1 = threading.Thread(name="t1", target=worker, daemon=True)
t1.start()
t1.join()
debug("w" * 30)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

 
 
 
 

thread.local 类

thread.local 是一个类(线程安全的类),虽然没有用大写,但是是一个类,并且是一个可能会被大量实例化的类(因为内部实现了__slots__)。
 
其实是实现了一个可以被多个函数调用,但是每个函数中调用的又是独立的一个实例对象,常用于挂载属性,如要开启多个线程对一个同名属性进行修改时。
 
特点:

  • 在全局变量中进行实现
  • 可以大胆的用在本地变量中,用在各个线程的各局部变量时,是被隔离开的

 
注:能用局部变量就用局部变量,如非必要不要使用全局变量。使用优先级 “局部变量 > thread.local() > 全局变量”

写法如下:

import logging
import threading
import time

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
logger.addHandler(handler)

a = threading.local()


def worker():
    a.x = 0
    for i in range(100):
        time.sleep(0.00001)
        a.x += 1

    logger.debug(f"{threading.current_thread()}, {a.x}")


for i in range(10):
    threading.Thread(target=worker).start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

在这里插入图片描述

 
 
 
 

Timer 定时器

threading.Timer() 开启一个延时启动的线程(是 thread 的子类),参数如下:

  • interval —— 延迟的时间,单位秒
  • function —— 要调用的函数
  • args —— 位置参数(非收集,需要传入元组或列表)
  • kwargs —— 关键字参数(非收集,需要传入字典)

 
Python 中的线程一旦执行则无法被取消,除非线程执行结束或抛出异常。但是在线程未执行前,是可以被取消的。但是其他非延迟启动线程,当调用 .start() 时线程即启动,则无法取消,延迟线程则在未达到延迟时间前都可被取消
 
取消方式:t1.cancel()
 
注: cancel() 并非一定要放到 start() 后面,也可放到 start() 前,同样可以在线程执行前取消掉线程的执行

 

import threading


def add(x, y):
    print(x, y)
    print(threading.enumerate())


t = threading.Timer(3, add, (4, 5)).start()
print("main")
print(threading.enumerate())
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

 
 
 
 

线程池

线程的新建与终止需要回收资源,如果可以重用线程,则可以减少这部分的资源消耗,线程池就是为此而生
 
线程池工作的两部分:

  • 线程池 —— 提供固定数量的可用线程
  • 任务队列 —— 统一收集要处理的任务

 
适用场景:适合突发性大量请求 或者 需要大量线程完成的任务(单个任务的处理时间较短的任务)
 
主要执行方法:

  • pool.map(func, tasks) —— 直接将任务列表传入。直接返回 func 的运行结果
  • pool.submit(func, task) —— 将单个任务传入,一般通过列表生成器来批量完成。返回 futures 对象列表,相当于前端的 promise 对象
    1、若直接迭代列表,则会等待当前 future 状态为完成时,才进行 print,并继续下一个 future
    2、若使用 as_completed(futures) 进行迭代,则会打乱顺序,先执行完的先走循环中的 print
import time, random
import time
from concurrent.futures import ThreadPoolExecutor, as_completed


def worker(s):
    time.sleep(s)
    return s


tasks = [random.uniform(0.1, 0.3) for i in range(100)]
# 方法一
start = time.time()
with ThreadPoolExecutor(10) as pool:
    results = pool.map(worker, tasks)
    for result in results:
        ...
        # print(result)

print(f"cost: {time.time()-start}")

# 方法二
start = time.time()
with ThreadPoolExecutor(10) as pool:
    futures = [pool.submit(worker, task) for task in tasks]

    # # 固定顺序获取结果
    # for future in futures:
    #     ...
        # print(future.result())

    # 任务完成顺序获取结果
    for future in as_completed(futures):
        ...

print(f"cost: {time.time()-start}")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

 

线程池中使用回调

from concurrent.futures import ThreadPoolExecutor
from typing import Callable


def dispatch_work(work: list[str, Callable]):
    with ThreadPoolExecutor(max_workers=4) as pool:
        for task_id, func in work:
            future = pool.submit(func)
            # 给 future 添加回调
            future.add_done_callback(some_callback_func_here)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

 
 
 
 
 
 
 
 

Python中的进程和线程


 
 

python中的进程

python中的进程会启动一个解释器进程,线程共享一个解释器进程

 

Python的线程通过 Thread类 来完成,其主要参数如下

  • target —— 线程调用的对象,就是目标函数名
  • name —— 为线程起名
  • args —— 为目标函数传递实参,元组
  • kwargs —— 为目标函数传递关键字参数,字典
     

注意:
1、Python 中并没有线程退出的方法,只能等待线程完成或者抛出异常
 
2、Python 中的线程没有优先级、没有线程组的概念,也不能被销毁、停止、挂起,那也就没有恢复、中断了

 
 
 
 
 
 
 
 

threading


 

基础

threading.Thread 的参数

参数含义
group为未来留 ThreadGroup 扩展留的参数,目前只接受 None
target线程调用的对象,就是目标函数名
name为线程起名
args为目标函数传递实参,元组
kwargs为目标函数传递关键字参数,字典

 

threading 的属性和方法

名称含义
current_thread()返回当前线程对象
main_thread()返回主线程对象
active_count()当前处于 alive 状态的线程个数
enumerate()返回所有活着的线程列表,不包括已经终止的线程和未开始的线程
get_ident()返回当前线程的ID,非0整数

 

Thread实例的属性和方法

属性及方法含义
name线程的名称
ident线程的ID,非 0 整数,线程启动后才会有 ID,否则为 None。线程退出,此 ID 会被回收
is_alive()返回线程是否活着
getName()获取线程名称
setName()设置线程名称
start()启动线程。每一个线程必须且只能 start 一次,即使线程已经退出,也无法再调用 start
run()运行线程

 

注意:

1、线程在 start() 时,先运行 threading.start(),再运行 threading.run()

 

2、线程在 start() 时,会给 Thread 实例添加 _target、_args、_kwargs 等属性,并且会在线程退出时清空这些属性。由此在线程退出后,在调用 run() 方法时,会找不到这些属性,进而导致无法运行,如果人为补全,也是可以运行的

 

3、当在主线程内直接调用 t.run() 时,会直接使用主线程调用 threading.Thread() 传入的函数,此时主线程被阻塞,等待函数调用完成后,主线程继续执行

 

实例一:运行 start() 是先调用 start(),再调用 run()

import threading
import time


class MyThread(threading.Thread):

    def run(self) -> None:
        print("run")
        super().run()

    def start(self) -> None:
        print("start")
        super().start()


def add(x, y):
    for i in range(5):
        time.sleep(1)
        print("adding...")
    print("over")
    return x + y


t1 = MyThread(None, add, "t1", (1, 2), {})
t2 = MyThread(None, add, "t2", (3, 4), {})
t1.start()
t2.start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

 

实例二:不使用 start() 直接使用 run(),会使用主线程执行 threading.Thread() 传入的函数

import threading
import time


class MyThread(threading.Thread):

    def run(self) -> None:
        print("run")
        super().run()

    def start(self) -> None:
        print("start")
        super().start()


def add(x, y):
    print(threading.current_thread())
    for i in range(3):
        time.sleep(1)
        print("adding...")
    print("over")
    print(threading.enumerate())
    return x + y


t1 = MyThread(None, add, "t1", (1, 2), {})
t1.run()
print(threading.enumerate())

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

 
 
 
 
 
 
 
 

线程同步


线程同步,线程间协同,通过某种技术,让一个线程访问某些数据时,其他线程不能访问这些数据,直到该线程完成对数据的操作

基本方法:临界区(Critical Section)、互斥量(Mutex)、信号量(Semaphore)、事件Event

多个线程中,如果需要主动释放计算资源,则可以调用 time.sleep(0)

 
 
 
 

Event 事件

Event事件,是线程间通信机制中最简单的实现,使用一个内部的标记 flag,通过 flag 的 True 或 False 的变化来操作

 

名称含义
set()标记设置为 True
clear()标记为 False
is_set()标记是否为 True
wait(timeout=None)设置等待标记为 True 的时长,None 为无限等待。等到返回 True,未等到超时了返回 False

 

如下示例,通过 Event 事件在不同线程间进行信息传递。在不同线程间检测 Event 实例的 flag。需要等待的线程,可以调用 wait() 来等待一定时间。

import threading
import time
import logging

logger = logging.getLogger()
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(levelname)s - %(asctime)s - %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)

n = 0


# 给参数设置注解,可以帮助 IDE 搜索该对象的属性及方法
def boss(event: threading.Event):
    logger.info("老板在等待...")

    # 让该线程等待
    if event.wait(timeout=2):
        # wait 等到了 event 的 flag 变为 true
        logger.info("老板很开心")
    else:
        # wait 返回的 event 的 flag 为 false
        logger.info("老板生气了")


def create_cup(event: threading.Event):
    global n
    for i in range(10):
        time.sleep(0.1)
        n += 1
        logger.info(f"已经做了{n}个")

        logger.info(threading.enumerate())

    # 设置 flag 为 True 代表执行结束
    event.set()
    logger.info("制作杯子结束")


event = threading.Event()
logging.info(f"event is set: {event.is_set()}")
t1 = threading.Thread(target=boss, args=(event,))
t2 = threading.Thread(target=create_cup, args=(event,))
t1.start()
t2.start()
logger.info("主线程执行完毕")
logger.info(threading.enumerate())

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

 
 
 
 

Lock

凡是存在资源争抢的地方都可以使用锁,从而保证只有一个使用者可以完全使用这个资源

可以使用 threading 提供的 Lock,也可以自己实现 lock
 
使用方法:

  • 实例化锁 —— lock = threading.Lock()
  • 加锁 —— lock.acquire(blocking=True, timeout=-1),成功拿到锁返回 True,否则返回 False。当 blocking=False(非阻塞态)时,timeout 会被禁用
  • 解锁 —— lock.release()

 
在锁开启阻塞状态下,通过多个线程对同一个锁的同时操作来完成数据的共享,当要修改某个共享数据时,要求线程先拿到锁,再修改,修改完再释放。由此可以避免多个线程对共享数据的同时修改
 
阻塞锁的注意事项:
 
1、当一个线程获取锁后,若线程内的代码执行失败,则无法正常对锁进行释放,由此所有的线程都无法继续运行。为避免这种情况,需要借助 try…except…finally 来将释放锁放到 finally 中,无论如何要将当前线程加的锁释放掉
 
2、避免线程调度不均衡。即避免在某个线程的死循环内,频繁的解锁、加锁,尤其是当两次操作间的代码执行的非常快的时候。有可能造成该线程刚释放锁,中间部分的代码就执行完毕,该线程又迅速加上了锁,由此会导致其他线程无法运行
 
3、多线程使用锁最重要的就是代码的逻辑,即线程的调度能力。如,尽量避免在 加锁后 到 解锁前 的代码中执行中断循环的操作。因为当任务目标达成时,完成最后一个任务的线程会中断循环,但是此时锁并没有释放,其他线程会持续阻塞,导致程序无法结束退出

 

适用场景

锁适用于访问和修改同一个共享资源时,即读写同一个资源的时候。若被访问的资源仅仅是读取,那么不需要加锁

 

注意事项

  • 少用锁,必要的时候用锁,多线程中访问被锁资源时,就成了串行,要么排队执行,要么争抢执行
  • 加锁的时间越短越好,不需要就立即释放
  • 一定要避免死锁
  • 避免在加锁后的操作无法正常执行完毕,从而导致锁不会被释放。最好使锁对象支持 with 语法,以保证无论如何锁一定会被释放(使用 try … except … finally … 次之)

 
 
 
 

非阻塞锁

在获取锁的时候,使用参数 blocking 来控制,若 blocking 为 True 则表示锁为阻塞锁,即当前线程拿到锁后,其他线程无法在当前线程释放前拿到锁,并且若其他线程的 blocking 为 True 则其他线程在此处阻塞,并争抢锁,拿到锁后再继续运行。若其他线程的 blocking 为 False 则其他线程的 .acquire(Flase) 返回 Flase 并继续向下执行。 即锁的阻塞与否,影响的是自身线程,控制自身线程是否在此处等待,

 
 
 
 

可重入锁

可重入锁是线程相关的锁。某个线程获得可重复锁,并且可以获取多个,但是有多少个 .acquire() 就要有多少个 release。对相同的 rlock 对象,在哪个线程内加是锁,就只能由这个线程操作。其他线程操作的都是自己的 RLock
 
语法:lock - threading.RLock()

 
 
 
 

Condition

构造方法 Condition(lock=None),可以传入一个 Lock 或 RLock 对象,默认是 RLock
 
Condition 用户生产者、消费者模型,为了解决生产者消费者速度匹配问题
 
Condition 在内部以 _waiters 属性(_deque() 的双端队列)存储需要等待(使线程进入休眠的状态)的线程,当达到某些情况时,Condition通过调用 notify() 和 notify_all() 来唤醒线程
 
设计模式中的广播模式就是类似情况

 

方法

方法含义
acquire()获取锁
wait(timeout=None)使当前线程处于等待状态,会加入到等待队列(每个 Condition 对象都有一个自己的 _waiters 的队列)。处于等待状态的线程需要别的线程(因为自己已经阻塞住了)调用 notify() 或者 notify_all() 来激活处于等待队列中的线程,使其继续执行
notify(n=1)唤醒之多 n 个等待的线程,没有等待的线程就没有任何操作
notify_all()唤醒所有等待线程

 

使用场景:生产者生产产品,消费者排队获取

在这里插入图片描述

 
 
 
 

Barrier

Barrier 常用于并发初始化,所有线程必须都初始化完成后,才能继续工作。当 barrier.wait() 的线程数(每个线程执行到 barrier.wait() 的时候都会阻塞住)达到预设的 parties 时,会将各线程放开(争抢)执行
 
Barrier 类似 MOBA 游戏的对战房,每个对战房有不同的人数限制(2人,6人,10人,20人等),当房间内人数不满时,所有人均在房间内等待。当房间内人数达到预设值时,游戏开始所有人(线程)自行行动
 
Barrier 没有次数限制,一波放行后,继续积攒下一波

名称含义
Barrier(parties, action=None, timeout=None)构建 Barrier 对象,指定参与方数目。timeout 是 wait 方法未指定超时时间时的默认值
n_waiting当前在屏障中等待的线程数
parties各方数,需要等待的个数
wait(timeout=None)等待通过屏障。返回 0 到线程数 -1 的整数,每个线程返回不通。如果 wait 方法设置了超时,并超时发送,屏障将处于 broken 状态
abort()打破屏障使 barrier 处于 broken 状态,现有的处于等待的线程会触发 threading.BrokenBarrierError。处于 broken 状态的 barrier 无法继续 wait(),碰到 wait() 直接触发 BrokenBarrierError

 
 
 
 

semaphore 信号量

信号量与 Lock 很像,信号量对象内部维护一个倒计数器,每一次 acquire 都会减 1,当 acquire 方法发现计数为 0 就阻塞请求的线程,直到其他线程对信号量 release 后,计数大于 0,恢复阻塞的线程
 
常见应用:连接池
 
若 Semaphore 没有 acquire() 则直接 release(),其信号值会增加(semaphore对象的_value属性)。若要生成有顶的 Semaphore 则使用 BoundedSemaphore(),当尝试 release() 超过上限时,会抛出 Value Error

名称方法
Semaphore(value=1)构造方法,value 小于 0,抛 ValueError 异常
acquire(blocking=True, timeout=None)获取信号量,计数器减1,获取成功返回 True
release()释放信号量,计数器加 1

 
 
 
 
 
 
 
 

进程


在 Python 中进行多进程开发,需要借助 multiprocessing 模块进行多进程编写,而在这个模块内部提供有一个 Process 类,利用这个类可以进行多进程的定义
 
进程的声明周期

  • 创建状态 —— 系统已经为其分配了 PCB(可以获取进程信息),但是所需要执行的进程的上下文环境并未分配,因此当前进程还无法被调度
  • 就绪状态 —— 该进程已经分配到了除 CPU 之外的全部资源(上下文),并等待 CPU 调度
  • 执行状态 —— 进程已获取了 CPU 资源,开始正常提供服务(执行进程)
  • 阻塞状态 —— 所有的进程不可能一直强占 CPU(共享资源),依据资源调度算法,每一个进程运行一段时间后,让出当前的 CPU 资源给他其他进程执行
  • 终止状态 —— 某一个进程达到了自然终止的状态,或者进行了强制性的停止,则当前进程进入到了终止态,不再继续执行

 
注意:
 
1、每个进程都认为自己独占所有计算机资源。由此多个进程中的线程地址可能存在相同值
 
2、任何一个进程都包含有各自独立的数据,各个进程间无法直接互相访问数据,若要进程通信需要特殊的方法(如“管道”)

 

process 类的方法

方法类型描述备注
pid属性获取进程ID
name属性获取进程名称
multiprocessing.Process()初始化方法参数如下:
- group:分组定义
- target: 进程处理对象(代替 run() 方法)
- name:进程名称,若不设置则自动分配一个名称
- args:进程间处理对象所需要的执行参数
- kwargs:关键字参数
- daemon:是否被设置为后台进程
start()方法启动进程,进入进程调度队列
run()方法处理进程,不指定 target 时起效

 

CPU密集型任务的 “单线程”、“多进程”、“多线程” 的运算速度对比

| | 单线程 | 多线程 | 多进程 |
|–|–|–|–|–|
| 三并发 | 1min 31sec | 1min 35sec | 34sec

 

cpu密集型任务(从1加到1亿)在多进程和多线程下的耗时情况如下图,我的机器是 6核 12线程
在这里插入图片描述

可以看到多进程任务大致有如下规律(不要看曲线斜率,x轴尺度变化不均匀,看斜率没用)

  • 在进程数达到物理核心数前,多进程并行处理任务,时间基本与单进程相同(考虑主要受操作系统和其他软件所占用资源的影响,所以时间略长一些)
  • 当进程数达到核心数后,任务耗时增加的比例与进程数与核心数的比例几乎相同。可以理解为物理核心数分 n 波执行了任务,每波都是并行的,前后两波间是串行的(只是这么理解,操作系统会把任务分割成时间片,也就是说每一波都是交叉执行的。实际代码执行时不会是先执行 6个,再执行 6个,会表现为每个进程之间耗时差不太多,几乎同步结束)

 

多线程有大致如下规律

  • 在 CPU 密集型任务下,多线程的任务耗时和串行执行几乎相同,甚至更久(线程切换、GIL加锁解锁的耗时),基本都是线性递增

 
 
 
 

写法

面相过程的写法

import multiprocessing
from datetime import datetime


def worker():
    sum = 0
    for i in range(1000000000):
        sum += 1

    print(sum)


if __name__ == '__main__':
    start = datetime.now()
    processes = []
    for i in range(6):
        process = multiprocessing.Process(target=worker)
        process.start()
        processes.append(process)
    for process in processes:
        process.join()

    end = datetime.now()
    print(end - start)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

 
 

面相对象的写法

自定义一个类,使其继承 multiprocessing.Process,并且重写 __init__、run() 方法,前者用来传参(如 name),后者用来将 target 指向的函数在 run() 的内部进行实现。并且经常会添加一个 “延迟启动” 的功能,通过传一个值给 __init__ 并且在 run() 内 sleep 给定时间即可

import multiprocessing
import time


class MyProcess(multiprocessing.Process):
    def __init__(self, name, delay, count):
        super(MyProcess, self).__init__(name=name)
        self.__delay = delay
        self.__count = count

    def run(self) -> None:
        for num in range(self.__count):
            print(f"{num}, 进程ID:{multiprocessing.current_process().pid}, 进程名称:{multiprocessing.current_process().name}")
            # 延迟运行
            time.sleep(self.__delay)


if __name__ == '__main__':
    # 查看当前活动的子进程
    print(multiprocessing.active_children())
    processes = []
    for i in range(3):
        process = MyProcess(name=f"Process-{i}", delay=1, count=10)
        process.start()
        processes.append(process)

    # 查看子进程全部启动后当前活跃的子进程
    print(multiprocessing.active_children())

    for process in processes:
        process.join()

    # 查看子进程全部执行完毕后,当前活跃的子进程
    print(multiprocessing.active_children())
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

 
 
 
 

进程控制

主要方法

方法含义
terminate()关闭进程
is_alive()查看线程是否存活
join(timeout=None)同 thread,让当前线程等待执行这个方法的线程对象所在的线程执行完毕

 
 
 
 

daemon进程

类似 thread 中的 daemon 线程,当创建此线程的线程执行结束或者中断后,daemon 进程会被立即停止掉。常用于服务状态的检测、分发等非工作业务功能

import multiprocessing
import logging
import time

logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)


# 守护进程
def status():
    while True:
        logger.info(
            f"守护进程ID:{multiprocessing.current_process().pid}, 守护进程name:{multiprocessing.current_process().name}")
        time.sleep(1)

# 工作进程
def worker():
    # 启动守护进程
    multiprocessing.Process(target=status, name="守护进程", daemon=True).start()
    for i in range(3):
        time.sleep(1)
        logger.info(
            f"工作进程ID: {multiprocessing.current_process().pid}, 工作进程name:{multiprocessing.current_process().name}"
            f", item: {i}")


def main():
    multiprocessing.Process(target=worker, name="工作进程").start()


if __name__ == '__main__':
    main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

 
 
 
 

fork创建子进程(linux、unix)

使用 multiprocessing 创建的子进程都是通过 Python 创建的一个跨平台的子进程。除了使用 Python 创建外,也可以使用操作系统创建子进程(os.fork()),这种形式创建的子进程并不是跨平台的,仅在 linux 下可用
 
fork() 有三种结果 <0、=0、>0 分别对应 “创建失败”、“子进程获取数据”、“父进程获取数据” 三种状态

import os
import multiprocessing


def sub():
    print(f"sub() 子进程ID:{os.getpid()},父进程ID:{os.getppid()}")


def main():
    print(f"main() 进程ID:{multiprocessing.current_process().pid},进程名称:{multiprocessing.current_process().name}")
    newpid = os.fork()
    print(f"fork() 的新子进程状态码:{newpid}")
    # 给子进程分发任务
    if newpid == 0:
        sub()
    else:
        print(f"父进程执行,父进程ID:{os.getpid()}")


if __name__ == '__main__':
    main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

linux执行如下
在这里插入图片描述

 
 
 
 

psutil

psutil是一个第三方的进程管理模块,该模块可以跨平台进行进程管理
 
pip install -i https://pypi.douban.com/simple psutil

import psutil


def main():
    for process in psutil.process_iter():
        print(f"进程编号:{process.pid},进程名称:{process.name()},创建时间:{process.create_time()}")
        # 关闭记事本
        if process.name() == "notepad.exe":
            process.terminate()


if __name__ == '__main__':
    main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

 
 
 
 

进程池

使用多进程的确可以充分的发挥出全部的硬件性能,但是随之有可能带来一些问题:例如,多进程过多的时候如何保证系统的性能平衡,以及对于这多个进程的管理问题
 
为了更好的保证多进程和操作系统性能之间的平衡问题,所以一般可以将多个进程放在一个对象池中进行统一管理,对象池本身是有大小限制的,这样就可以保证不会产生过多的进程从而影响到硬件的性能,这个对象池实际上就是进程池
 
实现:multiprocessing.Pool(processes=2) # 开启两个进程

def resample_jpgs_from_log(log_file: str):
    # 获取需要处理的图片
    unvalid_jpgs: list = _get_unvalid_jpg_from_log(log_file)
    # 创建进程池,进程数为系统逻辑处理器数减一
    pool = multiprocessing.Pool(os.cpu_count() - 1)
    pool.map(_resample_jpg, unvalid_jpgs)
    # 关闭进程池(停止接收新任务到进程池)
    pool.close()
    # 等待子进程结束
    pool.join()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

 
 
 
 

进程通信

Pipe

管道是由内核管理的一个缓冲区,用于从一个进程获取信息并传递给另一个线程,其工作模式类似广播,获取信息的进程在管道中没有信息时会被阻塞,直到有新信息进入管道或者超时。
 
管道的空间有限,当管道中信息填满时,向管道传递信息的进程会被阻塞,等待有空间后再放入
 
管道可用于具有亲缘关系进程间的通信,有名管道甚至还允许无亲缘关系的进程间通信
 
注意:

  • 管道既可以被发送者关闭,也可以被接收者关闭,二者均会触发 EOFError,要即使捕捉并处理
  • 当进程(send、receive)执行完毕时,管道会被关闭掉

在这里插入图片描述

import multiprocessing
import time
from multiprocessing.connection import PipeConnection


def send_data(pipe_send: PipeConnection):
    for i in range(10):
        pipe_send.send(f"message {i}")
        print(f"*** 已发送:message {i}")
        time.sleep(0.5)


def receive_data(pipe_recv: PipeConnection):
    for i in range(10):
        recv = pipe_recv.recv()
        print(f"=== 已接收:{recv}")


def main():
    pipe_send, pipe_recv = multiprocessing.Pipe()
    process_send = multiprocessing.Process(target=send_data, args=(pipe_send,))
    process_recv = multiprocessing.Process(target=receive_data, args=(pipe_recv,))
    process_send.start()
    process_recv.start()


if __name__ == '__main__':
    main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

在这里插入图片描述

 
 
 
 

进程队列

不通的进程彼此之间可以利用管道实现数据的发送和接受,但是如果说现在发送的数据过多并且接收处理缓慢的时候,那么这种情况下就需要引入队列的形式来进行缓冲的操作实现
 
使用队列对象
from multiprocessing import Queue

 

队列的方法

方法描述
put(obj, block=True, timeout=None)将数据推入队列,若 block 为 False 则当无法推入数据时候(队列满或者队列不可用)直接抛出异常
get(block=True, timeout=None)从队列获取数据,若 block 为 False 则当无法获取数据时候(空队列或者队列不可用)直接抛出异常
qsize()获取队列保存数据个数
empty()bool,是否为空队列
full()bool,是否为满队列

 
 
 
 

subprocess

采用管道的形式去启动操作系统中的另外一个进程,并且还可以获取此进程的相关信息
 
主要方法 subprocess.Popen(),参数如下

  • args —— 要执行的 shell 命令或 命令列表
  • bufsize —— 缓冲区大小
  • stdin —— 标准输入
  • stdout —— 标准输出
  • stderr —— 错误信息
  • shell —— 是否直接执行命令
  • cwd —— 当前的工作目录
  • env —— 子进程的环境变量

 
subprocess 可以通过管道来对进程间进行通信,包括从一个进程中执行另外一个进程

 

实例:通过 subprocess 启动 python 命令行,并且执行命令

import subprocess
import time

# 启动并进入 python 解释器
py = subprocess.Popen(["python.exe"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

# 在 python 解释器中以输入流输入
py.stdin.write("import time\n".encode())
py.stdin.write("print(time.localtime())\n".encode())
py.stdin.write("print('123')\n".encode())
py.stdin.write("1/0\n".encode())
py.stdin.close()

cmd_out = py.stdout.read().decode()
py.stdout.close()
print(cmd_out)

cmd_error = py.stderr.read().decode()
py.stderr.close()
print(cmd_error)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

在这里插入图片描述

 

实例二、自动装包

import subprocess
import sys

subprocess.call(
    [sys.executable, '-m', 'pip', 'install', '-i', 'http://pypi.douban.com/simple', '--trusted-host', 'pypi.douban.com',
     'pandas'])
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

 
 
 
 

Manager

不同进程之间虽然可以通过管道的形式实现数据的传输,但是如果每一次操作都是通过管道进行处理,那么实际上就会非常麻烦,所以为了进一步简化不同进程之间的数据共享,提供了一个 Manager 类
 
Manager类中提供 dict、list 两种数据类型,供多进程进行数据共享

 

在这里插入图片描述

 

实例:多进程共同修改

import multiprocessing


def worker(list, item):
    list.append((f"item: {item}", (multiprocessing.current_process().name, item)))


def main():
    manager = multiprocessing.Manager()
    main_item = f"Manager类: {multiprocessing.current_process().name}"
    mgr_list = manager.list()

    # 开启三个进程,每个进程向 Manager 的 list 中插入一条数据
    processes = [multiprocessing.Process(target=worker, args=(mgr_list, item), name=f"name-{item}") for item in
                 range(3)]
    # 启动所有进程
    [process.start() for process in processes]
    # 阻塞主进程
    [process.join() for process in processes]

    # 在主进程中读取数据,查看是否成功被各个子进程修改
    print(mgr_list)


if __name__ == '__main__':
    main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

 
 
 
 

进程同步

Lock

当多进程加上进程锁后,被锁住的部分就成了单进程执行。只有在需要保证数据安全的前提下,才建议使用锁,否则尽量避免锁的使用
 
借助:multiprocessing.Lock()

主要方法

方法含义
acquire(blocking=False, timeout=None)请求锁
release()释放锁
import time
from multiprocessing.synchronize import Lock
import multiprocessing
import sys


def worker(ticket_dict: dict, lock: Lock):
    while True:
        # 在获取数据前就要加锁,否则依旧会有多个进程共同修改数据
        lock.acquire()
        number = ticket_dict.get("ticket")
        if number > 0:
            time.sleep(1)
            print(f"{multiprocessing.current_process().name} --- {number} ticket")
            number -= 1
            ticket_dict.update({"ticket": number})
            lock.release()
        else:
            lock.release()
            break


def main():
    manager = multiprocessing.Manager()
    mgr_dict = manager.dict({"ticket": 5})
    lock = multiprocessing.Lock()
    print(type(lock))
    # sys.exit()
    processes = [multiprocessing.Process(target=worker, args=(mgr_dict, lock), name=f"售票员:{i}") for i in range(10)]
    [process.start() for process in processes]
    [process.join() for process in processes]
    print(f"最终剩余票数:{mgr_dict.get('ticket')}")


if __name__ == '__main__':
    main()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

 
 

Semaphore

信号量,原理与多线程的信号量相同,当信号量归零前一直可以进行 acquire(),但是归零后则无法 acquire() 只有当之前的信号被 release() 之后,后续的进程才可以 acquire()

主要方法

方法含义
acquire(blocking=False, timeout=None)请求锁
release()释放锁
import time
from multiprocessing.synchronize import Lock
import multiprocessing


def worker(ticket_dict: dict, sema: Lock):
    while True:
        # 在获取数据前就要加锁,否则依旧会有多个进程共同修改数据
        sema.acquire()

        number = ticket_dict.get("ticket")
        number -= 1

        if number < 0:
            sema.release()
            break
        ticket_dict.update({"ticket": number})
        time.sleep(5)
        print(f"{multiprocessing.current_process().name} --- {number} ticket")

        sema.release()


def main():
    manager = multiprocessing.Manager()
    mgr_dict = manager.dict({"ticket": 5})
    sema = multiprocessing.Semaphore(3)
    print(type(sema))
    # sys.exit()
    processes = [multiprocessing.Process(target=worker, args=(mgr_dict, sema), name=f"售票员:{i}") for i in range(10)]
    [process.start() for process in processes]
    [process.join() for process in processes]
    print(f"最终剩余票数:{mgr_dict.get('ticket')}")


if __name__ == '__main__':
    main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

 
 

Event

通过 Event 对象来进行多进程之间的数据同步,各个线程通过判断 flag 来进行程序控制。一般用于有前后承接关系的场景,前一个任务完成后,之后的任务才能进行

 

import multiprocessing
import time


def customer(event):
    print(f"正在点餐")
    time.sleep(1)
    print("点餐结束")
    event.set()


def waiter(event):
    event.wait()
    print("正在服务")
    time.sleep(1)
    print("服务完成")


def main():
    event = multiprocessing.Event()
    process_customer = multiprocessing.Process(target=customer, args=(event,))
    process_waiter = multiprocessing.Process(target=waiter, args=(event,))
    process_customer.start()
    process_waiter.start()


if __name__ == '__main__':
    main()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

 
 

Barrier

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/181289?site
推荐阅读
相关标签
  

闽ICP备14008679号