当前位置:   article > 正文

Pytorch基础|分布式数据并行 (DDP)以及其Pytorch实现_pytorch ddp

pytorch ddp

 一、引言

大规模分布式训练的目标是协调多台机器简单高效地训练大模型。然而,在这个过程中,存在着内存需求与GPU内存限制之间的矛盾。

问题分析:假设训练模型所需的内存的Require_memory,GPU的单卡内存为Device_memory,在模型训练过程中的矛盾为Require_memory>>Device_memory。

要解决这两个矛盾,有两个解决方案:“节流”、“开源”。

“节流”的核心思想是在保持模型训练精度的同时减少内存消耗;而“开源”的主要思路是利用各种形式的分布式训练,将模型训练的任务分布到多个GPU上,主要包括数据并行(Data Parallel),分布式数据并行(DistributedDataParallel),模型并行、流水线并行、3D并行等。

上一篇详细介绍了数据并行(Data Parallel)的原理以及其在Pytorch中的实现。

Pytorch基础|数据并行(DP)以及其Pytorch实现_pytorch 数据并行-CSDN博客文章浏览阅读1k次,点赞26次,收藏29次。这篇文章将深入数据并行(Data Parallel,即常说的DP)的原理,并解析数据并行在Pytorch中的实现。_pytorch 数据并行https://blog.csdn.net/weixin_49659123/article/details/136055562?spm=1001.2014.3001.5501

本篇文章进一步深入解析分布式数据并行(DistributedDataParallel, DDP)的原理,并解析分布式数据并行在Pytorch中的实现。

二、分布式数据并行原理

2.1 数据并行(DP)的缺点

在数据并行(DP)的模型训练中,训练任务被切分到多个进程(GPU)上,即每个 GPU 复制一份模型,将一批样本分为多份输入各个模型并行计算。通过这种并行计算的方式解决了数据样本很大的问题,但也有自身的不足:


(1)单进程多线程带来的问题:DataParallel 是单进程多线程的,无法在多个机器上工作 (仅适用于单机多卡,但不支持多机);同时它基于多线程的方式,确实方便了信息的交换,但受困于 GIL (Python 全局解释器锁),会带来性能开销 (GIL 的存在使得一个 Python 进程只能利用一个 CPU 核心,不适合用于计算密集型的任务)

Python中的GIL(全局解释器锁):多线程编程的隐患-腾讯云开发者社区-腾讯云Python作为一门强大而灵活的编程语言,吸引了大量的开发者。然而,对于多线程编程来说,Python引入了一个概念——全局解释器锁(Global Interpreter Lock,简称GIL),它在一定程度上影响了多线程程序的性能。本文将深入探讨GIL的概念,它对多线程编程的影响以及如何处理与绕过它。icon-default.png?t=N7T8https://cloud.tencent.com/developer/article/2336372
(2)存在效率问题:主卡性能和通信开销容易成为瓶颈,GPU 利用率通常很低——数据集需要先拷贝到主进程,然后再分配到每个设备上;权重参数只在主卡上更新,需要每次迭代前向所有设备做一次同步;每次迭代的网络输出需要聚合到主卡上


(3)不支持 Model Parallel

DistributedDataParallel就以上的问题提出了高效的解决方案,并且自此之后,不管是单机还是多机,都推荐使用 DDP 来代替 DP。

DP 和 DDP 的主要差异可以总结为以下几点:
(1)DP 是单进程多线程的,只用于单机情况,而 DDP 是多进程的,适用于单机和多机情况,真正实现分布式训练,并且因为每个进程都是独立的 Python 解释器,DDP 避免了 GIL 带来的性能开销
(2)DDP 的训练更高效,不存在 DP 中负载不均衡的问题;DDP 中每个 GPU 直接处理 mini-batch 数据,不需要由主卡分发;每个 GPU 独立计算gradient,不需要汇聚到主卡计算;每个 GPU 独立进行参数更新,不需要由主卡 broadcast 模型参数
(3)DDP 支持模型并行,而 DP 并不支持,这意味如果模型太大,单卡显存不足时只能使用DDP

2.2 分布式训练常见概念

2.2.1 分布式训练常见概念

在深入DDP的原理之前,先了解 DDP 技术相关术语的含义十分重要。以下是一些常见术语的含义:

  • Node:可将 Node 视为一台配备了单个/多个 GPU 的高性能计算机。集群(cluster)并不是简单地将一堆 GPU 拼凑在一起。相反,它们被组织成 Groups 或 Nodes。例如,一个 Node 可以容纳 8 个 GPU。
  • Master Node:在 multi-node(多节点)环境中,通常需要有一个 Node 负责协调工作。这个“Master Node”处理诸如同步、启动模型复制、监控模型加载和管理日志条目等任务。 如果没有 Master Node ,每个 GPU 都会独立生成日志,从而导致混乱。
  • Local Rank:术语“ Rank ” 可以类比为 ID 或位置。Local Rank 指的是 GPU 在其特定 Node(或计算机)中的位置或 ID。 它是“ Local ”的,因为它仅限于这台特定的设备。
  • Global Rank:从全局角度来看,Global Rank 是指 GPU 在所有可用 Node 中的标识。 这是一个唯一的标识符,与设备无关。
  • World Size:所有 Node 上可用的所有 GPU 数量。简单来说,就是节点数和每个节点中GPU数量的乘积。

