当前位置:   article > 正文

pytorch 多GPU训练_args.gpu

args.gpu

代码库地址: mnist

目录​​​​​​​

普通单机单卡训练流程

DDP分布式训练

horovod方式

PyTorch 并行训练极简 Demo

  1. 程序开始时执行dist.init_process_group('nccl'),结束时执行dist.destroy_process_group()
  2. torchrun --nproc_per_node=GPU_COUNT main.py运行脚本。
  3. 进程初始化后用rank = dist.get_rank()获取当前的GPU ID,把模型和数据都放到这个GPU上。
  4. 封装一下模型
ddp_model = DistributedDataParallel(model, device_ids=[device_id])

5. 封装一下DataLoader

  1. dataset = MyDataset()
  2. sampler = DistributedSampler(dataset)
  3. dataloader = DataLoader(dataset, batch_size=2, sampler=sampler)

6. 训练时打乱数据。sampler.set_epoch(epoch)

7. 保存只在单卡上进行。

  1. if rank == 0:
  2. torch.save(ddp_model.state_dict(), ckpt_path)
  3. dist.barrier()

8. 读取数据时注意map_location,也要注意参数名里的module

  1. map_location = {'cuda:0': f'cuda:{device_id}'}
  2. state_dict = torch.load(ckpt_path, map_location=map_location)
  3. ddp_model.load_state_dict(state_dict)

 

普通单机单卡训练流程

mnist为例,主要包括数据加载、模型构建、优化器和迭代训练等部分 

  1. import argparse
  2. import torch
  3. import torch.nn as nn
  4. import torchvision
  5. import torchvision.transforms as transforms
  6. from datetime import datetime
  7. from tqdm import tqdm
  8. class ConvNet(nn.Module):
  9. def __init__(self, num_classes=10):
  10. super(ConvNet, self).__init__()
  11. self.layer1 = nn.Sequential(
  12. nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
  13. nn.BatchNorm2d(16),
  14. nn.ReLU(),
  15. nn.MaxPool2d(kernel_size=2, stride=2))
  16. self.layer2 = nn.Sequential(
  17. nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
  18. nn.BatchNorm2d(32),
  19. nn.ReLU(),
  20. nn.MaxPool2d(kernel_size=2, stride=2))
  21. self.fc = nn.Linear(7*7*32, num_classes)
  22. def forward(self, x):
  23. out = self.layer1(x)
  24. out = self.layer2(out)
  25. out = out.reshape(out.size(0), -1)
  26. out = self.fc(out)
  27. return out
  28. def train(gpu, args):
  29. torch.manual_seed(0)
  30. model = ConvNet()
  31. torch.cuda.set_device(gpu)
  32. model.cuda(gpu)
  33. batch_size = 100
  34. # define loss function (criterion) and optimizer
  35. criterion = nn.CrossEntropyLoss().cuda(gpu)
  36. optimizer = torch.optim.SGD(model.parameters(), 1e-4)
  37. # Data loading code
  38. train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True)
  39. train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size,shuffle=True, num_workers=0,pin_memory=True)
  40. start = datetime.now()
  41. for epoch in range(args.epochs):
  42. if gpu == 0:
  43. print("Epoch: {}/{}".format(epoch+1, args.epochs))
  44. pbar = tqdm(train_loader)
  45. for images, labels in pbar:
  46. images = images.cuda(non_blocking=True)
  47. labels = labels.cuda(non_blocking=True)
  48. # Forward pass
  49. outputs = model(images)
  50. loss = criterion(outputs, labels)
  51. # Backward and optimize
  52. optimizer.zero_grad()
  53. loss.backward()
  54. optimizer.step()
  55. if gpu == 0:
  56. msg = 'Loss: {:.4f}'.format(loss.item())
  57. pbar.set_description(msg)
  58. if gpu == 0:
  59. print("Training complete in: " + str(datetime.now() - start))
  60. def main():
  61. parser = argparse.ArgumentParser()
  62. parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N')
  63. parser.add_argument('-g', '--gpus', default=2, type=int, help='number of gpus per node')
  64. parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes')
  65. parser.add_argument('--epochs', default=10, type=int, metavar='N', help='number of total epochs to run')
  66. args = parser.parse_args()
  67. train(0, args)
  68. if __name__ == '__main__':
  69. main()

 在2080Ti上训练2个epoch耗时1分12秒.

