当前位置:   article > 正文

多线程并发执行 + 多进程并行执行_python并行计算

python并行计算

文章目录


在这里插入图片描述

一、多线程 + 多进程(必读)

# 线程(Thread)				:是进程中的一个执行单元(可以理解为执行流)
# 多线程(Multithreading)	:同时运行多个线程(在一个进程中)
#
# 进程(Process)			:是操作系统中的一个执行单元
# 多进程(Multiprocessing)	:同时运行多个独立的进程
  • 1
  • 2
  • 3
  • 4
  • 5
多线程(Multithreading)多进程(Multiprocessing)
适用领域并发任务并行任务
内存内存共享线程切换开销小,因为所有线程共享同一地址空间,不需要进行上下文切换。)内存独立进程切换开销大,因为每个进程都拥有独立的地址空间,导致数据不共享,故切换时需要保存和恢复整个进程的上下文。进程间需要通过通信(Inter-process Communication,IPC)来传递数据)
时耗创建和销毁线程的时间开销小创建和销毁进程的时间开销大
任务IO密集型任务(任务执行的主要时间都在读写操作上)CPU密集型任务(任务执行的主要时间都在数值计算上)
应用1、文件读写;2、网络通信(下载文件、发送与接收网络请求);3、数据库操作(查询数据库、读写数据);4、图像处理(加载图像、保存图像)1、数值计算(大数据分析、统计、计算);2、图像处理(渲染图像)

1.1、(时耗问题)线程或进程:创建 + 切换 + 销毁

(线程)创建和销毁线程的时间开销小,(进程)创建和销毁进程的时间开销大

  • 线程 - 创建时间:较快,尤其是在同一个进程内、内存需要共享时。【数毫秒,十几毫秒】
  • 进程 - 创建时间:较慢,尤其是当涉及内存复制、系统调用时。【几十毫秒,几百毫秒或更长时间】

1.2、(时耗问题)线程或进程:传递参数

内存共享(多线程)

  • 内存占用:多个线程之间的数据共享(同一个地址空间),不需要为每个线程复制数据,只需要传递指向数据的指针或引用。适用于需要数据量较大或传递频繁的情况。
  • 参数的传递时间:由于参数传递只涉及指针或引用的复制,传递时间很短,几乎可以忽略不计。这使得在多线程编程中,数据共享和通信成本较低,可以更高效地完成任务。

内存独立(多进程)

  • 内存占用:多个进程之间的数据独立(独立的地址空间),因此在参数传递时需要复制数据到新的内存地址,每个进程独立维护一份数据副本导致消耗更多的内存,特别是在数据量较大或传递频繁时。
  • 参数的传递时间:由于需要复制数据到新的内存空间,传递时间相比多线程会稍长一些。尤其是在数据量大或频繁传递时,会有显著的传递成本。

1.3、Python存在全局解释器锁(GIL)

Python 解释器(CPython)在执行 Python 代码时,会存在全局解释器锁(Global Interpreter Lock,GIL)。

  • GIL 是一种线程执行的控制机制。
  • 优点:简化了解释器的实现和内存管理,且减少了线程安全问题。
  • 缺点:限制了Python多线程的并行能力。

GIL(全局解释器锁)只存在于 CPython 解释器中,而其他一些 Python 解释器没有 GIL 的限制(如 Jython、IronPython 和 PyPy 等)。

  • Jython:Jython 是一个在 Java 平台上运行的 Python 解释器,它直接利用了 Java 的线程模型,因此不受 GIL 的限制。在 Jython 中,Python 代码被转换为 Java 字节码并在 Java 虚拟机(JVM)上执行,可以充分利用 JVM 的多线程机制。
  • IronPython:IronPython 是一个在 .NET 平台上运行的 Python 解释器,类似地,它利用了 .NET 平台的线程模型,而不受 GIL 的影响。
  • PyPy:PyPy 是一个高性能的 Python 解释器,它通过即时编译(JIT)等技术提高了性能。PyPy 采用了一种称为 STM(Software Transactional Memory)的机制来管理并发,与 GIL 不同,STM 允许多个线程同时访问共享数据,并通过事务来确保线程安全。

1.3.1、GIL限制了Python多线程的并行能力

深入浅出:Python内存管理机制

  • 为什么GIL限制了Python多线程的并行能力?
    • (1)Python 的内存管理机制是基于引用计数的,即每个对象都有一个计数器来跟踪对它的引用次数。当引用计数归零时,对象被销毁并释放内存。
    • (2)引用计数的增减需要在多线程环境下进行原子操作,以确保线程安全。 原子操作是一种不可中断的操作,它要么全部执行成功,要么全部不执行。
    • (3)若在增减引用计数的过程中发生了线程切换,可能会导致引用计数不正确,从而引发内存泄漏或释放错误的内存空间,导致程序出现异常行为。
    • (4)GIL 保证了在 Python 解释器中的任何时刻,只有一个线程能够执行 Python 字节码指令GIL确保了引用计数的正确性,但也导致了多线程程序无法充分利用多核处理器的并行计算能力。