如果只使用一台机器,情况就会简单明了,因为 Local Rank 等同于 Global Rank。

可以用一张图片来说明这一点:

2.2.2 分布式训练中的集合通信

DDP 模式是基于分布式数据并行的,所以在模型训练过程中要进行大量通信及数据传输,DDP利用集合通信(Collective Communication)。

集合通信是一种通信模式,它能够根据进程组,实现方便地跨多进程间通信。
PyTorch 实现了 6 个集合通信函数:

  • broadcast:主进程将相同的tensor分发给组里的每一个其它进程
  • scatter:是主进程将tensor的每一小部分给组里的其它进程
  • gather:是将其它进程的tensor收集过来;
  • reduce:是将其它进程的tensor收集过来并应用某种操作 (e.g. SUM、PRODUCT、MAX、MIN)
  • all_gather:将所有进程的tensor复制到一个tensor_list 中
  • all_reduce:将某种操作应用到每个tensor并在所有进程保存结果

2.2.3 通信后端

在 PyTorch 中实现分布式通信,是直接绑定到设置的通信后端(Communication Backends)上,可以根据使用场景选择合适的通信后端,而每个通信后端实现可以使用集合通信提供的各种通信函数。如果愿意,甚至可以直接使用点对点通信方式来实现更加灵活的通信方式。下面我们了解一下 PyTorch 支持的 3 个通信后端,可以直接在创建 DDP 的时候进行配置并创建。

  • Gloo Backend:Gloo 通信后端在开发时使用,它被预编译进入 PyTorch 库。Gloo 通信后端在 CPU 上支持所有的点对通信函数,在 GPU 上支持所有的集合通信函数。
  • MPI Backend:MPI 后端既支持点对点通信,也支持集合通信。PyTorch 二进制发行包并没有包括 MPI 实现,如果使用它需要我们自己手动编译。
  • NCCL Backend:NCCL 通信后端是直接预编译进 PyTorch 二进制发行包以支持 CUDA,它提供了基于 CUDA Tensor 的集合通信的优化实现。如果使用 CUDA Tensor 进行集合通信,NCCL 能够带来最好的性能。

关于每个通信后端,支持在 CPU 及 GPU 上使用的集合通信函数的情况,如下表所示:

2.3 分布式数据并行实现

2.3.1 DDP 初始化

1. 环境准备(就是init_process_group这一步):各个进程会在这一步,与master节点进行握手,建立连接。

如果连接上的进程数量不足约定的 word_size,进程会一直等待。也就是说,如果你约定了world_size=64,但是只开了6台8卡机器,那么程序会一直暂停在这个地方。

2. 复制模型:在创建 DDP 的时候会在 rank=0 进程上,将模型的副本同步到分布式训练集群中其它每个进程上,使每个进程持有的模型副本从开始具有相同的状态。

3.创建Local Reducer和Bucket:然后,每个 DDP 进程创建一个Local Reducer,Local Reducer 用来支持反向传播过程中的梯度同步。为了提高通信效率,Local Reducer采用基于 Bucket 的方式来组织和管理模型的参数(即在多个 Bucket 中将模型参数Model.parameters()以逆序的方式保存,这样可以方便在反向传播过程中直接对 Bucket 中的参数计算梯度,而不用在这个时候过多考虑和处理模型参数的顺序问题)

采用 Bucket 结构,会将模型参数映射到 Bucket 中,这样能够对 DDP 的速度产生重要的影响。在每一轮迭代中的反向传播阶段,会在所有模型参数与 Bucket 之间发生一次双向复制:先将模型参数复制到 Bucket 中,在执行 AllReduce 计算后再将平均梯度从 Bucket 中复制更新到模型参数上。

Bucket 的具体结构:

在这里还会为模型的每个参数注册一个 Autograd Hook,即为每个参数的梯度累加器(Gradient Accumulator)注册一个 Autograd Hook,当反向传播过程中参数的梯度更新完成后会触发 Autograd Hook 执行。

2.3.2 前向传播过程

在前向传播过程中,每个进程从磁盘加载 batch 数据,并将它们传递到其 GPU。每个DDP进程将其拿到的 batch 数据传给本地的模型,通过计算得到输出output。

