赞
踩
# 线程(Thread) :是进程中的一个执行单元(可以理解为执行流)
# 多线程(Multithreading) :同时运行多个线程(在一个进程中)
#
# 进程(Process) :是操作系统中的一个执行单元
# 多进程(Multiprocessing) :同时运行多个独立的进程
多线程(Multithreading) | 多进程(Multiprocessing) | |
---|---|---|
适用领域 | 并发任务 | 并行任务 |
内存 | 内存共享 (线程切换开销小,因为所有线程共享同一地址空间,不需要进行上下文切换。) | 内存独立 (进程切换开销大,因为每个进程都拥有独立的地址空间,导致数据不共享,故切换时需要保存和恢复整个进程的上下文。进程间需要通过通信(Inter-process Communication,IPC)来传递数据) |
时耗 | 创建和销毁线程的时间开销小 | 创建和销毁进程的时间开销大 |
任务 | IO密集型任务 (任务执行的主要时间都在读写操作上) | CPU密集型任务 (任务执行的主要时间都在数值计算上) |
应用 | 1、文件读写;2、网络通信(下载文件、发送与接收网络请求);3、数据库操作(查询数据库、读写数据);4、图像处理(加载图像、保存图像) | 1、数值计算(大数据分析、统计、计算);2、图像处理(渲染图像) |
(线程)创建和销毁线程的时间开销小,(进程)创建和销毁进程的时间开销大
- 线程 - 创建时间:
较快
,尤其是在同一个进程内、内存需要共享时。【数毫秒,十几毫秒】- 进程 - 创建时间:
较慢
,尤其是当涉及内存复制、系统调用时。【几十毫秒,几百毫秒或更长时间】
内存共享(多线程)
- 内存占用:多个线程之间的数据共享(同一个地址空间),不需要为每个线程复制数据,只需要传递指向数据的指针或引用。适用于需要数据量较大或传递频繁的情况。
- 参数的传递时间:由于参数传递只涉及指针或引用的复制,传递时间很短,几乎可以忽略不计。这使得在多线程编程中,数据共享和通信成本较低,可以更高效地完成任务。
内存独立(多进程)
- 内存占用:多个进程之间的数据独立(独立的地址空间),因此在参数传递时需要复制数据到新的内存地址,每个进程独立维护一份数据副本。导致消耗更多的内存,特别是在数据量较大或传递频繁时。
- 参数的传递时间:由于需要复制数据到新的内存空间,传递时间相比多线程会稍长一些。尤其是在数据量大或频繁传递时,会有显著的传递成本。
Python 解释器(CPython)在执行 Python 代码时,会存在全局解释器锁(Global Interpreter Lock,GIL)。
- GIL 是一种线程执行的控制机制。
- 优点:简化了解释器的实现和内存管理,且减少了线程安全问题。
- 缺点:限制了Python多线程的并行能力。
GIL(全局解释器锁)只存在于 CPython 解释器中,而其他一些 Python 解释器没有 GIL 的限制(如 Jython、IronPython 和 PyPy 等)。
- Jython:Jython 是一个在 Java 平台上运行的 Python 解释器,它直接利用了 Java 的线程模型,因此不受 GIL 的限制。在 Jython 中,Python 代码被转换为 Java 字节码并在 Java 虚拟机(JVM)上执行,可以充分利用 JVM 的多线程机制。
- IronPython:IronPython 是一个在 .NET 平台上运行的 Python 解释器,类似地,它利用了 .NET 平台的线程模型,而不受 GIL 的影响。
- PyPy:PyPy 是一个高性能的 Python 解释器,它通过即时编译(JIT)等技术提高了性能。PyPy 采用了一种称为 STM(Software Transactional Memory)的机制来管理并发,与 GIL 不同,STM 允许多个线程同时访问共享数据,并通过事务来确保线程安全。
- 为什么GIL限制了Python多线程的并行能力?
- (1)Python 的内存管理机制是基于引用计数的,即每个对象都有一个计数器来跟踪对它的引用次数。当引用计数归零时,对象被销毁并释放内存。
- (2)引用计数的增减需要在多线程环境下进行原子操作,以确保线程安全。 原子操作是一种不可中断的操作,它要么全部执行成功,要么全部不执行。
- (3)若在增减引用计数的过程中发生了线程切换,可能会导致引用计数不正确,从而引发内存泄漏或释放错误的内存空间,导致程序出现异常行为。
- (4)GIL 保证了在 Python 解释器中的任何时刻,只有一个线程能够执行 Python 字节码指令。GIL确保了引用计数的正确性,但也导致了多线程程序无法充分利用多核处理器的并行计算能力。
- 多线程与多进程适用于不同任务的原因?
- (1)在 CPU 密集型任务上,由于 GIL 的存在,多进程可以实现并行任务,但多线程不可以。
- 在 CPU 密集型任务上,由于 GIL 的存在,
多线程
无法利用多核 CPU 实现真正的并行执行。虽然在操作系统层面上多个线程是并发执行的,但在 Python 解释器内部,即使在多核 CPU 上运行多个线程,同一时刻也只能利用单个 CPU 核心,即只能有一个线程在解释器中执行 Python 代码,其他线程需要等待 GIL 的释放才能执行,因此多线程无法充分利用多核 CPU 的计算能力。- 在 CPU 密集型任务上,由于 GIL 的存在,
多进程
可以利用多核 CPU 实现真正的并行执行。在多进程中,每个进程都有独立的 Python 解释器和内存空间,因此不存在 GIL 的限制,多个进程可以同时在多个 CPU 核心上执行任务,实现真正的并行执行。- (2)在 I/O 密集型任务上,多线程可以实现并发任务。 由于线程在等待 I/O 操作完成时会释放 GIL,因此 GIL 并不会对性能造成太大影响。
并发任务(Concurrent Tasks)
:在同一时间段内,交替执行多个任务(不是同时执行)。
- 核心:任务之间的交替执行 + 内存共享
- 单核处理器:通过操作系统的上下文切换机制,在不同任务之间快速切换执行,实现任务的
并发执行
。- 多核处理器:在多个核心上
同时执行
不同的任务,从而实现更高的整体性能和资源利用率。- 优点:
- 提高系统的响应能力:通过并发处理,系统可以同时处理多个任务,减少了任务等待的时间,从而提高了系统的响应速度。
- 更高的资源利用率:在 I/O 密集型任务中,由于线程可以在等待 I/O 操作完成的同时执行其他任务,可以充分利用 CPU 的空闲时间,提高系统的资源利用率。
并行任务(Parallel Tasks)
:在同一时刻,同时执行多个任务(各自独立执行)。
- 核心:任务之间的同时执行 + 性能提升
- 在多核处理器上,不同的任务可以在不同的核心上
并行执行
。- 优点:
- 提高计算速度:将任务分配给多个处理单元同时执行,从而加速任务的完成速度。特别是在处理 CPU 密集型任务时,利用多核处理器的并行计算能力可以大大提高计算速度。
- 更高的吞吐量:通过并行处理多个任务,系统可以在单位时间内处理更多的工作量,提高了系统的整体吞吐量。
线程切换(Thread Switching):是指操作系统将 CPU 执行权从一个线程转移到另一个线程的过程。在多线程环境中,一个进程可以包含多个线程,每个线程可以被认为是独立的执行流。线程切换可以在同一个进程内进行,也可以跨进程进行。
线程切换是上下文切换的一种特例。
线程切换的步骤:
- 保存当前线程状态:将当前线程的状态(如寄存器、程序计数器等)保存到线程控制块(Thread Control Block, TCB)中。
- 选择下一个线程:操作系统根据调度算法选择下一个要执行的线程。
- 恢复下一个线程状态:从选定线程的线程控制块中恢复其状态。
- 切换到下一个线程:将 CPU 执行权转移到选定的线程。
上下文切换(Context Switching)
:是指操作系统将 CPU 执行权从一个进程转移到另一个进程的过程。上下文切换比线程切换开销更大,因为进程拥有独立的内存空间、资源和状态信息,而线程共享同一进程的资源。
上下文切换(进程级)的步骤:
- 保存当前进程状态:保存当前进程的 CPU 寄存器、程序计数器和内存管理信息到进程控制块(PCB)。
- 选择下一个进程:根据调度算法选择下一个要执行的进程。
- 加载下一个进程状态:从下一个进程的 PCB 中恢复其状态,包括内存映射。
- 切换到下一个进程:CPU 执行权转移到下一个进程。
上下文切换
:
- (1)在任务执行过程中,操作系统因为一些事件的发生(如:时间片用完、外部中断、任务主动让出CPU等)而暂停当前任务的执行,并切换到另一个任务执行。
- (2)在任务切换过程中,操作系统将当前任务的执行状态(上下文)保存到内存中,并加载下一个任务的执行状态,使得下一个任务能够继续执行。
上下文切换的关键:上下文的保存和恢复。
上下文(Context)
:当前任务执行时的环境和状态信息。
在操作系统中,上下文通常指的是进程的执行状态,包括但不限于以下内容:
- 寄存器状态:程序计数器(PC)、通用寄存器(如eax、ebx等)、堆栈指针(SP)等。这些寄存器存储了程序当前的执行位置、变量的值等信息。
- 内存映像:程序的内存布局和相关的数据。这包括了程序的代码、堆、栈以及全局变量等数据。
- 进程或线程的状态:进程或线程的优先级、状态(运行、就绪、阻塞等)、所属的进程组等。
- 打开的文件描述符:如果程序正在访问文件或其他资源,上下文中可能包括打开的文件描述符以及相应的文件指针等信息。
CPU处理器的参数解析:
12th Gen Intel( R ) Core( TM ) i7-12700 2.10 GHz
12th Gen Intel(R) Core(TM)
:表示英特尔第 12 代 Core 处理器i7-12700
:表示处理器属于i7
系列,具体型号为12700
2.10 GHz
:表示处理器的时钟频率
- 架构: 使用了Intel的Alder Lake架构。
- 核心数: i7-12700有12个核心。
- 线程数: 具有24个线程,支持超线程技术(每个核心可以同时执行两个线程)。
CPU核心数:由处理器和操作系统决定
- 处理器(硬件)
物理内核(Physical Cores)
:实际的处理器核心
- 现代计算机通常配备有多核心的处理器,即在一个物理处理器芯片上集成了多个独立的处理器核心,可以并行地执行指令,从而提高处理器的整体性能和并发能力。
- 每个物理核心都是一个独立的、实际存在的硬件单元,具有自己的执行单元、缓存和执行流水线。。
逻辑内核(Logical Cores)
:通过超线程技术(Hyper-Threading)实现的虚拟核心
- 超线程允许一个物理内核模拟两个逻辑内核,实现在同一时间执行两个线程。
- 逻辑内核共享物理内核的执行单元、缓存和执行流水线,但它们可以独立地执行指令流。
举例:计算机配备了四核处理器(4个物理内核),每个核心都支持超线程(8个逻辑内核)。因此,可以实现8个线程的并行处理。
- 操作系统(软件)
- 线程的分配和管理:决定哪些线程在哪个核心上运行
- 支持的最大线程数:不同操作系统对支持的最大线程数有限制,这取决于其设计和内核管理能力。
import psutil
# Ture:显示逻辑内核、False:显示物理内核
print("CPU处理器的逻辑内核数量:", psutil.cpu_count(logical=True)) # CPU处理器的逻辑内核数量: 20
print("CPU处理器的物理内核数量:", psutil.cpu_count(logical=False)) # CPU处理器的物理内核数量: 12
import os
print("CPU处理器的逻辑内核数量:", os.cpu_count())
# CPU处理器的逻辑内核数量: 20
import multiprocessing as mp
print("CPU处理器的逻辑内核数量:", mp.cpu_count())
# CPU处理器的逻辑内核数量: 20
(1)若指定,则max_workers=20 / 40 / os.cpu_count() / 1000
(2)若不指定,则默认使用CPU处理器支持逻辑内核的最大数量(如:max_workers=os.cpu_count()
)
- 线程数可以无限指定,但逻辑内核数决定了最大可以同时处理的线程数量。
- 若线程数超过逻辑核心数,则通过
调度策略
来决定哪些线程能够运行。 选择硬件支持的最大逻辑内核数并行执行,而超出部分将处于等待状态(阻塞),直到有空闲的逻辑内核可供使用。这种情况将导致线程等待时间较长,影响系统的响应速度和效率。
(案例一)已知:若创建每个线程为毫秒级,创建每个进程为 3 ~ 5 秒,而每个线程的执行时间为 3 秒。则线程是并行执行,而进程是串行执行;
(案例二)已知:
当数据装载器 A 启用多线程时:for a in A,若(从 A 中获取 a 的耗时)小于(每个线程的执行时间)
,则多线程并行;反之串行执行。**
以多线程为例
- 需求:循环计算函数100次(每次输入参数不同)
- 已知:系统内存RAM = 64GB,读取图像的内存损耗 = 6GB(只读取一次),每次计算图像的内存损耗 = 7.81GB(循环计算)
- 计算:
逻辑内核的最大数量 = (64 - 6 - 系统内存) / 7.81GB = 7.42。即每轮最多并行计算7个循环max_workers = 7,否则将异常显示:内存分配不足。
备注:每个循环对应的内存损耗都是不同的(函数的输入参数不同导致函数内部计算所损耗的内存不同)。故在指定max_workers时,必须以100次循环中的最大内存损耗为基准线(如:13.81GB)。
已知:假设指定 100 个线程,但系统只有 20 个逻辑核心。
- 逻辑核心数表示处理器可以同时处理的线程数量
- 若线程数超过逻辑核心数,则通过
调度策略
来决定哪些线程能够运行。 选择硬件支持的最大逻辑内核数并行执行,而超出部分将处于等待状态(阻塞),直到有空闲的处理器可供使用。这种情况将导致线程等待时间较长,影响系统的响应速度和效率。
- 根据线程的优先级、状态(如:运行中、等待中)、以及核心的可用性等,动态地增加或减少线程数,或者重新分配线程到不同的物理核心上。
优先级
:高优先级的线程会在低优先级线程之前执行。- 操作系统可以在任何时候中断低优先级线程,并转而执行高优先级线程。
时间片轮转(Round Robin - Scheduling)是一种调度算法,主要用于操作系统的多任务处理。它通过将CPU时间划分为小的时间片,轮流分配给各个任务,确保系统的公平性和响应性。
公平性:每个任务都可以定期获得CPU时间,不会出现某个任务长期占用CPU的情况。
时间片轮转的工作流程:
- (1)
任务初始化 + 时间片分配(Time Slicing)
:将所有需要执行的任务(或线程)放入就绪队列,并分配固定长度的时间片(比如10毫秒或100毫秒)。
- 时间片长度:决定了每个任务(或线程)能在CPU上连续执行的时间。
- 时间片过短:将导致频繁的上下文切换,增加系统开销。
- 时间片过长:将导致响应时间变长,不适合实时系统。
- (2)
任务执行(核心分配)
:选择就绪队列中的前 N 个任务(或线程),并让他们同时在各自的时间片内执行。" 同时 " 是指每个任务(或线程)在一个逻辑核心上运行。- (3)
线程切换
:每个时间片结束后,操作系统会进行线程切换。
- 若线程的任务
执行结束,则 选择就绪序列中的下一个任务(或线程) 将占用这个核心。- 若线程的任务
执行未结束:
保存当前线程状态
:操作系统将中断当前执行的任务(或线程),并保存线程的状态(包括程序计数器、寄存器、和内存状态等)。调度下一个线程
:将当前任务放回就绪队列的末尾,然后选择队列中的下一个任务(或线程)。若任务具有不同的优先级,操作系统会优先选择高优先级的任务执行。恢复任务的状态
:恢复下一个任务的状态,让它在新的时间片内继续执行。- (4)
循环执行
:重复上述步骤,直到所有任务(或线程)执行完毕。
import threading
import time
import queue
# 定义任务函数
def task(task_id, duration):
print(f"Task {task_id} is running")
time.sleep(duration)
print(f"Task {task_id} is finished")
# 创建任务队列
task_queue = queue.Queue()
num_tasks = 100
time_slice = 0.1 # 时间片长度
# 向任务队列中添加任务
for i in range(num_tasks):
task_queue.put((i, time_slice))
# 初始化线程池
threads = []
num_cores = 20
# 定义调度函数
def scheduler():
while not task_queue.empty():
for _ in range(num_cores):
if not task_queue.empty():
task_id, duration = task_queue.get()
thread = threading.Thread(target=task, args=(task_id, duration))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
threads.clear()
# 启动调度器
scheduler()
"""
Task 0 is running
Task 1 is running
Task 2 is running
Task 3 is running
Task 4 is running
Task 5 is running
Task 6 is running
Task 7 is running
Task 8 is running
Task 9 is running
Task 7 is finished
Task 5 is finishedTask 4 is finished
Task 2 is finishedTask 1 is finished
Task 9 is finished
Task 3 is finished
Task 0 is finished
Task 8 is finished
Task 6 is finished
"""
优先级调度(Priority - Scheduling)是一种调度算法,其中每个任务(或线程)都分配了一个优先级。操作系统根据优先级来决定任务的执行顺序,优先级越高的任务会优先执行。
优先级调度的工作流程:
- (1)
任务初始化
:将所有任务放入就绪队列,并分配优先级。- (2)
任务执行
:从就绪队列中选择优先级最高的任务执行。若有多个任务具有相同的优先级,可以采用先来先服务(FCFS)的方式选择任务。选中的任务在一个时间片内执行。- (3)
线程切换
:每个时间片结束后,操作系统会进行线程切换。若任务未完成,则保存任务状态,并将其放回就绪队列。- (4)
循环执行
:重复上述步骤,直到所有任务(或线程)执行完毕。
优先级调度策略有两种方式:
- 抢占式调度(Preemptive):如果一个高优先级任务到达,它可以中断正在执行的低优先级任务。
- 非抢占式调度(Non-preemptive):一旦任务开始执行,它将一直执行到完成,即使有更高优先级的任务到达。
import threading
import queue
import time
class Task:
def __init__(self, task_id, priority, duration):
self.task_id = task_id
self.priority = priority
self.duration = duration
def run(self):
print(f"Task {self.task_id} with priority {self.priority} is running")
time.sleep(self.duration)
print(f"Task {self.task_id} with priority {self.priority} is finished")
# 创建优先级队列
task_queue = queue.PriorityQueue()
# 向优先级队列中添加任务
tasks = [
Task(1, 1, 2),
Task(2, 3, 1),
Task(3, 2, 1),
Task(4, 4, 1),
Task(5, 5, 2)
]
for task in tasks:
# 优先级队列按优先级排序,优先级值小的任务优先执行
task_queue.put((task.priority, task))
# 初始化线程池
threads = []
# 定义调度函数
def scheduler():
while not task_queue.empty():
priority, task = task_queue.get()
thread = threading.Thread(target=task.run)
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
threads.clear()
# 启动调度器
scheduler()
"""
Task 1 with priority 1 is running
Task 1 with priority 1 is finished
Task 3 with priority 2 is running
Task 3 with priority 2 is finished
Task 2 with priority 3 is running
Task 2 with priority 3 is finished
Task 4 with priority 4 is running
Task 4 with priority 4 is finished
Task 5 with priority 5 is running
Task 5 with priority 5 is finished
"""
import psutil
def get_all_processes_thread_count():
"""获取系统中所有进程的线程数量"""
total_threads = 0
for proc in psutil.process_iter(['pid', 'name', 'num_threads']):
try:
info = proc.info
total_threads += info['num_threads']
print(f"PID={info['pid']}, Name={info['name']}, Threads={info['num_threads']}")
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return total_threads
def get_process_thread_count(pid):
"""获取特定PID的进程线程数量"""
try:
proc = psutil.Process(pid)
return proc.num_threads()
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
return None
if __name__ == '__main__':
# (1)获取系统中所有进程的总线程数量
total_threads = get_all_processes_thread_count()
print(f"Total threads in the system: {total_threads}")
# (2)获取特定PID的进程线程数量
pid = 34544
process_threads = get_process_thread_count(pid)
if process_threads is not None:
print(f"Process with PID={pid} has {process_threads} threads")
else:
print(f"Process with PID={pid} not found or access denied")
"""
部分打印结果(与任务管理器的详细信息中的数量一致)
PID=22840, Name=WeChatPlayer.exe, Threads=12
PID=22916, Name=WeChatAppEx.exe, Threads=11
PID=22952, Name=WeChatAppEx.exe, Threads=100
PID=23208, Name=WeChatUtility.exe, Threads=52
"""
- (1)
threading模块
、multiprocessing模块
- 作用:提供底层线程控制和丰富的同步原语,需手动管理线程
- 适合:需要精细控制的多线程 / 多进程编程,且需要手动处理线程 / 进程的创建和销毁。
- Python 标准库中没有名为 multithreading 的模块。多线程编程一般使用 threading 模块。
- Python 标准库中没有名为 processing 的模块。多进程编程一般使用 multiprocessing 模块。
- (2)
concurrent.futures.ThreadPoolExecutor
、concurrent.futures.ProcessPoolExecutor
- 作用:提供高级的线程池自动管理
- 适合:需要并行执行大量独立任务,且更关注任务的结果而不是线程/进程本身。
"""#############################################################################################
# 函数功能:线程池执行器,用于管理和调度线程池中的线程执行异步任务。
# 函数说明:executor = concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
# 参数说明:
# max_workers:可选参数,表示线程池中的最大工作线程数。默认为 None,表示根据系统情况自动决定线程数。
# thread_name_prefix:可选参数,表示线程名称的前缀。默认为空字符串。
# 返回值:
# executor:表示线程池执行器对象,用于管理和调度线程池中的线程执行异步任务。
#############################################################################################"""
使用方式:
"""(1)executor.submit(): 提供单个任务"""
作用:将所有参数只应用一次到函数,完成并行化计算。
返回:一个concurrent.futures.Future对象,可以用于获取任务的执行结果。
"""常与 for () 连用执行并行任务,常与 concurrent.futures.as_completed() 连用获取结果。"""
"""(2)executor.map(): 多个任务并行计算"""
作用:将列表参数循环应用到函数,完成并行化计算。
返回:一个map()迭代器,可以用于迭代获取每个任务的结果。
"""(3)executor.shutdown(): 等待所有任务完成,并关闭进程池"""
若不调用,程序可能会在所有任务完成之前提前结束,导致一些任务未能执行完毕。
"""#############################################################################################
# 类功能:进程池执行器,用于管理和调度进程池中的进程执行异步任务。
# 类说明:executor = concurrent.futures.ProcessPoolExecutor(max_workers=None)
# 参数说明:
# max_workers:可选参数,表示进程池中的最大工作进程数。默认为 None,表示根据系统情况自动决定进程数。
# 返回值:
# executor:表示进程池执行器对象,用于管理和调度进程池中的进程执行异步任务。
#############################################################################################"""
使用方式:
"""(1)executor.submit(): 提供单个任务"""
作用:将所有参数只应用一次到函数,完成并行化计算。
返回:一个concurrent.futures.Future对象,可以用于获取任务的执行结果。
"""常与 for () 连用执行并行任务,常与 concurrent.futures.as_completed() 连用获取结果。"""
"""(2)executor.map(): 多个任务并行计算"""
作用:将列表参数循环应用到函数,完成并行化计算。
返回:一个map()迭代器,可以用于迭代获取每个任务的结果。
"""(3)executor.shutdown(): 等待所有任务完成,并关闭进程池"""
若不调用,程序可能会在所有任务完成之前提前结束,导致一些任务未能执行完毕。
"""#############################################################################################
# 函数功能:在 Executor 对象中提交可调用对象以异步执行。
# 函数说明:future = executor.submit(fn, *args, **kwargs)
# 参数说明:
# executor:表示 Executor 对象,可以是 ThreadPoolExecutor 或 ProcessPoolExecutor。
# fn:可调用对象,表示要在异步任务中执行的函数或方法。
# *args:可选参数,表示要传递给 fn 函数的位置参数。
# **kwargs:可选参数,表示要传递给 fn 函数的关键字参数。
# 返回值:
# future:表示异步任务的 Future 对象,用于获取异步任务的执行结果或状态。
#############################################################################################"""
"""#############################################################################################
# 函数功能:在 Executor 对象中映射可调用对象以异步执行。
# 函数说明:results_iterator = executor.map(func, *iterables, timeout=None, chunksize=1)
# 参数说明:
# executor:表示 Executor 对象,可以是 ThreadPoolExecutor 或 ProcessPoolExecutor。
# func:可调用对象,表示要在异步任务中执行的函数或方法。
# *iterables:表示要迭代的可迭代对象,包含了传递给 func 函数的参数。如果有多个可迭代对象,则 func 将从每个可迭代对象中依次获取参数。
# timeout:可选参数,表示等待结果的超时时间(以秒为单位)。如果在指定的时间内没有返回结果,则会引发 TimeoutError。默认为 None,表示不设置超时时间。
# chunksize:可选参数,表示每个任务的数据块大小。默认为 1,表示一个任务处理一个迭代器中的一个元素。
# 返回值:
# results_iterator:表示异步任务结果的迭代器,用于获取异步任务的执行结果。
#############################################################################################"""
"""#############################################################################################
# 函数功能:在迭代器中异步返回已完成的 Future 对象,而不会等待所有任务都完成。
# 函数说明:concurrent.futures.as_completed(futures, timeout=None)
# 参数说明:
# futures:一个 Future 对象的可迭代容器,表示要等待的异步任务。
# timeout:可选参数,表示等待结果的超时时间(以秒为单位)。如果在指定的时间内没有返回结果,则会引发 TimeoutError。默认为 None,表示不设置超时时间。
# 返回值:
# 返回一个生成器,用于异步返回已完成的 Future 对象。
#############################################################################################"""
"""#############################################################################################
# 函数功能:优雅地关闭 Executor 对象,不再接受新的任务并等待已提交任务的完成。
# 函数说明:executor.shutdown(wait=True, cancel_futures=False)
# 参数说明:
# wait:可选参数,布尔值,默认为 True。表示是否等待所有已提交的任务完成后再返回。如果设置为 False,则立即返回,不等待已提交的任务完成。
# cancel_futures:可选参数,布尔值,默认为 False。表示是否取消尚未开始执行的任务。如果设置为 True,则取消所有尚未开始执行的任务。
# 返回值:
# 无返回值。
#############################################################################################"""
没有显示调用executor.shutdown()
已知:现需要循环N(N >> 20)次任务,每次提交(线程数 = 20)任务到线程池。则线程的使用情况如下:
线程重用
: 当一个线程完成一个任务后,将被线程池用来执行下一个任务,而不是被销毁。该方法避免了频繁地创建和销毁线程,提高性能。
任务并发执行
:若单个任务的执行时间很短,则在某个时刻内可能有多个线程同时执行多个不同的任务,且同时返回多个不同的结果。
任务重复执行
:若某些线程(20个线程中的部分线程)已完成任务,但主线程仍在循环中提交新的任务,则这些线程将被用来执行新的任务,直到任务结束。显示调用executor.shutdown():等待线程池中的所有线程都完成任务,然后再继续执行主线程。
- 在某一个批次中:某些线程提前完成了任务,它们会等待其余线程全部完成任务后,再一起投入到下一个批次的任务。
- 最后一个批次中:已完成任务的线程会等待其余线程全部结束;
- (1)
threading模块
、multiprocessing模块
- 作用:提供底层线程控制和丰富的同步原语,需手动管理线程
- 适合:需要精细控制的多线程 / 多进程编程,且需要手动处理线程 / 进程的创建和销毁。
- Python 标准库中没有名为 multithreading 的模块。多线程编程一般使用 threading 模块。
- Python 标准库中没有名为 processing 的模块。多进程编程一般使用 multiprocessing 模块。
- (2)
concurrent.futures.ThreadPoolExecutor
、concurrent.futures.ProcessPoolExecutor
- 作用:提供高级的线程池自动管理
- 适合:需要并行执行大量独立任务,且更关注任务的结果而不是线程/进程本身。
import threading
import time
def target_function():
print("线程开始执行...")
time.sleep(2)
print("线程执行结束.")
def main():
"""(1)Thread 类:用于创建和管理线程。"""
thread = threading.Thread(target=target_function, daemon=True) # 创建线程对象,并设置守护标志为 True
thread.start() # 启动线程
thread.join(timeout=None) # 等待线程结束。可选的 timeout 参数表示最长等待时间。
is_alive = thread.is_alive() # 断线程是否存活(即是否处于活动状态)。
name = thread.name # 获取线程的名称
ident = thread.ident # 获取线程的标识符
# thread.daemon = True # 线程的守护标志,设置为 True 时表示该线程是守护线程。
"""(2)同步原语"""
lock = threading.Lock() # 创建互斥锁,用于控制对共享资源的访问。
lock = threading.RLock() # 创建重入锁,允许同一线程多次请求锁。
lock.acquire() # 获取锁
lock.release() # 释放锁
condition = threading.Condition() # 创建条件变量,用于线程间的通信和同步。
semaphore = threading.Semaphore(value=1) # 创建信号量,用于控制对有限资源的访问。
event = threading.Event() # 创建事件,用于线程间的通信,一个线程发出信号,其他线程等待信号。
"""(3)其他方法"""
active_threads = threading.enumerate() # 返回当前活动的线程列表
active_thread_count = threading.active_count() # 返回当前活动的线程数量
threading.stack_size(1024 * 1024) # 设置线程堆栈大小(仅在创建新线程时有效)。
if __name__ == '__main__':
main()
import multiprocessing
import time
def target_function(index):
print("进程开始执行...")
time.sleep(2)
print("进程执行结束.")
return index
def process_function():
process = multiprocessing.Process(target=target_function, args=(0,), daemon=True) # 创建进程对象,并设置守护标志为 True
process.start() # 启动进程
process.join(timeout=None) # 等待进程结束。可选的 timeout 参数表示最长等待时间。
is_alive = process.is_alive() # 断进程是否存活(即是否处于活动状态)。
name = process.name # 获取进程的名称
ident = process.ident # 获取进程的标识符
print(f"进程名称: {name}, 进程ID: {ident}, 是否存活: {is_alive}")
def pool_function():
# Pool 类:用于管理进程池。
with multiprocessing.Pool(processes=4) as pool: # 创建进程池对象
results = pool.map(target_function, range(5)) # map(func, iterable[, chunksize]): 将函数应用于可迭代对象的每个元素,并返回结果列表。
result = pool.apply(target_function, (0,)) # apply(func[, args[, kwds]]): 在进程池中同步执行函数,并返回结果。
pool.close() # 关闭进程池,不再接受新的任务。
pool.join() # 等待所有进程完成
def main():
process_function()
pool_function()
"""
在主进程中,multiprocessing.Process 和 multiprocessing.Pool 这两种方式不兼容且是互斥的。
multiprocessing.Process 用于启动单个进程,
multiprocessing.Pool 用于管理一个进程池。
"""
# 同步原语
lock = multiprocessing.Lock() # 创建互斥锁,用于控制对共享资源的访问。
lock.acquire() # 获取锁
lock.release() # 释放锁
semaphore = multiprocessing.Semaphore(value=1) # 创建信号量。用于控制对有限资源的访问。
event = multiprocessing.Event() # 创建事件。用于进程间的通信,一个进程发出信号,其他进程等待信号。
# 其他方法
current_process = multiprocessing.current_process() # 返回当前进程对象
active_children = multiprocessing.active_children() # 返回当前活动的子进程列表
if __name__ == '__main__':
multiprocessing.freeze_support()
main()
import concurrent.futures
def add(args):
a, b = args
return a + b
def main():
flag = 0
futures = []
results = [] # 存储结果的列表
# with concurrent.futures.ThreadPoolExecutor() as executor:
with concurrent.futures.ProcessPoolExecutor() as executor:
if flag:
"""方法一:使用executor.submit()"""
for i in range(10):
future = executor.submit(add, args=(1, 2))
futures.append((future, i))
##########################################################################################################
# (1)futures 列表保存了所有提交的任务及其相关信息;
# (2)[fut[0] for fut in futures] 是一个列表生成式,从 futures 列表中提取出所有的 Future 对象;
# (3)concurrent.futures.as_completed 是一个生成器,返回已经完成的 Future 对象。使用 as_completed 可以按完成的顺序处理任务,而不是按提交的顺序。
##########################################################################################################
for future in concurrent.futures.as_completed([fut[0] for fut in futures]):
result = future.result() # 获取每个已完成任务的结果。
results.append(result) # 将结果添加到列表中
print(results)
else:
"""方法二:使用executor.map()"""
args = [(3, 4), (3, 4), (3, 4)] # 将参数打包成可迭代对象
future = executor.map(add, args)
results = list(future) # 使用list()将迭代器转换为列表
print(results)
"""executor.shutdown()方法没有被调用,而是利用了with语句来自动管理进程池的生命周期,在with块结束时自动关闭进程池。"""
# executor.shutdown() # 关闭进程池
if __name__ == '__main__':
main()
executor.shutdown()方法没有被调用,而是利用了with语句来自动管理进程池的生命周期,在with块结束时自动关闭进程池。
import concurrent.futures
def process_target_gray(gray_value, param2, param3):
return gray_value, param2+param3
def main(gray_value, param2_values, param3_values):
"""使用 ThreadPoolExecutor 创建线程池并并行执行任务"""
futures = []
# with concurrent.futures.ThreadPoolExecutor() as executor:
with concurrent.futures.ProcessPoolExecutor() as executor:
# 提交任务并将 Future 对象和输入值作为元组存储在 futures 列表中
for index in gray_value:
future = executor.submit(process_target_gray, index, param2_values, param3_values)
futures.append((future, index))
"""使用 as_completed 迭代 Future 对象,按任务完成的顺序获取结果"""
results = {}
for future in concurrent.futures.as_completed([fut[0] for fut in futures]):
value1, value2 = future.result()
# 根据任务在 futures 列表中的位置,将结果存储在 results 字典中
for fut in futures:
if fut[0] == future:
i = fut[1]
results[i] = (value1, value2)
break
"""按保存顺序打印结果(多线程:先完成先保存)"""
for key, value in results.items():
value1, value2 = value
print(f"Input: {key}, Square: {value1}, Cube: {value2}")
print("")
"""按输入顺序打印结果 ———— 需要在 futures 列表中保留任务的提交顺序,然后在所有任务完成后,按顺序打印结果。"""
for index in gray_value:
value1, value2 = results[index]
print(f"Input: {index}, Square: {value1}, Cube: {value2}")
if __name__ == '__main__':
"""使用 ThreadPoolExecutor 创建线程池 + 并行执行任务"""
gray_value = [1, 2, 3, 4, 5]
param2_values = 2
param3_values = 0.1
main(gray_value, param2_values, param3_values)
"""
Input: 2, Square: 2, Cube: 2.1
Input: 5, Square: 5, Cube: 2.1
Input: 3, Square: 3, Cube: 2.1
Input: 1, Square: 1, Cube: 2.1
Input: 4, Square: 4, Cube: 2.1
Input: 1, Square: 1, Cube: 2.1
Input: 2, Square: 2, Cube: 2.1
Input: 3, Square: 3, Cube: 2.1
Input: 4, Square: 4, Cube: 2.1
Input: 5, Square: 5, Cube: 2.1
"""
executor.shutdown()方法没有被调用,而是利用了with语句来自动管理进程池的生命周期,在with块结束时自动关闭进程池。
import concurrent.futures
def process_target_gray(gray_value, param2, param3):
return gray_value, param2+param3
def main(gray_value, param2_values, param3_values):
"""使用 ThreadPoolExecutor 创建线程池 + 并行执行任务"""
with concurrent.futures.ThreadPoolExecutor(max_workers=25) as executor:
results = list(executor.map(process_target_gray, gray_value, param2_values, param3_values))
"""使用 executor.map 函数来提交任务 + 根据输入顺序返回结果。"""
for index, (value1, value2) in enumerate(results, start=1):
print(f"Input: {index}, Square: {value1}, Cube: {value2}")
if __name__ == '__main__':
gray_value = [1, 2, 3, 4, 5]
param2_values = [10, 20, 30, 40, 50]
param3_values = [0.1, 0.2, 0.3, 0.4, 0.5]
main(gray_value, param2_values, param3_values)
"""
Input: 1, Square: 1, Cube: 10.1
Input: 2, Square: 2, Cube: 20.2
Input: 3, Square: 3, Cube: 30.3
Input: 4, Square: 4, Cube: 40.4
Input: 5, Square: 5, Cube: 50.5
"""
import threading
def thread_function():
print("这是一个子线程")
def main():
# 创建多个线程
threads = []
for _ in range(5):
thread = threading.Thread(target=thread_function)
threads.append(thread)
thread.start()
# 主线程会继续执行
print("这是主线程")
# 等待所有子线程执行完毕
for thread in threads:
thread.join()
print("所有子线程执行完毕,主线程继续执行")
if __name__ == '__main__':
main()
import multiprocessing
import time
def process_function():
print("这是一个子进程")
def main():
# 创建多个进程
processes = []
for _ in range(5):
process = multiprocessing.Process(target=process_function)
processes.append(process)
process.start()
# 主进程会继续执行
time.sleep(1) # 主线程是立即执行的,而子进程的启动需要一些时间。
print("这是主进程")
# 等待所有子进程执行完毕
for process in processes:
process.join()
print("所有子进程执行完毕,主进程继续执行")
if __name__ == '__main__':
main()
import concurrent.futures
def thread_function():
print("这是一个子线程")
def main():
# 创建 ThreadPoolExecutor 对象
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 提交任务给线程池
futures = [executor.submit(thread_function) for _ in range(5)]
# 等待所有任务完成
for future in concurrent.futures.as_completed(futures):
future.result()
print("所有子线程执行完毕,主线程继续执行")
if __name__ == '__main__':
main()
import concurrent.futures
def process_function():
print("这是一个子进程")
def main():
# 创建 ProcessPoolExecutor 对象
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
# 提交任务给进程池
futures = [executor.submit(process_function) for _ in range(5)]
# 等待所有任务完成
for future in concurrent.futures.as_completed(futures):
future.result()
print("所有子进程执行完毕,主进程继续执行")
if __name__ == '__main__':
main()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。