1.3.2、GIL导致了多线程与多进程适用于不同任务

  • 多线程与多进程适用于不同任务的原因?
    • (1)在 CPU 密集型任务上,由于 GIL 的存在,多进程可以实现并行任务,但多线程不可以。
      • 在 CPU 密集型任务上,由于 GIL 的存在,多线程无法利用多核 CPU 实现真正的并行执行。虽然在操作系统层面上多个线程是并发执行的,但在 Python 解释器内部,即使在多核 CPU 上运行多个线程,同一时刻也只能利用单个 CPU 核心,即只能有一个线程在解释器中执行 Python 代码,其他线程需要等待 GIL 的释放才能执行,因此多线程无法充分利用多核 CPU 的计算能力。
      • 在 CPU 密集型任务上,由于 GIL 的存在,多进程可以利用多核 CPU 实现真正的并行执行。在多进程中,每个进程都有独立的 Python 解释器和内存空间,因此不存在 GIL 的限制,多个进程可以同时在多个 CPU 核心上执行任务,实现真正的并行执行。
    • (2)在 I/O 密集型任务上,多线程可以实现并发任务。 由于线程在等待 I/O 操作完成时会释放 GIL,因此 GIL 并不会对性能造成太大影响。

1.3.3、多线程【并发任务】:在同一时间段内,交替执行多个任务(不是同时执行)

  • 并发任务(Concurrent Tasks)在同一时间段内,交替执行多个任务(不是同时执行)。
    • 核心:任务之间的交替执行 + 内存共享
      • 单核处理器:通过操作系统的上下文切换机制,在不同任务之间快速切换执行,实现任务的并发执行
      • 多核处理器:在多个核心上同时执行不同的任务,从而实现更高的整体性能和资源利用率。
    • 优点:
      • 提高系统的响应能力:通过并发处理,系统可以同时处理多个任务,减少了任务等待的时间,从而提高了系统的响应速度。
      • 更高的资源利用率:在 I/O 密集型任务中,由于线程可以在等待 I/O 操作完成的同时执行其他任务,可以充分利用 CPU 的空闲时间,提高系统的资源利用率。

1.3.4、多进程【并行任务】:在同一时刻,同时执行多个任务(各自独立执行)

  • 并行任务(Parallel Tasks)在同一时刻,同时执行多个任务(各自独立执行)。
    • 核心:任务之间的同时执行 + 性能提升
      • 在多核处理器上,不同的任务可以在不同的核心上并行执行
    • 优点:
      • 提高计算速度:将任务分配给多个处理单元同时执行,从而加速任务的完成速度。特别是在处理 CPU 密集型任务时,利用多核处理器的并行计算能力可以大大提高计算速度。
      • 更高的吞吐量:通过并行处理多个任务,系统可以在单位时间内处理更多的工作量,提高了系统的整体吞吐量。

在这里插入图片描述

1.3.5、多线程【线程切换】 —— 不需要切换内存空间

线程切换(Thread Switching):是指操作系统将 CPU 执行权从一个线程转移到另一个线程的过程。在多线程环境中,一个进程可以包含多个线程,每个线程可以被认为是独立的执行流。线程切换可以在同一个进程内进行,也可以跨进程进行。

线程切换是上下文切换的一种特例。

线程切换的步骤:

  • 保存当前线程状态:将当前线程的状态(如寄存器、程序计数器等)保存到线程控制块(Thread Control Block, TCB)中。
  • 选择下一个线程:操作系统根据调度算法选择下一个要执行的线程。
  • 恢复下一个线程状态:从选定线程的线程控制块中恢复其状态。
  • 切换到下一个线程:将 CPU 执行权转移到选定的线程。

1.3.6、多进程【上下文切换】 —— 需要切换内存空间

上下文切换(Context Switching):是指操作系统将 CPU 执行权从一个进程转移到另一个进程的过程。上下文切换比线程切换开销更大,因为进程拥有独立的内存空间、资源和状态信息,而线程共享同一进程的资源。

上下文切换(进程级)的步骤:

  • 保存当前进程状态:保存当前进程的 CPU 寄存器、程序计数器和内存管理信息到进程控制块(PCB)。
  • 选择下一个进程:根据调度算法选择下一个要执行的进程。
  • 加载下一个进程状态:从下一个进程的 PCB 中恢复其状态,包括内存映射。
  • 切换到下一个进程:CPU 执行权转移到下一个进程。
  • 上下文切换
    • (1)在任务执行过程中,操作系统因为一些事件的发生(如:时间片用完、外部中断、任务主动让出CPU等)而暂停当前任务的执行,并切换到另一个任务执行。
    • (2)在任务切换过程中,操作系统将当前任务的执行状态(上下文)保存到内存中,并加载下一个任务的执行状态,使得下一个任务能够继续执行。

上下文切换的关键:上下文的保存和恢复。

  • 上下文(Context)当前任务执行时的环境和状态信息。
    在操作系统中,上下文通常指的是进程的执行状态,包括但不限于以下内容:
    • 寄存器状态:程序计数器(PC)、通用寄存器(如eax、ebx等)、堆栈指针(SP)等。这些寄存器存储了程序当前的执行位置、变量的值等信息。
    • 内存映像:程序的内存布局和相关的数据。这包括了程序的代码、堆、栈以及全局变量等数据。
    • 进程或线程的状态:进程或线程的优先级、状态(运行、就绪、阻塞等)、所属的进程组等。
    • 打开的文件描述符:如果程序正在访问文件或其他资源,上下文中可能包括打开的文件描述符以及相应的文件指针等信息。

二、CPU处理器

CPU处理器的参数解析:12th Gen Intel( R ) Core( TM ) i7-12700 2.10 GHz

  • 12th Gen Intel(R) Core(TM) :表示英特尔第 12 代 Core 处理器
  • i7-12700:表示处理器属于i7系列,具体型号为12700
  • 2.10 GHz:表示处理器的时钟频率
  • 架构: 使用了Intel的Alder Lake架构。
  • 核心数: i7-12700有12个核心。
  • 线程数: 具有24个线程,支持超线程技术(每个核心可以同时执行两个线程)。