DDP分布式训练

需要的改动 0.引入必须的库

  1. import os
  2. import torch.multiprocessing as mp
  3. import torch.distributed as dist

 1.修改main函数

  1. def main():
  2. parser = argparse.ArgumentParser()
  3. ...
  4. args = parser.parse_args()
  5. args.world_size = args.gpus * args.nodes
  6. if args.world_size > 1:
  7. os.environ['MASTER_ADDR'] = '127.0.0.1' #
  8. os.environ['MASTER_PORT'] = '8889' #
  9. mp.spawn(train, nprocs=args.gpus, args=(args,)) #
  10. else:
  11. train(0, args)

2.初始化通信库

  1. def train(gpu, args):
  2. if args.world_size > 1:
  3. rank = args.nr * args.gpus + gpu
  4. dist.init_process_group(backend='nccl', init_method='env://', world_size=args.world_size, rank=rank)

3.送给每个node的数据需要打乱,有请DistributedSampler 

  1. if args.world_size > 1:
  2. model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu])
  3. train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset,num_replicas=args.world_size,rank=rank)
  4. shuffle = False
  5. else:
  6. train_sampler = None
  7. shuffle = True
  8. train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size,shuffle=shuffle, num_workers=0,pin_memory=True,sampler=train_sampler)

这里我们首先计算出当前进程序号:rank = args.nr * args.gpus + gpu,然后就是通过dist.init_process_group初始化分布式环境,其中backend参数指定通信后端,包括mpi, gloo, nccl,这里选择nccl,这是Nvidia提供的官方多卡通信框架,相对比较高效。mpi也是高性能计算常用的通信协议,不过你需要自己安装MPI实现框架,比如OpenMPI。gloo倒是内置通信后端,但是不够高效。init_method指的是如何初始化,以完成刚开始的进程同步;这里我们设置的是env://,指的是环境变量初始化方式,需要在环境变量中配置4个参数:MASTER_PORT,MASTER_ADDR,WORLD_SIZE,RANK,前面两个参数我们已经配置,后面两个参数也可以通过dist.init_process_group函数中world_sizerank参数配置。其它的初始化方式还包括共享文件系统(https://pytorch.org/docs/stable/distributed.html#shared-file-system-initialization)以及TCP(https://pytorch.org/docs/stable/distributed.html#tcp-initialization),比如采用TCP作为初始化方法init_method='tcp://10.1.1.20:23456',其实也是要提供master的IP地址和端口。注意这个调用是阻塞的,必须等待所有进程来同步,如果任何一个进程出错,就会失败。

