当前位置:   article > 正文

11、PyTorch 分布式训练_最简单的分布式pytorch

最简单的分布式pytorch

一、CUDA 语义:如何选择显卡进行计算

  • 可通过 torch.device('cuda:0') 来设置需要使用的 GPU 设备
  • 可通过 torch.cuda.device(1) 上下文管理器更改设备的默认值
  • 可通过 tensor.cuda(cuda2)tensor.to(device=cuda2)torch.tensor([1., 2.], device=cuda2) 等来将 Tensor 放到 GPU 上去执行
  • 指定程序运行在特定 GPU 上:CUDA_VISIBLE_DEVICES=0,1 python train.py 或在代码中指定 os.environ['CUDA_VISIBLE_DEVICES'] = '0,1'
cuda = torch.device('cuda')     # Default CUDA device
cuda0 = torch.device('cuda:0')
cuda2 = torch.device('cuda:2')  # GPU 2 (these are 0-indexed)

x = torch.tensor([1., 2.], device=cuda0)
# x.device is device(type='cuda', index=0)
y = torch.tensor([1., 2.]).cuda()
# y.device is device(type='cuda', index=0)

with torch.cuda.device(1):
    # allocates a tensor on GPU 1
    a = torch.tensor([1., 2.], device=cuda)

    # transfers a tensor from CPU to GPU 1
    b = torch.tensor([1., 2.]).cuda()
    # a.device and b.device are device(type='cuda', index=1)

    # You can also use ``Tensor.to`` to transfer a tensor:
    b2 = torch.tensor([1., 2.]).to(device=cuda)
    # b.device and b2.device are device(type='cuda', index=1)

    c = a + b
    # c.device is device(type='cuda', index=1)

    z = x + y
    # z.device is device(type='cuda', index=0)

    # even within a context, you can specify the device
    # (or give a GPU index to the .cuda call)
    d = torch.randn(2, device=cuda2)
    e = torch.randn(2).to(cuda2)
    f = torch.randn(2).cuda(cuda2)
    # d.device, e.device, and f.device are all device(type='cuda', index=2)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • to 函数的使用:转换数据类型/设备
    • tensor.to(*args, **kwargs)
    • module.to(*args, **kwargs)
    • 区别:张量不执行 inplace,需要执行重新赋值使用;模型执行 inplace,无需重新赋值使用
x = torch.ones((3, 3)) 
x = x.to(torch.float64)

x = torch.ones((3, 3)) 
x = x.to("cuda")  # 张量不执行 inplace,需要执行重新赋值使用,此 x 和 上面的 原始 x id 不同,一个在 CPU 上,一个在 GPU 上

linear = nn.Linear(2, 2) 
linear.to(torch.double)

gpu1 = torch.device("cuda") # 模型执行 inplace,无需重新赋值使用
linear.to(gpu1)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • torch.cuda 常用方法
    • torch.cuda.device_count():计算当前可见可用 GPU 数
    • torch.cuda.get_device_name():获取 GPU 名称
    • torch.cuda.set_device():设置主 GPU 为哪一个物理 GPU,推荐使用: os.environ.setdefault("CUDA_VISIBLE_DEVICES", "2, 3")
    • torch.cuda.manual_seed():为当前 GPU 设置随机种子
    • torch.cuda.manual_seed_all():为所有可见可用 GPU 设置随机种子

二、并行训练及集合式通信

2.1、并行训练简介

2.1.1 模型并行
  • 模型并行:主要应用于模型相比显存来说更大,一块 device 无法加载的场景,通过把模型切割为几个部分,分别加载到不同的 device 上。比如早期的 AlexNet,当时限于显卡,模型就是分别加载在两块显卡上的。
    在这里插入图片描述
2.1.2 数据并行
  • 这个是日常会应用的比较多的情况。每一个 device 上会加载一份模型,然后把数据分发到每个 device 并行进行计算,加快训练速度
    在这里插入图片描述
  • 从广义上讲,从磁盘读取输入数据开始,加载数据涉及四个步骤:
    • 将数据从磁盘加载到主机
    • 将数据从可分页内存传输到主机上的固定内存,参阅此 blog: 有关分页和固定的内存更多信息
    • 将数据从固定内存传输到 GPU
    • 在 GPU 上向前和向后传播

在这里插入图片描述

  • PyTorch 中的 Dataloader 提供使用多个进程(通过 num_workers > 0 设置)从磁盘加载数据以及将多页数据从可分页内存到固定内存的能力(通过 pin_memory = True 设置)。一般的,对于大批量的数据,若仅有一个线程用于加载数据,则数据加载时间占主导地位,这意味着无论我们如何加快数据处理速度,性能都会受到数据加载时间的限制。现在,设置 num_workers = 16 以及 pin_memory = True,可以使用多个进程从磁盘读取不重叠的数据,并启动生产者-消费者线程以将这些进程读取的数据从可分页的内存转移到固定的内存

在这里插入图片描述


2.2、点对点通信和集合式通信

2.2.1 点对点通信
  • 点对点通信,使用同一个 IP 和端口,数据从一个进程转移到另一个进程,通过 sendrecv 函数实现,也可以用对应的即时版本,isendirevc 进行实现
    在这里插入图片描述
