赞
踩
目录
5.注意:“shuffle=True” 和 “sampler”不能同时存在
神经网络训练加速的最简单方法是使用GPU,对弈神经网络中常规操作(矩阵乘法和加法)GPU运算速度要倍超于CPU。随着模型或数据集越来越大,一个GPU很快就会变得不足。例如,BERT和GPT-2等大型语言模型是在数百个GPU上训练的。对于多GPU训练,需要一种在不同GPU之间对模型和数据进行切分和调度的方法。
PyTorch是非常流行的深度学习框架,它在主流框架中对于灵活性和易用性的平衡最好。Pytorch有两种方法可以在多个GPU上切分模型和数据:nn.DataParallel 和nn.distributedataparallel 。DataParallel更易于使用(只需简单包装单GPU模型)。然而,由于它使用一个进程来计算模型权重,然后在每个批处理期间将分发到每个GPU,因此通信很快成为一个瓶颈,GPU利用率通常很低。而且,nn.DataParallel 要求所有的GPU都在同一个节点上(不支持分布式),而且不能使用Apex进行混合精度训练。nn.DataParallel 和nn.distributedataparallel 的主要差异可以总结为以下几点(译者注):
总的来说,Pytorch文档是相当完备和清晰的,尤其是在1.0x版本后。但是关于 DistributedDataParallel 的介绍却较少,主要的文档有以下三个:
这篇教程将通过一个MNISI例子讲述如何使用PyTorch的分布式训练,这里将一段段代码进行解释,而且也包括任何使用apex进行混合精度训练。
DistributedDataParallel 通过多进程在多个GPUs间复制模型,每个GPU都由一个进程控制(当然可以让每个进程控制多个GPU,但这显然比每个进程有一个GPU要慢;也可以多个进程在一个GPU上运行)。GPU可以都在同一个节点上,也可以分布在多个节点上。每个进程都执行相同的任务,并且每个进程都与所有其他进程通信。进程或者说GPU之间只传递梯度,这样网络通信就不再是瓶颈。
在训练过程中,每个进程从磁盘加载batch数据,并将它们传递到其GPU。每一个GPU都有自己的前向过程,然后梯度在各个GPUs间进行All-Reduce。每一层的梯度不依赖于前一层,所以梯度的All-Reduce和后向过程同时计算,以进一步缓解网络瓶颈。在后向过程的最后,每个节点都得到了平均梯度,这样模型参数保持同步。
这都要求多个进程(可能在多个节点上)同步并通信。Pytorch通过 distributed.init_process_group 函数来实现这一点。他需要知道进程0位置以便所有进程都可以同步,以及预期的进程总数。每个进程都需要知道进程总数及其在进程中的顺序,以及使用哪个GPU。通常将进程总数称为 world_size。Pytorch 提供了nn.utils.data.DistributedSampler来为各个进程切分数据,以保证训练数据不重叠。
首先,导入所需要的库:
- import os
- from datetime import datetime
- import argparse
- import torch.multiprocessing as mp
- import torchvision
- import torchvision.transforms as transforms
- import torch
- import torch.nn as nn
- import torch.distributed as dist
- from apex.parallel import DistributedDataParallel as DDP
- from apex import amp
然后我们定义一个简单的 CNN 模型处理 MNIST 数据:
- class ConvNet(nn.Module):
- def __init__(self, num_classes=10):
- super(ConvNet, self).__init__()
- self.layer1 = nn.Sequential(
- nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
- nn.BatchNorm2d(16),
- nn.ReLU(),
- nn.MaxPool2d(kernel_size=2, stride=2))
- self.layer2 = nn.Sequential(
- nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
- nn.BatchNorm2d(32),
- nn.ReLU(),
- nn.MaxPool2d(kernel_size=2, stride=2))
- self.fc = nn.Linear(7*7*32, num_classes)
-
- def forward(self, x):
- out = self.layer1(x)
- out = self.layer2(out)
- out = out.reshape(out.size(0), -1)
- out = self.fc(out)
- return out
主函数 main() 接受参数,执行训练:
- def main():
- parser = argparse.ArgumentParser()
- parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N')
- parser.add_argument('-g', '--gpus', default=1, type=int,
- help='number of gpus per node')
- parser.add_argument('-nr', '--nr', default=0, type=int,
- help='ranking within the nodes')
- parser.add_argument('--epochs', default=2, type=int, metavar='N',
- help='number of total epochs to run')
- args = parser.parse_args()
- train(0, args)
其中训练部分主函数为:
- def train(gpu, args):
- torch.manual_seed(0)
- model = ConvNet()
- torch.cuda.set_device(gpu)
- model.cuda(gpu)
- batch_size = 100
- # define loss function (criterion) and optimizer
- criterion = nn.CrossEntropyLoss().cuda(gpu)
- optimizer = torch.optim.SGD(model.parameters(), 1e-4)
- # Data loading code
- train_dataset = torchvision.datasets.MNIST(root='./data',
- train=True,
- transform=transforms.ToTensor(),
- download=True)
- train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
- batch_size=batch_size,
- shuffle=True,
- num_workers=0,
- pin_memory=True)
-
- start = datetime.now()
- total_step = len(train_loader)
- for epoch in range(args.epochs):
- for i, (images, labels) in enumerate(train_loader):
- images = images.cuda(non_blocking=True)
- labels = labels.cuda(non_blocking=True)
- # Forward pass
- outputs = model(images)
- loss = criterion(outputs, labels)
-
- # Backward and optimize
- optimizer.zero_grad()
- loss.backward()
- optimizer.step()
- if (i + 1) % 100 == 0 and gpu == 0:
- print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(
- epoch + 1,
- args.epochs,
- i + 1,
- total_step,
- loss.item())
- )
- if gpu == 0:
- print("Training complete in: " + str(datetime.now() - start))
通过启动主函数来开始训练:
- if __name__ == '__main__':
- main()
你可能注意到有些参数是多余的,但是对后面的分布式训练是有用的。我们通过执行以下语句就可以在单机单卡上训练:
python src/mnist.py -n 1 -g 1 -nr 0
使用多进程进行分布式训练,我们需要为每个GPU启动一个进程。每个进程需要知道自己运行在哪个GPU上,以及自身在所有进程中的序号。对于多节点,我们需要在每个节点启动脚本。
首先,我们要配置基本的参数:
- def main():
- parser = argparse.ArgumentParser()
- parser.add_argument('-n', '--nodes', default=1,
- type=int, metavar='N')
- parser.add_argument('-g', '--gpus', default=1, type=int,
- help='number of gpus per node')
- parser.add_argument('-nr', '--nr', default=0, type=int,
- help='ranking within the nodes')
- parser.add_argument('--epochs', default=2, type=int,
- metavar='N',
- help='number of total epochs to run')
- args = parser.parse_args()
- #########################################################
- args.world_size = args.gpus * args.nodes #
- os.environ['MASTER_ADDR'] = '10.57.23.164' #
- os.environ['MASTER_PORT'] = '8888' #
- mp.spawn(train, nprocs=args.gpus, args=(args,)) #
- #########################################################
-
其中 args.nodes 是节点总数,而 args.gpus 是每个节点的GPU总数(每个节点GPU数是一样的),而 args.nr 是当前节点在所有节点的序号。节点总数乘以每个节点的GPU数可以得到 world_size ,也即进程总数。所有的进程需要知道进程0的IP地址以及端口,这样所有进程可以在开始时同步,一般情况下称进程0是master进程,比如我们会在进程0中打印信息或者保存模型。PyTorch提供了 mp.spawn 来在一个节点启动该节点所有进程,每个进程运行 train(i, args) ,其中 i 从 0 到 args.gpus - 1 。
同样,我们要修改训练函数:
- def train(gpu, args):
- ############################################################
- rank = args.nr * args.gpus + gpu
- dist.init_process_group(
- backend='nccl',
- init_method='env://',
- world_size=args.world_size,
- rank=rank
- )
- ############################################################
-
- torch.manual_seed(0)
- model = ConvNet()
- torch.cuda.set_device(gpu)
- model.cuda(gpu)
- batch_size = 100
- # define loss function (criterion) and optimizer
- criterion = nn.CrossEntropyLoss().cuda(gpu)
- optimizer = torch.optim.SGD(model.parameters(), 1e-4)
-
- ###############################################################
- # Wrap the model
- model = nn.parallel.DistributedDataParallel(model,
- device_ids=[gpu])
- ###############################################################
-
- # Data loading code
- train_dataset = torchvision.datasets.MNIST(
- root='./data',
- train=True,
- transform=transforms.ToTensor(),
- download=True
- )
- ################################################################
- train_sampler = torch.utils.data.distributed.DistributedSampler(
- train_dataset,
- num_replicas=args.world_size,
- rank=rank
- )
- ################################################################
-
- train_loader = torch.utils.data.DataLoader(
- dataset=train_dataset,
- batch_size=batch_size,
- ##############################
- shuffle=False, #
- ##############################
- num_workers=0,
- pin_memory=True,
- #############################
- sampler=train_sampler) #
- #############################
- ...
这里我们首先计算出当前进程序号:rank = args.nr * args.gpus + gpu,然后就是通过 dist.init_process_group 初始化分布式环境,其中 backend 参数指定通信后端,包括 mpi, gloo, nccl ,这里选择 nccl ,这是Nvidia提供的官方多卡通信框架,相对比较高效。mpi 也是高性能计算常用的通信协议,不过你需要自己安装MPI实现框架,比如OpenMPI。 gloo 倒是内置通信后端,但是不够高效。init_method 指的是如何初始化,以完成刚开始的进程同步;这里我们设置的是 env:// ,指的是环境变量初始化方式,需要在环境变量中配置4个参数:MASTER_PORT,MASTER_ADDR,WORLD_SIZE,RANK,前面两个参数我们已经配置,后面两个参数也可以通过 dist.init_process_group 函数中 world_size 和 rank 参数配置。其它的初始化方式还包括共享文件系统以及TCP,比如 init_method='tcp://10.1.1.20:23456' ,其实也是要提供master的IP地址和端口。注意这个调用是阻塞的,必须等待所有进程来同步,如果任何一个进程出错,就会失败。
对于模型侧,我们只需要用 DistributedDataParallel 包装一下原来的model即可,在背后它会支持梯度的 All-Reduce 操作。对于数据侧,我们 nn.utils.data.DistributedSampler 来给各个进程切分数据,只需要在 dataloader 中使用这个 sampler 就好,值得注意的一点是你要训练循环过程的每个epoch开始时调用 train_sampler.set_epoch(epoch) ,(主要是为了保证每个epoch的划分是不同的)其它的训练代码都保持不变。
最后就可以执行代码了,比如我们是4节点,每个节点是8卡,那么需要在4个节点分别执行:
python src/mnist-distributed.py -n 4 -g 8 -nr i
要注意的是,此时的有效batch_size其实是 batch_size_per_gpu * world_size
,对于有BN的模型还可以采用同步BN获取更好的效果:
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
上述讲述的是分布式训练过程,其实同样适用于评估或者测试过程,比如我们把数据划分到不同的进程中进行预测,这样可以加速预测过程。实现代码和上述过程完全一样,不过我们想计算某个指标,那就需要从各个进程的统计结果进行 All-Reduce,因为每个进程仅是计算的部分数据的内容。比如我们要计算分类准确度,我们可以统计每个进程的数据总数 total 和分类正确的数量 count ,然后进行聚合。这里要提的一点,当用 dist.init_process_group 初始化分布式环境时,其实就是建立一个默认的分布式进程组( distributed process group ),这个 group 同时会初始化Pytorch的 torch.distributed 包。这样我们可以直接用 torch.distributed 的API就可以进行分布式基本操作了,下面是具体实现:
- # define tensor on GPU, count and total is the result at each GPU
- t = torch.tensor([count, total], dtype=torch.float64, device='cuda')
- dist.barrier() # synchronizes all processes
- dist.all_reduce(t, op=torch.distributed.ReduceOp.SUM,) # Reduces the tensor data across all machines in such a way that all get the final result.
- t = t.tolist()
- all_count = int(t[0])
- all_total = int(t[1])
- acc = all_count / all_total
混合精度训练(混合FP32和FP16训练)可以适用更大的batch_size,而且可以利用NVIDIA Tensor Cores加速计算。采用NVIDIA的apex进行混合精度训练非常简单,只需要修改部分代码
- rank = args.nr * args.gpus + gpu
- dist.init_process_group(
- backend='nccl',
- init_method='env://',
- world_size=args.world_size,
- rank=rank)
-
- torch.manual_seed(0)
- model = ConvNet()
- torch.cuda.set_device(gpu)
- model.cuda(gpu)
- batch_size = 100
- # define loss function (criterion) and optimizer
- criterion = nn.CrossEntropyLoss().cuda(gpu)
- optimizer = torch.optim.SGD(model.parameters(), 1e-4)
- # Wrap the model
- ##############################################################
- model, optimizer = amp.initialize(model, optimizer,
- opt_level='O2')
- model = DDP(model)
- ##############################################################
- # Data loading code
- ...
- start = datetime.now()
- total_step = len(train_loader)
- for epoch in range(args.epochs):
- for i, (images, labels) in enumerate(train_loader):
- images = images.cuda(non_blocking=True)
- labels = labels.cuda(non_blocking=True)
- # Forward pass
- outputs = model(images)
- loss = criterion(outputs, labels)
-
- # Backward and optimize
- optimizer.zero_grad()
- ##############################################################
- with amp.scale_loss(loss, optimizer) as scaled_loss:
- scaled_loss.backward()
- ##############################################################
- optimizer.step()
- ...
-
其实就两处变化,首先是采用 amp.initialize 来包装 model 和 optimizer 以支持混合精度训练,其中 opt_level 指的是优化级别,如果为O0或者O3不是真正的混合精度,但是可以用来确定模型效果和速度的 baseline ,而 O1 和 O2 是混合精度的两种设置,可以选择某个进行混合精度训练。另外一处是在进行根据梯度更新参数前,要先通过 amp.scale_loss 对梯度进行scale以防止梯度下溢(underflowing)。此外,你还可以用 apex.parallel.DistributedDataParallel 替换 nn.DistributedDataParallel。
RuntimeError: Expected all tensors to be on the same device, but found at least two devices, cuda:1 and cuda:0!
原因:
解决:检查代码,将数据移到 cuda 上 , non_blocking 表示非阻塞
ValueError: Dataloader with Iterableataset: expected unspecified sampler option, but got sampler-torchutils data.distributed.Distributedsampler obiect
at 0x7ffoafe6da30>
原因:dataloader 接收了一个经过 sampler包装的 Iterableataset
解决: 如果train_data本身就是个可迭代对象,则不需要用 sampler 包装,注释掉 “train_sampler” 这一行。但是在对其他项目做分布式训练时 ,不设置 DistributedSampler 无法实现真正的并行(运行效率没有提高)。
RuntimeError: NCCL error in: ../torch/csrc/distributed/c10d/ProcessGroupiNcCl.cpp:957, invalid usage, NCCL version 21.0.3
ncclInvalidlsage: This usually reflects invalid usage of wol library (such as too mary async ops, to many collectives at once, mixing streams in a group,
etc)
原因:nccl 安装成功,版本也没有问题,问题是没有被正确使用,从报错位置上我们可以看出,问题出在 model = DistributedDataParallel(model,device_ids=[rank])
解决:这里的问题比较多样,我的问题是 “在 torch.cuda.set_device(rank) 之前,模型多次被重复放到 cuda 上”,所以我检查了一下,在将模型放到 cuda 上之前是否有重复操作而引起冲突。所以我的处理方式是删掉了 torch.cuda.set_device(rank) 之前的 “to.device()”
-- Process 1 terminated with the following error:
Traceback (most recent call last):
File "/root/anaconda3/envs/diffusion1/lib/python3.8/site-packages/torch/multiprocessing/spawn.py", line 59, in _wrap
fn(i, *args)
File "/root/train.py", line 425, in train
model = tDistributedDataParallel(model, device_ids=[rank])
File "/root/anaconda3/envs/diffusion1/lib/python3.8/site-packages/torch/nn/parallel/distributed.py", line 530, in __init__
self.process_group = _get_default_group()
File "/root/anaconda3/envs/diffusion1/lib/python3.8/site-packages/torch/distributed/distributed_c10d.py", line 410, in _get_default_group
raise RuntimeError(
RuntimeError: Default process group has not been initialized, please make sure to call init_process_group.
原因:进程没有被初始化,我在进程初始化前加了只初始化主进程的代码,故报此错。
解决:“删掉 if rank == 0:”,有的博主说,只需要初始化主进程,但是我这样处理却报错了,不知为何,但是我的两个进程都初始化就不会报错
- os.environ['MASTER_ADDR'] = 'localhost'
- os.environ['MASTER_PORT'] = '8888'
- os.environ['NCCL_DEBUG'] = 'INFO' # nccl log 日志
-
- dist_backend = 'nccl' if torch.cuda.is_available() else 'gloo'
- dist_init_method = f'tcp://localhost:22222'
-
- # Initialize the process group
- dist.init_process_group(
- backend=dist_backend,
- init_method=dist_init_method,
- world_size=torch.cuda.device_count(),
- rank=rank
- )
- import os
- from datetime import datetime
- import argparse
- import torch.multiprocessing as mp
- import torchvision
- import torchvision.transforms as transforms
- import torch
- import torch.nn as nn
- import torch.distributed as dist
- # from apex.parallel import DistributedDataParallel as DDP
- # from apex import amp
-
-
- class ConvNet(nn.Module):
- def __init__(self, num_classes=10):
- super(ConvNet, self).__init__()
- self.layer1 = nn.Sequential(
- nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
- nn.BatchNorm2d(16),
- nn.ReLU(),
- nn.MaxPool2d(kernel_size=2, stride=2))
- self.layer2 = nn.Sequential(
- nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
- nn.BatchNorm2d(32),
- nn.ReLU(),
- nn.MaxPool2d(kernel_size=2, stride=2))
- self.fc = nn.Linear(7*7*32, num_classes)
-
- def forward(self, x):
- out = self.layer1(x)
- out = self.layer2(out)
- out = out.reshape(out.size(0), -1)
- out = self.fc(out)
- return out
-
-
- def train(gpu, args):
- ############################################################
- rank = args.nr * args.gpus + gpu
- dist.init_process_group(
- backend='nccl',
- init_method='tcp://localhost:22222',
- world_size=args.world_size,
- rank=rank
- )
- ############################################################
- print(f'1111111')
- print(f'2222222')
- print(f'在GPU{rank}上初始化进程组')
- torch.manual_seed(0)
- model = ConvNet()
- torch.cuda.set_device(gpu)
- model.cuda(gpu)
- batch_size = 100
- # define loss function (criterion) and optimizer
- criterion = nn.CrossEntropyLoss().cuda(gpu)
- optimizer = torch.optim.SGD(model.parameters(), 1e-4)
-
- ###############################################################
- # Wrap the model
- model = nn.parallel.DistributedDataParallel(model,
- device_ids=[gpu])
- ###############################################################
-
- # Data loading code
- train_dataset = torchvision.datasets.MNIST(
- root='./data',
- train=True,
- transform=transforms.ToTensor(),
- download=True
- )
- import collections
- print(f'是否是可迭代对象:{isinstance(train_dataset, collections.Iterable)}')
- ################################################################
- train_sampler = torch.utils.data.distributed.DistributedSampler(
- train_dataset,
- num_replicas=args.world_size,
- rank=rank
- )
- ################################################################
- print(f'{train_dataset}')
- train_loader = torch.utils.data.DataLoader(
- dataset=train_dataset,
- batch_size=batch_size,
- shuffle=False,
- num_workers=0,
- pin_memory=True,
- sampler=train_sampler)
- print(f'train_iterator"{train_loader}')
- start = datetime.now()
- total_step = len(train_loader)
- for epoch in range(args.epochs):
- for i, (images, labels) in enumerate(train_loader):
- images = images.cuda(non_blocking=True)
- labels = labels.cuda(non_blocking=True)
- # Forward pass
- outputs = model(images)
- loss = criterion(outputs, labels)
-
- # Backward and optimize
- optimizer.zero_grad()
- loss.backward()
- optimizer.step()
- if (i + 1) % 100 == 0 and gpu == 0:
- print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(
- epoch + 1,
- args.epochs,
- i + 1,
- total_step,
- loss.item())
- )
- if gpu == 0:
- print("Training complete in: " + str(datetime.now() - start))
-
-
- def main():
- parser = argparse.ArgumentParser()
- parser.add_argument('-n', '--nodes', default=1,
- type=int, metavar='N')
- parser.add_argument('-g', '--gpus', default=1, type=int,
- help='number of gpus per node')
- parser.add_argument('-nr', '--nr', default=0, type=int,
- help='ranking within the nodes')
- parser.add_argument('--epochs', default=200, type=int,
- metavar='N',
- help='number of total epochs to run')
- args = parser.parse_args()
- #########################################################
- args.world_size = args.gpus * args.nodes #
- os.environ['MASTER_ADDR'] = 'localhost' #
- os.environ['MASTER_PORT'] = '8888' #
- mp.spawn(train, nprocs=args.gpus, args=(args,)) #
- #########################################################
-
-
-
- if __name__ == '__main__':
- main()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。