当前位置:   article > 正文

Pytorch 分布式训练DDP(torch.distributed)详解-原理-代码_pytorch ddp

pytorch ddp

0. 引言

  目前已经有很多分布式训练的方法,一直在用,这里详细学习总结一下各种分布式训练的原理,本文主要介绍DP以及DDP,后面会介绍流水线并行、张量并行以及Zero优化等,用于学习记录,错误请指出。

1. DP与DPP原理

  Pytorch老版本的数据并行DP已经被弃用,这里还是分别介绍一下两者的过程,下面的图需要仔细看一下,方面更好的理解。
在这里插入图片描述

1.1 DP

  DP采用多线程实现,速度很慢,只适合单机多卡,目前已经被弃用。这里作为对比还是说明一下整个流程。DP需要各个GPU之间的通信量很大,主卡性能和通信开销容易成为瓶颈。
在这里插入图片描述

  • 将模型权重从主卡(默认GPU:0)broadcast到所有GPU上,输入的bacth大小的数据会scatter到各个GPU上(每张卡上输入batch / 4,假设有四张卡)。
  • 之后独自在各张卡上执行前向过程,每张卡的输出结果会被gather到主卡上,然后在主卡上计算总的损失,总损失是一个标量,一般可以求平均值,再通过损失函数求导,得到损失的梯度,分发到各个GPU上。
  • 在每个GPU根据链式法则继续计算各个参数的梯度,再将所有设备上的梯度回传到主卡上进行梯度累加并求梯度均值。在主卡上通过平均梯度更新模型权重。最后将更新后的模型参数 broadcast 到其余 GPU 中进行下一轮的前向传播。

1.2 DDP

  DDP可适用于单机多卡和多机多卡的数据并行训练,本质是采用了多进程控制。通常是一个进程控制一个GPU,每一个GPU上都有一份相同的模型权重,且每个进程在训练时都有独自的优化器,执行自己的更新过程,DDP进程间通信只传递梯度,通信量较少。下面还是说明一下整个流程。
在这里插入图片描述

  • 首先将 主卡的模型参数broadcast到其他GPU上,然后每个 DDP 进程都会创建一个 local Reducer 来负责梯度同步。每个进程从磁盘加载 batch 数据,并将它们传递到其 GPU,这一点在下面外面会展开讲解。
  • 每个 GPU 都有自己的前向过程,分别得到相应的预测结果。在DDP中,损失函数的计算是在每个设备上独立进行的,每个设备计算的损失都是基于其处理的数据子集和相应的预测结果。
  • 根据损失函数可以计算出参数梯度,之后将梯度信息在各个 GPUs 间进行 All-Reduce,每个 GPU 都收到其他 GPU 的梯度进行梯度累加后求平均值,然后可以独自进行反向传播和参数更新。
  • 由于初始权重参数相同,梯度相同,优化器状态相同,所以每个GPU上的参数更新之后的模型权重还是相同。All-Reduce主要是使用Ring AllReduce,下面会展开介绍。

  AllReduce就是将各个GPU上的信息进行广播汇总,如下图,假设有4块GPU,每块GPU上的数据也对应被切成4份。AllReduce的最终目标,就是让每块GPU上的数据都变成箭头右边汇总的样子。一般使用Ring-AllReduce算法实现。Ring-ALLReduce则分两大步骤实现该目标:Reduce-Scatter和All-Gather。
  DDP进程间进行梯度通信是通过Ring AllReduce实现的,每个进程将梯度依次传递给下一个进程,之后再把从上一个进程拿到的梯度传递给下一个进程。循环 n 次 (进程数量) 之后,所有进程就可以得到全部的梯度了。
在这里插入图片描述

  (a)Reduce-Scatter

  定义网络拓扑关系,使得每个GPU只和其相邻的两块GPU通讯。每次发送对应位置的数据进行累加。每一次累加更新都形成一个拓扑环。经过3次之后,每个GPU上都有完整的信息。
在这里插入图片描述

  (b)All-Gather

  按照“相邻GPU对应位置进行通讯”的原则,但对应位置数据不再做相加,而是直接替换。All-Gather以红色块作为起点。
在这里插入图片描述

2. 数据集分配

  在数据集采样时,给 dataloader 加一DistributedSampler 就可以给不同进程分配数据集的不重叠、不交叉的部分,从而无缝对接 DDP 模式。下面以两个GPU为例讲解。
  先将数据集打乱,然后将数据集数目/GPU数目,并向上取整,不够就用前面的数据进行补充,然后按照顺序分配给不同的GPU。在同一个epoch中,每个进程使用相同的种子数,保证数据shuffle一样,不同的epoch中将种子数设置为seed+epoch,保证每个GPU每次epoch使用的数据都不一样。
在这里插入图片描述

3. 具体代码

3.1 DP

  DP代码比较简单。

torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0)
参数:
 - module:需要多卡数据并行的模型;
 - device_ids:可用的GPU号
 - output_device:模型输出结果存放的卡号,默认为0
 - dim:从哪一维度切分一个 batch 的数据,默认为 0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