2.2.2 集合式通信(Collective Communication)
  • 可参考:https://pytorch.org/tutorials/intermediate/dist_tuto.html
  • 每个 collective operations ,也就是群体操作,支持同步和异步的方式;目前主要支持: Broadcast 广播、Scatter 散射、Gather 聚集、Reduce 规约、All Gather 全聚集、All Reduce 全规约
    • All Gather 全聚集:每个服务器都做 Gather 操作,最终每个服务器都获得了全部数据的 Gather 结果
    • All Reduce 全规约:每个服务器都做 Reduce 操作,最终每个服务器都获得了全部数据的 Reduce 结果
    • Reduce Scatter 规约散射:先在一台服务器上做 Reduce 操作,再将结果 Scatter 至其它服务器

在这里插入图片描述在这里插入图片描述在这里插入图片描述

2.3、集合式通信算法:Parameter Server 和 RingAllReduce

在这里插入图片描述
在这里插入图片描述在这里插入图片描述

三、如何使用 GPU 进行训练

3.1、单 GPU 进行训练

# 单 GPU 进行训练:将数据、标签以及模型搬运到相应的 GPU 上即可
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# 将数据、标签搬运到相应的 GPU 上
inputs = inputs.to(device)
labels = labels.to(device)


# 将模型搬运到相应的 GPU 上
model = ResNet()
model.to(device)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

3.2、多 GPU 进行训练:DataParallel

  • 官网参考资料:https://pytorch.org/tutorials/beginner/blitz/data_parallel_tutorial.html
  • DataParallel单进程多线程,不支持多机
    • 会自动帮我们将数据切分 load 到相应 GPU,将模型复制到相应 GPU,进行正向传播计算梯度并汇总到 output_device 上汇总
    • 可以看出,nn.DataParallel 没有改变模型的输入输出,因此其他部分的代码不需要做任何更改,非常方便。但弊端是,后续的 loss 计算只会在 output_device 上进行,没法并行,因此会导致负载不均衡的问题
    • master_gpu:分发数据 scatter(不包含 label)、loss 计算(每个设备上的 loss 单独计算,然后分发到各个 GPU 上进行反向传播)、各层梯度汇总(reduce)求均值后进行参数更新 ,然后将参数分发到各个 GPU在这里插入图片描述在这里插入图片描述
# 包装模型,实现分发并行机制
torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0)
>>> module: 需要包装分发的模型
>>> device_ids : 可分发的 gpu,默认分发到所有可见可用 gpu,一般不用设置;如 device_ids=[0,1,2]
>>> output_device: 结果输出到哪个设备上,默认就是在第一块卡上,因此第一块卡的显存会占用的比其他卡要更多一些

# 另一种实现方式:torch.nn.functional
torch.nn.parallel.data_parallel(module, inputs, device_ids=None, output_device=None, dim=0, module_kwargs=None)

# 1、Get multiple GPU device for training.
n_gpu = torch.cuda.device_count()
device = torch.device('cuda:0' if n_gpu > 0 else 'cpu')
device_ids = list(range(n_gpu))

# 2、将数据、标签搬运到 GPU 上
inputs = inputs.to(device)
labels = labels.to(device)

# 3、将模型搬运到主 GPU 上
model = ResNet()
model.to(device) 
# copy model to multi-GPU,并将数据分发到所有可见 GPU 中,每个 GPU 中的数据为 batch_size / n
if n_gpu > 1:
	model = torch.nn.DataParallel(model, device_ids=device_ids)  
# 声明 DataParallel 后,模型将被拷贝到 个 GPU 上
# 同时,在前向传播时,一个 batch_size 被平均拆分成 n_gpu 份并输入到各个 GPU 的模型中
# 在前向传播后,每个 GPU 上的输出结果都会放到默认 GPU 中,并在此 GPU 上计算损失和反向传播


# 4、save model with multi-GPU
# 多 GPU 并行计算时,模型被 DataParallel 包装,所有 module 都增加一个属性 module. 所以保存模型时要注意
if isinstance(model, torch.nn.DataParallel):
    model_state_dict = model.module.state_dict()
else:
    model_state_dict = model.state_dict()
torch.save(model_state_dict, "model.pth")


# 5、单机多卡训练命令
CUDA_VISIBLE_DEVICES=2,3 python single-machine-and-multi-GPU-DataParallel.py

# 6、数据并行输出示例,分发到 2 个 GPU 上
batch size in forward: 8 
batch size in forward: 8 
model outputs.size: torch.Size ( [16, 3] ) 
CUDA_VISIBLE_DEVICES: 2,3 
device_count :2
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

3.3、多 GPU 进行训练:DistributedDataParallel

