当前位置:   article > 正文

torch, cuda, 单机多卡与DDP分布式_单机单卡修改ddp

单机单卡修改ddp

一. 设备与cuda

一个 device 指一个 cpu 或 gpu, 并不是计算机这一粒度, 因为一台计算机可以装多张显卡.
nvidia 部分 GPU, 在驱动也就绪的情况下可被 torch 识别为 cuda device.
可通过 torch.cuda.is_available():bool 验证cuda {硬件, 驱动, 库} 是否就绪, 通过 torch.cuda.device_count():int 查看显卡个数.

安装

去 torch 官网, 选择 平台与版本, 动态生成 pip install 命令.
装 torch 时直接就会安装 cuda 和 cuDNN 了, 不必像早期的教程那样去 nvidia 网站独立安装 cuda 和 cuDNN 了.

C:\Users\yichu>pip3 install torch==1.8.2+cu111 torchvision==0.9.2+cu111 torchaudio===0.8.2 -f https://download.pytorch.org/whl/lts/1.8/torch_lts.html
Looking in indexes: http://mirrors.aliyun.com/pypi/simple/
Looking in links: https://download.pytorch.org/whl/lts/1.8/torch_lts.html
Collecting torch==1.8.2+cu111
  Downloading https://download.pytorch.org/whl/lts/1.8/cu111/torch-1.8.2%2Bcu111-cp38-cp38-win_amd64.whl (3057.4 MB)
     |█████████████████████▌          | 2055.2 MB 6.4 MB/s eta 0:02:37
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

安装包超过3G, 非常大. 装后site-packages\torch\lib\目录下有 torch_cuda_cu.dll, cudnn_cnn_infer64_8.dll,cudnn64_8.dll 等文件, 多达5个G.

gpu型号与cuda版本的兼容性

torch.cuda.is_available() 的验证并不充分, 需要 torch.zeros(1).cuda() 进一步验证.
简单讲, 高版本cuda兼容低版本硬件, 反之则不行.
可通过 torch.version.cuda 查看 cuda 版本.
在这里插入图片描述
报错显示 A10 型号的gpu较新, 我实际安装的的是 cuda_10.2, 重装 cuda_11.3 应该就好了.

GPU 训练改写

要显式地把 tensor 搬到 GPU 上运行.
一般地用
常见报错:

  1. 如果 input tensor 在 cpu 上, model 的 weight parameter 在 GPU 上, 运行就会报错 RuntimeError: Expected all tensors to be on the same device, but found at least two devices, cuda:0 and cpu!

debug 模式

需要配置环境变量 os.environ['CUDA_LAUNCH_BLOCKING'] = '1'.

二. 并行原理

数据并行 与 模型并行

并行是为了加速, 模式有:

  • 数据并行, 所有设备运行完整的网络副本, 类比于多个工人一起搬砖, 速度快.
  • 模型并行, 不同设备运行网络的不同子模块, 类比于工厂的流水线, 不同工人干不同的工序. 当模型参数过大时, 得用这种.

x机x卡

  • 单机0卡, 普通写法, 默认就是 cpu 运行.
  • 多机0卡, 无法以 DDP 等方式在cpu集群上分布式训练, 而 tf 可以.
  • 单机单卡, tensor 搬到 cuda:0 上即可.
  • 单机多卡
    最简单的就是DP策略.
    但因为GIL锁的限制(一个 Python 进程只能利用一个 CPU 核),DP的性能是低于DDP的. 所以此时依旧推荐 DDP.
  • 多机单卡
    todo
  • 多机多卡
    todo

三. Data Parallel 实现

单进程多线程控制同一台机器上的多个 GPU.

梯度同步

数据并行实践中用的更多, 它引入了 主设备 的角色, 工作流如下:

  1. 主设备初始化网络, 分发副本至各设备.
  2. 数据拆分, 各 device 拿到不同的输入, 执行自己的 forward 并计算 gradient.
  3. 各 replica 的梯度加和, 在主设备上反向传播作更新.
  4. 主设备计算出最新的网络参数, 转到步骤一, 作分发.

class torch.nn.parallel.data_parallel.DataParallel(Module)
构造调用如 net = torch.nn.DataParallel(model, device_ids=[0, 1, 2]).

源码解读

重要成员定义见下:

