赞
踩
分布式训练可以分为数据并行、模型并行,流水线并行和混合并行。分布式算法又有典型的parameter server和ring all-reduce。无论是哪一种分布式技术一个核心的关键就是如何进行communication,这是实现分布式训练的基础,因此要想掌握分布式训练或当前流行的大模型训练务必对worker间的通信方式有所了解。
Pytorch的分布式训练的通信是依赖torch.distributed
模块来实现的,torch.distributed
提供了point-2-point communication 和collective communication两种通信方式。
point-2-point communication提供了send和recv语义,用于任务间的通信
collective communication主要提供了scatter/broadcast/gather/reduce/all_reduce/all_gather 语义,不同的backend在提供的通信语义上具有一定的差异性。
All-reduce:所有节点上的数据都会被收集起来,然后进行某种操作(通常是求和或求平均),然后将结果广播回每个节点。这个操作在并行计算中常用于全局梯度更新。
All-gather: 每个节点上的数据都被广播到其他所有节点上。每个节点最终都会收到来自所有其他节点的数据集合。这个操作在并行计算中用于收集各个节点的局部数据,以进行全局聚合或分析。
Broadcast:一台节点上的数据被广播到其他所有节点上。通常用于将模型参数或其他全局数据分发到所有节点。
Reduce: 将所有节点上的数据进行某种操作(如求和、求平均、取最大值等)后,将结果发送回指定节点。这个操作常用于在并行计算中进行局部聚合。
Scatter: 从一个节点的数据集合中将数据分发到其他节点上。通常用于将一个较大的数据集合分割成多个部分,然后分发到不同节点上进行并行处理。
Gather: 将各个节点上的数据收集到一个节点上。通常用于将多个节点上的局部数据收集到一个节点上进行汇总或分析。
Pytorch 中分布式的基本使用流程如下:
init_process_group
初始化进程组。DDP(model, device_ids=device_ids)
torch.distributed.launch
或者torchrun
在每个主机上执行一次脚本,开始训练destory_process_group()
销毁进程组| Backend | ``gloo`` | ``mpi`` | ``nccl`` |
|----------------|-----|-----|-----|-----|-----|-----|
| Device | CPU | GPU | CPU | GPU | CPU | GPU |
|----------------|-----|-----|-----|-----|-----|-----|
| send | ✓ | ✘ | ✓ | ? | ✘ | ✓ |
| recv | ✓ | ✘ | ✓ | ? | ✘ | ✓ |
| broadcast | ✓ | ✓ | ✓ | ? | ✘ | ✓ |
| all_reduce | ✓ | ✓ | ✓ | ? | ✘ | ✓ |
| reduce | ✓ | ✘ | ✓ | ? | ✘ | ✓ |
| all_gather | ✓ | ✘ | ✓ | ? | ✘ | ✓ |
| gather | ✓ | ✘ | ✓ | ? | ✘ | ✓ |
| scatter | ✓ | ✘ | ✓ | ? | ✘ | ✓ |
| reduce_scatter | ✘ | ✘ | ✘ | ✘ | ✘ | ✓ |
| all_to_all | ✘ | ✘ | ✓ | ? | ✘ | ✓ |
| barrier | ✓ | ✘ | ✓ | ? | ✘ | ✓ |
torch.distributed.launch
或者torchrun
内部指定。比方说, rank = 3,local_rank = 0 表示第 3 个进程内的第 1 块 GPU。调用任何其他方法之前,需要使用torch.distributed.init_process_group(backend=None, init_method=None, timeout=None, world_size=-1, rank=-1, store=None, group_name='', pg_options=None)
或torch.distributed.device_mesh.init_device_mesh(device_type, mesh_shape, *, mesh_dim_names=None)
函数初始化。两者都阻塞,直到所有进程都joined。
""" python run.py """ #!/usr/bin/env python import os from typing import Callable import torch import torch.distributed as dist import torch.multiprocessing as mp # Initialization def init_process(rank: int, size: int, fn: Callable[[int, int], None], backend="gloo"): """Initialize the distributed environment.""" os.environ["MASTER_ADDR"] = "127.0.0.1" os.environ["MASTER_PORT"] = "29500" dist.init_process_group(backend, rank=rank, world_size=size) fn(rank, size) def run_p2p(rank_id, size) -> None: tensor = torch.zeros(1) if rank_id == 0: tensor += 1 # Send the tensor to process 1 dist.send(tensor=tensor, dst=1) print("after send, Rank ", rank_id, " has data ", tensor[0]) dist.recv(tensor=tensor, src=1) print("after recv, Rank ", rank_id, " has data ", tensor[0]) else: # Receive tensor from process 0 dist.recv(tensor=tensor, src=0) print("after recv, Rank ", rank_id, " has data ", tensor[0]) tensor += 1 dist.send(tensor=tensor, dst=0) print("after send, Rank ", rank_id, " has data ", tensor[0]) def run_reduce(rank_id, size) -> None: tensor = torch.arange(end=2, dtype=torch.int64) + 1 + 2 * rank_id print('before reudce',' Rank ', rank_id, ' has data ', tensor) dist.all_reduce(tensor=tensor, op=dist.ReduceOp.SUM) print('after reudce',' Rank ', rank_id, ' has data ', tensor) if __name__ == "__main__": size = 2 processes = [] mp.set_start_method(method="spawn") for rank in range(size): # p = mp.Process(target=init_process, args=(rank, size, run_p2p)) p = mp.Process(target=init_process, args=(rank, size, run_reduce)) p.start() processes.append(p) for p in processes: p.join()
输出
python run.py
after send, Rank 0 has data tensor(1.)
after recv, Rank 1 has data tensor(1.)
after send, Rank 1 has data tensor(2.)
after recv, Rank 0 has data tensor(2.)
-----------------------------------------
python run.py
before reudce Rank 1 has data tensor([3, 4])
before reudce Rank 0 has data tensor([1, 2])
after reudce Rank 0 has data tensor([4, 6])
after reudce Rank 1 has data tensor([4, 6])
在单节点分布式训练或多节点分布式训练的两种情况下,该实用程序将启动每个节点给定数量的进程(–nproc-per-node)。如果用于GPU训练,这个数字需要小于或等于当前系统(nproc_per_node)上的GPU数量,并且每个进程从GPU 0到GPU (nproc_per_node - 1)将在单个GPU上操作。
training_script::位置参数,单 GPU 训练脚本的完整路径,该工具将并行启动该脚本。
–nnodes:指定用来分布式训练脚本的节点数
–node_rank:多节点分布式训练时,指定当前节点的 rank。
–nproc_per_node:指定当前节点上,使用 GPU 训练的进程数。建议将该参数设置为当前节点的 GPU 数量,这样每个进程都能单独控制一个 GPU,效率最高。
–master_addr:master 节点(rank 为 0)的地址,应该为 ip 地址或者 node 0 的 hostname。对于单节点多进程训练的情况,该参数可以设置为 127.0.0.1。
–master_port:指定分布式训练中,master 节点使用的端口号,必须与其他应用的端口号不冲突。
python -m torchrun --nproc-per-node=NUM_GPUS_YOU_HAVE
YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other
arguments of your training script)
python -m torchrun --nproc-per-node=NUM_GPUS_YOU_HAVE
--nnodes=2 --node-rank=0 --master-addr="192.168.1.1"
--master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
and all other arguments of your training script)
Node 2:
python -m torchrun --nproc-per-node=NUM_GPUS_YOU_HAVE
--nnodes=2 --node-rank=1 --master-addr="192.168.1.1"
--master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
and all other arguments of your training script)
import os import torch import torch.distributed as dist def run(rank_id, size): tensor = torch.arange(2, dtype=torch.float64) + 1 + rank_id tensor = tensor.to(f"cuda:{rank_id}") print("---->before reudce", " Rank ", rank_id, " has data ", tensor, "<----") dist.all_reduce(tensor, op=dist.ReduceOp.SUM) print("---->after reudce", " Rank ", rank_id, " has data ", tensor, "<----") def main(): local_rank = int(os.environ["LOCAL_RANK"]) # 注意这个参数,即使代码中不使用。因为 torchrun工具默认传递该参数 rank = int(os.environ["RANK"]) dist.init_process_group(backend="nccl") run(rank, size=0) if __name__ == "__main__": main()
输出:
torchrun --nproc_per_node=2 --nnode=1 --master_addr="127.0.0.1" --master_port=29500 run.py
WARNING:torch.distributed.run:
*****************************************
Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed.
*****************************************
---->before reudce Rank 1 has data tensor([2., 3.], device='cuda:1', dtype=torch.float64) <----
---->before reudce Rank 0 has data tensor([1., 2.], device='cuda:0', dtype=torch.float64) <----
---->after reudce Rank 0 has data tensor([3., 5.], device='cuda:1',dtype=torch.float64) <----
---->after reudce Rank 1 has data tensor([3., 5.], device='cuda:0',dtype=torch.float64) <----
多节点测试(数据并行)
在 Env 方式中,在 init_process_group 中,无需指定任何参数
必须在 rank==0 的进程内保存参数。
该方式下,使用 torchrun在每台主机上,为其创建多进程,其中:nproc_per_node 参数指定为当前主机创建的进程数。一般设定为当前主机的 GPU 数
nnodes 参数指定当前 job 包含多少个节点
node_rank 指定当前节点的优先级
master_addr 和 master_port 分别指定 master 节点的 ip:port
若没有为每个进程合理分配 GPU,则默认使用当前主机上所有的 GPU。即使一台主机上有多个进程,也会共用 GPU。
使用 torch.distributed.launch 工具时,将会为当前主机创建 nproc_per_node 个进程,每个程独立执行训练脚本。同时,它还会为每个进程分配一个 local_rank 参数,表示当前进程在当前主机上的编号。例如:rank=2, local_rank=0 表示第 3 个节点上的第 1 个进程
需要合理利用 local_rank 参数,来合理分配本地的 GPU 资源
每条命令表示一个进程。若已开启的进程未达到 word_size 的数量,则所有进程会一直等待
# sample.py import torch.distributed as dist import torch.utils.data.distributed # ...... import argparse parser = argparse.ArgumentParser() # 注意这个参数,必须要以这种形式指定,即使代码中不使用。因为 launch 工具默认传递该参数 parser.add_argument("--local_rank", type=int) args = parser.parse_args() # ...... dist.init_process_group(backend='nccl', init_method='env://') # ...... trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=download, transform=transform) train_sampler = torch.utils.data.distributed.DistributedSampler(trainset) trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, sampler=train_sampler) # ...... # 根据 local_rank,配置当前进程使用的 GPU net = Net() device = torch.device('cuda', args.local_rank) net = net.to(device) net = torch.nn.parallel.DistributedDataParallel(net, device_ids=[args.local_rank], output_device=args.local_rank)
执行方式
torchrun --nproc_per_node=2 --nnodes=3 --node_rank=0 --master_addr="192.168.1.201" --master_port=23456 sample.py
torchrun --nproc_per_node=2 --nnodes=3 --node_rank=1 --master_addr="192.168.1.201" --master_port=23456 sample.py
torchrun --nproc_per_node=2 --nnodes=3 --node_rank=2 --master_addr="192.168.1.201" --master_port=23456 sample.py
参考
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。