DDP 与 DP 的区别:

  • DDP 不再有主 GPU,每个 GPU 执行相同的任务(负载更均衡),前向传播、loss 计算、反向传播、梯度 ring-all-reduce、模型参数更新

  • DDP 每个 GPU 获得的数据量为 batch_size,而 DP 每个 GPU 获得的数据量为 batch_size // n_gpus

  • DDP 是多进程的,而 DP 是单进程的,速度要快很多

  • DDP 采用 分布式数据采样器 确保加载的数据在各个进程之间不重叠

  • DistributedDataParallel,支持 all-reduce,broadcast,send 和 receive 等等。通过 MPI 实现 CPU 通信,通过 NCCL 实现 GPU 通信。可以用于单机多卡也可用于多机多卡

  • 加速及调优手段:

    • 数据读取方式:dataloader 里面读取数据采用多进程(num_workers=ncpu_cores, eg:32pin_memory=True快很多
    • 数据复制方式:data、label.to(device, non_blocking=True)
    • 尺寸固定且存在卷积,可尝试设置 torch.backends.cudnn.benchmark = True
    • BN 参数也在多卡间进行同步:model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)

在这里插入图片描述在这里插入图片描述

3.3.1 DistributedDataParallel 介绍
  • 分布式数据并行(distributed data parallel),是通过多进程实现的
  • 从一开始就会启动多个进程(进程数等于 GPU 数),每个进程独享一个 GPU,每个进程都会独立地执行代码。这意味着每个进程都独立地初始化模型、训练,在每次迭代过程中会通过进程间通信共享梯度,整合梯度,然后独立地更新参数
    在这里插入图片描述
    在这里插入图片描述
# 包装模型,实现分发并行机制
torch.nn.parallel.DistributedDataParallel(module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True, process_group=None, bucket_cap_mb=25, find_unused_parameters=False, check_reduction=False)
>>> module: 需要包装分发的模型
>>> device_ids : 可分发的 gpu,默认分发到所有可见可用 gpu,一般不用设置;如 device_ids=[0,1,2]
>>> output_device: 结果输出到哪个设备上,默认就是在第一块卡上,因此第一块卡的显存会占用的比其他卡要更多一些
  • 1
  • 2
  • 3
  • 4
  • 5

3.3.2 torch.distributed.launch 介绍
  • torch.distributed.launch:模块参数介绍
python -m torch.distributed.launch --help
>>> 软件包位置:/miniconda3/lib/python3.8/site-packages/torch/distributed/launch.py
>>> 注意事项:
>>>>>> torch.distributed.launch 在未来版本将被舍弃,建议使用 torchrun. == torch.distributed.run
>>>>>> --use_env 参数在 torchrun. 变为默认项,而在 torch.distributed.launch 中需要手动指定
>>>>>> 之前通过命令行传递的 --local_rank 参数可以通过 os.environ['LOCAL_RANK'] 获取

# 使用说明:单机多卡只需指定 worker 数量即可
# python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
#           YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other
#           arguments of your training script)
# CUDA_VISIBLE_DEVICES=0,1 python -m torch.distributed.launch --nproc_per_node=2 train.py
usage: launch.py [-h] [--nnodes NNODES] [--nproc_per_node NPROC_PER_NODE]
                 [--rdzv_backend RDZV_BACKEND] [--rdzv_endpoint RDZV_ENDPOINT]
                 [--rdzv_id RDZV_ID] [--rdzv_conf RDZV_CONF] [--standalone]
                 [--max_restarts MAX_RESTARTS]
                 [--monitor_interval MONITOR_INTERVAL]
                 [--start_method {spawn,fork,forkserver}] [--role ROLE] [-m]
                 [--no_python] [--run_path] [--log_dir LOG_DIR] [-r REDIRECTS]
                 [-t TEE] [--node_rank NODE_RANK] [--master_addr MASTER_ADDR]
                 [--master_port MASTER_PORT] [--use_env]
                 training_script ...

# 参数解释
positional arguments:
  training_script       Full path to the (single GPU) training program/script
                        to be launched in parallel, followed by all the
                        arguments for the training script.
  training_script_args