完整代码, 搜索add可快速直达修改的地方

  1. import argparse
  2. import torch
  3. import torch.nn as nn
  4. import torchvision
  5. import torchvision.transforms as transforms
  6. from datetime import datetime
  7. from tqdm import tqdm
  8. # add 0
  9. import os
  10. import torch.multiprocessing as mp
  11. import torch.distributed as dist
  12. class ConvNet(nn.Module):
  13. def __init__(self, num_classes=10):
  14. super(ConvNet, self).__init__()
  15. self.layer1 = nn.Sequential(
  16. nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
  17. nn.BatchNorm2d(16),
  18. nn.ReLU(),
  19. nn.MaxPool2d(kernel_size=2, stride=2))
  20. self.layer2 = nn.Sequential(
  21. nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
  22. nn.BatchNorm2d(32),
  23. nn.ReLU(),
  24. nn.MaxPool2d(kernel_size=2, stride=2))
  25. self.fc = nn.Linear(7*7*32, num_classes)
  26. def forward(self, x):
  27. out = self.layer1(x)
  28. out = self.layer2(out)
  29. out = out.reshape(out.size(0), -1)
  30. out = self.fc(out)
  31. return out
  32. def train(gpu, args):
  33. # add 2
  34. if args.world_size > 1:
  35. rank = args.nr * args.gpus + gpu
  36. dist.init_process_group(backend='nccl', init_method='env://', world_size=args.world_size, rank=rank)
  37. torch.manual_seed(0)
  38. model = ConvNet()
  39. torch.cuda.set_device(gpu)
  40. model.cuda(gpu)
  41. batch_size = 100
  42. # define loss function (criterion) and optimizer
  43. criterion = nn.CrossEntropyLoss().cuda(gpu)
  44. optimizer = torch.optim.SGD(model.parameters(), 1e-4)
  45. # Data loading code
  46. train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True)
  47. # add 3
  48. if args.world_size > 1:
  49. model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu])
  50. train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset,num_replicas=args.world_size,rank=rank)
  51. shuffle = False
  52. else:
  53. train_sampler = None
  54. shuffle = True
  55. train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size,shuffle=shuffle, num_workers=0,pin_memory=True,sampler=train_sampler)
  56. start = datetime.now()
  57. for epoch in range(args.epochs):
  58. if gpu == 0:
  59. print("Epoch: {}/{}".format(epoch+1, args.epochs))
  60. pbar = tqdm(train_loader)
  61. for i, (images, labels) in enumerate(pbar):
  62. images = images.cuda(non_blocking=True)
  63. labels = labels.cuda(non_blocking=True)
  64. # Forward pass
  65. outputs = model(images)
  66. loss = criterion(outputs, labels)
  67. # Backward and optimize
  68. optimizer.zero_grad()
  69. loss.backward()
  70. optimizer.step()
  71. if gpu == 0:
  72. msg = 'Loss: {:.4f}'.format(loss.item())
  73. pbar.set_description(msg)
  74. if gpu == 0:
  75. print("Training complete in: " + str(datetime.now() - start))
  76. def main():
  77. parser = argparse.ArgumentParser()
  78. parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N')
  79. parser.add_argument('-g', '--gpus', default=2, type=int, help='number of gpus per node')
  80. parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes')
  81. parser.add_argument('--epochs', default=10, type=int, metavar='N', help='number of total epochs to run')
  82. args = parser.parse_args()
  83. # add 1
  84. args.world_size = args.gpus * args.nodes
  85. if args.world_size > 1:
  86. os.environ['MASTER_ADDR'] = '127.0.0.1' #
  87. os.environ['MASTER_PORT'] = '8889' #
  88. mp.spawn(train, nprocs=args.gpus, args=(args,)) #
  89. else:
  90. train(0, args)
  91. if __name__ == '__main__':
  92. main()

 耗时缩小到了48秒,还是很显著的.

horovod方式

Horovod是一个专注于分布式训练的深度学习框架,通过Horovod可以为Tensorflow、Keras、Pytorch和MXNet提供分布式训练的能力。使用Horovod进行分布式训练主要能给我们带来两个好处:

  • 易用性:仅需要通过几行代码的修改,就可以将单机的Tensorflow、Keras、Pytorch或MXNet的代码更改为同时支持单机单卡、单机多卡、多机多卡的训练代码
  • 高性能:高性能主要体现在扩展性上,一般的训练框架随着worker的增加(尤其是在node增加)训练性能会逐渐下降(主要是由于node间网络通信性能比较低引起);Horovod在128机4 P100 25Gbit/s的RoCE网络条件下,Inception V3 和 ResNet-101均可以获得90%的扩展效率。

它的安装非常简单,pip install horovod即可

通过Horovod编写分布式训练代码,一般为6个步骤:

  • 添加hvd.init()来初始化Horovod;
  • 为每个worker分配GPU,一般一个worker process对应一个GPU,对应关系通过rank id来映射。例如pytorch中为torch.cuda.set_device(hvd.local_rank())
  • 随着world_size的变化,batch_size也在变化,因此我们也要随着world_size的变化来调整lr,一般为原有的lr 值乘以world_size;
  • 将原有深度学习框架的optimizer通过horovod中的hvd.DistributedOptimizer进行封装;
  • rank 0 将初始的variable广播给所有worker: hvd.broadcast_parameters(model.state_dict(), root_rank=0)
  • 仅在worker 0上进行checkpoint的save