#完整代码
import torch
gpus = [0, 1, 2, 3]
torch.cuda.set_device('cuda:{}'.format(gpus[0]))

train_dataset = ...
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=...)

model = ...
model = nn.DataParallel(model.to(device), device_ids=gpus, output_device=gpus[0])

optimizer = optim.SGD(model.parameters())

for epoch in range(100):
   for batch_idx, (data, target) in enumerate(train_loader):
      images = images.cuda(non_blocking=True)
      target = target.cuda(non_blocking=True)
      ...
      output = model(images)
      loss = criterion(output, target)
      ...
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

3.2 DDP

  DDP数据并行有多种启动方式,这里先对几个重要的参数加以解释。


 - group: 进程组,一般保持默认为一个;
 - rank:全局进程id;
 - local_rank: 某个几点上的进程id;
 - world_size:所有进程的数量;

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

  一般一个进程对应一个GPU,例如:两台机器或者节点,每台机器上有8张显卡,则world_size=16, rank=[0,1…15],每台机器上的local_rank=[0,1…7].

3.2.1 torch.distributed.launch

  这里介绍torch.distributed.launch通过命令行执行分布式训练。每一个进程执行的是同一份代码。其他方式不在介绍

python3 -m torch.distributed.launch --配置 train.py --args参数

 - nnodes: 使用的机器数量,单机的话,就默认是1了
 - nproc_per_node: 单机的进程数,即一个机器上使用多少显卡
 - master_addr/port: 使用的主进程rank0的地址和端口
 - node_rank: 当前的进程rank
 
在单机情况下, 只有nproc_per_node 是必须指定的,
master_addr/port和node_rank都是可以由launch通过环境自动配置
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

  torch的distributed分布式训练首先需要对进程组进行初始化。

torch.distributed.init_process_group(backend, init_method=None, world_size=-1, rank=-1, store=None,...)

 - backend:指定分布式的后端,GPU用NCCL, CPU用GLOO
 - init_method:初始化方法,可以是TCP连接、File共享文件系统、ENV环境变量三种方式
 - init_method='tcp://ip:port': 通过指定rank 0(即:MASTER进程)的IP和端口,各个进程进行信息交换。 需指定 rank 和 world_size 这两个参数。
 - init_method='file://path':通过所有进程都可以访问共享文件系统来进行信息共享。需要指定rank和world_size参数。
 - init_method=env://:从环境变量中读取分布式的信息(os.environ),主要包括 MASTER_ADDR, MASTER_PORT, RANK, WORLD_SIZE。 其中,rank和world_size可以选择手动指定,否则从环境变量读取。

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
import torch
import torch.distributed as dist
import os

dist.init_process_group('nccl', init_method='env://')
rank = dist.get_rank()   // 每个进程在执行同一份代码得到对应的rank
local_rank = os.environ['LOCAL_RANK']  // // 每个进程在执行同一份代码得到对应的local_rank 
master_addr = os.environ['MASTER_ADDR']  
master_port = os.environ['MASTER_PORT']  
print(f"rank = {rank} is initialized in {master_addr}:{master_port}; local_rank = {local_rank}")
torch.cuda.set_device(rank)
tensor = torch.tensor([1, 2, 3, 4]).cuda()
print(tensor)


执行:python -m torch.distributed.launch --nproc_per_node=4 demo.py
/*
rank = 2 is initialized in 127.0.0.1:29500; local_rank = 2
rank = 3 is initialized in 127.0.0.1:29500; local_rank = 3
rank = 1 is initialized in 127.0.0.1:29500; local_rank = 1
rank = 0 is initialized in 127.0.0.1:29500; local_rank = 0
tensor([1, 2, 3, 4], device='cuda:2')
tensor([1, 2, 3, 4], device='cuda:3')
tensor([1, 2, 3, 4], device='cuda:0')
tensor([1, 2, 3, 4], device='cuda:1')
*/
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
完整代码:
import torch
import argparse
import torch.distributed as dist

parser = argparse.ArgumentParser()
parser.add_argument('--local_rank', default=-1, type=int,
                    help='node rank for distributed training')
args = parser.parse_args()

dist.init_process_group(backend='nccl')
torch.cuda.set_device(args.local_rank)

train_dataset = ...
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)

model = ...
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.local_rank])

optimizer = optim.SGD(model.parameters())

for epoch in range(100):
   for batch_idx, (data, target) in enumerate(train_loader):
      images = images.cuda(non_blocking=True)
      target = target.cuda(non_blocking=True)
      ...
      output = model(images)
      loss = criterion(output, target)
      ...
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()


命令行执行:CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch --nproc_per_node=4 main.py
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

34. 参考

1. 当代研究生应当掌握的并行训练方法(单机多卡)
2. Pytorch DDP分布式训练介绍
3. Pytorch 分布式数据 Distributed Data Parallal
4. PyTorch 源码解读之 DP & DDP:模型并行和分布式训练解析
5. Pytorch 分布式训练 (DP, DDP)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/1011798
推荐阅读
相关标签
  

闽ICP备14008679号