2.1、CPU处理器的核心:物理内核 + 逻辑内核

CPU核心数:由处理器和操作系统决定

  • 处理器(硬件)
    • 物理内核(Physical Cores)实际的处理器核心
      • 现代计算机通常配备有多核心的处理器,即在一个物理处理器芯片上集成了多个独立的处理器核心,可以并行地执行指令,从而提高处理器的整体性能和并发能力。
      • 每个物理核心都是一个独立的、实际存在的硬件单元,具有自己的执行单元、缓存和执行流水线。
    • 逻辑内核(Logical Cores)通过超线程技术(Hyper-Threading)实现的虚拟核心
      • 超线程允许一个物理内核模拟两个逻辑内核,实现在同一时间执行两个线程。
      • 逻辑内核共享物理内核的执行单元、缓存和执行流水线,但它们可以独立地执行指令流。

举例:计算机配备了四核处理器(4个物理内核),每个核心都支持超线程(8个逻辑内核)。因此,可以实现8个线程的并行处理。

  • 操作系统(软件)
    • 线程的分配和管理:决定哪些线程在哪个核心上运行
    • 支持的最大线程数:不同操作系统对支持的最大线程数有限制,这取决于其设计和内核管理能力。

2.2、获取CPU处理器的逻辑内核数:os.cpu_count()

import psutil

# Ture:显示逻辑内核、False:显示物理内核
print("CPU处理器的逻辑内核数量:", psutil.cpu_count(logical=True))  # CPU处理器的逻辑内核数量: 20
print("CPU处理器的物理内核数量:", psutil.cpu_count(logical=False))  # CPU处理器的物理内核数量: 12
  • 1
  • 2
  • 3
  • 4
  • 5
import os

print("CPU处理器的逻辑内核数量:", os.cpu_count())
# CPU处理器的逻辑内核数量: 20
  • 1
  • 2
  • 3
  • 4
import multiprocessing as mp

print("CPU处理器的逻辑内核数量:", mp.cpu_count())
# CPU处理器的逻辑内核数量: 20
  • 1
  • 2
  • 3
  • 4

2.3、设置CPU处理器的逻辑内核数:max_workers=os.cpu_count()