完整代码,可搜索add一键直达修改的地方

  1. import argparse
  2. import torch
  3. import torch.nn as nn
  4. import torchvision
  5. import torchvision.transforms as transforms
  6. from datetime import datetime
  7. from tqdm import tqdm
  8. # add 0
  9. import horovod.torch as hvd
  10. class ConvNet(nn.Module):
  11. def __init__(self, num_classes=10):
  12. super(ConvNet, self).__init__()
  13. self.layer1 = nn.Sequential(
  14. nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
  15. nn.BatchNorm2d(16),
  16. nn.ReLU(),
  17. nn.MaxPool2d(kernel_size=2, stride=2))
  18. self.layer2 = nn.Sequential(
  19. nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
  20. nn.BatchNorm2d(32),
  21. nn.ReLU(),
  22. nn.MaxPool2d(kernel_size=2, stride=2))
  23. self.fc = nn.Linear(7*7*32, num_classes)
  24. def forward(self, x):
  25. out = self.layer1(x)
  26. out = self.layer2(out)
  27. out = out.reshape(out.size(0), -1)
  28. out = self.fc(out)
  29. return out
  30. def train(gpu, args):
  31. torch.manual_seed(0)
  32. model = ConvNet()
  33. torch.cuda.set_device(gpu)
  34. model.cuda(gpu)
  35. batch_size = 100
  36. # define loss function (criterion) and optimizer
  37. criterion = nn.CrossEntropyLoss().cuda(gpu)
  38. optimizer = torch.optim.SGD(model.parameters(), 1e-4)
  39. # Data loading code
  40. train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True)
  41. # add 2
  42. if hvd.size() > 1:
  43. train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
  44. shuffle = False
  45. optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters(), op=hvd.Average)
  46. hvd.broadcast_parameters(model.state_dict(), root_rank=0)
  47. hvd.broadcast_optimizer_state(optimizer, root_rank=0)
  48. else:
  49. train_sampler = None
  50. shuffle = True
  51. train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size,shuffle=shuffle, num_workers=0, sampler=train_sampler, pin_memory=True)
  52. start = datetime.now()
  53. for epoch in range(args.epochs):
  54. if gpu == 0:
  55. print("Epoch: {}/{}".format(epoch+1, args.epochs))
  56. if gpu == 0:
  57. pbar = tqdm(train_loader)
  58. else:
  59. pbar = train_loader
  60. for images, labels in pbar:
  61. images = images.cuda(non_blocking=True)
  62. labels = labels.cuda(non_blocking=True)
  63. # Forward pass
  64. outputs = model(images)
  65. loss = criterion(outputs, labels)
  66. # Backward and optimize
  67. optimizer.zero_grad()
  68. loss.backward()
  69. optimizer.step()
  70. if gpu == 0:
  71. msg = 'Loss: {:.4f}'.format(loss.item())
  72. pbar.set_description(msg)
  73. if gpu == 0:
  74. print("Training complete in: " + str(datetime.now() - start))
  75. def main():
  76. parser = argparse.ArgumentParser()
  77. parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N')
  78. parser.add_argument('-g', '--gpus', default=2, type=int, help='number of gpus per node')
  79. parser.add_argument('-nr', '--nr', default=0, type=int, help='ranking within the nodes')
  80. parser.add_argument('--epochs', default=10, type=int, metavar='N', help='number of total epochs to run')
  81. args = parser.parse_args()
  82. # add 1
  83. hvd.init()
  84. train(hvd.local_rank(), args)
  85. if __name__ == '__main__':
  86. main()

启动命令:

  1. # Horovod 1 单机训练
  2. #horovodrun -np 1 -H localhost:1 python train_horovod.py
  3. # Horovod 2 双卡训练
  4. horovodrun -np 2 -H localhost:2 python train_horovod.py
  5. # Horovod 4 四卡训练
  6. #horovodrun -np 4 -H localhost:4 python train_horovod.py

这种方式下双卡仅需41秒 

虽然单卡的速度没有变快,但是处理任务的worker多了,吞吐就上去了,4卡仅需要20秒 


分布式训练之旅   PyTorch分布式训练简明教程

PyTorch分布式训练基础--DDP使用

主要变动的位置包括:
1. 启动的方式引入了一个多进程机制; 
2. 引入了几个环境变量; 
3. DataLoader多了一个sampler参数; 
4. 网络被一个DistributedDataParallel(net)又包裹了一层; 
5. ckpt的保存方式发生了变化。

Pytorch - 分布式通信原语(附源码) 

Pytorch - 多机多卡极简实现(附源码)

Pytorch - DDP实现分析

Pytorch - 使用Horovod分布式训练

Pytorch - Horovod分布式训练源码分析 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/932990
推荐阅读
相关标签
  

闽ICP备14008679号