赞
踩
目前已经有很多分布式训练的方法,一直在用,这里详细学习总结一下各种分布式训练的原理,本文主要介绍DP以及DDP,后面会介绍流水线并行、张量并行以及Zero优化等,用于学习记录,错误请指出。
Pytorch老版本的数据并行DP已经被弃用,这里还是分别介绍一下两者的过程,下面的图需要仔细看一下,方面更好的理解。
DP采用多线程实现,速度很慢,只适合单机多卡,目前已经被弃用。这里作为对比还是说明一下整个流程。DP需要各个GPU之间的通信量很大,主卡性能和通信开销容易成为瓶颈。
DDP可适用于单机多卡和多机多卡的数据并行训练,本质是采用了多进程控制。通常是一个进程控制一个GPU,每一个GPU上都有一份相同的模型权重,且每个进程在训练时都有独自的优化器,执行自己的更新过程,DDP进程间通信只传递梯度,通信量较少。下面还是说明一下整个流程。
AllReduce就是将各个GPU上的信息进行广播汇总,如下图,假设有4块GPU,每块GPU上的数据也对应被切成4份。AllReduce的最终目标,就是让每块GPU上的数据都变成箭头右边汇总的样子。一般使用Ring-AllReduce算法实现。Ring-ALLReduce则分两大步骤实现该目标:Reduce-Scatter和All-Gather。
DDP进程间进行梯度通信是通过Ring AllReduce实现的,每个进程将梯度依次传递给下一个进程,之后再把从上一个进程拿到的梯度传递给下一个进程。循环 n 次 (进程数量) 之后,所有进程就可以得到全部的梯度了。
定义网络拓扑关系,使得每个GPU只和其相邻的两块GPU通讯。每次发送对应位置的数据进行累加。每一次累加更新都形成一个拓扑环。经过3次之后,每个GPU上都有完整的信息。
按照“相邻GPU对应位置进行通讯”的原则,但对应位置数据不再做相加,而是直接替换。All-Gather以红色块作为起点。
在数据集采样时,给 dataloader 加一DistributedSampler 就可以给不同进程分配数据集的不重叠、不交叉的部分,从而无缝对接 DDP 模式。下面以两个GPU为例讲解。
先将数据集打乱,然后将数据集数目/GPU数目,并向上取整,不够就用前面的数据进行补充,然后按照顺序分配给不同的GPU。在同一个epoch中,每个进程使用相同的种子数,保证数据shuffle一样,不同的epoch中将种子数设置为seed+epoch,保证每个GPU每次epoch使用的数据都不一样。
DP代码比较简单。
torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0)
参数:
- module:需要多卡数据并行的模型;
- device_ids:可用的GPU号
- output_device:模型输出结果存放的卡号,默认为0
- dim:从哪一维度切分一个 batch 的数据,默认为 0
#完整代码 import torch gpus = [0, 1, 2, 3] torch.cuda.set_device('cuda:{}'.format(gpus[0])) train_dataset = ... train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=...) model = ... model = nn.DataParallel(model.to(device), device_ids=gpus, output_device=gpus[0]) optimizer = optim.SGD(model.parameters()) for epoch in range(100): for batch_idx, (data, target) in enumerate(train_loader): images = images.cuda(non_blocking=True) target = target.cuda(non_blocking=True) ... output = model(images) loss = criterion(output, target) ... optimizer.zero_grad() loss.backward() optimizer.step()
DDP数据并行有多种启动方式,这里先对几个重要的参数加以解释。
- group: 进程组,一般保持默认为一个;
- rank:全局进程id;
- local_rank: 某个几点上的进程id;
- world_size:所有进程的数量;
一般一个进程对应一个GPU,例如:两台机器或者节点,每台机器上有8张显卡,则world_size=16, rank=[0,1…15],每台机器上的local_rank=[0,1…7].
这里介绍torch.distributed.launch通过命令行执行分布式训练。每一个进程执行的是同一份代码。其他方式不在介绍
python3 -m torch.distributed.launch --配置 train.py --args参数
- nnodes: 使用的机器数量,单机的话,就默认是1了
- nproc_per_node: 单机的进程数,即一个机器上使用多少显卡
- master_addr/port: 使用的主进程rank0的地址和端口
- node_rank: 当前的进程rank
在单机情况下, 只有nproc_per_node 是必须指定的,
master_addr/port和node_rank都是可以由launch通过环境自动配置
torch的distributed分布式训练首先需要对进程组进行初始化。
torch.distributed.init_process_group(backend, init_method=None, world_size=-1, rank=-1, store=None,...)
- backend:指定分布式的后端,GPU用NCCL, CPU用GLOO
- init_method:初始化方法,可以是TCP连接、File共享文件系统、ENV环境变量三种方式
- init_method='tcp://ip:port': 通过指定rank 0(即:MASTER进程)的IP和端口,各个进程进行信息交换。 需指定 rank 和 world_size 这两个参数。
- init_method='file://path':通过所有进程都可以访问共享文件系统来进行信息共享。需要指定rank和world_size参数。
- init_method=env://:从环境变量中读取分布式的信息(os.environ),主要包括 MASTER_ADDR, MASTER_PORT, RANK, WORLD_SIZE。 其中,rank和world_size可以选择手动指定,否则从环境变量读取。
import torch import torch.distributed as dist import os dist.init_process_group('nccl', init_method='env://') rank = dist.get_rank() // 每个进程在执行同一份代码得到对应的rank local_rank = os.environ['LOCAL_RANK'] // // 每个进程在执行同一份代码得到对应的local_rank master_addr = os.environ['MASTER_ADDR'] master_port = os.environ['MASTER_PORT'] print(f"rank = {rank} is initialized in {master_addr}:{master_port}; local_rank = {local_rank}") torch.cuda.set_device(rank) tensor = torch.tensor([1, 2, 3, 4]).cuda() print(tensor) 执行:python -m torch.distributed.launch --nproc_per_node=4 demo.py /* rank = 2 is initialized in 127.0.0.1:29500; local_rank = 2 rank = 3 is initialized in 127.0.0.1:29500; local_rank = 3 rank = 1 is initialized in 127.0.0.1:29500; local_rank = 1 rank = 0 is initialized in 127.0.0.1:29500; local_rank = 0 tensor([1, 2, 3, 4], device='cuda:2') tensor([1, 2, 3, 4], device='cuda:3') tensor([1, 2, 3, 4], device='cuda:0') tensor([1, 2, 3, 4], device='cuda:1') */
完整代码: import torch import argparse import torch.distributed as dist parser = argparse.ArgumentParser() parser.add_argument('--local_rank', default=-1, type=int, help='node rank for distributed training') args = parser.parse_args() dist.init_process_group(backend='nccl') torch.cuda.set_device(args.local_rank) train_dataset = ... train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset) train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler) model = ... model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.local_rank]) optimizer = optim.SGD(model.parameters()) for epoch in range(100): for batch_idx, (data, target) in enumerate(train_loader): images = images.cuda(non_blocking=True) target = target.cuda(non_blocking=True) ... output = model(images) loss = criterion(output, target) ... optimizer.zero_grad() loss.backward() optimizer.step() 命令行执行:CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch --nproc_per_node=4 main.py
1. 当代研究生应当掌握的并行训练方法(单机多卡)
2. Pytorch DDP分布式训练介绍
3. Pytorch 分布式数据 Distributed Data Parallal
4. PyTorch 源码解读之 DP & DDP:模型并行和分布式训练解析
5. Pytorch 分布式训练 (DP, DDP)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。