class DataParallel(Module):
    def __init__(self, module, device_ids=None, output_device=None, dim=0):
    	pass

	def forward(self, *inputs, **kwargs):
	    with torch.autograd.profiler.record_function("DataParallel.forward"):
	        # t:torch.nn.parameter.Parameter
	        for t in chain(self.module.parameters(), self.module.buffers()):
	            #t.device device(type='cuda', index=0)
	            if t.device != self.src_device_obj:
	                raise RuntimeError(
	        # before scatter, kwargs:Dict[str,Tensor]
	        inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
	        # after scatter, kwargs: Tuple[Dict[str,Tensor]]

	        # 我看到的现象, device_ids=[0,1,2,3], 但散列之后总数变少了呢
	        # List[transformers.models.distilbert.modeling_distilbert.DistilBertForSequenceClassification]
	        replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
	        outputs = self.parallel_apply(replicas, inputs, kwargs)
	        return self.gather(outputs, self.output_device)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

scatter()方法将一个batch的数据散列成若干份, 同 device 个数相等, 便于数据并行. 在单机多卡环境, pdb debug有印证.

# (Pdb) p kwargs[0]['labels']
# tensor([1, 1, 0,  ..., 0, 0, 0], device='cuda:0')
# (Pdb) p kwargs[1]['labels']
# tensor([1, 0, 0,  ..., 1, 0, 0], device='cuda:1')
  • 1
  • 2
  • 3
  • 4

然后

  1. 通过replicate.py 中的 replicate()将模型作复制.
  2. 然后又调用了 parallel_apply.py 中的 parallel_apply() 方法.
    见下:

replicate.py

def replicate(network, devices, detach=False):
    devices = [_get_device_index(x, True) for x in devices]
    num_replicas = len(devices)

    params = list(network.parameters())
    param_indices = {param: idx for idx, param in enumerate(params)}
    param_copies = _broadcast_coalesced_reshape(params, devices, detach)

    buffers = list(network.buffers())
    buffers_rg = []
    buffers_not_rg = []
    for buf in buffers:
        if buf.requires_grad and not detach:
            buffers_rg.append(buf)
        else:
            buffers_not_rg.append(buf)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

parallel_apply

torch.nn.parallel.parallel_apply.parallel_apply.py

def parallel_apply(modules, inputs, kwargs_tup=None, devices=None):
    def _worker(i, module, input, kwargs, device=None):
        torch.set_grad_enabled(grad_enabled)
        if device is None:
            device = get_a_var(input).get_device()
        try:
            with torch.cuda.device(device), autocast(enabled=autocast_enabled):
                # this also avoids accidental slicing of `input` if it is a Tensor
                if not isinstance(input, (list, tuple)):
                    input = (input,)
                output = module(*input, **kwargs)
    threads = [threading.Thread(target=_worker,
                                args=(i, module, input, kwargs, device))
                for i, (module, input, kwargs, device) in
                enumerate(zip(modules, inputs, kwargs_tup, devices))]

    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

该方法多线程地作并发.

tips: 因为 GIL 的存在, py 单进程无法利用cpu多核的能力搞并行, 所以即便是单机多卡, 也推荐下文的 DDP.

四. Distributed Data Parallel 实现

  • DistributedDataParallel. 相当于一个 wrapper 把原 model 包装了起来. 其 module 字段代表传入的原始 model.
class DistributedDataParallel(Module):
	def __init__(self, module, device_ids, output_device, ...):
		pass
  • 1
  • 2
  • 3

启动

多进程控制多 GPU, 启动命令为

export NCCL_DEBUG=INFO
torchrun \
--master_addr=$MASTER_ADDR \
--master_port=$MASTER_PORT \
--nproc_per_node=1 \
--nnodes=$WORLD_SIZE \
--node_rank=$RANK \
model_train.py \
[--程序传参名字=参数值]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

torchrun 位于 Scripts 目录中, 它用来启动训练任务的进程, 为之分配不同的环境变量. 每个进程都可以通过 os.environ[“xx”] 拿到相应的不同值:

  • MASTER_ADDR, master 进程的地址, 可以是 ipv4 addr, 也可以是形如dlcd60pvomf3pmel-master-0 的主机名.
  • MASTER_PORT, master 进程的端口
  • WORLD_SIZE, 所有进程的个数
  • RANK, 在 WORLD_SIZE 中的进程排序, 值域为 [0, WORLD_SIZE -1]
  • LOCAL_RANK:本机中的进程排序, 值域为 [0, 本机GPU个数-1].

训练代码适配

可参见官方例子[2], 我也写了一版, 见下:

from torch import nn
model.to(device)

dist.init_process_group('nccl', rank=flags.rank, world_size=flags.world_size)
# 没有上一行, 直接构造 ddp 实例会报错
model = nn.parallel.DistributedDataParallel (model,
            device_ids=[args.local_rank])
# 这里不需要 model.module
optimizer = torch.optim.SGD(model.parameters(), 1e-4)


while train_loop:
	X,y=X.to(device), y.to(device)
	loss = loss_fn(model(X), y)
	optimizer.zero_grad()
	loss.backward()
	optimizer.step()


if torch.distributed.get_rank() == 0:
	# 因为原 model 被 DDP 包裹起来了, 所以这里要 model.module
	torch.save(model.module.state_dict(),'out/model.pt')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

完整任务设计

  1. 并发地训练, 上文解读过了, 就是多进程执行同样的代码, 靠 rank 参数区分.
  2. 数据切分. 如果是本地数据, 可用 DistributedSampler, 它挑选不同的切片数据送往训练.
  3. 定时 eval, 只有 rank=0 的进程去 eval, 此时全局训练被阻塞暂停了么? todo
  4. 定时 save, 只有 rank=0 的进程去 save, 疑问同上, 待验证.

step 与 梯度

todo

参考

  1. zhihu, PyTorch 源码解读之 DP & DDP:模型并行和分布式训练解析
  2. ddp 官方demo
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/420486
推荐阅读
相关标签
  

闽ICP备14008679号