optional arguments:
  -h,--help: show this help message and exit
  -m, --module: Change each process to interpret the launch script as a Python module, 
                executing with the same behavior as 'python -m'.
                
  --nnodes: Number of nodes, 节点数量,一个节点对应一个主机
  --nproc_per_node: Number of workers per node; supported values: [auto, cpu, gpu, int]
                    >>> 一个节点中显卡的数量,一般 1 个进程使用 1 个 GPU,故通常也表述为一个节点运行的进程数量
  --node_rank: Rank of the node for multi-node distributed training,节点的序号,从 0 开始
  --master_addr: Address of the master node (rank 0). It should be either the IP address or the
                 hostname of rank 0. For single node multi-proc training the --master_addr can 
                 simply be 127.0.0.1; IPv6 should have the pattern `[0:0:0:0:0:0:0:1]`.
                 >>> master 节点的 IP 地址,也就是 rank=0 对应的主机地址;设置该参数目的是为了让其他节点知道 0 
                 >>> 号节点的位置,这样就可以将自己训练的参数传递过去处理
  --master_port: Port on the master node (rank 0) to be used for communication during distributed 
                 training.
  --use_env: Use environment variable to pass 'local rank'. For legacy reasons, the default value
             is False. If set to True, the script will not pass --local_rank as argument, and will
             instead set LOCAL_RANK.
             >>> 使用 use_env 后,pytorch 会把当前进程所使用的 local_rank 放到环境变量 LOCAL_RANK 中,
             >>> 而不会放在 args.local_rank 中

  --rdzv_backend: Rendezvous backend.
  --rdzv_endpoint: Rendezvous backend endpoint; usually in form <host>:<port>.
  --rdzv_id: User-defined group id.
  --rdzv_conf: Additional rendezvous configuration(<key1>=<value1>,<key2>=<value2>,...).
  --standalone: Start a local standalone rendezvous backend that is represented by a C10d TCP 
                store on port 29400. Useful when launching single-node, multi-worker job. 
                If specified --rdzv_backend, --rdzv_endpoint, --rdzv_id are auto-assigned;
                any explicitly set values are ignored.
  -
  -max_restarts: Maximum number of worker group restarts before failing.
  --monitor_interval: Interval, in seconds, to monitor the state of workers.
  --start_method {spawn,fork,forkserver}: Multiprocessing start method to use when creating workers.
  --role: User-defined role for the workers.
  --no_python: Skip prepending the training script with 'python' - just execute it directly. 
               Useful when the script is not a Python script.
  --run_path: Run the training script with runpy.run_path in the same interpreter. 
              Script must be provided as an abs path (e.g. /abs/path/script.py). 
              Takes precedence over --no_python.
  --log_dir: Base directory to use for log files (e.g. /var/log/torch/elastic). 
             The same directory is re-used for multiple runs (a unique job-level 
             sub-directory is created with rdzv_id as the prefix).
  -r, --redirects: Redirect std streams into a log file in the log directory (e.g. [-r 3] 
                   redirects both stdout+stderr for all workers, [-r 0:1,1:2] redirects stdout f
                   or local rank 0 and stderr for local rank 1).
  -t, --tee: Tee std streams into a log file and also to console (see --redirects for format).
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • torch.distributed.launch:运行后自动设置的环境变量
    • 在使用 torch.distributed.launch 运行代码后,每个进程都将设置五个参数(MASTER_ADDR、MASTER_PORT、RANK、LOCAL_RANK 和 WORLD_RANK)到环境变量中:
      • RANK:使用 os.environ["RANK"] 获取进程的序号,一般是 1 个 gpu 对应一个进程。它是一个全局的序号,从 0 开始,最大值为所有 GPU 的数量减 1
      • LOCAL_RANK:使用 os.environ["LOCAL_RANK"] 获取每个进程在所在主机中的序号。从 0 开始,最大值为当前进程所在主机的 GPU 的数量减 1
      • WORLD_SIZE:使用 os.environ["WORLD_SIZE"] 获取当前启动的所有的进程的数量(所有机器的进程总和
      • nnodes、node_rank 与 nproc_per_nodennodes 是指物理节点数量(一个 node 即一台机器,内部有多个 gpu),node_rank 是物理节点的序号;nproc_per_node 是指每个物理节点上面进程的数量
    • 为了便于理解,我们举个例子来说明:假设我们使用了 2 台机器,每台机器 4 块 GPU;那么,RANK 取值为 [0, 7];每台机器上的 LOCAL_RANK 的取值为 [0, 3];WORLD_SIZE 的值为 8

在这里插入图片描述

  • 环境变量获取及单机多卡和多机多卡示例如下:
# 1、获取环境变量脚本 train.py 内容如下
import os
import time
import torch.distributed as dist

print("before running dist.init_process_group()")
MASTER_ADDR = os.environ["MASTER_ADDR"]
MASTER_PORT = os.environ["MASTER_PORT"]
LOCAL_RANK = os.environ["LOCAL_RANK"]
RANK = os.environ["RANK"]
WORLD_SIZE = os.environ["WORLD_SIZE"]

print("MASTER_ADDR: {}\tMASTER_PORT: {}".format(MASTER_ADDR, MASTER_PORT))
print("LOCAL_RANK: {}\tRANK: {}\tWORLD_SIZE: {}".format(LOCAL_RANK, RANK, WORLD_SIZE))

dist.init_process_group('nccl')
print("after running dist.init_process_group()")
time.sleep(60)  # Sleep for a while to avoid exceptions that occur when some processes end too quickly.
dist.destroy_process_group()



# 2、单机多卡获取环境变量示例:只需指定 worker 数量即可
CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch --nproc_per_node=4 train.py
# 2.1、单机多卡,输出如下:
*****************************************
Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
*****************************************
before running dist.init_process_group()
MASTER_ADDR: 127.0.0.1	MASTER_PORT: 29500
LOCAL_RANK: 0	RANK: 0	WORLD_SIZE: 4
before running dist.init_process_group()
MASTER_ADDR: 127.0.0.1	MASTER_PORT: 29500
LOCAL_RANK: 2	RANK: 2	WORLD_SIZE: 4
before running dist.init_process_group()
MASTER_ADDR: 127.0.0.1	MASTER_PORT: 29500
LOCAL_RANK: 3	RANK: 3	WORLD_SIZE: 4
before running dist.init_process_group()
MASTER_ADDR: 127.0.0.1	MASTER_PORT: 29500
LOCAL_RANK: 1	RANK: 1	WORLD_SIZE: 4
after running dist.init_process_group()
after running dist.init_process_group()
after running dist.init_process_group()


# 3、多机多卡获取环境变量示例:需要在不同设备上执行命令
# 3.1 多机多卡:机器 0 上执行如下命令
python -m torch.distributed.launch --nproc_per_node 4 --nnodes 2 --node_rank 0 --master_addr="192.168.1.105" --master_port=12345 train.py

# 3.1 多机多卡,机器 0 输出如下:
*****************************************
Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
*****************************************
before running dist.init_process_group()
MASTER_ADDR: 192.168.1.105	MASTER_PORT: 12345
LOCAL_RANK: 0	RANK: 0	WORLD_SIZE: 8
before running dist.init_process_group()
MASTER_ADDR: 192.168.1.105	MASTER_PORT: 12345
LOCAL_RANK: 1	RANK: 1	WORLD_SIZE: 8
before running dist.init_process_group()
MASTER_ADDR: 192.168.1.105	MASTER_PORT: 12345
LOCAL_RANK: 3	RANK: 3	WORLD_SIZE: 8
before running dist.init_process_group()
MASTER_ADDR: 192.168.1.105	MASTER_PORT: 12345
LOCAL_RANK: 2	RANK: 2	WORLD_SIZE: 8
after running dist.init_process_group()
after running dist.init_process_group()
after running dist.init_process_group()
after running dist.init_process_group()


# 3.2、多机多卡:机器 1 上执行如下命令(和机器 0 上的区别是节点序号改变了)
python -m torch.distributed.launch --nproc_per_node 4 --nnodes 2 --node_rank 1 --master_addr="192.168.1.105" --master_port=12345 train.py


# 3.2、多机多卡:机器 1 输出如下:
*****************************************
Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
*****************************************
before running dist.init_process_group()
MASTER_ADDR: 192.168.1.105	MASTER_PORT: 12345
LOCAL_RANK: 0	RANK: 4	WORLD_SIZE: 8
before running dist.init_process_group()
MASTER_ADDR: 192.168.1.105	MASTER_PORT: 12345
LOCAL_RANK: 1	RANK: 5	WORLD_SIZE: 8
before running dist.init_process_group()
MASTER_ADDR: 192.168.1.105	MASTER_PORT: 12345
LOCAL_RANK: 3	RANK: 7	WORLD_SIZE: 8
before running dist.init_process_group()
MASTER_ADDR: 192.168.1.105	MASTER_PORT: 12345
LOCAL_RANK: 2	RANK: 6	WORLD_SIZE: 8
after running dist.init_process_group()
after running dist.init_process_group()
after running dist.init_process_group()
after running dist.init_process_group()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95

3.3.3 torch.distributed.run 介绍:最新多机多卡使用此方式
  • torchrun provides a superset of the functionality as torch.distributed.launchwith the following additional functionalities:
    • Worker failures are handled gracefully by restarting all workers.
    • Worker RANK and WORLD_SIZE are assigned automatically,因为 --use_env 参数在 torchrun 变为默认项
    • Number of nodes is allowed to change between minimum and maximum sizes (elasticity).
  • 相较于 torch.distributed.launch 命令行代码需要修改的地方
# 0、将 python -m torch.distributed.launch 换成 torchrun 即可

# 1、单机多卡:只需指定 worker 数量即可
CUDA_VISIBLE_DEVICES=0,1,2,3 torchrun --nproc_per_node=4 train.py

# 2、多机多卡:需要在不同设备上执行命令
# 2.1 多机多卡:机器 0 上执行如下命令
torchrun --nproc_per_node 4 --nnodes 2 --node_rank 0 --master_addr="192.168.1.105" --master_port=12345 train.py

# 2.2、多机多卡:机器 1 上执行如下命令(和机器 0 上的区别是节点序号改变了)
torchrun --nproc_per_node 4 --nnodes 2 --node_rank 1 --master_addr="192.168.1.105" --master_port=12345 train.py
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • torch.distributed.run 介绍
/home/manzp/miniconda3/bin/torchrun
/home/manzp/miniconda3/pkgs/pytorch-1.13.1-py3.8_cuda11.6_cudnn8.3.2_0/bin/torchrun
python -m torch.distributed.run == torchrun

usage: torchrun [-h] [--nnodes NNODES] [--nproc_per_node NPROC_PER_NODE] [--rdzv_backend RDZV_BACKEND] [--rdzv_endpoint RDZV_ENDPOINT] [--rdzv_id RDZV_ID] [--rdzv_conf RDZV_CONF] [--standalone]
                [--max_restarts MAX_RESTARTS] [--monitor_interval MONITOR_INTERVAL] [--start_method {spawn,fork,forkserver}] [--role ROLE] [-m] [--no_python] [--run_path] [--log_dir LOG_DIR] [-r REDIRECTS]
                [-t TEE] [--node_rank NODE_RANK] [--master_addr MASTER_ADDR] [--master_port MASTER_PORT]
                training_script ...

Torch Distributed Elastic Training Launcher

positional arguments:
  training_script       Full path to the (single GPU) training program/script to be launched in parallel, followed by all the arguments for the training script.
  training_script_args

optional arguments:
  -h, --help            show this help message and exit
  --nnodes NNODES       Number of nodes, or the range of nodes in form <minimum_nodes>:<maximum_nodes>.
  --nproc_per_node NPROC_PER_NODE
                        Number of workers per node; supported values: [auto, cpu, gpu, int].
  --rdzv_backend RDZV_BACKEND
                        Rendezvous backend.
  --rdzv_endpoint RDZV_ENDPOINT
                        Rendezvous backend endpoint; usually in form <host>:<port>.
  --rdzv_id RDZV_ID     User-defined group id.
  --rdzv_conf RDZV_CONF
                        Additional rendezvous configuration (<key1>=<value1>,<key2>=<value2>,...).
  --standalone          Start a local standalone rendezvous backend that is represented by a C10d TCP store on port 29400. Useful when launching single-node, multi-worker job. If specified --rdzv_backend,
                        --rdzv_endpoint, --rdzv_id are auto-assigned; any explicitly set values are ignored.
  --max_restarts MAX_RESTARTS
                        Maximum number of worker group restarts before failing.
  --monitor_interval MONITOR_INTERVAL
                        Interval, in seconds, to monitor the state of workers.
  --start_method {spawn,fork,forkserver}
                        Multiprocessing start method to use when creating workers.
  --role ROLE           User-defined role for the workers.
  -m, --module          Change each process to interpret the launch script as a Python module, executing with the same behavior as 'python -m'.
  --no_python           Skip prepending the training script with 'python' - just execute it directly. Useful when the script is not a Python script.
  --run_path            Run the training script with runpy.run_path in the same interpreter. Script must be provided as an abs path (e.g. /abs/path/script.py). Takes precedence over --no_python.
  --log_dir LOG_DIR     Base directory to use for log files (e.g. /var/log/torch/elastic). The same directory is re-used for multiple runs (a unique job-level sub-directory is created with rdzv_id as the
                        prefix).
  -r REDIRECTS, --redirects REDIRECTS
                        Redirect std streams into a log file in the log directory (e.g. [-r 3] redirects both stdout+stderr for all workers, [-r 0:1,1:2] redirects stdout for local rank 0 and stderr for local
                        rank 1).
  -t TEE, --tee TEE     Tee std streams into a log file and also to console (see --redirects for format).
  --node_rank NODE_RANK
                        Rank of the node for multi-node distributed training.
  --master_addr MASTER_ADDR
                        Address of the master node (rank 0). It should be either the IP address or the hostname of rank 0. For single node multi-proc training the --master_addr can simply be 127.0.0.1; IPv6
                        should have the pattern `[0:0:0:0:0:0:0:1]`.
  --master_port MASTER_PORT
                        Port on the master node (rank 0) to be used for communication during distributed training.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

3.3.4 分布式代码实践:单机单卡改成多机多卡
  • 修改 1:初始化分布式进程组和分布式设备
# Initialize the distributed process group and distributed device
# 检查 nccl 是否可用 torch.distributed.is_nccl_available()
def setup_DDP(backend="nccl", verbose=False):
    """
    We don't set ADDR and PORT in here, like:
        # os.environ['MASTER_ADDR'] = 'localhost'
        # os.environ['MASTER_PORT'] = '12355'
    Because program's ADDR and PORT can be given automatically at startup.
    E.g. You can set ADDR and PORT by using:
        python -m torch.distributed.launch --master_addr="192.168.1.201" --master_port=23456 ...

    You don't set rank and world_size in dist.init_process_group() explicitly.

    :param backend:
    :param verbose:
    :return:
    """
    rank = int(os.environ["RANK"])
    local_rank = int(os.environ["LOCAL_RANK"])
    world_size = int(os.environ["WORLD_SIZE"])
    
    # initialize the distributed process group
    torch.distributed.init_process_group(backend=backend)
    
    # set distributed device
    device = torch.device("cuda:{}".format(local_rank))
    
    if verbose:
        print("Using device: {}".format(device))
        print(f"local rank: {local_rank}, global rank: {rank}, world size: {world_size}")
    return rank, local_rank, world_size, device


# 进程等待及资源释放
def init_dist_process_group(init_method_str, gpu_id, total_gpu_id, total_gpu_num):
    torch.cuda.set_device(gpu_id)
    dist.init_process_group(backend='nccl', init_method=init_method_str, rank=total_gpu_id, world_size=total_gpu_num)
    dist.barrier()  # 阻塞组中的所有进程,直到每个进程都进入该函数
    
def destroy_dist_process_group():
    dist.destroy_process_group()  # 撤销进程组,释放资源


# init_process_group 参数说明
def init_process_group(
    backend: Union[str, Backend],
    init_method: Optional[str] = None,
    timeout: timedelta = default_pg_timeout,
    world_size: int = -1,
    rank: int = -1,
    store: Optional[Store] = None,
    group_name: str = "",
    pg_options: Optional[Any] = None,
):
    """
    Initializes the default distributed process group, and this will also
    initialize the distributed package.

    There are 2 main ways to initialize a process group:
        1. Specify ``store``, ``rank``, and ``world_size`` explicitly.
        2. Specify ``init_method`` (a URL string) which indicates where/how
           to discover peers. Optionally specify ``rank`` and ``world_size``,
           or encode all required parameters in the URL and omit them.

    If neither is specified, ``init_method`` is assumed to be "env://".


    Args:
        backend (str or Backend): The backend to use. Depending on
            build-time configurations, valid values include ``mpi``, ``gloo``,
            ``nccl``, and ``ucc``. This field should be given as a lowercase
            string (e.g., ``"gloo"``), which can also be accessed via
            :class:`Backend` attributes (e.g., ``Backend.GLOO``). If using
            multiple processes per machine with ``nccl`` backend, each process
            must have exclusive access to every GPU it uses, as sharing GPUs
            between processes can result in deadlocks. ``ucc`` backend is
            experimental.
        init_method (str, optional): URL specifying how to initialize the
                                     process group. Default is "env://" if no
                                     ``init_method`` or ``store`` is specified.
                                     Mutually exclusive with ``store``.
        world_size (int, optional): Number of processes participating in
                                    the job. Required if ``store`` is specified.
        rank (int, optional): Rank of the current process (it should be a
                              number between 0 and ``world_size``-1).
                              Required if ``store`` is specified.
        store(Store, optional): Key/value store accessible to all workers, used
                                to exchange connection/address information.
                                Mutually exclusive with ``init_method``.
        timeout (timedelta, optional): Timeout for operations executed against
            the process group. Default value equals 30 minutes.
            This is applicable for the ``gloo`` backend. For ``nccl``, this is
            applicable only if the environment variable ``NCCL_BLOCKING_WAIT``
            or ``NCCL_ASYNC_ERROR_HANDLING`` is set to 1. When
            ``NCCL_BLOCKING_WAIT`` is set, this is the duration for which the
            process will block and wait for collectives to complete before
            throwing an exception. When ``NCCL_ASYNC_ERROR_HANDLING`` is set,
            this is the duration after which collectives will be aborted
            asynchronously and the process will crash. ``NCCL_BLOCKING_WAIT``
            will provide errors to the user which can be caught and handled,
            but due to its blocking nature, it has a performance overhead. On
            the other hand, ``NCCL_ASYNC_ERROR_HANDLING`` has very little
            performance overhead, but crashes the process on errors. This is
            done since CUDA execution is async and it is no longer safe to
            continue executing user code since failed async NCCL operations
            might result in subsequent CUDA operations running on corrupted
            data. Only one of these two environment variables should be set.
            For ``ucc``, blocking wait is supported similar to NCCL. However,
            async error handling is done differently since with UCC we have
            progress thread and not watch-dog thread.
        group_name (str, optional, deprecated): Group name.
        pg_options (ProcessGroupOptions, optional): process group options
            specifying what additional options need to be passed in during
            the construction of specific process groups. As of now, the only
            options we support is ``ProcessGroupNCCL.Options`` for the ``nccl``
            backend, ``is_high_priority_stream`` can be specified so that
            the nccl backend can pick up high priority cuda streams when
            there're compute kernels waiting.

    .. note:: To enable ``backend == Backend.MPI``, PyTorch needs to be built from source
        on a system that supports MPI.

    """


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 修改 2:使用 DistributedSampler 初始化 DataLoader
    • batch_size:此 batch_size 即为每个进程(每个 GPU)所需的 batch_size,不是所有 GPU 所需数据之和
    • 初始化 DistributedSampler
    • 初始化 DataLoader:初始化 DataLoader 时, 应传入 sampler 参数,此时dataloader 里不用设置 shuffle
# initialize data loader using DistributedSampler
batch_size = 64 

train_sampler = DistributedSampler(training_data, shuffle=True)  
test_sampler = DistributedSampler(test_data, shuffle=False) # test 可以不用 DistributedSampler

# 可以设置 pin_memory=true 和 num_workers 进行加速
train_dataloader = DataLoader(training_data, batch_size=batch_size, pin_memory=True, num_workers=16)
test_dataloader = DataLoader(test_data, batch_size=batch_size, pin_memory=True, num_workers=16)

def train(dataloader, model, loss_fn, optimizer, device):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        # copy data from cpu to gpu,并设置非阻塞方式
        X, y = X.to(device, non_blocking=True), y.to(device, non_blocking=True) 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 修改 3:使用 DistributedDataParallel 初始化模型
from torch.nn.parallel import DistributedDataParallel as DDP

# initialize model
model = NeuralNetwork().to(device)  # copy model from cpu to gpu

# 是否使用 SyncBatchNorm
if args.syncBN:
    # 使用 SyncBatchNorm 后训练会更耗时
    model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model).to(device)