(1)若指定,则max_workers=20 / 40 / os.cpu_count() / 1000
(2)若不指定,则默认使用CPU处理器支持逻辑内核的最大数量(如:max_workers=os.cpu_count()

  • 线程数可以无限指定,但逻辑内核数决定了最大可以同时处理的线程数量
  • 若线程数超过逻辑核心数,则通过调度策略来决定哪些线程能够运行。 选择硬件支持的最大逻辑内核数并行执行,而超出部分将处于等待状态(阻塞),直到有空闲的逻辑内核可供使用这种情况将导致线程等待时间较长,影响系统的响应速度和效率。

三、经验分享(必读)

3.1、(错误)使用案例

  • (案例一)已知:若创建每个线程为毫秒级,创建每个进程为 3 ~ 5 秒,而每个线程的执行时间为 3 秒。则线程是并行执行,而进程是串行执行;

  • (案例二)已知:当数据装载器 A 启用多线程时:for a in A,若(从 A 中获取 a 的耗时)小于(每个线程的执行时间),则多线程并行;反之串行执行。**

3.2、在不超过RAM的基础上,如何设置逻辑内核的最大值?

以多线程为例

  • 需求:循环计算函数100次(每次输入参数不同)
  • 已知:系统内存RAM = 64GB,读取图像的内存损耗 = 6GB(只读取一次),每次计算图像的内存损耗 = 7.81GB(循环计算)
  • 计算逻辑内核的最大数量 = (64 - 6 - 系统内存) / 7.81GB = 7.42。即每轮最多并行计算7个循环max_workers = 7,否则将异常显示:内存分配不足。

备注:每个循环对应的内存损耗都是不同的(函数的输入参数不同导致函数内部计算所损耗的内存不同)。故在指定max_workers时,必须以100次循环中的最大内存损耗为基准线(如:13.81GB)。

3.3、当线程数大于逻辑核心数,使用调度策略来决定哪些线程能够运行

已知:假设指定 100 个线程,但系统只有 20 个逻辑核心。

  • 逻辑核心数表示处理器可以同时处理的线程数量
  • 若线程数超过逻辑核心数,则通过调度策略来决定哪些线程能够运行。 选择硬件支持的最大逻辑内核数并行执行,而超出部分将处于等待状态(阻塞),直到有空闲的处理器可供使用这种情况将导致线程等待时间较长,影响系统的响应速度和效率。
  • 根据线程的优先级、状态(如:运行中、等待中)、以及核心的可用性等,动态地增加或减少线程数,或者重新分配线程到不同的物理核心上。
  • 优先级:高优先级的线程会在低优先级线程之前执行。
  • 操作系统可以在任何时候中断低优先级线程,并转而执行高优先级线程。

3.3.1、时间片轮转 - 调度(Round Robin - Scheduling)

时间片轮转(Round Robin - Scheduling)是一种调度算法,主要用于操作系统的多任务处理。它通过将CPU时间划分为小的时间片,轮流分配给各个任务,确保系统的公平性和响应性。

公平性:每个任务都可以定期获得CPU时间,不会出现某个任务长期占用CPU的情况。

时间片轮转的工作流程:

  • (1)任务初始化 + 时间片分配(Time Slicing):将所有需要执行的任务(或线程)放入就绪队列,并分配固定长度的时间片(比如10毫秒或100毫秒)。
    • 时间片长度:决定了每个任务(或线程)能在CPU上连续执行的时间。
    • 时间片过短:将导致频繁的上下文切换,增加系统开销。
    • 时间片过长:将导致响应时间变长,不适合实时系统。
  • (2)任务执行(核心分配)选择就绪队列中的前 N 个任务(或线程),并让他们同时在各自的时间片内执行。" 同时 " 是指每个任务(或线程)在一个逻辑核心上运行。
  • (3)线程切换:每个时间片结束后,操作系统会进行线程切换。
    • 若线程的任务执行结束 ,则 选择就绪序列中的下一个任务(或线程) 将占用这个核心。
    • 若线程的任务执行未结束
      • 保存当前线程状态操作系统将中断当前执行的任务(或线程),并保存线程的状态(包括程序计数器、寄存器、和内存状态等)
      • 调度下一个线程将当前任务放回就绪队列的末尾,然后选择队列中的下一个任务(或线程)若任务具有不同的优先级,操作系统会优先选择高优先级的任务执行。
      • 恢复任务的状态:恢复下一个任务的状态,让它在新的时间片内继续执行。
  • (4)循环执行:重复上述步骤,直到所有任务(或线程)执行完毕。
import threading
import time
import queue


# 定义任务函数
def task(task_id, duration):
    print(f"Task {task_id} is running")
    time.sleep(duration)
    print(f"Task {task_id} is finished")


# 创建任务队列
task_queue = queue.Queue()
num_tasks = 100
time_slice = 0.1  # 时间片长度

# 向任务队列中添加任务
for i in range(num_tasks):
    task_queue.put((i, time_slice))

# 初始化线程池
threads = []
num_cores = 20


# 定义调度函数
def scheduler():
    while not task_queue.empty():
        for _ in range(num_cores):
            if not task_queue.empty():
                task_id, duration = task_queue.get()
                thread = threading.Thread(target=task, args=(task_id, duration))
                threads.append(thread)
                thread.start()

        # 等待所有线程完成
        for thread in threads:
            thread.join()
        threads.clear()


# 启动调度器
scheduler()

"""
Task 0 is running
Task 1 is running
Task 2 is running
Task 3 is running
Task 4 is running
Task 5 is running
Task 6 is running
Task 7 is running
Task 8 is running
Task 9 is running
Task 7 is finished
Task 5 is finishedTask 4 is finished
Task 2 is finishedTask 1 is finished
Task 9 is finished
Task 3 is finished
Task 0 is finished

Task 8 is finished
Task 6 is finished
"""
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66

3.3.2、优先级调度(Priority - Scheduling)

优先级调度(Priority - Scheduling)是一种调度算法,其中每个任务(或线程)都分配了一个优先级。操作系统根据优先级来决定任务的执行顺序,优先级越高的任务会优先执行。

优先级调度的工作流程:

  • (1)任务初始化:将所有任务放入就绪队列,并分配优先级。
  • (2)任务执行:从就绪队列中选择优先级最高的任务执行。若有多个任务具有相同的优先级,可以采用先来先服务(FCFS)的方式选择任务。选中的任务在一个时间片内执行。
  • (3)线程切换:每个时间片结束后,操作系统会进行线程切换。若任务未完成,则保存任务状态,并将其放回就绪队列。
  • (4)循环执行:重复上述步骤,直到所有任务(或线程)执行完毕。

优先级调度策略有两种方式:

  • 抢占式调度(Preemptive):如果一个高优先级任务到达,它可以中断正在执行的低优先级任务。
  • 非抢占式调度(Non-preemptive):一旦任务开始执行,它将一直执行到完成,即使有更高优先级的任务到达。
import threading
import queue
import time

class Task:
    def __init__(self, task_id, priority, duration):
        self.task_id = task_id
        self.priority = priority
        self.duration = duration

    def run(self):
        print(f"Task {self.task_id} with priority {self.priority} is running")
        time.sleep(self.duration)
        print(f"Task {self.task_id} with priority {self.priority} is finished")

# 创建优先级队列
task_queue = queue.PriorityQueue()

# 向优先级队列中添加任务
tasks = [
    Task(1, 1, 2),
    Task(2, 3, 1),
    Task(3, 2, 1),
    Task(4, 4, 1),
    Task(5, 5, 2)
]

for task in tasks:
    # 优先级队列按优先级排序,优先级值小的任务优先执行
    task_queue.put((task.priority, task))

# 初始化线程池
threads = []

# 定义调度函数
def scheduler():
    while not task_queue.empty():
        priority, task = task_queue.get()
        thread = threading.Thread(target=task.run)
        threads.append(thread)
        thread.start()
        
        # 等待所有线程完成
        for thread in threads:
            thread.join()
        threads.clear()

# 启动调度器
scheduler()

"""
Task 1 with priority 1 is running
Task 1 with priority 1 is finished
Task 3 with priority 2 is running
Task 3 with priority 2 is finished
Task 2 with priority 3 is running
Task 2 with priority 3 is finished
Task 4 with priority 4 is running
Task 4 with priority 4 is finished
Task 5 with priority 5 is running
Task 5 with priority 5 is finished
"""
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

3.4、获取系统中所有进程的线程数量(任务管理器中当前正在执行的进程)

在这里插入图片描述

import psutil


def get_all_processes_thread_count():
    """获取系统中所有进程的线程数量"""
    total_threads = 0
    for proc in psutil.process_iter(['pid', 'name', 'num_threads']):
        try:
            info = proc.info
            total_threads += info['num_threads']
            print(f"PID={info['pid']}, Name={info['name']}, Threads={info['num_threads']}")
        except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
            pass
    return total_threads


def get_process_thread_count(pid):
    """获取特定PID的进程线程数量"""
    try:
        proc = psutil.Process(pid)
        return proc.num_threads()
    except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
        return None


if __name__ == '__main__':
    # (1)获取系统中所有进程的总线程数量
    total_threads = get_all_processes_thread_count()
    print(f"Total threads in the system: {total_threads}")

    # (2)获取特定PID的进程线程数量
    pid = 34544
    process_threads = get_process_thread_count(pid)
    if process_threads is not None:
        print(f"Process with PID={pid} has {process_threads} threads")
    else:
        print(f"Process with PID={pid} not found or access denied")
"""
部分打印结果(与任务管理器的详细信息中的数量一致)

PID=22840, Name=WeChatPlayer.exe, Threads=12
PID=22916, Name=WeChatAppEx.exe, Threads=11
PID=22952, Name=WeChatAppEx.exe, Threads=100
PID=23208, Name=WeChatUtility.exe, Threads=52
"""
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

四、函数详解

4.1、线程 - 自动管理

  • (1)threading模块multiprocessing模块
    • 作用:提供底层线程控制和丰富的同步原语,需手动管理线程
    • 适合:需要精细控制的多线程 / 多进程编程,且需要手动处理线程 / 进程的创建和销毁。
      • Python 标准库中没有名为 multithreading 的模块。多线程编程一般使用 threading 模块。
      • Python 标准库中没有名为 processing 的模块。多进程编程一般使用 multiprocessing 模块。
  • (2)concurrent.futures.ThreadPoolExecutorconcurrent.futures.ProcessPoolExecutor
    • 作用:提供高级的线程池自动管理
    • 适合:需要并行执行大量独立任务,且更关注任务的结果而不是线程/进程本身。

4.1.1、线程池执行器:concurrent.futures.ThreadPoolExecutor()

"""#############################################################################################
# 函数功能:线程池执行器,用于管理和调度线程池中的线程执行异步任务。
# 函数说明:executor = concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
# 参数说明:
#         max_workers:可选参数,表示线程池中的最大工作线程数。默认为 None,表示根据系统情况自动决定线程数。
#         thread_name_prefix:可选参数,表示线程名称的前缀。默认为空字符串。
# 返回值:
#         executor:表示线程池执行器对象,用于管理和调度线程池中的线程执行异步任务。
#############################################################################################"""

使用方式:
    """(1)executor.submit(): 提供单个任务"""
            作用:将所有参数只应用一次到函数,完成并行化计算。
            返回:一个concurrent.futures.Future对象,可以用于获取任务的执行结果。
            """常与 for () 连用执行并行任务,常与 concurrent.futures.as_completed() 连用获取结果。"""
            
    """(2)executor.map(): 多个任务并行计算"""
            作用:将列表参数循环应用到函数,完成并行化计算。
            返回:一个map()迭代器,可以用于迭代获取每个任务的结果。

    """(3)executor.shutdown(): 等待所有任务完成,并关闭进程池"""
    若不调用,程序可能会在所有任务完成之前提前结束,导致一些任务未能执行完毕。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

4.1.2、进程池执行器:concurrent.futures.ProcessPoolExecutor()

  • 多线程共享内存,多进程独立内存:若需要在多个进程之间共享数据,确保使用适当的同步机制,以防止数据竞争和其他并发问题。
  • 性能测试: 在使用多进程之前,建议对单次循环进行性能测试,以确定是否有优化的空间。有时候,优化循环内的算法比并行计算更有效。
"""#############################################################################################
# 类功能:进程池执行器,用于管理和调度进程池中的进程执行异步任务。
# 类说明:executor = concurrent.futures.ProcessPoolExecutor(max_workers=None)
# 参数说明:
#         max_workers:可选参数,表示进程池中的最大工作进程数。默认为 None,表示根据系统情况自动决定进程数。
# 返回值:
#         executor:表示进程池执行器对象,用于管理和调度进程池中的进程执行异步任务。
#############################################################################################"""

使用方式:
    """(1)executor.submit(): 提供单个任务"""
            作用:将所有参数只应用一次到函数,完成并行化计算。
            返回:一个concurrent.futures.Future对象,可以用于获取任务的执行结果。
            """常与 for () 连用执行并行任务,常与 concurrent.futures.as_completed() 连用获取结果。"""
            
    """(2)executor.map(): 多个任务并行计算"""
            作用:将列表参数循环应用到函数,完成并行化计算。
            返回:一个map()迭代器,可以用于迭代获取每个任务的结果。

    """(3)executor.shutdown(): 等待所有任务完成,并关闭进程池"""
    若不调用,程序可能会在所有任务完成之前提前结束,导致一些任务未能执行完毕。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

4.1.3、异步执行:executor.submit()

"""#############################################################################################
# 函数功能:在 Executor 对象中提交可调用对象以异步执行。
# 函数说明:future = executor.submit(fn, *args, **kwargs)
# 参数说明:
#         executor:表示 Executor 对象,可以是 ThreadPoolExecutor 或 ProcessPoolExecutor。
#         fn:可调用对象,表示要在异步任务中执行的函数或方法。
#         *args:可选参数,表示要传递给 fn 函数的位置参数。
#         **kwargs:可选参数,表示要传递给 fn 函数的关键字参数。
# 返回值:
#         future:表示异步任务的 Future 对象,用于获取异步任务的执行结果或状态。
#############################################################################################"""
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

4.1.4、异步执行:executor.map()

"""#############################################################################################
# 函数功能:在 Executor 对象中映射可调用对象以异步执行。
# 函数说明:results_iterator = executor.map(func, *iterables, timeout=None, chunksize=1)
# 参数说明:
#         executor:表示 Executor 对象,可以是 ThreadPoolExecutor 或 ProcessPoolExecutor。
#         func:可调用对象,表示要在异步任务中执行的函数或方法。
#         *iterables:表示要迭代的可迭代对象,包含了传递给 func 函数的参数。如果有多个可迭代对象,则 func 将从每个可迭代对象中依次获取参数。
#         timeout:可选参数,表示等待结果的超时时间(以秒为单位)。如果在指定的时间内没有返回结果,则会引发 TimeoutError。默认为 None,表示不设置超时时间。
#         chunksize:可选参数,表示每个任务的数据块大小。默认为 1,表示一个任务处理一个迭代器中的一个元素。
# 返回值:
#         results_iterator:表示异步任务结果的迭代器,用于获取异步任务的执行结果。
#############################################################################################"""
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

4.1.5、异步返回结果:concurrent.futures.as_completed()

  • 在异步编程中,通常会创建多个异步任务(比如使用asyncio模块或者其他异步框架),这些任务会在后台执行,不会阻塞主程序的执行。
  • as_completed 函数接受一组 Future 对象(表示异步任务的结果)作为输入,然后在这些任务完成时返回结果(先完成,先返回),而不会等待所有任务都完成。
"""#############################################################################################
# 函数功能:在迭代器中异步返回已完成的 Future 对象,而不会等待所有任务都完成。
# 函数说明:concurrent.futures.as_completed(futures, timeout=None)
# 参数说明:
#         futures:一个 Future 对象的可迭代容器,表示要等待的异步任务。
#         timeout:可选参数,表示等待结果的超时时间(以秒为单位)。如果在指定的时间内没有返回结果,则会引发 TimeoutError。默认为 None,表示不设置超时时间。
# 返回值:
#         返回一个生成器,用于异步返回已完成的 Future 对象。
#############################################################################################"""
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

4.1.6、关闭进程池:executor.shutdown()

"""#############################################################################################
# 函数功能:优雅地关闭 Executor 对象,不再接受新的任务并等待已提交任务的完成。
# 函数说明:executor.shutdown(wait=True, cancel_futures=False)
# 参数说明:
#         wait:可选参数,布尔值,默认为 True。表示是否等待所有已提交的任务完成后再返回。如果设置为 False,则立即返回,不等待已提交的任务完成。
#         cancel_futures:可选参数,布尔值,默认为 False。表示是否取消尚未开始执行的任务。如果设置为 True,则取消所有尚未开始执行的任务。
# 返回值:
#         无返回值。
#############################################################################################"""
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

4.1.7、多个线程是如何并发执行?

  • 没有显示调用executor.shutdown()

  • 已知:现需要循环N(N >> 20)次任务,每次提交(线程数 = 20)任务到线程池。则线程的使用情况如下:

    • 线程重用: 当一个线程完成一个任务后,将被线程池用来执行下一个任务,而不是被销毁。该方法避免了频繁地创建和销毁线程,提高性能。

    • 任务并发执行:若单个任务的执行时间很短,则在某个时刻内可能有多个线程同时执行多个不同的任务,且同时返回多个不同的结果。

    • 任务重复执行:若某些线程(20个线程中的部分线程)已完成任务,但主线程仍在循环中提交新的任务,则这些线程将被用来执行新的任务,直到任务结束。

  • 显示调用executor.shutdown():等待线程池中的所有线程都完成任务,然后再继续执行主线程。

    • 在某一个批次中:某些线程提前完成了任务,它们会等待其余线程全部完成任务后,再一起投入到下一个批次的任务。
    • 最后一个批次中:已完成任务的线程会等待其余线程全部结束;

4.2、线程 - 手动管理

  • (1)threading模块multiprocessing模块
    • 作用:提供底层线程控制和丰富的同步原语,需手动管理线程
    • 适合:需要精细控制的多线程 / 多进程编程,且需要手动处理线程 / 进程的创建和销毁。
      • Python 标准库中没有名为 multithreading 的模块。多线程编程一般使用 threading 模块。
      • Python 标准库中没有名为 processing 的模块。多进程编程一般使用 multiprocessing 模块。
  • (2)concurrent.futures.ThreadPoolExecutorconcurrent.futures.ProcessPoolExecutor
    • 作用:提供高级的线程池自动管理
    • 适合:需要并行执行大量独立任务,且更关注任务的结果而不是线程/进程本身。

4.2.1、多线程:使用 threading 模块

import threading
import time


def target_function():
    print("线程开始执行...")
    time.sleep(2)
    print("线程执行结束.")


def main():
    """(1)Thread 类:用于创建和管理线程。"""
    thread = threading.Thread(target=target_function, daemon=True)  # 创建线程对象,并设置守护标志为 True
    thread.start()  # 启动线程
    thread.join(timeout=None)  # 等待线程结束。可选的 timeout 参数表示最长等待时间。
    is_alive = thread.is_alive()  # 断线程是否存活(即是否处于活动状态)。
    name = thread.name  # 获取线程的名称
    ident = thread.ident  # 获取线程的标识符
    # thread.daemon = True  # 线程的守护标志,设置为 True 时表示该线程是守护线程。

    """(2)同步原语"""
    lock = threading.Lock()  # 创建互斥锁,用于控制对共享资源的访问。
    lock = threading.RLock()  # 创建重入锁,允许同一线程多次请求锁。
    lock.acquire()  # 获取锁
    lock.release()  # 释放锁
    condition = threading.Condition()  # 创建条件变量,用于线程间的通信和同步。
    semaphore = threading.Semaphore(value=1)  # 创建信号量,用于控制对有限资源的访问。
    event = threading.Event()  # 创建事件,用于线程间的通信,一个线程发出信号,其他线程等待信号。

    """(3)其他方法"""
    active_threads = threading.enumerate()  # 返回当前活动的线程列表
    active_thread_count = threading.active_count()  # 返回当前活动的线程数量
    threading.stack_size(1024 * 1024)  # 设置线程堆栈大小(仅在创建新线程时有效)。


if __name__ == '__main__':
    main()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

4.2.2、多进程:使用 multiprocessing 模块

import multiprocessing
import time


def target_function(index):
    print("进程开始执行...")
    time.sleep(2)
    print("进程执行结束.")
    return index


def process_function():
    process = multiprocessing.Process(target=target_function, args=(0,), daemon=True)  # 创建进程对象,并设置守护标志为 True
    process.start()  # 启动进程
    process.join(timeout=None)  # 等待进程结束。可选的 timeout 参数表示最长等待时间。
    is_alive = process.is_alive()  # 断进程是否存活(即是否处于活动状态)。
    name = process.name  # 获取进程的名称
    ident = process.ident  # 获取进程的标识符
    print(f"进程名称: {name}, 进程ID: {ident}, 是否存活: {is_alive}")


def pool_function():
    # Pool 类:用于管理进程池。
    with multiprocessing.Pool(processes=4) as pool:  # 创建进程池对象
        results = pool.map(target_function, range(5))  # map(func, iterable[, chunksize]): 将函数应用于可迭代对象的每个元素,并返回结果列表。
        result = pool.apply(target_function, (0,))  # apply(func[, args[, kwds]]): 在进程池中同步执行函数,并返回结果。
        pool.close()  # 关闭进程池,不再接受新的任务。
        pool.join()  # 等待所有进程完成


def main():
    process_function()
    pool_function()
    """
    在主进程中,multiprocessing.Process 和 multiprocessing.Pool 这两种方式不兼容且是互斥的。
            multiprocessing.Process 用于启动单个进程,
            multiprocessing.Pool 用于管理一个进程池。
    """
    
    # 同步原语
    lock = multiprocessing.Lock()  # 创建互斥锁,用于控制对共享资源的访问。
    lock.acquire()  # 获取锁
    lock.release()  # 释放锁
    semaphore = multiprocessing.Semaphore(value=1)  # 创建信号量。用于控制对有限资源的访问。
    event = multiprocessing.Event()  # 创建事件。用于进程间的通信,一个进程发出信号,其他进程等待信号。

    # 其他方法
    current_process = multiprocessing.current_process()  # 返回当前进程对象
    active_children = multiprocessing.active_children()  # 返回当前活动的子进程列表


if __name__ == '__main__':
    multiprocessing.freeze_support()
    main()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

五、项目实战

5.1、线程 - 自动管理

5.1.1、基本使用:executor.submit() + executor.map()

import concurrent.futures


def add(args):
    a, b = args
    return a + b


def main():
    flag = 0
    futures = []
    results = []  # 存储结果的列表
    # with concurrent.futures.ThreadPoolExecutor() as executor:
    with concurrent.futures.ProcessPoolExecutor() as executor:
        if flag:
            """方法一:使用executor.submit()"""
            for i in range(10):
                future = executor.submit(add, args=(1, 2))
                futures.append((future, i))
            ##########################################################################################################
            # (1)futures 列表保存了所有提交的任务及其相关信息;
            # (2)[fut[0] for fut in futures] 是一个列表生成式,从 futures 列表中提取出所有的 Future 对象;
            # (3)concurrent.futures.as_completed 是一个生成器,返回已经完成的 Future 对象。使用 as_completed 可以按完成的顺序处理任务,而不是按提交的顺序。
            ##########################################################################################################
            for future in concurrent.futures.as_completed([fut[0] for fut in futures]):
                result = future.result()  # 获取每个已完成任务的结果。
                results.append(result)  # 将结果添加到列表中
            print(results)
        else:
            """方法二:使用executor.map()"""
            args = [(3, 4), (3, 4), (3, 4)]  # 将参数打包成可迭代对象
            future = executor.map(add, args)
            results = list(future)  # 使用list()将迭代器转换为列表
            print(results)

        """executor.shutdown()方法没有被调用,而是利用了with语句来自动管理进程池的生命周期,在with块结束时自动关闭进程池。"""
        # executor.shutdown()  # 关闭进程池


if __name__ == '__main__':
    main()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

5.1.2、异步执行 + 异步返回结果 + 同步返回结果:executor.submit() + concurrent.futures.as_completed()

executor.shutdown()方法没有被调用,而是利用了with语句来自动管理进程池的生命周期,在with块结束时自动关闭进程池。

import concurrent.futures


def process_target_gray(gray_value, param2, param3):
    return gray_value, param2+param3


def main(gray_value, param2_values, param3_values):
    """使用 ThreadPoolExecutor 创建线程池并并行执行任务"""
    futures = []
    # with concurrent.futures.ThreadPoolExecutor() as executor:
    with concurrent.futures.ProcessPoolExecutor() as executor:
        # 提交任务并将 Future 对象和输入值作为元组存储在 futures 列表中
        for index in gray_value:
            future = executor.submit(process_target_gray, index, param2_values, param3_values)
            futures.append((future, index))

    """使用 as_completed 迭代 Future 对象,按任务完成的顺序获取结果"""
    results = {}
    for future in concurrent.futures.as_completed([fut[0] for fut in futures]):
        value1, value2 = future.result()
        # 根据任务在 futures 列表中的位置,将结果存储在 results 字典中
        for fut in futures:
            if fut[0] == future:
                i = fut[1]
                results[i] = (value1, value2)
                break

    """按保存顺序打印结果(多线程:先完成先保存)"""
    for key, value in results.items():
        value1, value2 = value
        print(f"Input: {key}, Square: {value1}, Cube: {value2}")

    print("")

    """按输入顺序打印结果 ———— 需要在 futures 列表中保留任务的提交顺序,然后在所有任务完成后,按顺序打印结果。"""
    for index in gray_value:
        value1, value2 = results[index]
        print(f"Input: {index}, Square: {value1}, Cube: {value2}")


if __name__ == '__main__':
    """使用 ThreadPoolExecutor 创建线程池 + 并行执行任务"""
    gray_value = [1, 2, 3, 4, 5]
    param2_values = 2
    param3_values = 0.1
    main(gray_value, param2_values, param3_values)

"""
Input: 2, Square: 2, Cube: 2.1
Input: 5, Square: 5, Cube: 2.1
Input: 3, Square: 3, Cube: 2.1
Input: 1, Square: 1, Cube: 2.1
Input: 4, Square: 4, Cube: 2.1

Input: 1, Square: 1, Cube: 2.1
Input: 2, Square: 2, Cube: 2.1
Input: 3, Square: 3, Cube: 2.1
Input: 4, Square: 4, Cube: 2.1
Input: 5, Square: 5, Cube: 2.1
"""

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

5.1.3、异步执行 + 同步返回结果:executor.map()

executor.shutdown()方法没有被调用,而是利用了with语句来自动管理进程池的生命周期,在with块结束时自动关闭进程池。

import concurrent.futures


def process_target_gray(gray_value, param2, param3):
    return gray_value, param2+param3


def main(gray_value, param2_values, param3_values):
    """使用 ThreadPoolExecutor 创建线程池 + 并行执行任务"""
    with concurrent.futures.ThreadPoolExecutor(max_workers=25) as executor:
        results = list(executor.map(process_target_gray, gray_value, param2_values, param3_values))

    """使用 executor.map 函数来提交任务 + 根据输入顺序返回结果。"""
    for index, (value1, value2) in enumerate(results, start=1):
        print(f"Input: {index}, Square: {value1}, Cube: {value2}")


if __name__ == '__main__':
    gray_value = [1, 2, 3, 4, 5]
    param2_values = [10, 20, 30, 40, 50]
    param3_values = [0.1, 0.2, 0.3, 0.4, 0.5]
    main(gray_value, param2_values, param3_values)

"""
Input: 1, Square: 1, Cube: 10.1
Input: 2, Square: 2, Cube: 20.2
Input: 3, Square: 3, Cube: 30.3
Input: 4, Square: 4, Cube: 40.4
Input: 5, Square: 5, Cube: 50.5
"""

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

5.2、线程 - 手动管理

5.2.1、使用 threading 模块示例

import threading

def thread_function():
    print("这是一个子线程")

def main():
    # 创建多个线程
    threads = []
    for _ in range(5):
        thread = threading.Thread(target=thread_function)
        threads.append(thread)
        thread.start()

    # 主线程会继续执行
    print("这是主线程")

    # 等待所有子线程执行完毕
    for thread in threads:
        thread.join()

    print("所有子线程执行完毕,主线程继续执行")

if __name__ == '__main__':
    main()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

5.2.2、使用 multiprocessing 模块示例

import multiprocessing
import time


def process_function():
    print("这是一个子进程")

def main():
    # 创建多个进程
    processes = []
    for _ in range(5):
        process = multiprocessing.Process(target=process_function)
        processes.append(process)
        process.start()

    # 主进程会继续执行
    time.sleep(1)  # 主线程是立即执行的,而子进程的启动需要一些时间。
    print("这是主进程")

    # 等待所有子进程执行完毕
    for process in processes:
        process.join()

    print("所有子进程执行完毕,主进程继续执行")

if __name__ == '__main__':
    main()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

5.2.3、使用 concurrent.futures.ThreadPoolExecutor 示例

import concurrent.futures


def thread_function():
    print("这是一个子线程")


def main():
    # 创建 ThreadPoolExecutor 对象
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        # 提交任务给线程池
        futures = [executor.submit(thread_function) for _ in range(5)]

        # 等待所有任务完成
        for future in concurrent.futures.as_completed(futures):
            future.result()

    print("所有子线程执行完毕,主线程继续执行")


if __name__ == '__main__':
    main()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

5.2.4、使用 concurrent.futures.ProcessPoolExecutor 示例

import concurrent.futures

def process_function():
    print("这是一个子进程")

def main():
    # 创建 ProcessPoolExecutor 对象
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        # 提交任务给进程池
        futures = [executor.submit(process_function) for _ in range(5)]

        # 等待所有任务完成
        for future in concurrent.futures.as_completed(futures):
            future.result()

    print("所有子进程执行完毕,主进程继续执行")

if __name__ == '__main__':
    main()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小桥流水78/article/detail/793824
推荐阅读
相关标签
  

闽ICP备14008679号