2.3.3 反向传播过程

在反向传播过程中,通过调用 loss.backward() 执行反向传播计算,计算出梯度。

由于DDP 在创建的时候为每个参数注册了一个 Autograd Hook,DDP 会等待 Autograd Hook 被触发从而判断是否需要进行梯度的同步。

反向传播过程 Local Reducer 会遍历创建 DDP 时的多个存储模型参数的 Bucket,如果存在某个 Bucket 已经 ready 了(即 Bucket 中的所有参数对应的 Autograd Hook 都已经被触发),则会触发一个异步 AllReduce 信号;如果所有的 Bucket 中还有没 ready 的,Local Reducer 会一直阻塞等待直到所有的 AllReduce 操作完成。

当得到梯度均值结果以后,所有的进程中模型参数的梯度都要更新,对应的是每个参数的 .grad 字段,这样就保证训练集群中每个进程中的参数梯度是相同的。

每一层的梯度不依赖于前一层,所以梯度的 All-Reduce 和反向过程同时计算 (参数被分组为了多个 bucket,先计算、得到梯度的 bucket 会马上进行通讯,不必等到所有梯度计算结束才进行通讯)。

这种使用 Bucket 和注册 Autograd Hook 的机制,能够很好地实现的通信的 Overlapping,极大地降低了多次频繁通信带来的开销。

2.3.4 分布式梯度 Ring All-Reduce

在 GPU backend NCCL 中,All-Reduce 的实现方法就是 Ring All-Reduce. 在 Ring All-reduce 中,所有设备组成一个环形(如下图所示),每个进程只跟自己上下游两个进程进行通讯,极大地缓解了参数服务器的通讯阻塞现象。

 All-Reduce 阶段各个参数在计算过程的操作过程可以分为 2 个大的阶段:

  • Scatter-Reduce 阶段

在 Scatter-Reduce 阶段,在每个 GPU 中对一个 Mini-Batch 的数据执行模型训练,会经过一轮一轮迭代,以确定的顺序对各个 GPU 中参数梯度进行交换,最终的结果是每个 GPU 上都有模型一个参数的所有梯度,并且对其进行 Reduce 累加计算平均梯度,过程如下图所示:

  • All-Gather 阶段

经过上一个阶段,GPU 上的一些参数并没有获取到所有进程上计算得到的梯度,所以并不能进行累加求平均,还需要进行 All-Gather ,把所有 GPU 每个参数的所有梯度都完整收集过来,从而才能进行累加并计算梯度的平均值,过程如下图所示:

三、分布式数据并行的代码实现

3.1 DDP模式

DDP有不同的使用模式。DDP的官方最佳实践是,每一张卡对应一个单独的GPU模型(也就是一个进程),在下面介绍中,都会默认遵循这个模式。

(E.g. 有两台机子,每台8张显卡,那就是2x8=16个进程,并行数是16)

但是,我们也是可以给每个进程分配多张卡的。总的来说,分为以下三种情况:

  1. 每个进程一张卡。这是DDP的最佳使用方法。
  2. 每个进程多张卡,复制模式。一个模型复制在不同卡上面,每个进程都实质等同于DP模式。这样做是能跑得通的,但是,速度不如上一种方法,一般不采用。
  3. 每个进程多张卡,并行模式。一个模型的不同部分分布在不同的卡上面。例如,网络的前半部分在0号卡上,后半部分在1号卡上。这种场景,一般是因为我们的模型非常大,大到一张卡都塞不下batch size = 1的一个模型。

3.2 DDP的Pytorch实现

DDP的使用非常简单,其精髓只有一行代码:

model = DDP(model, device_ids=[local_rank], output_device=local_rank)

原本的model就是你的PyTorch模型,新得到的模型,就是你的DDP模型。

但是,在套“model = DDP(model)”之前,我们还是需要定义好模型、处理好数据以及进程。

3.2.1 准备工作

定义进程、模型,并把模型加载至不同的进程之中:

需要注意的是——定义并把模型放置到单独的GPU上,需要在调用`model=DDP(model)`前;如果要加载其他模型,也需要在调用`model=DDP(model)`前。

  1. ## main.py文件
  2. import torch
  3. import argparse
  4. # import依赖
  5. import torch.distributed as dist
  6. from torch.nn.parallel import DistributedDataParallel as DDP
  7. # 新增1:从外面得到local_rank参数,在调用DDP的时候,会自动分配这个参数
  8. parser = argparse.ArgumentParser()
  9. parser.add_argument("--local_rank", default=-1)
  10. FLAGS = parser.parse_args()
  11. local_rank = FLAGS.local_rank
  12. # 新增2:DDP backend初始化
  13. # a.根据local_rank来设定当前使用哪块GPU
  14. torch.cuda.set_device(local_rank)
  15. # b.初始化DDP,使用默认backend(nccl)就行。如果是CPU模型运行,需要选择其他后端。
  16. dist.init_process_group(backend='nccl')
  17. # 新增3:定义并把模型放置到单独的GPU上,需要在调用`model=DDP(model)`前。
  18. # 如果要加载其他模型,也需要在调用`model=DDP(model)`前。
  19. device = torch.device("cuda", local_rank)
  20. model = nn.Linear(10, 10).to(device)
  21. # 可能的load模型...
  22. # 新增4:初始化DDP模型
  23. model = DDP(model, device_ids=[local_rank], output_device=local_rank)