# 使用 torch.nn.parallel.DistributedDataParallel 包裹定义的 model
# 并显示地指定模型使用的设备(device_ids)以及输出数据存在的设备(output_device)
# 它能帮助我们为不同 GPU 上求得的梯度进行 all reduce(即汇总不同 GPU 计算所得的梯度,并同步计算结果)
model = DDP(model, device_ids=[local_rank], output_device=local_rank)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 修改 4:设置 sampler 的 epoch 参数
for epoch_index in range(start_epoch, num_epoch):
	  num_batches = len(train_data_loader)
	  train_sampler.set_epoch(epoch_index) # 为了让每张卡在每个 epoch 得到的数据是随机的
	  test_sampler.set_epoch(epoch_index) # 为了让每张卡在每个 epoch 得到的数据是随机的
    
    for batch_index, (data, label) in enumerate(train_data_loader):
    	step = num_batches*(epoch_index) + batch_index + 1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 修改 5:仅在 rank=0 保存模型或打印日志等
# Note:若未指定某个显卡,有可能保存出来的模型不能用;可能的原因:一些参数在 device0 上,另一些在 device1 上
# save model on rank 0,为了避免重复保存模型,我们仅在 master 主机上保存模型
if torch.distributed.get_rank() == 0:
    # save model with multi-GPU
    if isinstance(model, torch.nn.DataParallel) or isinstance(model, torch.nn.parallel.DistributedDataParallel):
        model_state_dict = model.module.state_dict()
        print("DistributedDataParallel Model")
    else:
        model_state_dict = model.state_dict()
    torch.save(model_state_dict, "model.pth")
    print("Saved PyTorch Model State to model.pth")


