赞
踩
大规模分布式训练的目标是协调多台机器简单高效地训练大模型。然而,在这个过程中,存在着内存需求与GPU内存限制之间的矛盾。
问题分析:假设训练模型所需的内存的Require_memory,GPU的单卡内存为Device_memory,在模型训练过程中的矛盾为Require_memory>>Device_memory。
要解决这两个矛盾,有两个解决方案:“节流”、“开源”。
“节流”的核心思想是在保持模型训练精度的同时减少内存消耗;而“开源”的主要思路是利用各种形式的分布式训练,将模型训练的任务分布到多个GPU上,主要包括数据并行(Data Parallel),分布式数据并行(DistributedDataParallel),模型并行、流水线并行、3D并行等。
上一篇详细介绍了数据并行(Data Parallel)的原理以及其在Pytorch中的实现。
本篇文章进一步深入解析分布式数据并行(DistributedDataParallel, DDP)的原理,并解析分布式数据并行在Pytorch中的实现。
在数据并行(DP)的模型训练中,训练任务被切分到多个进程(GPU)上,即每个 GPU 复制一份模型,将一批样本分为多份输入各个模型并行计算。通过这种并行计算的方式解决了数据样本很大的问题,但也有自身的不足:
(1)单进程多线程带来的问题:DataParallel 是单进程多线程的,无法在多个机器上工作 (仅适用于单机多卡,但不支持多机);同时它基于多线程的方式,确实方便了信息的交换,但受困于 GIL (Python 全局解释器锁),会带来性能开销 (GIL 的存在使得一个 Python 进程只能利用一个 CPU 核心,不适合用于计算密集型的任务)
Python中的GIL(全局解释器锁):多线程编程的隐患-腾讯云开发者社区-腾讯云Python作为一门强大而灵活的编程语言,吸引了大量的开发者。然而,对于多线程编程来说,Python引入了一个概念——全局解释器锁(Global Interpreter Lock,简称GIL),它在一定程度上影响了多线程程序的性能。本文将深入探讨GIL的概念,它对多线程编程的影响以及如何处理与绕过它。https://cloud.tencent.com/developer/article/2336372
(2)存在效率问题:主卡性能和通信开销容易成为瓶颈,GPU 利用率通常很低——数据集需要先拷贝到主进程,然后再分配到每个设备上;权重参数只在主卡上更新,需要每次迭代前向所有设备做一次同步;每次迭代的网络输出需要聚合到主卡上
(3)不支持 Model Parallel
DistributedDataParallel就以上的问题提出了高效的解决方案,并且自此之后,不管是单机还是多机,都推荐使用 DDP 来代替 DP。
DP 和 DDP 的主要差异可以总结为以下几点:
(1)DP 是单进程多线程的,只用于单机情况,而 DDP 是多进程的,适用于单机和多机情况,真正实现分布式训练,并且因为每个进程都是独立的 Python 解释器,DDP 避免了 GIL 带来的性能开销
(2)DDP 的训练更高效,不存在 DP 中负载不均衡的问题;DDP 中每个 GPU 直接处理 mini-batch 数据,不需要由主卡分发;每个 GPU 独立计算gradient,不需要汇聚到主卡计算;每个 GPU 独立进行参数更新,不需要由主卡 broadcast 模型参数
(3)DDP 支持模型并行,而 DP 并不支持,这意味如果模型太大,单卡显存不足时只能使用DDP
在深入DDP的原理之前,先了解 DDP 技术相关术语的含义十分重要。以下是一些常见术语的含义:
如果只使用一台机器,情况就会简单明了,因为 Local Rank 等同于 Global Rank。
可以用一张图片来说明这一点:
DDP 模式是基于分布式数据并行的,所以在模型训练过程中要进行大量通信及数据传输,DDP利用集合通信(Collective Communication)。
集合通信是一种通信模式,它能够根据进程组,实现方便地跨多进程间通信。
PyTorch 实现了 6 个集合通信函数:
在 PyTorch 中实现分布式通信,是直接绑定到设置的通信后端(Communication Backends)上,可以根据使用场景选择合适的通信后端,而每个通信后端实现可以使用集合通信提供的各种通信函数。如果愿意,甚至可以直接使用点对点通信方式来实现更加灵活的通信方式。下面我们了解一下 PyTorch 支持的 3 个通信后端,可以直接在创建 DDP 的时候进行配置并创建。
关于每个通信后端,支持在 CPU 及 GPU 上使用的集合通信函数的情况,如下表所示:
1. 环境准备(就是init_process_group
这一步):各个进程会在这一步,与master节点进行握手,建立连接。
如果连接上的进程数量不足约定的 word_size,进程会一直等待。也就是说,如果你约定了
world_size=64
,但是只开了6台8卡机器,那么程序会一直暂停在这个地方。
2. 复制模型:在创建 DDP 的时候会在 rank=0 进程上,将模型的副本同步到分布式训练集群中其它每个进程上,使每个进程持有的模型副本从开始具有相同的状态。
3.创建Local Reducer和Bucket:然后,每个 DDP 进程创建一个Local Reducer,Local Reducer 用来支持反向传播过程中的梯度同步。为了提高通信效率,Local Reducer采用基于 Bucket 的方式来组织和管理模型的参数(即在多个 Bucket 中将模型参数Model.parameters()以逆序的方式保存,这样可以方便在反向传播过程中直接对 Bucket 中的参数计算梯度,而不用在这个时候过多考虑和处理模型参数的顺序问题)
采用 Bucket 结构,会将模型参数映射到 Bucket 中,这样能够对 DDP 的速度产生重要的影响。在每一轮迭代中的反向传播阶段,会在所有模型参数与 Bucket 之间发生一次双向复制:先将模型参数复制到 Bucket 中,在执行 AllReduce 计算后再将平均梯度从 Bucket 中复制更新到模型参数上。
Bucket 的具体结构:
在这里还会为模型的每个参数注册一个 Autograd Hook,即为每个参数的梯度累加器(Gradient Accumulator)注册一个 Autograd Hook,当反向传播过程中参数的梯度更新完成后会触发 Autograd Hook 执行。
在前向传播过程中,每个进程从磁盘加载 batch 数据,并将它们传递到其 GPU。每个DDP进程将其拿到的 batch 数据传给本地的模型,通过计算得到输出output。
在反向传播过程中,通过调用 loss.backward() 执行反向传播计算,计算出梯度。
由于DDP 在创建的时候为每个参数注册了一个 Autograd Hook,DDP 会等待 Autograd Hook 被触发从而判断是否需要进行梯度的同步。
反向传播过程 Local Reducer 会遍历创建 DDP 时的多个存储模型参数的 Bucket,如果存在某个 Bucket 已经 ready 了(即 Bucket 中的所有参数对应的 Autograd Hook 都已经被触发),则会触发一个异步 AllReduce 信号;如果所有的 Bucket 中还有没 ready 的,Local Reducer 会一直阻塞等待直到所有的 AllReduce 操作完成。
当得到梯度均值结果以后,所有的进程中模型参数的梯度都要更新,对应的是每个参数的 .grad 字段,这样就保证训练集群中每个进程中的参数梯度是相同的。
每一层的梯度不依赖于前一层,所以梯度的 All-Reduce 和反向过程同时计算 (参数被分组为了多个 bucket,先计算、得到梯度的 bucket 会马上进行通讯,不必等到所有梯度计算结束才进行通讯)。
这种使用 Bucket 和注册 Autograd Hook 的机制,能够很好地实现的通信的 Overlapping,极大地降低了多次频繁通信带来的开销。
在 GPU backend NCCL 中,All-Reduce 的实现方法就是 Ring All-Reduce. 在 Ring All-reduce 中,所有设备组成一个环形(如下图所示),每个进程只跟自己上下游两个进程进行通讯,极大地缓解了参数服务器的通讯阻塞现象。
All-Reduce 阶段各个参数在计算过程的操作过程可以分为 2 个大的阶段:
在 Scatter-Reduce 阶段,在每个 GPU 中对一个 Mini-Batch 的数据执行模型训练,会经过一轮一轮迭代,以确定的顺序对各个 GPU 中参数梯度进行交换,最终的结果是每个 GPU 上都有模型一个参数的所有梯度,并且对其进行 Reduce 累加计算平均梯度,过程如下图所示:
经过上一个阶段,GPU 上的一些参数并没有获取到所有进程上计算得到的梯度,所以并不能进行累加求平均,还需要进行 All-Gather ,把所有 GPU 每个参数的所有梯度都完整收集过来,从而才能进行累加并计算梯度的平均值,过程如下图所示:
DDP有不同的使用模式。DDP的官方最佳实践是,每一张卡对应一个单独的GPU模型(也就是一个进程),在下面介绍中,都会默认遵循这个模式。
(E.g. 有两台机子,每台8张显卡,那就是2x8=16个进程,并行数是16)
但是,我们也是可以给每个进程分配多张卡的。总的来说,分为以下三种情况:
DDP的使用非常简单,其精髓只有一行代码:
model = DDP(model, device_ids=[local_rank], output_device=local_rank)
原本的model就是你的PyTorch模型,新得到的模型,就是你的DDP模型。
但是,在套“model = DDP(model)”之前,我们还是需要定义好模型、处理好数据以及进程。
定义进程、模型,并把模型加载至不同的进程之中:
需要注意的是——定义并把模型放置到单独的GPU上,需要在调用`model=DDP(model)`前;如果要加载其他模型,也需要在调用`model=DDP(model)`前。
- ## main.py文件
- import torch
- import argparse
-
- # import依赖
- import torch.distributed as dist
- from torch.nn.parallel import DistributedDataParallel as DDP
-
- # 新增1:从外面得到local_rank参数,在调用DDP的时候,会自动分配这个参数
- parser = argparse.ArgumentParser()
- parser.add_argument("--local_rank", default=-1)
- FLAGS = parser.parse_args()
- local_rank = FLAGS.local_rank
-
- # 新增2:DDP backend初始化
- # a.根据local_rank来设定当前使用哪块GPU
- torch.cuda.set_device(local_rank)
- # b.初始化DDP,使用默认backend(nccl)就行。如果是CPU模型运行,需要选择其他后端。
- dist.init_process_group(backend='nccl')
-
- # 新增3:定义并把模型放置到单独的GPU上,需要在调用`model=DDP(model)`前。
- # 如果要加载其他模型,也需要在调用`model=DDP(model)`前。
- device = torch.device("cuda", local_rank)
- model = nn.Linear(10, 10).to(device)
- # 可能的load模型...
-
- # 新增4:初始化DDP模型
- model = DDP(model, device_ids=[local_rank], output_device=local_rank)
我们知道,DDP同时起了很多个进程,但是他们用的是同一份数据,那么就会有数据上的冗余性。也就是说,你平时一个epoch如果是一万份数据,现在就要变成1*16=16万份数据了。
那么,我们需要使用一个特殊的sampler,来使得各个进程上的数据各不相同,进而让一个epoch还是1万份数据。
为了解决这个问题, Pytorch也已经提供了开箱即用的API——DistributedSampler:
- my_trainset = torchvision.datasets.CIFAR10(root='./data', train=True)
- # 新增1:使用DistributedSampler,DDP帮我们把细节都封装起来了。用,就完事儿!
- # sampler的原理,后面也会介绍。
- train_sampler = torch.utils.data.distributed.DistributedSampler(my_trainset)
- # 需要注意的是,这里的batch_size指的是每个进程下的batch_size。也就是说,总batch_size是这里的batch_size再乘以并行数(world_size)。
- trainloader = torch.utils.data.DataLoader(my_trainset, batch_size=batch_size, sampler=train_sampler)
初始化优化器和Loss函数:
- # DDP: 要在构造DDP model之后,才能用model初始化optimizer。
- optimizer = torch.optim.SGD(model.parameters(), lr=0.001)
-
- # 假设我们的loss是这个
- loss_func = nn.CrossEntropyLoss().to(local_rank)
准备工作做完之后,我们就开始正式训练了。
需要注意的是——DistributedSampler需要epoch数来作为随机种子,从而通过维持各个进程之间的相同随机数种子使不同进程能获得同样的shuffle效果。
- ### 3. 网络训练 ###
- model.train()
- iterator = tqdm(range(100))
- for epoch in iterator:
- # DDP:设置sampler的epoch,
- # DistributedSampler需要这个来指定shuffle方式,
- # 通过维持各个进程之间的相同随机数种子使不同进程能获得同样的shuffle效果。
- trainloader.sampler.set_epoch(epoch)
- # 后面这部分,则与原来的训练模式完全一致了:
- for data, label in trainloader:
- data, label = data.to(local_rank), label.to(local_rank)
- optimizer.zero_grad()
- prediction = model(data)
- loss = loss_func(prediction, label)
- loss.backward()
- iterator.desc = "loss = %0.3f" % loss
- optimizer.step()
- # DDP:
- # 1. save模型的时候,和DP模式一样,有一个需要注意的点:保存的是model.module而不是model。
- # 因为model其实是DDP model,参数是被`model=DDP(model)`包起来的。
- # 2. 只需要在进程0上保存一次就行了,避免多次保存重复的东西。
- if dist.get_rank() == 0:
- torch.save(model.module.state_dict(), "%d.ckpt" % epoch)
和DP调用一样,这里有个小trick,在使用nn.DataParallel(model) 操作之后,形成的模型对象实际上被封装在了DataParallel这个模块里,在保存模型的权重时,需要将模型对象预先提取出来。
# take the module model=model.module
源代码地址:
由于篇幅原因,手撕分布式数据并行源码会在另一篇内容中详细介绍
参考资料:
Pytorch 分布式训练 (DP, DDP)_if your script expects `--local-rank` argument to -CSDN博客
http://shiyanjun.cn/archives/2515.html
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。