赞
踩
多线程(Multithreading) | 多进程(Multiprocessing) | |
---|---|---|
任务 | IO密集型任务 (任务执行的主要时间都花费在读写操作上) | CPU密集型任务 (任务执行的主要时间都花费在数值计算上) |
功能 | 单线程 在等待IO时会被阻塞,而多线程 在等待IO期间可以切换到其他任务 | 利用多核处理器来实现真正的并行计算(一个进程的崩溃不会影响其他进程) |
内存 | 共享地址空间 (在同一进程内部共享相同的内存空间,因此它们可以方便地访问和修改共享的数据) | 独立地址空间 (每个进程有自己独立的地址空间,数据不共享,需要通过进程间通信(IPC)来传递数据) |
消耗 | 较小 (创建和销毁线程的开销较小) | 较大 (创建和销毁进程的开销较大) |
领域 | 1、文件读写;2、网络通信(下载文件、发送与接收网络请求);3、数据库操作(查询数据库、读写数据);4、图像处理(加载图像、保存图像) | 1、数值计算(大数据分析、统计、计算);2、图像处理(渲染图像) |
Python存在全局解释器锁(GIL) | 在部分的CPU密集型任务中,多线程的性能不如多进程。 | 每个进程都有自己的GIL,可以并行执行。 |
全局解释器锁(Global Interpreter Lock,简称GIL)
:是一种在解释器层面对多线程执行的控制机制。为了简化内存管理以及线程安全,但也引入了一些限制。一次只允许一个线程执行:GIL确保在解释器的任何时刻,即便在多核CPU上运行,同一时刻也只有一个线程在执行Python字节码。
CPU核心数:由处理器和操作系统决定
- 处理器(硬件)
物理内核(Physical Cores)
:是实际的处理器核心。每个物理内核都是独立的、实际存在的硬件单元。现代计算机通常配备有多核心的处理器。
- 在多核处理器中,物理内核的数量表示处理器上实际存在的独立处理单元的数量。每个物理内核可以独立地执行指令,具有自己的执行单元、缓存和执行流水线。
逻辑内核(Logical Cores)
:通过超线程技术(Hyper-Threading)实现的虚拟核心。超线程允许一个物理内核模拟两个逻辑内核,从而在同一时间执行两个线程。
- 逻辑内核共享物理内核的执行单元、缓存和执行流水线,但它们可以独立地执行指令流。
举例:计算机配备了四核处理器,每个核心都支持超线程,实现了4个物理内核和8个逻辑内核的处理能力,可同时并行处理8个线程。
- 操作系统(软件)
- 线程的分配和管理:决定哪些线程在哪个核心上运行
- 支持的最大线程数:不同操作系统对支持的最大线程数有限制,这取决于其设计和内核管理能力。
处理器:12th Gen Intel( R ) Core( TM ) i7-12700 2.10 GHz
12th Gen Intel(R) Core(TM)
:表示英特尔第12代Core处理器i7-12700
:表示处理器属于i7
系列,具体型号为12700
2.10 GHz
:表示处理器的时钟频率
- 架构: 使用了Intel的Alder Lake架构。
- 核心数: i7-12700有12个核心。
- 线程数: 具有24个线程,支持超线程技术(每个核心可以同时执行两个线程)。
# 方法一
import psutil
print("CPU处理器的逻辑内核数量:", psutil.cpu_count(logical=True)) # Ture:显示逻辑内核、False:显示物理内核
print("CPU处理器的物理内核数量:", psutil.cpu_count(logical=False)) # Ture:显示逻辑内核、False:显示物理内核
# CPU处理器的逻辑内核数量: 20
# CPU处理器的物理内核数量: 12
# 方法二
import os
print("CPU处理器的逻辑内核数量:", os.cpu_count())
# CPU处理器的逻辑内核数量: 20
# 方法三
import multiprocessing as mp
print("CPU处理器的逻辑内核数量:", mp.cpu_count())
# CPU处理器的逻辑内核数量: 20
(1)手动指定CPU处理器的逻辑内核数量
(2)若不指定,则默认使用CPU处理器支持逻辑内核的最大数量(如:max_workers=20)
在不超过系统内存RAM的基础上,如何设置逻辑内核的最大数量?
以多线程为例
- 需求:循环计算函数100次(每次输入参数不同)
- 已知:系统内存RAM = 64GB,读取图像的内存损耗 = 6GB(只读取一次)计算图像的内存损耗 = 7.81GB(循环计算)
- 计算:
逻辑内核的最大数量 = (64 - 6 - 系统内存) / 7.81GB = 7.42。即每轮最多并行计算7个循环max_workers = 7,否则将显示内存分配不足问题。
备注:每个循环对应的内存损耗都是不同的(函数的输入参数不同导致函数内部计算所损耗的内存不同)。故在指定max_workers时,必须以100次循环中的最大内存损耗为基准线(如:13.81GB)。
"""
#############################################################################
函数说明: concurrent.futures.ThreadPoolExecutor(max_workers=None)
输入参数: max_workers 指定最大线程数(默认使用CPU处理器支持逻辑内核的最大数量)
#############################################################################
使用方式:
(1)executor.submit(): 单个任务并行计算
将所有参数只应用一次到函数,完成并行化计算。 返回一个concurrent.futures.Future对象,可以用于获取任务的执行结果。
(2)executor.map(): 多个任务并行计算
将列表参数循环应用到函数,完成并行化计算。 返回一个map()迭代器,可以迭代获取每个任务的结果
(3)executor.shutdown(): 等待所有任务完成并关闭线程池
若不调用,程序可能会在所有任务完成之前提前结束,导致一些任务未能执行完毕。
#############################################################################
"""
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
# (1)使用executor.submit
future = executor.submit(my_fun, arg1, arg2) # 其中:my_function是执行函数,arg1和arg2是函数的参数。
result = future.result() # 获取单个任务的执行结果
for future in concurrent.futures.as_completed(futures): # 返回一个迭代器,其是在任务完成时按照完成的顺序生成 Future 对象。
result = future.result() # 在迭代过程中使用 future.result() 来获取每个已完成任务的结果。
print(result)
# 若是单个任务,两者没有区别。
# 若是多个任务,as_completed允许在任务完成的时候立即获取结果,而不需要等待所有任务都完成。
##############################################################
# (2)使用executor.map
future = executor.map(my_fun, [arg1, arg2]) # 其中:my_function是执行函数,[arg1, arg2]是一个包含函数参数的列表。
results = list(future) # 使用list()将迭代器转换为列表
print(results)
##############################################################
# (3)关闭进程池
executor.shutdown()
"""
#############################################################################
函数说明: concurrent.futures.ProcessPoolExecutor(max_workers=None)
输入参数: max_workers 指定最大进程数(默认使用CPU处理器支持逻辑内核的最大数量)
#############################################################################
使用方式:
(1)executor.submit(): 单个任务并行计算
将所有参数只应用一次到函数,完成并行化计算。 返回一个concurrent.futures.Future对象,可以用于获取任务的执行结果。
(2)executor.map(): 多个任务并行计算
将列表参数循环应用到函数,完成并行化计算。 返回一个map()迭代器,可以迭代获取每个任务的结果
(3)executor.shutdown(): 等待所有任务完成并关闭进程池
若不调用,程序可能会在所有任务完成之前提前结束,导致一些任务未能执行完毕。
#############################################################################
"""
import concurrent.futures
with concurrent.futures.ProcessPoolExecutor() as executor:
# (1)使用executor.submit
future = executor.submit(my_fun, arg1, arg2) # 其中:my_function是执行函数,arg1和arg2是函数的参数。
result = future.result() # 获取单个任务的执行结果
for future in concurrent.futures.as_completed(futures): # 返回一个迭代器,其是在任务完成时按照完成的顺序生成 Future 对象。
result = future.result() # 在迭代过程中使用 future.result() 来获取每个已完成任务的结果。
print(result)
# 若是单个任务,两者没有区别。
# 若是多个任务,as_completed允许在任务完成的时候立即获取结果,而不需要等待所有任务都完成。
##############################################################
# (2)使用executor.map
future = executor.map(my_fun, [arg1, arg2]) # 其中:my_function是执行函数,[arg1, arg2]是一个包含函数参数的列表。
results = list(future) # 使用list()将迭代器转换为列表
print(results)
##############################################################
# (3)关闭进程池
executor.shutdown()
"""适用于:多进程 + 多线程"""
import concurrent.futures
def process_target_gray(value):
# 处理每个值的逻辑
return [value * 2, 1]
if __name__ == '__main__':
gray_values = [1, 2, 3, 4, 5]
results = [] # 初始化结果列表
# with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
# 使用 submit 方法逐个提交任务(futures是保存所有结果的对象)
futures = [executor.submit(process_target_gray, value) for value in gray_values]
# 获取每个任务的结果
for future in futures:
result = future.result()
results.append(result)
print(results)
"""[[2, 1], [4, 1], [6, 1], [8, 1], [10, 1]]"""
"""只适用于:多线程"""
import concurrent.futures
def process_target_gray(value):
# 处理每个值的逻辑
return [value * 2, 1]
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: # 创建线程池
futures = [] # 用于存储每个任务的 Future 对象
results = [] # 用于存储每个任务的结果
gray_values = [1, 2, 3, 4, 5]
for i in gray_values:
future = executor.submit(process_target_gray, i)
futures.append(future)
# 获取每个任务的结果
for future in futures:
result = future.result()
results.append(result)
print(results)
"""[[2, 1], [4, 1], [6, 1], [8, 1], [10, 1]]"""
使用 concurrent.futures.as_completed 在任务完成的时候立即获取结果,而不需要等待所有任务都完成。
import concurrent.futures
def process_target_gray(value):
return [value * 2, 1]
if __name__ == '__main__':
gray_values = [1, 2, 3, 4, 5]
with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
# 使用 submit 方法逐个提交任务
futures = [executor.submit(process_target_gray, value) for value in gray_values]
# 使用 as_completed 在任务完成的时候立即获取结果,而不需要等待所有任务都完成。
for future in concurrent.futures.as_completed(futures):
result = future.result()
print(result)
"""
[2, 1]
[4, 1]
[8, 1]
[10, 1]
[6, 1]
"""
若需要循环100次,每次提交一个任务到线程池,指定线程数为20,且没有显式调用 executor.shutdown(),那么情况会如下:
线程重用
: 当一个线程完成一个任务后,它可以被线程池重用来执行下一个任务,而不是被销毁。该方法避免了频繁地创建和销毁线程,提高性能。任务并发执行
: 如果线程池中有多个线程,且任务的执行时间相对较短,那么在某个时刻可能有多个线程同时执行不同的任务。任务重复执行
: 如果线程池中的线程已经执行完任务,但主线程仍在循环中提交新的任务,那么这些线程可能会被重用来执行新的任务。这样,在循环的过程中,20个线程可能会不断地重复执行不同的任务。
调用executor.shutdown() 等待线程池中的所有任务完成后才会返回,然后再继续执行主线程。
- (在某一批次中)某些线程提前完成了任务,它们会等待其余线程完成任务后,然后再次投入到下一批次的任务。
- (最后一批次中)已经完成的线程会等待其余的线程一起结束,确保线程池的所有线程都完成了任务。
import numpy as np
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from scipy.ndimage import median_filter
def memory_usage():
import psutil
process = psutil.Process() # 创建一个进程对象
mem_info = process.memory_info() # 获取当前进程在RAM中的内存使用量
memory_usage_mb = mem_info.rss / (1024 ** 2) # 表示进程在当前时刻的实际内存使用情况(字节 - MB)
peak_memory_mb = mem_info.peak_wset / (1024 ** 2) # 表示进程在任意时间点的内存使用的峰值(字节 - MB)
return process, memory_usage_mb, peak_memory_mb
# 备注: 每次只打印一个进程的结果。
# (1)多进程(不同进程ID): 打印的峰值内存是不同进程ID对应的值而不是所有ID中的最大值
# (2)多线程(相同进程ID): 打印的峰值内存是多个线程中最大的峰值内存(多线程在一个进程里)
def median_filter_slice(slice):
print(memory_usage()[0], memory_usage()[1], memory_usage()[2])
return median_filter(slice, size=3, mode='reflect') # 中值滤波函数
def apply_median_filter_3d_array(data):
# with ProcessPoolExecutor() as executor: # 使用ProcessPoolExecutor创建进程池
with ThreadPoolExecutor() as executor: # 使用ThreadPoolExecutor创建线程池
# 将每个深度方向的切片提交给进程池
filtered_slices = list(executor.map(median_filter_slice, [data[d, :, :] for d in range(data.shape[0])]))
result = np.stack(filtered_slices, axis=0) # 将结果合并成一个新的三维数组
return result
if __name__ == "__main__":
"""遍历3D图像的每个slice,并分别进行中值滤波"""
test_array = np.random.randint(0, 100, size=(30, 10, 10))
filtered_array = apply_median_filter_3d_array(test_array)
import napari
viewer = napari.Viewer() # 创建napari视图
viewer.layers.clear() # 清空图层
viewer.add_image(test_array, name="test_array") # 添加图像
viewer.add_image(filtered_array, name="filtered_array") # 添加图像
napari.run() # 显示napari图形界面
import numpy as np
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from scipy.ndimage import median_filter
def memory_usage():
import psutil
process = psutil.Process() # 创建一个进程对象
mem_info = process.memory_info() # 获取当前进程在RAM中的内存使用量
memory_usage_mb = mem_info.rss / (1024 ** 2) # 表示进程在当前时刻的实际内存使用情况(字节 - MB)
peak_memory_mb = mem_info.peak_wset / (1024 ** 2) # 表示进程在任意时间点的内存使用的峰值(字节 - MB)
return process, memory_usage_mb, peak_memory_mb
# 备注: 每次只打印一个进程的结果。
# (1)多进程(不同进程ID): 打印的峰值内存是不同进程ID对应的值而不是所有ID中的最大值
# (2)多线程(相同进程ID): 打印的峰值内存是多个线程中最大的峰值内存(多线程在一个进程里)
def median_filter_slice(slice):
print(memory_usage()[0], memory_usage()[1], memory_usage()[2])
return median_filter(slice, size=3, mode='reflect') # 中值滤波函数
def apply_median_filter_3d_array(data):
with ProcessPoolExecutor() as executor: # 使用ProcessPoolExecutor创建进程池
# with ThreadPoolExecutor() as executor: # 使用ThreadPoolExecutor创建线程池
# 将每个深度方向的切片提交给进程池
filtered_slices = list(executor.map(median_filter_slice, [data[d, :, :] for d in range(data.shape[0])]))
result = np.stack(filtered_slices, axis=0) # 将结果合并成一个新的三维数组
return result
if __name__ == "__main__":
"""遍历3D图像的每个slice,并分别进行中值滤波"""
test_array = np.random.randint(0, 100, size=(30, 10, 10))
filtered_array = apply_median_filter_3d_array(test_array)
import napari
viewer = napari.Viewer() # 创建napari视图
viewer.layers.clear() # 清空图层
viewer.add_image(test_array, name="test_array") # 添加图像
viewer.add_image(filtered_array, name="filtered_array") # 添加图像
napari.run() # 显示napari图形界面
多线程 :适用于 I/O 密集型任务
,因为线程切换的开销较小,可以有效地并行执行多个 I/O 操作,如文件读写、网络请求等。但由于 Python 的全局解释器锁(GIL),多线程在 CPU 密集型任务上性能有限。
多进程 :适用于 CPU 密集型任务
,因为每个进程都有独立的 Python 解释器和内存空间,不受 GIL 限制,可以充分利用多核处理器。对于 CPU 密集型任务,多进程通常比多线程更快。
协程 :适用于高并发的 I/O 密集型任务
,协程允许在单线程中执行多个任务,避免了线程切换的开销,但需要合理地设计异步代码。协程可以实现非常高的并发性能,但在 CPU 密集型任务上性能可能较差。
并行计算库:如 concurrent.futures、joblib、dask
等可以提供简单的接口来管理并行任务,性能取决于底层的并行执行策略和硬件资源。
import threading
import time
# 定义任务1
def task1():
for i in range(5):
print("Task 1 - Step", i + 1)
time.sleep(1) # 模拟耗时操作
# 定义任务2
def task2():
for i in range(3):
print("Task 2 - Step", i + 1)
time.sleep(1) # 模拟耗时操作
if __name__ == "__main__":
# 创建两个线程
thread1 = threading.Thread(target=task1)
thread2 = threading.Thread(target=task2)
# 启动线程
thread1.start()
thread2.start()
# 等待线程完成
thread1.join()
thread2.join()
print("All tasks are completed.")
"""
Task 1 - Step 1
Task 2 - Step 1
Task 2 - Step 2
Task 1 - Step 2
Task 1 - Step 3
Task 2 - Step 3
Task 1 - Step 4
Task 1 - Step 5
All tasks are completed.
"""
import multiprocessing
import time
# 定义任务1
def task1():
for i in range(5):
print("Task 1 - Step", i + 1)
time.sleep(1) # 模拟耗时操作
# 定义任务2
def task2():
for i in range(3):
print("Task 2 - Step", i + 1)
time.sleep(1) # 模拟耗时操作
if __name__ == "__main__":
# 创建两个进程
process1 = multiprocessing.Process(target=task1)
process2 = multiprocessing.Process(target=task2)
# 启动进程
process1.start()
process2.start()
# 等待进程完成
process1.join()
process2.join()
print("All tasks are completed.")
"""
Task 1 - Step 1
Task 2 - Step 1
Task 2 - Step 2
Task 1 - Step 2
Task 2 - Step 3
Task 1 - Step 3
Task 1 - Step 4
Task 1 - Step 5
All tasks are completed.
"""
import asyncio
# 定义任务1
async def task1():
for i in range(5):
print("Task 1 - Step", i + 1)
await asyncio.sleep(1) # 模拟异步操作
# 定义任务2
async def task2():
for i in range(3):
print("Task 2 - Step", i + 1)
await asyncio.sleep(1) # 模拟异步操作
async def main():
# 并行执行 task1 和 task2
await asyncio.gather(task1(), task2())
if __name__ == "__main__":
asyncio.run(main())
"""
Task 1 - Step 1
Task 2 - Step 1
Task 1 - Step 2
Task 2 - Step 2
Task 1 - Step 3
Task 2 - Step 3
Task 1 - Step 4
Task 1 - Step 5
"""
import concurrent.futures
# 定义任务1
def task1():
for i in range(5):
print("Task 1 - Step", i + 1)
# 定义任务2
def task2():
for i in range(3):
print("Task 2 - Step", i + 1)
if __name__ == "__main__":
# 使用 ThreadPoolExecutor 创建线程池
with concurrent.futures.ThreadPoolExecutor() as executor:
# 提交任务1和任务2给线程池
future1 = executor.submit(task1)
future2 = executor.submit(task2)
# 获取任务1和任务2的结果
result1 = future1.result()
result2 = future2.result()
"""
Task 1 - Step 1
Task 1 - Step 2
Task 1 - Step 3
Task 1 - Step 4
Task 1 - Step 5
Task 2 - Step 1
Task 2 - Step 2
Task 2 - Step 3
"""
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。