# 模型加载(在不同的 device 上)
state_dict_load = torch.load("model.pth", map_location=device) 
model.load_state_dict(state_dict_load)

# 仅在 master 上进行打印
def print_only_rank0(log):
    if torch.distributed.get_rank() == 0:
        print(log)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 修改 6:命令行设置
# 1、单机多卡:只需指定 worker 数量即可
CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch --nproc_per_node=4 train.py

# 2、多机多卡:需要在不同设备上执行命令
# 2.1 多机多卡:机器 0 上执行如下命令
python -m torch.distributed.launch --nproc_per_node 4 --nnodes 2 --node_rank 0 --master_addr="192.168.1.105" --master_port=12345 train.py

# 2.2、多机多卡:机器 1 上执行如下命令(和机器 0 上的区别是节点序号改变了)
python -m torch.distributed.launch --nproc_per_node 4 --nnodes 2 --node_rank 1 --master_addr="192.168.1.105" --master_port=12345 train.py
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

3.4、查询当前 GPU 内存剩余

def get_gpu_memory(): 
	import os 
	os.system('nvidia-smi -q -d Memory | grep -A4 GPU | grep Free > tmp.txt') 
	memory_gpu = [int(x.split()[2]) for x in open('tmp.txt', 'r').readlines()] 
	os.system('rm tmp.txt') 
	return memory_gpu

