赞
踩
本文在【Megatron-LM源码系列(二):Tensor模型并行和Sequence模型并行训练】基础上增加了Pipeline模型并行训练的介绍,对于Pipeline模型并行思路可参考【详解MegatronLM流水线模型并行训练(Pipeline Parallel)】。pipeline并行中网络是按层的粒度进行纵向切分,在通信组通信上中在pipeline的不同stage中进行横向通信。如下图中2机16卡每个色块就是一个pipeline通信组,训练前向通信的顺序是从左向右。
pipeline模型并行训练实现的代码在megatron/core/pipeline_parallel
目录下,有两个主要文件,分别是p2p_communication.py
和schedules.py
。
在p2p_communication.py
中会用到megatron/core/parallel_state.py
中定义的四种函数,分别是
pipeline模型并行用到的通信都是p2p点对点的,底层对应通信库中的send和recv两个原语。实际使用过程中定义了如下的几个接口:
_communicate
。def recv_forward(tensor_shape: Shape,
dtype: torch.dtype,
batch_p2p_comm: bool = True,
timers: Callable = None) -> torch.Tensor:
...
input_tensor, _, _ = _communicate(
tensor_send_next=None,
tensor_send_prev=None,
recv_prev=True,
recv_next=False,
tensor_shape=tensor_shape,
batch_p2p_comm=batch_p2p_comm,
dtype=dtype)
...
_communicate
。def recv_forward(tensor_shape: Shape,
dtype: torch.dtype,
batch_p2p_comm: bool = True,
timers: Callable = None) -> torch.Tensor:
...
input_tensor, _, _ = _communicate(
tensor_send_next=None,
tensor_send_prev=None,
recv_prev=True,
recv_next=False,
tensor_shape=tensor_shape,
batch_p2p_comm=batch_p2p_comm,
dtype=dtype)
...
_communicate
。def send_forward(output_tensor: torch.Tensor,
batch_p2p_comm: bool = True,
timers: Callable = None) -> None:
...
_communicate(
tensor_send_next=output_tensor,
tensor_send_prev=None,
recv_prev=False,
recv_next=False,
tensor_shape=None,
batch_p2p_comm=batch_p2p_comm,
dtype=None)
...
_communicate
。def send_backward(input_tensor_grad: torch.Tensor,
batch_p2p_comm: bool = True,
timers: Callable = None) -> None:
...
_communicate(
tensor_send_next=None,
tensor_send_prev=input_tensor_grad,
recv_prev=False,
recv_next=False,
tensor_shape=None,
batch_p2p_comm=batch_p2p_comm,
dtype=None)
...
_communicate
。def send_backward(input_tensor_grad: torch.Tensor,
batch_p2p_comm: bool = True,
timers: Callable = None) -> None:
...
_communicate(
tensor_send_next=None,
tensor_send_prev=input_tensor_grad,
recv_prev=False,
recv_next=False,
tensor_shape=None,
batch_p2p_comm=batch_p2p_comm,
dtype=None)
...
_communicate
。def send_backward_recv_forward(input_tensor_grad: torch.Tensor,
tensor_shape: Shape,
dtype: torch.dtype,
batch_p2p_comm: bool = True,
timers: Callable = None) -> torch.Tensor:
...
input_tensor, _, _ = _communicate(
tensor_send_next=None,
tensor_send_prev=input_tensor_grad,
recv_prev=True,
recv_next=False,
tensor_shape=tensor_shape,
batch_p2p_comm=batch_p2p_comm,
dtype=dtype)
...
_communicate
。def send_backward_recv_backward(input_tensor_grad: torch.Tensor, recv_next: bool, tensor_shape: Shape, dtype: torch.dtype, batch_p2p_comm: bool = True, overlap_p2p_comm: bool = False, timers: Callable = None) -> torch.Tensor: ... input_tensor, _, wait_handles = _communicate( tensor_send_next=output_tensor, tensor_send_prev=None, recv_prev=recv_prev, recv_next=False, tensor_shape=tensor_shape, batch_p2p_comm=batch_p2p_comm, wait_on_reqs=(not overlap_p2p_comm), dtype=dtype) ...
_communicate
。def send_backward_recv_backward(input_tensor_grad: torch.Tensor, recv_next: bool, tensor_shape: Shape, dtype: torch.dtype, batch_p2p_comm: bool = True, overlap_p2p_comm: bool = False, timers: Callable = None) -> torch.Tensor: ... _, output_tensor_grad, wait_handles = _communicate( tensor_send_next=None, tensor_send_prev=input_tensor_grad, recv_prev=False, recv_next=recv_next, tensor_shape=tensor_shape, batch_p2p_comm=batch_p2p_comm, wait_on_reqs=(not overlap_p2p_comm), dtype=dtype) ...
_communicate
。def send_forward_backward_recv_forward_backward( output_tensor: torch.Tensor, input_tensor_grad: torch.Tensor, recv_prev: bool, recv_next: bool, tensor_shape: Shape, dtype: torch.dtype, batch_p2p_comm: bool = True, timers: Callable = None) -> Tuple[torch.Tensor, torch.Tensor]: ... input_tensor, output_tensor_grad, _ = _communicate( tensor_send_next=output_tensor, tensor_send_prev=input_tensor_grad, recv_prev=recv_prev, recv_next=recv_next, tensor_shape=tensor_shape, batch_p2p_comm=batch_p2p_comm, dtype=dtype) ...
在p2p接口都是直接使用的_communicate
函数,_communicate
函数定义如下:
def _communicate(*, tensor_send_next: Optional[torch.Tensor],
tensor_send_prev: Optional[torch.Tensor],
recv_prev: bool,
recv_next: bool,
tensor_shape: Shape,
batch_p2p_comm: bool = True,
wait_on_reqs: bool = True,
dtype: Optional[torch.dtype],
variable_seq_lengths: bool = False,
use_ring_exchange_p2p: bool = False,
) -> Tuple[torch.Tensor, torch.Tensor]:
参数列表说明:
batch_isend_irecv
,为False的话就改为使用isend
和irecv
isend
和irecv
的话,每次在请求结束前进行阻塞操作ring_exchange kernel
,代替torch.distributed.batch_isend_irecv()
在_communicate
函数中会先进行接收buffer的初始化操作。
if recv_prev:
...
tensor_recv_prev = torch.empty(recv_prev_shape,
requires_grad=True,
device=torch.cuda.current_device(),
dtype=dtype)
if recv_next:
...
tensor_recv_next = torch.empty(recv_next_shape,
requires_grad=True,
device=torch.cuda.current_device(),
dtype=dtype)
初始化以后调用p2p_func
进行数据的接收和发送,这里_batched_p2p_ops
调用的是batch_isend_irecv
,_p2p_ops
调用的是isend
和irecv
,同时也支持使用torch.distributed.ring_exchange
。
# Send tensors in both the forward and backward directions as appropriate. if use_ring_exchange_p2p: def _ring_exchange_wrapper(**kwargs): torch.distributed.ring_exchange(**kwargs) return [] p2p_func = _ring_exchange_wrapper elif batch_p2p_comm: assert wait_on_reqs p2p_func = _batched_p2p_ops else: p2p_func = _p2p_ops reqs = p2p_func(tensor_send_prev=tensor_send_prev, tensor_recv_prev=tensor_recv_prev, tensor_send_next=tensor_send_next, tensor_recv_next=tensor_recv_next, group=get_pipeline_model_parallel_group())
get_forward_backward_func
是训练中用到的入口,用于选择训练用到的前向和反向的方法,对应有三种方式:
PipeDream-2BW
的1F1B
的训练PipeDream-2BW
的改进,在每个卡上支持多个stage。def get_forward_backward_func():
pipeline_model_parallel_size = parallel_state.get_pipeline_model_parallel_world_size()
if pipeline_model_parallel_size > 1:
if parallel_state.get_virtual_pipeline_model_parallel_world_size() is not None:
forward_backward_func = forward_backward_pipelining_with_interleaving
else:
forward_backward_func = forward_backward_pipelining_without_interleaving
else:
forward_backward_func = forward_backward_no_pipelining
return forward_backward_func
forward_backward_func
在实际中的使用(megatron/training.py
)如下,在写完forward_step_func
前向过程后,使用forward_backward_func
进行封装。
forward_backward_func = get_forward_backward_func()
fwd_bwd_timers = timers if args.timing_log_level > 1 else None
losses_reduced = forward_backward_func(
forward_step_func=forward_step_func,
data_iterator=data_iterator,
model=model,
num_microbatches=get_num_microbatches(),
dtype=args.params_dtype,
tensor_shape=(args.seq_length, args.micro_batch_size, args.hidden_size),
grad_scaler=optimizer.scale_loss,
sequence_parallel=args.sequence_parallel,
overlap_p2p_comm=args.overlap_p2p_comm,
batch_p2p_comm=not args.overlap_p2p_comm,
forward_only=False,
timers=fwd_bwd_timers)
forward_backward_no_pipelining
中以microbatch
为粒度,先进行前向计算,紧跟着进行反向计算,前n-1
个microbatch
不会进行同步操作,直到最后一个再进行同步。这个过程中不涉及跨stage的通信,同时也不支持sequence并行。前向计算的loss会存在forward_data_store
中返回。
if no_sync_func is None and isinstance(model, torchDDP): no_sync_func = model.no_sync if no_sync_func is None: no_sync_func = contextlib.nullcontext # 前n-1个minibatch with no_sync_func(): for i in range(num_microbatches - 1): output_tensor = forward_step(forward_step_func, data_iterator, model, num_microbatches, input_tensor, forward_data_store, timers, collect_non_loss_data, dtype, enable_autocast) if not forward_only: backward_step(grad_scaler, input_tensor, output_tensor, output_tensor_grad, model_type, timers, deallocate_pipeline_outputs) # 最后一个minibatch output_tensor = forward_step(forward_step_func, data_iterator, model, num_microbatches, input_tensor, forward_data_store, timers, collect_non_loss_data, dtype, enable_autocast) if not forward_only: backward_step(grad_scaler, input_tensor, output_tensor, output_tensor_grad, model_type, timers, deallocate_pipeline_outputs) return forward_data_store
实现了non-interleaved的1F1B算法,在最后一个stage返回loss。在执行过程中先进行warmup的microbatch的计算。执行的时候先调用recv_forward
从前一个stage获取上一个rank的输出,对于第一个stage是从dataloader获取输入;然后执行forward_step
进行当前stage的前向计算;然后通过send_forward
把结果输传给下一个stage。
def forward_backward_pipelining_without_interleaving(*,...): ... # Compute number of warmup microbatches. num_warmup_microbatches = \ (parallel_state.get_pipeline_model_parallel_world_size() - parallel_state.get_pipeline_model_parallel_rank() - 1) num_warmup_microbatches = min( num_warmup_microbatches, num_microbatches) num_microbatches_remaining = \ num_microbatches - num_warmup_microbatches ... # Run warmup forward passes. for i in range(num_warmup_microbatches): input_tensor = recv_forward(recv_tensor_shapes, dtype, timers=timers) output_tensor = forward_step(forward_step_func, data_iterator, model, num_microbatches, input_tensor, forward_data_store, timers, collect_non_loss_data, dtype, enable_autocast) send_forward(output_tensor, send_tensor_shapes, timers=timers)
执行warmup操作后开始进入1F1B
的稳定状态, 先进行前向forward_step
计算并调用send_forward_recv_backward
传给下一个stage。这里forward_step_func
是跟每个模型相关的,是在模型初始化定义的。
# Run 1F1B in steady state.
for i in range(num_microbatches_remaining):
last_iteration = (i == (num_microbatches_remaining - 1))
output_tensor = forward_step(forward_step_func, data_iterator, model, num_microbatches,
input_tensor, forward_data_store,
timers, collect_non_loss_data, dtype, enable_autocast)
...
output_tensor_grad = \
send_forward_recv_backward(output_tensor,
send_tensor_shapes, dtype,
timers=timers)
在1F1B
中通过buffer保存前向的输入和输出用于反向的计算, 每次计算的时候会pop出来一组input和output,计算完backward后会通过send_backward_recv_forward
把梯度回传给前一个stage:
# 1. Add input_tensor and output_tensor to end of list. input_tensors.append(input_tensor) output_tensors.append(output_tensor) deallocate_output_tensor(output_tensor[0], deallocate_pipeline_outputs) # 2. Pop input_tensor and output_tensor from the start of the list for # the backward pass. input_tensor = input_tensors.pop(0) output_tensor = output_tensors.pop(0) input_tensor_grad = \ backward_step(grad_scaler, input_tensor, output_tensor, output_tensor_grad, model_type, timers, deallocate_pipeline_outputs) # 3. 反向结果传递 if last_iteration: input_tensor = None send_backward(input_tensor_grad, recv_tensor_shapes, timers=timers) else: input_tensor = \ send_backward_recv_forward( input_tensor_grad, recv_tensor_shapes, dtype, timers=timers)
跟warmup类似,在最后还有一个要等所有backward结束的阶段(cooldown backward passes
)
# Run cooldown backward passes.
if not forward_only:
for i in range(num_warmup_microbatches):
...
input_tensor_grad = \
backward_step(grad_scaler, input_tensor, output_tensor,
output_tensor_grad, model_type, timers, deallocate_pipeline_outputs)
send_backward(input_tensor_grad, recv_tensor_shapes, timers=timers)
...
最后思考一个问题:在forward_backward_pipelining_without_interleaving
中怎么保证后向用到的参数跟前向一样的?这个问题是通过DDP中的取消梯度同步的context函数model.no_sync
来解决的。在前N-1个microbatch的backward前只做梯度的累加,只有在最后第N个microbatch完成backward才进行梯度的实际更新。
# Disable async grad reductions if no_sync_func is None and isinstance(model, torchDDP): no_sync_func = model.no_sync if no_sync_func is None: no_sync_func = contextlib.nullcontext ... # 关闭梯度同步 disable_grad_sync() ... # 执行1F1B ... # Run cooldown backward passes. if not forward_only: for i in range(num_warmup_microbatches): if i == num_warmup_microbatches-1: if grad_sync_func is None or rank == 0: # 打开梯度同步 enable_grad_sync() ...
forward_backward_pipelining_with_interleaving
也是定义在megatron/core/pipeline_parallel/schedules.py
。它的实现interleaved 1F1B
调度方法的基本思路和forward_backward_pipelining_without_interleaving
类似。区别在于要对model模型进行进一步的拆分为多块(chunk)。
回顾下在启用interleaving pipeline
的时候,在megatron/core/parallel_state.py
中的initialize_model_parallel
函数中必须要设置virtual_pipeline_model_parallel_size
,表示每个device会处理几个stage,例如:对于一个有16层的transformer网络来说,训练配置tensor_model_parallel_size=1, pipeline_model_parallel_size=4, virtual_pipeline_model_parallel_size=2
,表示模型会被分为4*2=8
个stage,每个stage有2个layer,对于一个pipeline通信组的4个gpu device来说,每个device会处理2个stage。如果virtual_pipeline_model_parallel_size
不设置为None的情况下,不会启动interleaving
。
GPU 0: [1, 2] [9, 10]
GPU 1: [3, 4] [11, 12]
GPU 2: [5, 6] [13, 14]
GPU 3: [7, 8] [15, 16]
interleaving pipeline并行对model的分chunk构建,具体通过setup_model_and_optimizer
进行构建。
def setup_model_and_optimizer(model_provider_func,
model_type,
no_wd_decay_cond=None,
scale_lr_cond=None,
lr_mult=1.0):
args = get_args()
model = get_model(model_provider_func, model_type)
unwrapped_model = unwrap_model(model,
(torchDDP, LocalDDP, Float16Module))
......
return model, optimizer, opt_param_scheduler
在setup_model_and_optimizer
中会调用megatron/training.py
的get_model
方法,在get_model
方法中会按照设的args.virtual_pipeline_model_parallel_size
个数返回一个同等长度的model列表,每一个virtual_pipline会对应一个model。注意这里每个model会被设置virtual_pipeline_model_parallel_rank
,即mpu.set_virtual_pipeline_model_parallel_rank(i)
def get_model(model_provider_func, model_type=ModelType.encoder_or_decoder, wrap_with_ddp=True): ...... # Build model. if mpu.get_pipeline_model_parallel_world_size() > 1 and \ ...... model = [] # 按照args.virtual_pipeline_model_parallel_size的大小保存一个同等长度的model列表 for i in range(args.virtual_pipeline_model_parallel_size): mpu.set_virtual_pipeline_model_parallel_rank(i) # Set pre_process and post_process only after virtual rank is set. pre_process = mpu.is_pipeline_first_stage() post_process = mpu.is_pipeline_last_stage() this_model = model_provider_func( pre_process=pre_process, post_process=post_process ) this_model.model_type = model_type model.append(this_model) ......
在megatron/model/transformer.py
文件中的ParallelTransformer
函数中有针对流水线并行对model进行按layer层的划分。切分的基本思路是对transformer中的layer列表进行按比例切分为N
份,每个rank结点上保存各自的模型(继承自torch.nn.module),每个module中分别有layer_num/N
个layer,layer确定是从原始的layer列表中通过计算offset偏移来得到的。
在划分的时候先调用_get_num_layers
获得切分后的layer个数:
self.num_layers = _get_num_layers(args, model_type,
layer_type==LayerType.decoder)
在_get_num_layers
中这里只分析decoder结构的情况,只有在embedding stage
且当前结点rank在通信组中为0的情况下num_layers
等于0,否则num_layers
等于传入的args.num_layers
除以流水线并行度, 比如设置了transformer的layer有8层,pipeline并行度为2,那么这里的self.num_layers
的值会被更新为4。
def _get_num_layers(args, model_type, is_decoder=False): """Compute the number of transformer layers resident on the current rank.""" ...... elif mpu.get_pipeline_model_parallel_world_size() > 1: if is_encoder_and_decoder_model: ...... else: assert args.num_layers % args.transformer_pipeline_model_parallel_size == 0, \ 'num_layers must be divisible by transformer_pipeline_model_parallel_size' num_layers = ( 0 if args.standalone_embedding_stage and mpu.get_pipeline_model_parallel_rank() == 0 else args.num_layers // args.transformer_pipeline_model_parallel_size )
更新得到切分后的num_layers
以后,对于非interleaving的情况中,对于只有decoder结构的模型,offset计算是按rank来算,代码如下:
class ParallelTransformer(MegatronModule):
"""Transformer class."""
def __init__(...):
......
if args.virtual_pipeline_model_parallel_size is not None:
......
else:
# 对于非interleaving的情况
# Each stage gets a contiguous set of layers.
if args.model_type == ModelType.encoder_and_decoder and \
mpu.get_pipeline_model_parallel_world_size() > 1:
......
else:
# 对于只有decoder结构的模型
offset = mpu.get_pipeline_model_parallel_rank() * self.num_layers
对于interleaving的情况中,self.num_layers
需要进一步按args.virtual_pipeline_model_parallel_size
切分为更小的块(chunk)。
class ParallelTransformer(MegatronModule): """Transformer class.""" def __init__(...): ...... if args.virtual_pipeline_model_parallel_size is not None: assert args.num_layers % args.virtual_pipeline_model_parallel_size == 0, \ 'num_layers_per_stage must be divisible by ' \ 'virtual_pipeline_model_parallel_size' assert args.model_type != ModelType.encoder_and_decoder # 在interleaving的情况下, self.num_layers还要进一步除以args.virtual_pipeline_model_parallel_size。 # layer有8层,pipeline并行度为2,虚拟pipeline并行度也为2时,self.num_layers从原来的4会被更新为2 self.num_layers = self.num_layers // args.virtual_pipeline_model_parallel_size # 在interleaving的情况下, 如下图 # With 8 layers, 2 stages, and 4 model chunks, we want an assignment of # layers to stages like (each list is a model chunk): # Stage 0: [0] [2] [4] [6] # Stage 1: [1] [3] [5] [7] # With 8 layers, 2 stages, and 2 virtual stages, we want an assignment of # layers to stages like (each list is a model chunk): # Stage 0: [0, 1] [4, 5] # Stage 1: [2, 3] [6, 7] offset = mpu.get_virtual_pipeline_model_parallel_rank() * ( args.num_layers // args.virtual_pipeline_model_parallel_size) + \ (mpu.get_pipeline_model_parallel_rank() * self.num_layers) else: # 在非interleaving的情况下, offset等于每个stage中的层数. if args.model_type == ModelType.encoder_and_decoder and \ ...... else: offset = mpu.get_pipeline_model_parallel_rank() * self.num_layers
在非interleaving的情况下,得到offset后开始实际创建当前rank结点保存的ModuleList,这里的ModuleList中每一层是一个layer,传入的数表示当前layer的编号。比如以num_layer=8, pipeline_stage=2
为例,offset是4,对应创建的两个ModuleList,device0中ModuleList是ModuleList(l1,l2,l3,l4)
, device1中的是ModuleList(l5,l6,l7,l8)
。
class ParallelTransformer(MegatronModule):
"""Transformer class."""
def __init__(...):
......
self.layers = torch.nn.ModuleList(
[build_layer(i + 1 + offset) for i in range(self.num_layers)])
讨论interleaving offset实现前,先回顾下training.py
中对每个device创建model list过程,如果是interleaving情况下,每个device实际会保存args.virtual_pipeline_model_parallel_size
个model,做为一个虚拟组,每个model中只保存原有网络的部分layer。这里的model_provider_func
最终会调用到ParallelTransformer
的初始化。
def get_model(model_provider_func, ...):
......
model = []
for i in range(args.virtual_pipeline_model_parallel_size):
mpu.set_virtual_pipeline_model_parallel_rank(i)
......
this_model = model_provider_func(
pre_process=pre_process,
post_process=post_process
)
this_model.model_type = model_type
model.append(this_model)
对于interleaving的offset定义有些复杂,定义如下:
offset = mpu.get_virtual_pipeline_model_parallel_rank() * (
args.num_layers // args.virtual_pipeline_model_parallel_size) + \
(mpu.get_pipeline_model_parallel_rank() * self.num_layers)
mpu.get_virtual_pipeline_model_parallel_rank() * (args.num_layers // args.virtual_pipeline_model_parallel_size)
表示先按大的虚拟组来做区分,然后在每个大虚拟组中再切分小块(chunk)。以layer_num=8, stage_num=2, virtual_pipeline_model_parallel_size=2
为例,大的虚拟组的偏移分别是0和4,这里的self.num_layers
大小为2, mpu.get_pipeline_model_parallel_rank()
的值有0和1(stage为2),最终组合起来可能的值有0, 2, 4, 6
。在使用offset求最终的layer编号时,也是跟之前非interleaving一样的方式。
self.layers = torch.nn.ModuleList(
[build_layer(i + 1 + offset) for i in range(self.num_layers)])
最终device0对应的model有两个ModuleList, 分别是ModuleList[l0, l1]
和ModuleList[l4, l5]
;device1对应的model也有两个ModuleList,分别是ModuleList[l2, l3]
和ModuleList[l6, l7]
device0(stage1) : {ModuleList[l0, l1], ModuleList[l4, l5]}
device1(stage2) : {ModuleList[l2, l3], ModuleList[l6, l7]}
在一开始跟without_interleaving
情况一样,需要关闭自动的grad更新;同时由于存在多个model,这里要保存多个model的输入input_tensors
和输出output_tensors
。
disable_grad_sync()
# Model chunk IDs with synchronized grads
synchronized_model_chunks = set()
input_tensors = [[] for _ in range(len(model))]
output_tensors = [[] for _ in range(len(model))]
如果是sequence并行的话,是从sequence维度对矩阵tensor进行切分为get_tensor_model_parallel_world_size()
份。
if sequence_parallel:
seq_length, batch_size, hidden = tensor_shape
tensor_shape = (
seq_length // parallel_state.get_tensor_model_parallel_world_size(),
batch_size,
hidden,
)
warmup前计算每个device在warmup阶段要用到的microbatch数量num_warmup_microbatches
。
首先计算总的total_num_microbatches
,等于传入的microbatch和len(model)的乘积,如果virtual_stage=2
的话,这里num_model_chunks
值对应是2。
如果num_microbatches
等于pipeline_parallel_size
,并行方式跟GPipe中的思路一样,先全forward再走backward,在warmup阶段把所有的microbatch都训练完。
如果num_microbatches
不等于pipeline_parallel_size
,warmup的计算的单位按pipeline_parallel_size
来进行,num_warmup_microbatches
计算分为两部分,第一部分是在阶梯下降时每个device处理的microbatch个数正好错一个,对应rank号大小,用
p
i
p
e
l
i
n
e
_
p
a
r
a
l
l
e
l
_
s
i
z
e
−
p
i
p
e
l
i
n
e
_
p
a
r
a
l
l
e
l
_
r
a
n
k
−
1
)
∗
2
pipeline\_parallel\_size - pipeline\_parallel\_rank - 1) * 2
pipeline_parallel_size−pipeline_parallel_rank−1)∗2计算; 第二部分是等量增加的部分,用
(
n
u
m
_
m
o
d
e
l
_
c
h
u
n
k
s
−
1
)
∗
p
i
p
e
l
i
n
e
_
p
a
r
a
l
l
e
l
_
s
i
z
e
(num\_model\_chunks - 1) * pipeline\_parallel\_size
(num_model_chunks−1)∗pipeline_parallel_size来计算。示例如下图:
代码实现:
# Compute number of warmup and remaining microbatches.
num_model_chunks = len(model)
total_num_microbatches = num_microbatches * num_model_chunks
......
if num_microbatches == pipeline_parallel_size:
num_warmup_microbatches = total_num_microbatches
all_warmup_microbatches = True
else:
num_warmup_microbatches = (pipeline_parallel_size - pipeline_parallel_rank - 1) * 2
num_warmup_microbatches += (num_model_chunks - 1) * pipeline_parallel_size
num_warmup_microbatches = min(num_warmup_microbatches, total_num_microbatches)
num_microbatches_remaining = total_num_microbatches - num_warmup_microbatches
在这里还实现了几个辅助函数分别是:
get_model_chunk_id(microbatch_id, forward)
: 获取microbatch_id
对应的model_chunk_id
is_first_microbatch_for_model_chunk(microbatch_id: int)
: microbatch_id
是否是model chunk中的第一个microbatchis_last_microbatch_for_model_chunk(microbatch_id: int)
: microbatch_id
是否是model chunk中的最后一个microbatchforward_step_helper(microbatch_id, checkpoint_activations_microbatch)
: 执行microbatch_id
对应的前向backward_step_helper(microbatch_id)
: 执行microbatch_id
对应的反向以interleaving 1F1B
稳定状态为例,对应的流程如下:
# 遍历处理每个个micro for k in range(num_microbatches_remaining): ...... if config.overlap_p2p_comm: ...... else: # no p2p overlap # 先进行1 forward,forward_k是microbatch_id # 跟without-interleaving不同的是forward中会跟据microbatch_id计算model_chunk_id,跟据model_chunk_id运行对应的forward_step output_tensor = forward_step_helper(forward_k, checkpoint_activations_microbatch) # 再进行1 backward, 类似forward backward_k = k input_tensor_grad = backward_step_helper(backward_k) ...... # 计算完前向和后向,跟前向和后向的rank结点进行通信 input_tensor, output_tensor_grad = \ p2p_communication.send_forward_backward_recv_forward_backward( output_tensor, input_tensor_grad, recv_prev=recv_prev, recv_next=recv_next, tensor_shape=tensor_shape, config=config) deallocate_output_tensor(output_tensor, config.deallocate_pipeline_outputs) # 在一批microbatch训练结束后,进行grad的同步操作 enable_grad_sync() if config.grad_sync_func is not None: params = [] for model_chunk_id in range(num_model_chunks): if model_chunk_id not in synchronized_model_chunks: params.extend(model[model_chunk_id].parameters()) synchronized_model_chunks.add(model_chunk_id) if params: config.grad_sync_func(params) return forward_data_store
注意interleaving目前的一些限制:
len(model) > 1
或mpu.get_pipeline_model_parallel_world_size() > 1
时,只支持args.DDP_impl='local'
num_microbatches % pipeline_parallel_size == 0
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。