3.2.2 数据的并行化

我们知道,DDP同时起了很多个进程,但是他们用的是同一份数据,那么就会有数据上的冗余性。也就是说,你平时一个epoch如果是一万份数据,现在就要变成1*16=16万份数据了。
那么,我们需要使用一个特殊的sampler,来使得各个进程上的数据各不相同,进而让一个epoch还是1万份数据。

为了解决这个问题, Pytorch也已经提供了开箱即用的API——DistributedSampler:

  1. my_trainset = torchvision.datasets.CIFAR10(root='./data', train=True)
  2. # 新增1:使用DistributedSampler,DDP帮我们把细节都封装起来了。用,就完事儿!
  3. # sampler的原理,后面也会介绍。
  4. train_sampler = torch.utils.data.distributed.DistributedSampler(my_trainset)
  5. # 需要注意的是,这里的batch_size指的是每个进程下的batch_size。也就是说,总batch_size是这里的batch_size再乘以并行数(world_size)。
  6. trainloader = torch.utils.data.DataLoader(my_trainset, batch_size=batch_size, sampler=train_sampler)

3.2.3 优化器初始化

初始化优化器和Loss函数:

  1. # DDP: 要在构造DDP model之后,才能用model初始化optimizer。
  2. optimizer = torch.optim.SGD(model.parameters(), lr=0.001)
  3. # 假设我们的loss是这个
  4. loss_func = nn.CrossEntropyLoss().to(local_rank)

3.2.4 前向和反向传播

准备工作做完之后,我们就开始正式训练了。

需要注意的是——DistributedSampler需要epoch数来作为随机种子,从而通过维持各个进程之间的相同随机数种子使不同进程能获得同样的shuffle效果。

  1. ### 3. 网络训练 ###
  2. model.train()
  3. iterator = tqdm(range(100))
  4. for epoch in iterator:
  5. # DDP:设置sampler的epoch,
  6. # DistributedSampler需要这个来指定shuffle方式,
  7. # 通过维持各个进程之间的相同随机数种子使不同进程能获得同样的shuffle效果。
  8. trainloader.sampler.set_epoch(epoch)
  9. # 后面这部分,则与原来的训练模式完全一致了:
  10. for data, label in trainloader:
  11. data, label = data.to(local_rank), label.to(local_rank)
  12. optimizer.zero_grad()
  13. prediction = model(data)
  14. loss = loss_func(prediction, label)
  15. loss.backward()
  16. iterator.desc = "loss = %0.3f" % loss
  17. optimizer.step()
  18. # DDP:
  19. # 1. save模型的时候,和DP模式一样,有一个需要注意的点:保存的是model.module而不是model。
  20. # 因为model其实是DDP model,参数是被`model=DDP(model)`包起来的。
  21. # 2. 只需要在进程0上保存一次就行了,避免多次保存重复的东西。
  22. if dist.get_rank() == 0:
  23. torch.save(model.module.state_dict(), "%d.ckpt" % epoch)

和DP调用一样,这里有个小trick,在使用nn.DataParallel(model) 操作之后,形成的模型对象实际上被封装在了DataParallel这个模块里,在保存模型的权重时,需要将模型对象预先提取出来。

  1. # take the module
  2. model=model.module

四、手撕分布式数据并行源码

源代码地址:

https://github.com/pytorch/pytorch/blob/main/torch/nn/parallel/distributed.pyTensors and Dynamic neural networks in Python with strong GPU acceleration - pytorch/torch/nn/parallel/distributed.py at main · pytorch/pytorchicon-default.png?t=N7T8https://github.com/pytorch/pytorch/blob/main/torch/nn/parallel/distributed.py


由于篇幅原因,手撕分布式数据并行源码会在另一篇内容中详细介绍

参考资料: 

Pytorch 分布式训练 (DP, DDP)_if your script expects `--local-rank` argument to -CSDN博客

http://shiyanjun.cn/archives/2515.html

https://zhuanlan.zhihu.com/p/178402798

https://zhuanlan.zhihu.com/p/187610959

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/1011869
推荐阅读
相关标签
  

闽ICP备14008679号