example: 
gpu_memory = get_gpu_memory() 
gpu_list = np.argsort(gpu_memory)[::-1] 
gpu_list_str = ','.join(map(str, gpu_list)) 
os.environ.setdefault("CUDA_VISIBLE_DEVICES", gpu_list_str)

print("\ngpu free memory: {}".format(gpu_memory)) 
print("CUDA_VISIBLE_DEVICES :{}".format(os.environ["CUDA_VISIBLE_DEVICES"]))
>>> gpu free memory: [10362, 10058, 9990, 9990] 
>>> CUDA_VISIBLE_DEVICES :0,1,3,2
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

3.5、GPU 训练后模型的加载

  • 在无 GPU 的设备上进行加载:需将 map_location 设置为 CPU
    在这里插入图片描述
  • 多 GPU 训练后的模型加载:需要修改参数中的 key,去掉 module 前缀
    在这里插入图片描述
# 多 GPU 并行计算时,模型被 DataParallel 包装,所有 module 都增加一个属性 module. ,因此需要通过 net.module.linear 调用或者更改 key
from collections import OrderedDict

# load 模型参数
path_state_dict = "./model_in_multi_gpu.pkl"
state_dict_load = torch.load(path_state_dict, map_location="cpu") # 在没有 GPU 的设备上需要加上 map_location 参数在 CPU 上执行 
print("state_dict_load:\n{}".format(state_dict_load))

