赞
踩
torch.device('cuda:0')
来设置需要使用的 GPU 设备torch.cuda.device(1)
上下文管理器更改设备的默认值tensor.cuda(cuda2)
、tensor.to(device=cuda2)
、torch.tensor([1., 2.], device=cuda2)
等来将 Tensor 放到 GPU 上去执行CUDA_VISIBLE_DEVICES=0,1 python train.py
或在代码中指定 os.environ['CUDA_VISIBLE_DEVICES'] = '0,1'
cuda = torch.device('cuda') # Default CUDA device cuda0 = torch.device('cuda:0') cuda2 = torch.device('cuda:2') # GPU 2 (these are 0-indexed) x = torch.tensor([1., 2.], device=cuda0) # x.device is device(type='cuda', index=0) y = torch.tensor([1., 2.]).cuda() # y.device is device(type='cuda', index=0) with torch.cuda.device(1): # allocates a tensor on GPU 1 a = torch.tensor([1., 2.], device=cuda) # transfers a tensor from CPU to GPU 1 b = torch.tensor([1., 2.]).cuda() # a.device and b.device are device(type='cuda', index=1) # You can also use ``Tensor.to`` to transfer a tensor: b2 = torch.tensor([1., 2.]).to(device=cuda) # b.device and b2.device are device(type='cuda', index=1) c = a + b # c.device is device(type='cuda', index=1) z = x + y # z.device is device(type='cuda', index=0) # even within a context, you can specify the device # (or give a GPU index to the .cuda call) d = torch.randn(2, device=cuda2) e = torch.randn(2).to(cuda2) f = torch.randn(2).cuda(cuda2) # d.device, e.device, and f.device are all device(type='cuda', index=2)
tensor.to(*args, **kwargs)
module.to(*args, **kwargs)
x = torch.ones((3, 3))
x = x.to(torch.float64)
x = torch.ones((3, 3))
x = x.to("cuda") # 张量不执行 inplace,需要执行重新赋值使用,此 x 和 上面的 原始 x id 不同,一个在 CPU 上,一个在 GPU 上
linear = nn.Linear(2, 2)
linear.to(torch.double)
gpu1 = torch.device("cuda") # 模型执行 inplace,无需重新赋值使用
linear.to(gpu1)
torch.cuda.device_count()
:计算当前可见可用 GPU 数torch.cuda.get_device_name()
:获取 GPU 名称torch.cuda.set_device()
:设置主 GPU 为哪一个物理 GPU,推荐使用: os.environ.setdefault("CUDA_VISIBLE_DEVICES", "2, 3")
torch.cuda.manual_seed()
:为当前 GPU 设置随机种子torch.cuda.manual_seed_all()
:为所有可见可用 GPU 设置随机种子每一个 device 上会加载一份模型,然后把数据分发到每个 device 并行进行计算,加快训练速度
。分页内存
传输到主机上的固定内存
,参阅此 blog: 有关分页和固定的内存更多信息固定内存
传输到 GPUnum_workers > 0
设置)从磁盘加载数据以及将多页数据从可分页内存到固定内存的能力(通过 pin_memory = True
设置)。一般的,对于大批量的数据,若仅有一个线程用于加载数据,则数据加载时间占主导地位,这意味着无论我们如何加快数据处理速度,性能都会受到数据加载时间的限制。现在,设置 num_workers = 16
以及 pin_memory = True
,可以使用多个进程从磁盘读取不重叠的数据,并启动生产者-消费者线程以将这些进程读取的数据从可分页的内存转移到固定的内存
send
和 recv
函数实现,也可以用对应的即时版本,isend
和 irevc
进行实现collective operations
,也就是群体操作,支持同步和异步的方式;目前主要支持: Broadcast 广播、Scatter 散射、Gather 聚集、Reduce 规约、All Gather 全聚集、All Reduce 全规约
All Gather
全聚集:每个服务器都做 Gather 操作,最终每个服务器都获得了全部数据的 Gather 结果All Reduce
全规约:每个服务器都做 Reduce 操作,最终每个服务器都获得了全部数据的 Reduce 结果Reduce Scatter
规约散射:先在一台服务器上做 Reduce 操作,再将结果 Scatter 至其它服务器
# 单 GPU 进行训练:将数据、标签以及模型搬运到相应的 GPU 上即可
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# 将数据、标签搬运到相应的 GPU 上
inputs = inputs.to(device)
labels = labels.to(device)
# 将模型搬运到相应的 GPU 上
model = ResNet()
model.to(device)
DataParallel
:单进程多线程,不支持多机
loss
计算只会在 output_device
上进行,没法并行,因此会导致负载不均衡的问题master_gpu
:分发数据 scatter
(不包含 label)、loss
计算(每个设备上的 loss 单独计算,然后分发到各个 GPU 上进行反向传播)、各层梯度汇总(reduce
)求均值后进行参数更新 ,然后将参数分发到各个 GPU# 包装模型,实现分发并行机制 torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0) >>> module: 需要包装分发的模型 >>> device_ids : 可分发的 gpu,默认分发到所有可见可用 gpu,一般不用设置;如 device_ids=[0,1,2] >>> output_device: 结果输出到哪个设备上,默认就是在第一块卡上,因此第一块卡的显存会占用的比其他卡要更多一些 # 另一种实现方式:torch.nn.functional torch.nn.parallel.data_parallel(module, inputs, device_ids=None, output_device=None, dim=0, module_kwargs=None) # 1、Get multiple GPU device for training. n_gpu = torch.cuda.device_count() device = torch.device('cuda:0' if n_gpu > 0 else 'cpu') device_ids = list(range(n_gpu)) # 2、将数据、标签搬运到 GPU 上 inputs = inputs.to(device) labels = labels.to(device) # 3、将模型搬运到主 GPU 上 model = ResNet() model.to(device) # copy model to multi-GPU,并将数据分发到所有可见 GPU 中,每个 GPU 中的数据为 batch_size / n if n_gpu > 1: model = torch.nn.DataParallel(model, device_ids=device_ids) # 声明 DataParallel 后,模型将被拷贝到 个 GPU 上 # 同时,在前向传播时,一个 batch_size 被平均拆分成 n_gpu 份并输入到各个 GPU 的模型中 # 在前向传播后,每个 GPU 上的输出结果都会放到默认 GPU 中,并在此 GPU 上计算损失和反向传播 # 4、save model with multi-GPU # 多 GPU 并行计算时,模型被 DataParallel 包装,所有 module 都增加一个属性 module. 所以保存模型时要注意 if isinstance(model, torch.nn.DataParallel): model_state_dict = model.module.state_dict() else: model_state_dict = model.state_dict() torch.save(model_state_dict, "model.pth") # 5、单机多卡训练命令 CUDA_VISIBLE_DEVICES=2,3 python single-machine-and-multi-GPU-DataParallel.py # 6、数据并行输出示例,分发到 2 个 GPU 上 batch size in forward: 8 batch size in forward: 8 model outputs.size: torch.Size ( [16, 3] ) CUDA_VISIBLE_DEVICES: 2,3 device_count :2
DDP 与 DP 的区别:
DDP
不再有主 GPU
,每个 GPU 执行相同的任务(负载更均衡),前向传播、loss 计算、反向传播、梯度ring-all-reduce
、模型参数更新DDP 每个 GPU 获得的数据量为
batch_size
,而 DP 每个 GPU 获得的数据量为batch_size // n_gpus
DDP 是
多进程
的,而 DP 是单进程
的,速度要快很多DDP 采用 分布式数据采样器 确保加载的数据在各个进程之间不重叠
DistributedDataParallel,支持 all-reduce,broadcast,send 和 receive 等等。通过
MPI
实现 CPU 通信,通过NCCL
实现 GPU 通信。可以用于单机多卡也可用于多机多卡加速及调优手段:
- 数据读取方式:dataloader 里面读取数据采用多进程(
num_workers=ncpu_cores, eg:32
、pin_memory=True
)快很多- 数据复制方式:data、label.to(device, non_blocking=True)
- 尺寸固定且存在卷积,可尝试设置
torch.backends.cudnn.benchmark = True
- BN 参数也在多卡间进行同步:
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
DistributedDataParallel
介绍# 包装模型,实现分发并行机制
torch.nn.parallel.DistributedDataParallel(module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True, process_group=None, bucket_cap_mb=25, find_unused_parameters=False, check_reduction=False)
>>> module: 需要包装分发的模型
>>> device_ids : 可分发的 gpu,默认分发到所有可见可用 gpu,一般不用设置;如 device_ids=[0,1,2]
>>> output_device: 结果输出到哪个设备上,默认就是在第一块卡上,因此第一块卡的显存会占用的比其他卡要更多一些
torch.distributed.launch
介绍torch.distributed.launch
:模块参数介绍python -m torch.distributed.launch --help >>> 软件包位置:/miniconda3/lib/python3.8/site-packages/torch/distributed/launch.py >>> 注意事项: >>>>>> torch.distributed.launch 在未来版本将被舍弃,建议使用 torchrun. == torch.distributed.run >>>>>> --use_env 参数在 torchrun. 变为默认项,而在 torch.distributed.launch 中需要手动指定 >>>>>> 之前通过命令行传递的 --local_rank 参数可以通过 os.environ['LOCAL_RANK'] 获取 # 使用说明:单机多卡只需指定 worker 数量即可 # python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE # YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other # arguments of your training script) # CUDA_VISIBLE_DEVICES=0,1 python -m torch.distributed.launch --nproc_per_node=2 train.py usage: launch.py [-h] [--nnodes NNODES] [--nproc_per_node NPROC_PER_NODE] [--rdzv_backend RDZV_BACKEND] [--rdzv_endpoint RDZV_ENDPOINT] [--rdzv_id RDZV_ID] [--rdzv_conf RDZV_CONF] [--standalone] [--max_restarts MAX_RESTARTS] [--monitor_interval MONITOR_INTERVAL] [--start_method {spawn,fork,forkserver}] [--role ROLE] [-m] [--no_python] [--run_path] [--log_dir LOG_DIR] [-r REDIRECTS] [-t TEE] [--node_rank NODE_RANK] [--master_addr MASTER_ADDR] [--master_port MASTER_PORT] [--use_env] training_script ... # 参数解释 positional arguments: training_script Full path to the (single GPU) training program/script to be launched in parallel, followed by all the arguments for the training script. training_script_args optional arguments: -h,--help: show this help message and exit -m, --module: Change each process to interpret the launch script as a Python module, executing with the same behavior as 'python -m'. --nnodes: Number of nodes, 节点数量,一个节点对应一个主机 --nproc_per_node: Number of workers per node; supported values: [auto, cpu, gpu, int] >>> 一个节点中显卡的数量,一般 1 个进程使用 1 个 GPU,故通常也表述为一个节点运行的进程数量 --node_rank: Rank of the node for multi-node distributed training,节点的序号,从 0 开始 --master_addr: Address of the master node (rank 0). It should be either the IP address or the hostname of rank 0. For single node multi-proc training the --master_addr can simply be 127.0.0.1; IPv6 should have the pattern `[0:0:0:0:0:0:0:1]`. >>> master 节点的 IP 地址,也就是 rank=0 对应的主机地址;设置该参数目的是为了让其他节点知道 0 >>> 号节点的位置,这样就可以将自己训练的参数传递过去处理 --master_port: Port on the master node (rank 0) to be used for communication during distributed training. --use_env: Use environment variable to pass 'local rank'. For legacy reasons, the default value is False. If set to True, the script will not pass --local_rank as argument, and will instead set LOCAL_RANK. >>> 使用 use_env 后,pytorch 会把当前进程所使用的 local_rank 放到环境变量 LOCAL_RANK 中, >>> 而不会放在 args.local_rank 中 --rdzv_backend: Rendezvous backend. --rdzv_endpoint: Rendezvous backend endpoint; usually in form <host>:<port>. --rdzv_id: User-defined group id. --rdzv_conf: Additional rendezvous configuration(<key1>=<value1>,<key2>=<value2>,...). --standalone: Start a local standalone rendezvous backend that is represented by a C10d TCP store on port 29400. Useful when launching single-node, multi-worker job. If specified --rdzv_backend, --rdzv_endpoint, --rdzv_id are auto-assigned; any explicitly set values are ignored. - -max_restarts: Maximum number of worker group restarts before failing. --monitor_interval: Interval, in seconds, to monitor the state of workers. --start_method {spawn,fork,forkserver}: Multiprocessing start method to use when creating workers. --role: User-defined role for the workers. --no_python: Skip prepending the training script with 'python' - just execute it directly. Useful when the script is not a Python script. --run_path: Run the training script with runpy.run_path in the same interpreter. Script must be provided as an abs path (e.g. /abs/path/script.py). Takes precedence over --no_python. --log_dir: Base directory to use for log files (e.g. /var/log/torch/elastic). The same directory is re-used for multiple runs (a unique job-level sub-directory is created with rdzv_id as the prefix). -r, --redirects: Redirect std streams into a log file in the log directory (e.g. [-r 3] redirects both stdout+stderr for all workers, [-r 0:1,1:2] redirects stdout f or local rank 0 and stderr for local rank 1). -t, --tee: Tee std streams into a log file and also to console (see --redirects for format).
torch.distributed.launch
:运行后自动设置的环境变量
MASTER_ADDR、MASTER_PORT、RANK、LOCAL_RANK 和 WORLD_RANK
)到环境变量中:
RANK
:使用 os.environ["RANK"]
获取进程的序号,一般是 1 个 gpu 对应一个进程。它是一个全局的序号,从 0 开始,最大值为所有 GPU 的数量减 1LOCAL_RANK
:使用 os.environ["LOCAL_RANK"]
获取每个进程在所在主机中的序号。从 0 开始,最大值为当前进程所在主机的 GPU 的数量减 1WORLD_SIZE
:使用 os.environ["WORLD_SIZE"]
获取当前启动的所有的进程的数量(所有机器的进程总和)nnodes、node_rank 与 nproc_per_node
: nnodes
是指物理节点数量(一个 node
即一台机器,内部有多个 gpu
),node_rank
是物理节点的序号;nproc_per_node
是指每个物理节点上面进程的数量[0, 7]
;每台机器上的 LOCAL_RANK 的取值为 [0, 3]
;WORLD_SIZE 的值为 8
# 1、获取环境变量脚本 train.py 内容如下 import os import time import torch.distributed as dist print("before running dist.init_process_group()") MASTER_ADDR = os.environ["MASTER_ADDR"] MASTER_PORT = os.environ["MASTER_PORT"] LOCAL_RANK = os.environ["LOCAL_RANK"] RANK = os.environ["RANK"] WORLD_SIZE = os.environ["WORLD_SIZE"] print("MASTER_ADDR: {}\tMASTER_PORT: {}".format(MASTER_ADDR, MASTER_PORT)) print("LOCAL_RANK: {}\tRANK: {}\tWORLD_SIZE: {}".format(LOCAL_RANK, RANK, WORLD_SIZE)) dist.init_process_group('nccl') print("after running dist.init_process_group()") time.sleep(60) # Sleep for a while to avoid exceptions that occur when some processes end too quickly. dist.destroy_process_group() # 2、单机多卡获取环境变量示例:只需指定 worker 数量即可 CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch --nproc_per_node=4 train.py # 2.1、单机多卡,输出如下: ***************************************** Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. ***************************************** before running dist.init_process_group() MASTER_ADDR: 127.0.0.1 MASTER_PORT: 29500 LOCAL_RANK: 0 RANK: 0 WORLD_SIZE: 4 before running dist.init_process_group() MASTER_ADDR: 127.0.0.1 MASTER_PORT: 29500 LOCAL_RANK: 2 RANK: 2 WORLD_SIZE: 4 before running dist.init_process_group() MASTER_ADDR: 127.0.0.1 MASTER_PORT: 29500 LOCAL_RANK: 3 RANK: 3 WORLD_SIZE: 4 before running dist.init_process_group() MASTER_ADDR: 127.0.0.1 MASTER_PORT: 29500 LOCAL_RANK: 1 RANK: 1 WORLD_SIZE: 4 after running dist.init_process_group() after running dist.init_process_group() after running dist.init_process_group() # 3、多机多卡获取环境变量示例:需要在不同设备上执行命令 # 3.1 多机多卡:机器 0 上执行如下命令 python -m torch.distributed.launch --nproc_per_node 4 --nnodes 2 --node_rank 0 --master_addr="192.168.1.105" --master_port=12345 train.py # 3.1 多机多卡,机器 0 输出如下: ***************************************** Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. ***************************************** before running dist.init_process_group() MASTER_ADDR: 192.168.1.105 MASTER_PORT: 12345 LOCAL_RANK: 0 RANK: 0 WORLD_SIZE: 8 before running dist.init_process_group() MASTER_ADDR: 192.168.1.105 MASTER_PORT: 12345 LOCAL_RANK: 1 RANK: 1 WORLD_SIZE: 8 before running dist.init_process_group() MASTER_ADDR: 192.168.1.105 MASTER_PORT: 12345 LOCAL_RANK: 3 RANK: 3 WORLD_SIZE: 8 before running dist.init_process_group() MASTER_ADDR: 192.168.1.105 MASTER_PORT: 12345 LOCAL_RANK: 2 RANK: 2 WORLD_SIZE: 8 after running dist.init_process_group() after running dist.init_process_group() after running dist.init_process_group() after running dist.init_process_group() # 3.2、多机多卡:机器 1 上执行如下命令(和机器 0 上的区别是节点序号改变了) python -m torch.distributed.launch --nproc_per_node 4 --nnodes 2 --node_rank 1 --master_addr="192.168.1.105" --master_port=12345 train.py # 3.2、多机多卡:机器 1 输出如下: ***************************************** Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. ***************************************** before running dist.init_process_group() MASTER_ADDR: 192.168.1.105 MASTER_PORT: 12345 LOCAL_RANK: 0 RANK: 4 WORLD_SIZE: 8 before running dist.init_process_group() MASTER_ADDR: 192.168.1.105 MASTER_PORT: 12345 LOCAL_RANK: 1 RANK: 5 WORLD_SIZE: 8 before running dist.init_process_group() MASTER_ADDR: 192.168.1.105 MASTER_PORT: 12345 LOCAL_RANK: 3 RANK: 7 WORLD_SIZE: 8 before running dist.init_process_group() MASTER_ADDR: 192.168.1.105 MASTER_PORT: 12345 LOCAL_RANK: 2 RANK: 6 WORLD_SIZE: 8 after running dist.init_process_group() after running dist.init_process_group() after running dist.init_process_group() after running dist.init_process_group()
torch.distributed.run
介绍:最新多机多卡使用此方式torchrun
provides a superset of the functionality as torch.distributed.launch
with the following additional functionalities:
RANK
and WORLD_SIZE
are assigned automatically,因为 --use_env
参数在 torchrun
变为默认项torch.distributed.launch
命令行代码需要修改的地方# 0、将 python -m torch.distributed.launch 换成 torchrun 即可
# 1、单机多卡:只需指定 worker 数量即可
CUDA_VISIBLE_DEVICES=0,1,2,3 torchrun --nproc_per_node=4 train.py
# 2、多机多卡:需要在不同设备上执行命令
# 2.1 多机多卡:机器 0 上执行如下命令
torchrun --nproc_per_node 4 --nnodes 2 --node_rank 0 --master_addr="192.168.1.105" --master_port=12345 train.py
# 2.2、多机多卡:机器 1 上执行如下命令(和机器 0 上的区别是节点序号改变了)
torchrun --nproc_per_node 4 --nnodes 2 --node_rank 1 --master_addr="192.168.1.105" --master_port=12345 train.py
torch.distributed.run
介绍/home/manzp/miniconda3/bin/torchrun /home/manzp/miniconda3/pkgs/pytorch-1.13.1-py3.8_cuda11.6_cudnn8.3.2_0/bin/torchrun python -m torch.distributed.run == torchrun usage: torchrun [-h] [--nnodes NNODES] [--nproc_per_node NPROC_PER_NODE] [--rdzv_backend RDZV_BACKEND] [--rdzv_endpoint RDZV_ENDPOINT] [--rdzv_id RDZV_ID] [--rdzv_conf RDZV_CONF] [--standalone] [--max_restarts MAX_RESTARTS] [--monitor_interval MONITOR_INTERVAL] [--start_method {spawn,fork,forkserver}] [--role ROLE] [-m] [--no_python] [--run_path] [--log_dir LOG_DIR] [-r REDIRECTS] [-t TEE] [--node_rank NODE_RANK] [--master_addr MASTER_ADDR] [--master_port MASTER_PORT] training_script ... Torch Distributed Elastic Training Launcher positional arguments: training_script Full path to the (single GPU) training program/script to be launched in parallel, followed by all the arguments for the training script. training_script_args optional arguments: -h, --help show this help message and exit --nnodes NNODES Number of nodes, or the range of nodes in form <minimum_nodes>:<maximum_nodes>. --nproc_per_node NPROC_PER_NODE Number of workers per node; supported values: [auto, cpu, gpu, int]. --rdzv_backend RDZV_BACKEND Rendezvous backend. --rdzv_endpoint RDZV_ENDPOINT Rendezvous backend endpoint; usually in form <host>:<port>. --rdzv_id RDZV_ID User-defined group id. --rdzv_conf RDZV_CONF Additional rendezvous configuration (<key1>=<value1>,<key2>=<value2>,...). --standalone Start a local standalone rendezvous backend that is represented by a C10d TCP store on port 29400. Useful when launching single-node, multi-worker job. If specified --rdzv_backend, --rdzv_endpoint, --rdzv_id are auto-assigned; any explicitly set values are ignored. --max_restarts MAX_RESTARTS Maximum number of worker group restarts before failing. --monitor_interval MONITOR_INTERVAL Interval, in seconds, to monitor the state of workers. --start_method {spawn,fork,forkserver} Multiprocessing start method to use when creating workers. --role ROLE User-defined role for the workers. -m, --module Change each process to interpret the launch script as a Python module, executing with the same behavior as 'python -m'. --no_python Skip prepending the training script with 'python' - just execute it directly. Useful when the script is not a Python script. --run_path Run the training script with runpy.run_path in the same interpreter. Script must be provided as an abs path (e.g. /abs/path/script.py). Takes precedence over --no_python. --log_dir LOG_DIR Base directory to use for log files (e.g. /var/log/torch/elastic). The same directory is re-used for multiple runs (a unique job-level sub-directory is created with rdzv_id as the prefix). -r REDIRECTS, --redirects REDIRECTS Redirect std streams into a log file in the log directory (e.g. [-r 3] redirects both stdout+stderr for all workers, [-r 0:1,1:2] redirects stdout for local rank 0 and stderr for local rank 1). -t TEE, --tee TEE Tee std streams into a log file and also to console (see --redirects for format). --node_rank NODE_RANK Rank of the node for multi-node distributed training. --master_addr MASTER_ADDR Address of the master node (rank 0). It should be either the IP address or the hostname of rank 0. For single node multi-proc training the --master_addr can simply be 127.0.0.1; IPv6 should have the pattern `[0:0:0:0:0:0:0:1]`. --master_port MASTER_PORT Port on the master node (rank 0) to be used for communication during distributed training.
# Initialize the distributed process group and distributed device # 检查 nccl 是否可用 torch.distributed.is_nccl_available() def setup_DDP(backend="nccl", verbose=False): """ We don't set ADDR and PORT in here, like: # os.environ['MASTER_ADDR'] = 'localhost' # os.environ['MASTER_PORT'] = '12355' Because program's ADDR and PORT can be given automatically at startup. E.g. You can set ADDR and PORT by using: python -m torch.distributed.launch --master_addr="192.168.1.201" --master_port=23456 ... You don't set rank and world_size in dist.init_process_group() explicitly. :param backend: :param verbose: :return: """ rank = int(os.environ["RANK"]) local_rank = int(os.environ["LOCAL_RANK"]) world_size = int(os.environ["WORLD_SIZE"]) # initialize the distributed process group torch.distributed.init_process_group(backend=backend) # set distributed device device = torch.device("cuda:{}".format(local_rank)) if verbose: print("Using device: {}".format(device)) print(f"local rank: {local_rank}, global rank: {rank}, world size: {world_size}") return rank, local_rank, world_size, device # 进程等待及资源释放 def init_dist_process_group(init_method_str, gpu_id, total_gpu_id, total_gpu_num): torch.cuda.set_device(gpu_id) dist.init_process_group(backend='nccl', init_method=init_method_str, rank=total_gpu_id, world_size=total_gpu_num) dist.barrier() # 阻塞组中的所有进程,直到每个进程都进入该函数 def destroy_dist_process_group(): dist.destroy_process_group() # 撤销进程组,释放资源 # init_process_group 参数说明 def init_process_group( backend: Union[str, Backend], init_method: Optional[str] = None, timeout: timedelta = default_pg_timeout, world_size: int = -1, rank: int = -1, store: Optional[Store] = None, group_name: str = "", pg_options: Optional[Any] = None, ): """ Initializes the default distributed process group, and this will also initialize the distributed package. There are 2 main ways to initialize a process group: 1. Specify ``store``, ``rank``, and ``world_size`` explicitly. 2. Specify ``init_method`` (a URL string) which indicates where/how to discover peers. Optionally specify ``rank`` and ``world_size``, or encode all required parameters in the URL and omit them. If neither is specified, ``init_method`` is assumed to be "env://". Args: backend (str or Backend): The backend to use. Depending on build-time configurations, valid values include ``mpi``, ``gloo``, ``nccl``, and ``ucc``. This field should be given as a lowercase string (e.g., ``"gloo"``), which can also be accessed via :class:`Backend` attributes (e.g., ``Backend.GLOO``). If using multiple processes per machine with ``nccl`` backend, each process must have exclusive access to every GPU it uses, as sharing GPUs between processes can result in deadlocks. ``ucc`` backend is experimental. init_method (str, optional): URL specifying how to initialize the process group. Default is "env://" if no ``init_method`` or ``store`` is specified. Mutually exclusive with ``store``. world_size (int, optional): Number of processes participating in the job. Required if ``store`` is specified. rank (int, optional): Rank of the current process (it should be a number between 0 and ``world_size``-1). Required if ``store`` is specified. store(Store, optional): Key/value store accessible to all workers, used to exchange connection/address information. Mutually exclusive with ``init_method``. timeout (timedelta, optional): Timeout for operations executed against the process group. Default value equals 30 minutes. This is applicable for the ``gloo`` backend. For ``nccl``, this is applicable only if the environment variable ``NCCL_BLOCKING_WAIT`` or ``NCCL_ASYNC_ERROR_HANDLING`` is set to 1. When ``NCCL_BLOCKING_WAIT`` is set, this is the duration for which the process will block and wait for collectives to complete before throwing an exception. When ``NCCL_ASYNC_ERROR_HANDLING`` is set, this is the duration after which collectives will be aborted asynchronously and the process will crash. ``NCCL_BLOCKING_WAIT`` will provide errors to the user which can be caught and handled, but due to its blocking nature, it has a performance overhead. On the other hand, ``NCCL_ASYNC_ERROR_HANDLING`` has very little performance overhead, but crashes the process on errors. This is done since CUDA execution is async and it is no longer safe to continue executing user code since failed async NCCL operations might result in subsequent CUDA operations running on corrupted data. Only one of these two environment variables should be set. For ``ucc``, blocking wait is supported similar to NCCL. However, async error handling is done differently since with UCC we have progress thread and not watch-dog thread. group_name (str, optional, deprecated): Group name. pg_options (ProcessGroupOptions, optional): process group options specifying what additional options need to be passed in during the construction of specific process groups. As of now, the only options we support is ``ProcessGroupNCCL.Options`` for the ``nccl`` backend, ``is_high_priority_stream`` can be specified so that the nccl backend can pick up high priority cuda streams when there're compute kernels waiting. .. note:: To enable ``backend == Backend.MPI``, PyTorch needs to be built from source on a system that supports MPI. """
# initialize data loader using DistributedSampler batch_size = 64 train_sampler = DistributedSampler(training_data, shuffle=True) test_sampler = DistributedSampler(test_data, shuffle=False) # test 可以不用 DistributedSampler # 可以设置 pin_memory=true 和 num_workers 进行加速 train_dataloader = DataLoader(training_data, batch_size=batch_size, pin_memory=True, num_workers=16) test_dataloader = DataLoader(test_data, batch_size=batch_size, pin_memory=True, num_workers=16) def train(dataloader, model, loss_fn, optimizer, device): size = len(dataloader.dataset) model.train() for batch, (X, y) in enumerate(dataloader): # copy data from cpu to gpu,并设置非阻塞方式 X, y = X.to(device, non_blocking=True), y.to(device, non_blocking=True)
from torch.nn.parallel import DistributedDataParallel as DDP
# initialize model
model = NeuralNetwork().to(device) # copy model from cpu to gpu
# 是否使用 SyncBatchNorm
if args.syncBN:
# 使用 SyncBatchNorm 后训练会更耗时
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model).to(device)
# 使用 torch.nn.parallel.DistributedDataParallel 包裹定义的 model
# 并显示地指定模型使用的设备(device_ids)以及输出数据存在的设备(output_device)
# 它能帮助我们为不同 GPU 上求得的梯度进行 all reduce(即汇总不同 GPU 计算所得的梯度,并同步计算结果)
model = DDP(model, device_ids=[local_rank], output_device=local_rank)
for epoch_index in range(start_epoch, num_epoch):
num_batches = len(train_data_loader)
train_sampler.set_epoch(epoch_index) # 为了让每张卡在每个 epoch 得到的数据是随机的
test_sampler.set_epoch(epoch_index) # 为了让每张卡在每个 epoch 得到的数据是随机的
for batch_index, (data, label) in enumerate(train_data_loader):
step = num_batches*(epoch_index) + batch_index + 1
# Note:若未指定某个显卡,有可能保存出来的模型不能用;可能的原因:一些参数在 device0 上,另一些在 device1 上 # save model on rank 0,为了避免重复保存模型,我们仅在 master 主机上保存模型 if torch.distributed.get_rank() == 0: # save model with multi-GPU if isinstance(model, torch.nn.DataParallel) or isinstance(model, torch.nn.parallel.DistributedDataParallel): model_state_dict = model.module.state_dict() print("DistributedDataParallel Model") else: model_state_dict = model.state_dict() torch.save(model_state_dict, "model.pth") print("Saved PyTorch Model State to model.pth") # 模型加载(在不同的 device 上) state_dict_load = torch.load("model.pth", map_location=device) model.load_state_dict(state_dict_load) # 仅在 master 上进行打印 def print_only_rank0(log): if torch.distributed.get_rank() == 0: print(log)
# 1、单机多卡:只需指定 worker 数量即可
CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch --nproc_per_node=4 train.py
# 2、多机多卡:需要在不同设备上执行命令
# 2.1 多机多卡:机器 0 上执行如下命令
python -m torch.distributed.launch --nproc_per_node 4 --nnodes 2 --node_rank 0 --master_addr="192.168.1.105" --master_port=12345 train.py
# 2.2、多机多卡:机器 1 上执行如下命令(和机器 0 上的区别是节点序号改变了)
python -m torch.distributed.launch --nproc_per_node 4 --nnodes 2 --node_rank 1 --master_addr="192.168.1.105" --master_port=12345 train.py
def get_gpu_memory(): import os os.system('nvidia-smi -q -d Memory | grep -A4 GPU | grep Free > tmp.txt') memory_gpu = [int(x.split()[2]) for x in open('tmp.txt', 'r').readlines()] os.system('rm tmp.txt') return memory_gpu example: gpu_memory = get_gpu_memory() gpu_list = np.argsort(gpu_memory)[::-1] gpu_list_str = ','.join(map(str, gpu_list)) os.environ.setdefault("CUDA_VISIBLE_DEVICES", gpu_list_str) print("\ngpu free memory: {}".format(gpu_memory)) print("CUDA_VISIBLE_DEVICES :{}".format(os.environ["CUDA_VISIBLE_DEVICES"])) >>> gpu free memory: [10362, 10058, 9990, 9990] >>> CUDA_VISIBLE_DEVICES :0,1,3,2
map_location
设置为 CPUkey
,去掉 module
前缀# 多 GPU 并行计算时,模型被 DataParallel 包装,所有 module 都增加一个属性 module. ,因此需要通过 net.module.linear 调用或者更改 key from collections import OrderedDict # load 模型参数 path_state_dict = "./model_in_multi_gpu.pkl" state_dict_load = torch.load(path_state_dict, map_location="cpu") # 在没有 GPU 的设备上需要加上 map_location 参数在 CPU 上执行 print("state_dict_load:\n{}".format(state_dict_load)) # 更改模型的 key new_state_dict = OrderedDict() for k, v in state_dict_load.items(): namekey = k[7:] if k.startswith('module.') else k # remove module. new_state_dict[namekey] = v print("new_state_dict:\n{}".format(new_state_dict)) # load 新的模型字典 net.load_state_dict(new_state_dict) # 方法2: 在保存模型的时候,使用torch.save(model.module.state_dict(),'filename.pt') # 这样保存的模型的键名中就不含有module了。
1、PyTorch 多 GPU 训练 - 入门与实践 *****
2、Pytorch 分布式训练(DP/DDP)*****
3、DDP系列第三篇:实战与技巧 *****
4、training-larger-batches-practical-tips-on-1-gpu-multi-gpu-distributed-setups *****
5、GPU多卡并行训练总结(以pytorch为例)*****
6、pytorch(分布式)数据并行个人实践总结 ****
7、PyTorch分布式训练简明教程(2022更新版) ***
8、新生手册:PyTorch分布式训练 ****
9、https://github.com/hpcaitech/ColossalAI
10、https://github.com/microsoft/DeepSpeed
11、从零实现BERT、GPT及Diffusion类算法
12、https://github.com/pytorch/examples/tree/main/distributed/ddp-tutorial-series
13、https://wqw547243068.github.io/dist(分布式原理综述)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。