# 更改模型的 key 
new_state_dict = OrderedDict()
for k, v in state_dict_load.items():
    namekey = k[7:] if k.startswith('module.') else k  # remove module.
    new_state_dict[namekey] = v
print("new_state_dict:\n{}".format(new_state_dict))

# load 新的模型字典
net.load_state_dict(new_state_dict)


# 方法2: 在保存模型的时候,使用torch.save(model.module.state_dict(),'filename.pt')
# 这样保存的模型的键名中就不含有module了。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

四、参考资料

1、PyTorch 多 GPU 训练 - 入门与实践 *****
2、Pytorch 分布式训练(DP/DDP)*****
3、DDP系列第三篇:实战与技巧 *****
4、training-larger-batches-practical-tips-on-1-gpu-multi-gpu-distributed-setups *****
5、GPU多卡并行训练总结(以pytorch为例)*****
6、pytorch(分布式)数据并行个人实践总结 ****
7、PyTorch分布式训练简明教程(2022更新版) ***
8、新生手册:PyTorch分布式训练 ****
9、https://github.com/hpcaitech/ColossalAI
10、https://github.com/microsoft/DeepSpeed
11、从零实现BERT、GPT及Diffusion类算法
12、https://github.com/pytorch/examples/tree/main/distributed/ddp-tutorial-series
13、https://wqw547243068.github.io/dist(分布式原理综述)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/1011772
推荐阅读
相关标签
  

闽ICP备14008679号