当前位置:   article > 正文

Megatron-LM源码系列(三):详解Pipeline模型并行训练实现_megatron-lm 视频学习

megatron-lm 视频学习

github: https://github.com/NVIDIA/Megatron-LM

本文在【Megatron-LM源码系列(二):Tensor模型并行和Sequence模型并行训练】基础上增加了Pipeline模型并行训练的介绍,对于Pipeline模型并行思路可参考【详解MegatronLM流水线模型并行训练(Pipeline Parallel)】。pipeline并行中网络是按层的粒度进行纵向切分,在通信组通信上中在pipeline的不同stage中进行横向通信。如下图中2机16卡每个色块就是一个pipeline通信组,训练前向通信的顺序是从左向右。
在这里插入图片描述

pipeline模型并行训练实现的代码在megatron/core/pipeline_parallel目录下,有两个主要文件,分别是p2p_communication.pyschedules.py

1. p2p_communication.py

1.1 p2p接口定义

p2p_communication.py中会用到megatron/core/parallel_state.py中定义的四种函数,分别是

  • get_pipeline_model_parallel_group:获取当前rank所在的pipeline并行通信组
  • get_pipeline_model_parallel_rank:获取所在pipeline并行通信组的当前rank号
  • get_pipeline_model_parallel_prev_rank:获取所在pipeline并行通信组的前一个rank号
  • get_pipeline_model_parallel_next_rank:获取所在pipeline并行通信组的下一个rank号

pipeline模型并行用到的通信都是p2p点对点的,底层对应通信库中的send和recv两个原语。实际使用过程中定义了如下的几个接口:

  • recv_forward: 从pipeline并行组的前一个rank获取数据,函数里面会直接调用_communicate
def recv_forward(tensor_shape: Shape,
                 dtype: torch.dtype,
                 batch_p2p_comm: bool = True,
                 timers: Callable = None) -> torch.Tensor:
        ...
        input_tensor, _, _ = _communicate(
            tensor_send_next=None,
            tensor_send_prev=None,
            recv_prev=True,
            recv_next=False,
            tensor_shape=tensor_shape,
            batch_p2p_comm=batch_p2p_comm,
            dtype=dtype)
        ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • recv_backward:从pipeline并行组的后一个rank获取数据,函数里面直接调用_communicate
def recv_forward(tensor_shape: Shape,
                 dtype: torch.dtype,
                 batch_p2p_comm: bool = True,
                 timers: Callable = None) -> torch.Tensor:
    ...
        input_tensor, _, _ = _communicate(
            tensor_send_next=None,
            tensor_send_prev=None,
            recv_prev=True,
            recv_next=False,
            tensor_shape=tensor_shape,
            batch_p2p_comm=batch_p2p_comm,
            dtype=dtype)
    ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • send_forward: 向pipeline并行组的前一个rank发送数据,函数里面直接调用_communicate
def send_forward(output_tensor: torch.Tensor,
                 batch_p2p_comm: bool = True,
                 timers: Callable = None) -> None:
        ...
        _communicate(
            tensor_send_next=output_tensor,
            tensor_send_prev=None,
            recv_prev=False,
            recv_next=False,
            tensor_shape=None,
            batch_p2p_comm=batch_p2p_comm,
            dtype=None)
        ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • send_backward:向pipeline并行组的后一个rank发送数据,函数里面直接调用_communicate
def send_backward(input_tensor_grad: torch.Tensor,
                  batch_p2p_comm: bool = True,
                  timers: Callable = None) -> None:
        ...
        _communicate(
            tensor_send_next=None,
            tensor_send_prev=input_tensor_grad,
            recv_prev=False,
            recv_next=False,
            tensor_shape=None,
            batch_p2p_comm=batch_p2p_comm,
            dtype=None)
        ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • send_forward_recv_backward: pipeline并行组中向前一个rank批量发送数据,同时从后一个rank批量接收数据,函数里面直接调用_communicate
def send_backward(input_tensor_grad: torch.Tensor,
                  batch_p2p_comm: bool = True,
                  timers: Callable = None) -> None:
        ...
        _communicate(
            tensor_send_next=None,
            tensor_send_prev=input_tensor_grad,
            recv_prev=False,
            recv_next=False,
            tensor_shape=None,
            batch_p2p_comm=batch_p2p_comm,
            dtype=None)
        ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • send_backward_recv_forward: pipeline并行组中向后一个rank批量发送数据,同时从前一个rank批量接收数据,函数里面直接调用_communicate
def send_backward_recv_forward(input_tensor_grad: torch.Tensor,
                               tensor_shape: Shape,
                               dtype: torch.dtype,
                               batch_p2p_comm: bool = True,
                               timers: Callable = None) -> torch.Tensor:
        ...
        input_tensor, _, _ = _communicate(
            tensor_send_next=None,
            tensor_send_prev=input_tensor_grad,
            recv_prev=True,
            recv_next=False,
            tensor_shape=tensor_shape,
            batch_p2p_comm=batch_p2p_comm,
            dtype=dtype)
        ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • send_forward_recv_forward:pipeline并行组中向前一个rank批量发送数据,同时从前一个rank批量接收数据,函数里面直接调用_communicate
def send_backward_recv_backward(input_tensor_grad: torch.Tensor,
                                recv_next: bool,
                                tensor_shape: Shape,
                                dtype: torch.dtype,
                                batch_p2p_comm: bool = True,
                                overlap_p2p_comm: bool = False,
                                timers: Callable = None) -> torch.Tensor:
    ...
    input_tensor, _, wait_handles = _communicate(
        tensor_send_next=output_tensor,
        tensor_send_prev=None,
        recv_prev=recv_prev,
        recv_next=False,
        tensor_shape=tensor_shape,
        batch_p2p_comm=batch_p2p_comm,
        wait_on_reqs=(not overlap_p2p_comm),
        dtype=dtype)
    ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • send_backward_recv_backward: pipeline并行组中向后一个rank批量发送数据,同时从后一个rank批量接收数据,函数里面直接调用_communicate
def send_backward_recv_backward(input_tensor_grad: torch.Tensor,
                                recv_next: bool,
                                tensor_shape: Shape,
                                dtype: torch.dtype,
                                batch_p2p_comm: bool = True,
                                overlap_p2p_comm: bool = False,
                                timers: Callable = None) -> torch.Tensor:
    ...
    _, output_tensor_grad, wait_handles = _communicate(
        tensor_send_next=None,
        tensor_send_prev=input_tensor_grad,
        recv_prev=False,
        recv_next=recv_next,
        tensor_shape=tensor_shape,
        batch_p2p_comm=batch_p2p_comm,
        wait_on_reqs=(not overlap_p2p_comm),
        dtype=dtype)
    ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • send_forward_backward_recv_forward_backward: pipeline并行组中同时跟前后的rank结点进行双工通信,也就是同时批量发送和接收数据, 函数里面直接调用_communicate
def send_forward_backward_recv_forward_backward(
        output_tensor: torch.Tensor,
        input_tensor_grad: torch.Tensor,
        recv_prev: bool,
        recv_next: bool,
        tensor_shape: Shape,
        dtype: torch.dtype,
        batch_p2p_comm: bool = True,
        timers: Callable = None) -> Tuple[torch.Tensor, torch.Tensor]:
    ...
    input_tensor, output_tensor_grad, _ = _communicate(
        tensor_send_next=output_tensor,
        tensor_send_prev=input_tensor_grad,
        recv_prev=recv_prev,
        recv_next=recv_next,
        tensor_shape=tensor_shape,
        batch_p2p_comm=batch_p2p_comm,
        dtype=dtype)
    ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

1.2 _communicate实现

在p2p接口都是直接使用的_communicate函数,_communicate函数定义如下:

def _communicate(*, tensor_send_next: Optional[torch.Tensor],
                 tensor_send_prev: Optional[torch.Tensor],
                 recv_prev: bool,
                 recv_next: bool,
                 tensor_shape: Shape,
                 batch_p2p_comm: bool = True,
                 wait_on_reqs: bool = True,
                 dtype: Optional[torch.dtype],
                 variable_seq_lengths: bool = False,
                 use_ring_exchange_p2p: bool = False,
                 ) -> Tuple[torch.Tensor, torch.Tensor]:
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

参数列表说明:

  • tensor_send_next:要发送给后一个rank的tensor数据
  • tensor_send_prev:要发送给前一个rank的tensor数据
  • recv_prev:是否从前一个rank接收数据
  • recv_next:是否从后一个rank接收数据
  • tensor_shape:要接收tensor的shape大小,所有接收的tensor的shape都一样
  • batch_p2p_comm:为True则使用batch_isend_irecv,为False的话就改为使用isendirecv
  • wait_on_reqs: 使用isendirecv的话,每次在请求结束前进行阻塞操作
  • dtype:接收tensor的数据类型
  • variable_seq_lengths:当训练过程中sequence的长度不是固定的,则设置为True
  • use_ring_exchange_p2p:使用自定义的ring_exchange kernel,代替torch.distributed.batch_isend_irecv()

_communicate函数中会先进行接收buffer的初始化操作。

    if recv_prev:
        ...
        tensor_recv_prev = torch.empty(recv_prev_shape,
                                       requires_grad=True,
                                       device=torch.cuda.current_device(),
                                       dtype=dtype)
    if recv_next:
        ...
        tensor_recv_next = torch.empty(recv_next_shape,
                                       requires_grad=True,
                                       device=torch.cuda.current_device(),
                                       dtype=dtype)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

初始化以后调用p2p_func进行数据的接收和发送,这里_batched_p2p_ops调用的是batch_isend_irecv_p2p_ops调用的是isendirecv,同时也支持使用torch.distributed.ring_exchange

    # Send tensors in both the forward and backward directions as appropriate.
    if use_ring_exchange_p2p:
        def _ring_exchange_wrapper(**kwargs):
            torch.distributed.ring_exchange(**kwargs)
            return []
        p2p_func = _ring_exchange_wrapper
    elif batch_p2p_comm:
        assert wait_on_reqs
        p2p_func = _batched_p2p_ops
    else:
        p2p_func = _p2p_ops

    reqs = p2p_func(tensor_send_prev=tensor_send_prev,
                    tensor_recv_prev=tensor_recv_prev,
                    tensor_send_next=tensor_send_next,
                    tensor_recv_next=tensor_recv_next,
                    group=get_pipeline_model_parallel_group())
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

2. schedules.py

2.1 get_forward_backward_func

get_forward_backward_func是训练中用到的入口,用于选择训练用到的前向和反向的方法,对应有三种方式:

  1. forward_backward_no_pipelining:没用使用pipeline并行(也就是没有跨stage的通信)
  2. forward_backward_pipelining_without_interleaving:这是采用的PipeDream-2BW1F1B的训练
  3. forward_backward_pipelining_with_interleaving:这是在megatron-2论文中提到的基于PipeDream-2BW的改进,在每个卡上支持多个stage。
def get_forward_backward_func():
    pipeline_model_parallel_size = parallel_state.get_pipeline_model_parallel_world_size()
    if pipeline_model_parallel_size > 1:
        if parallel_state.get_virtual_pipeline_model_parallel_world_size() is not None:
            forward_backward_func = forward_backward_pipelining_with_interleaving
        else:
            forward_backward_func = forward_backward_pipelining_without_interleaving
    else:
        forward_backward_func = forward_backward_no_pipelining
    return forward_backward_func
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

forward_backward_func在实际中的使用(megatron/training.py)如下,在写完forward_step_func前向过程后,使用forward_backward_func进行封装。

    forward_backward_func = get_forward_backward_func()
    fwd_bwd_timers = timers if args.timing_log_level > 1 else None
    losses_reduced = forward_backward_func(
        forward_step_func=forward_step_func,
        data_iterator=data_iterator,
        model=model,
        num_microbatches=get_num_microbatches(),
        dtype=args.params_dtype,
        tensor_shape=(args.seq_length, args.micro_batch_size, args.hidden_size),
        grad_scaler=optimizer.scale_loss,
        sequence_parallel=args.sequence_parallel,
        overlap_p2p_comm=args.overlap_p2p_comm,
        batch_p2p_comm=not args.overlap_p2p_comm,
        forward_only=False,
        timers=fwd_bwd_timers)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

2.2 forward_backward_no_pipelining

forward_backward_no_pipelining中以microbatch为粒度,先进行前向计算,紧跟着进行反向计算,前n-1microbatch不会进行同步操作,直到最后一个再进行同步。这个过程中不涉及跨stage的通信,同时也不支持sequence并行。前向计算的loss会存在forward_data_store中返回。

    if no_sync_func is None and isinstance(model, torchDDP):
        no_sync_func = model.no_sync
    if no_sync_func is None:
        no_sync_func = contextlib.nullcontext
        
    # 前n-1个minibatch        
    with no_sync_func():
        for i in range(num_microbatches - 1):
            output_tensor = forward_step(forward_step_func, data_iterator,
                                         model, num_microbatches, input_tensor, forward_data_store,
                                         timers, collect_non_loss_data, dtype, enable_autocast)
            if not forward_only:
                backward_step(grad_scaler, input_tensor, output_tensor,
                              output_tensor_grad, model_type, timers, deallocate_pipeline_outputs)
                              
    # 最后一个minibatch
    output_tensor = forward_step(forward_step_func, data_iterator,
                                 model, num_microbatches, input_tensor, forward_data_store,
                                 timers, collect_non_loss_data, dtype, enable_autocast)
    if not forward_only:
        backward_step(grad_scaler, input_tensor, output_tensor,
                      output_tensor_grad, model_type, timers, deallocate_pipeline_outputs)
    return forward_data_store
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

2.3 forward_backward_pipelining_without_interleaving

实现了non-interleaved的1F1B算法,在最后一个stage返回loss。在执行过程中先进行warmup的microbatch的计算。执行的时候先调用recv_forward从前一个stage获取上一个rank的输出,对于第一个stage是从dataloader获取输入;然后执行forward_step进行当前stage的前向计算;然后通过send_forward把结果输传给下一个stage。

def forward_backward_pipelining_without_interleaving(*,...):
    ...
    # Compute number of warmup microbatches.
    num_warmup_microbatches = \
        (parallel_state.get_pipeline_model_parallel_world_size() -
         parallel_state.get_pipeline_model_parallel_rank() - 1)
    num_warmup_microbatches = min(
        num_warmup_microbatches,
        num_microbatches)
    num_microbatches_remaining = \
        num_microbatches - num_warmup_microbatches
    ...
    
    # Run warmup forward passes.
    for i in range(num_warmup_microbatches):
        input_tensor = recv_forward(recv_tensor_shapes, dtype, timers=timers)
        output_tensor = forward_step(forward_step_func, data_iterator, model, num_microbatches,
                                     input_tensor, forward_data_store,
                                     timers, collect_non_loss_data, dtype, enable_autocast)
        send_forward(output_tensor, send_tensor_shapes, timers=timers)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

执行warmup操作后开始进入1F1B的稳定状态, 先进行前向forward_step计算并调用send_forward_recv_backward传给下一个stage。这里forward_step_func是跟每个模型相关的,是在模型初始化定义的。

    # Run 1F1B in steady state.
    for i in range(num_microbatches_remaining):
        last_iteration = (i == (num_microbatches_remaining - 1))

        output_tensor = forward_step(forward_step_func, data_iterator, model, num_microbatches,
                                     input_tensor, forward_data_store,
                                     timers, collect_non_loss_data, dtype, enable_autocast)
    ...
            output_tensor_grad = \
                send_forward_recv_backward(output_tensor,
                                           send_tensor_shapes, dtype,
                                           timers=timers)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

1F1B中通过buffer保存前向的输入和输出用于反向的计算, 每次计算的时候会pop出来一组input和output,计算完backward后会通过send_backward_recv_forward把梯度回传给前一个stage:

            # 1. Add input_tensor and output_tensor to end of list.
            input_tensors.append(input_tensor)
            output_tensors.append(output_tensor)
            deallocate_output_tensor(output_tensor[0], deallocate_pipeline_outputs)

            # 2. Pop input_tensor and output_tensor from the start of the list for
            # the backward pass.
            input_tensor = input_tensors.pop(0)
            output_tensor = output_tensors.pop(0)
            input_tensor_grad = \
                backward_step(grad_scaler, input_tensor, output_tensor,
                              output_tensor_grad, model_type, timers, deallocate_pipeline_outputs)
            
            # 3. 反向结果传递       
            if last_iteration:
                input_tensor = None
                send_backward(input_tensor_grad, recv_tensor_shapes, timers=timers)
            else:
                input_tensor = \
                    send_backward_recv_forward(
                        input_tensor_grad, recv_tensor_shapes, dtype, timers=timers)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

跟warmup类似,在最后还有一个要等所有backward结束的阶段(cooldown backward passes)

    # Run cooldown backward passes.
    if not forward_only:
        for i in range(num_warmup_microbatches):
            ...
            input_tensor_grad = \
                backward_step(grad_scaler, input_tensor, output_tensor,
                              output_tensor_grad, model_type, timers, deallocate_pipeline_outputs)
            send_backward(input_tensor_grad, recv_tensor_shapes, timers=timers)
            ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

最后思考一个问题:在forward_backward_pipelining_without_interleaving中怎么保证后向用到的参数跟前向一样的?这个问题是通过DDP中的取消梯度同步的context函数model.no_sync来解决的。在前N-1个microbatch的backward前只做梯度的累加,只有在最后第N个microbatch完成backward才进行梯度的实际更新。

    # Disable async grad reductions
    if no_sync_func is None and isinstance(model, torchDDP):
        no_sync_func = model.no_sync
    if no_sync_func is None:
        no_sync_func = contextlib.nullcontext
    ...
    # 关闭梯度同步
    disable_grad_sync()
    ...
    # 执行1F1B
    ...
    
    # Run cooldown backward passes.
    if not forward_only:
        for i in range(num_warmup_microbatches):
            if i == num_warmup_microbatches-1:
                if grad_sync_func is None or rank == 0:
                    # 打开梯度同步
                    enable_grad_sync()
    ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

2.4 forward_backward_pipelining_with_interleaving中的模型构建

forward_backward_pipelining_with_interleaving也是定义在megatron/core/pipeline_parallel/schedules.py。它的实现interleaved 1F1B调度方法的基本思路和forward_backward_pipelining_without_interleaving类似。区别在于要对model模型进行进一步的拆分为多块(chunk)。

回顾下在启用interleaving pipeline的时候,在megatron/core/parallel_state.py中的initialize_model_parallel函数中必须要设置virtual_pipeline_model_parallel_size,表示每个device会处理几个stage,例如:对于一个有16层的transformer网络来说,训练配置tensor_model_parallel_size=1, pipeline_model_parallel_size=4, virtual_pipeline_model_parallel_size=2,表示模型会被分为4*2=8个stage,每个stage有2个layer,对于一个pipeline通信组的4个gpu device来说,每个device会处理2个stage。如果virtual_pipeline_model_parallel_size不设置为None的情况下,不会启动interleaving

            GPU 0: [1, 2] [9, 10]
            GPU 1: [3, 4] [11, 12]
            GPU 2: [5, 6] [13, 14]
            GPU 3: [7, 8] [15, 16]
  • 1
  • 2
  • 3
  • 4

interleaving pipeline并行对model的分chunk构建,具体通过setup_model_and_optimizer进行构建。

def setup_model_and_optimizer(model_provider_func,
                              model_type,
                              no_wd_decay_cond=None,
                              scale_lr_cond=None,
                              lr_mult=1.0):
    args = get_args()
    model = get_model(model_provider_func, model_type)
    unwrapped_model = unwrap_model(model,
                                   (torchDDP, LocalDDP, Float16Module))
    ......
    return model, optimizer, opt_param_scheduler
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

setup_model_and_optimizer中会调用megatron/training.pyget_model方法,在get_model方法中会按照设的args.virtual_pipeline_model_parallel_size个数返回一个同等长度的model列表,每一个virtual_pipline会对应一个model。注意这里每个model会被设置virtual_pipeline_model_parallel_rank,即mpu.set_virtual_pipeline_model_parallel_rank(i)

def get_model(model_provider_func, model_type=ModelType.encoder_or_decoder, wrap_with_ddp=True):
    ......
    # Build model.
    if mpu.get_pipeline_model_parallel_world_size() > 1 and \
        ......
        model = []
        # 按照args.virtual_pipeline_model_parallel_size的大小保存一个同等长度的model列表
        for i in range(args.virtual_pipeline_model_parallel_size):
            mpu.set_virtual_pipeline_model_parallel_rank(i)
            # Set pre_process and post_process only after virtual rank is set.
            pre_process = mpu.is_pipeline_first_stage()
            post_process = mpu.is_pipeline_last_stage()
            this_model = model_provider_func(
                pre_process=pre_process,
                post_process=post_process
            )
            this_model.model_type = model_type
            model.append(this_model)
    ......
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

megatron/model/transformer.py文件中的ParallelTransformer函数中有针对流水线并行对model进行按layer层的划分。切分的基本思路是对transformer中的layer列表进行按比例切分为N份,每个rank结点上保存各自的模型(继承自torch.nn.module),每个module中分别有layer_num/N个layer,layer确定是从原始的layer列表中通过计算offset偏移来得到的。

在划分的时候先调用_get_num_layers获得切分后的layer个数:

self.num_layers = _get_num_layers(args, model_type,
                                  layer_type==LayerType.decoder)
  • 1
  • 2

_get_num_layers中这里只分析decoder结构的情况,只有在embedding stage且当前结点rank在通信组中为0的情况下num_layers等于0,否则num_layers等于传入的args.num_layers除以流水线并行度, 比如设置了transformer的layer有8层,pipeline并行度为2,那么这里的self.num_layers的值会被更新为4。

def _get_num_layers(args, model_type, is_decoder=False):
    """Compute the number of transformer layers resident on the current rank."""
    ......
    elif mpu.get_pipeline_model_parallel_world_size() > 1:
        if is_encoder_and_decoder_model:
            ......
        else:
            assert args.num_layers % args.transformer_pipeline_model_parallel_size == 0, \
                'num_layers must be divisible by transformer_pipeline_model_parallel_size'
            
            num_layers = (
                0
                if args.standalone_embedding_stage
                and mpu.get_pipeline_model_parallel_rank() == 0 else
                args.num_layers // args.transformer_pipeline_model_parallel_size
            )
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

更新得到切分后的num_layers以后,对于非interleaving的情况中,对于只有decoder结构的模型,offset计算是按rank来算,代码如下:

class ParallelTransformer(MegatronModule):
    """Transformer class."""
    def __init__(...):
        ......
        if args.virtual_pipeline_model_parallel_size is not None:
            ......
        else:
            # 对于非interleaving的情况
            # Each stage gets a contiguous set of layers.
            if args.model_type == ModelType.encoder_and_decoder and \
                    mpu.get_pipeline_model_parallel_world_size() > 1:
                ......
            else:
                # 对于只有decoder结构的模型
                offset = mpu.get_pipeline_model_parallel_rank() * self.num_layers
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

对于interleaving的情况中,self.num_layers需要进一步按args.virtual_pipeline_model_parallel_size切分为更小的块(chunk)。

class ParallelTransformer(MegatronModule):
    """Transformer class."""
    def __init__(...):
        ......
        if args.virtual_pipeline_model_parallel_size is not None:
            assert args.num_layers % args.virtual_pipeline_model_parallel_size == 0, \
                'num_layers_per_stage must be divisible by ' \
                'virtual_pipeline_model_parallel_size'
            assert args.model_type != ModelType.encoder_and_decoder

            # 在interleaving的情况下, self.num_layers还要进一步除以args.virtual_pipeline_model_parallel_size。
            # layer有8层,pipeline并行度为2,虚拟pipeline并行度也为2时,self.num_layers从原来的4会被更新为2
            self.num_layers = self.num_layers // args.virtual_pipeline_model_parallel_size
            
            # 在interleaving的情况下, 如下图
            # With 8 layers, 2 stages, and 4 model chunks, we want an assignment of
            # layers to stages like (each list is a model chunk):
            # Stage 0: [0]  [2]  [4]  [6]
            # Stage 1: [1]  [3]  [5]  [7]
            # With 8 layers, 2 stages, and 2 virtual stages, we want an assignment of
            # layers to stages like (each list is a model chunk):
            # Stage 0: [0, 1]  [4, 5]
            # Stage 1: [2, 3]  [6, 7]
            offset = mpu.get_virtual_pipeline_model_parallel_rank() * (
                args.num_layers // args.virtual_pipeline_model_parallel_size) + \
                (mpu.get_pipeline_model_parallel_rank() * self.num_layers)
        else:
            # 在非interleaving的情况下, offset等于每个stage中的层数.
            if args.model_type == ModelType.encoder_and_decoder and \
                ......
            else:
                offset = mpu.get_pipeline_model_parallel_rank() * self.num_layers
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

在非interleaving的情况下,得到offset后开始实际创建当前rank结点保存的ModuleList,这里的ModuleList中每一层是一个layer,传入的数表示当前layer的编号。比如以num_layer=8, pipeline_stage=2为例,offset是4,对应创建的两个ModuleList,device0中ModuleList是ModuleList(l1,l2,l3,l4), device1中的是ModuleList(l5,l6,l7,l8)

class ParallelTransformer(MegatronModule):
    """Transformer class."""
    def __init__(...):
        ......
        self.layers = torch.nn.ModuleList(
            [build_layer(i + 1 + offset) for i in range(self.num_layers)])
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

讨论interleaving offset实现前,先回顾下training.py中对每个device创建model list过程,如果是interleaving情况下,每个device实际会保存args.virtual_pipeline_model_parallel_size个model,做为一个虚拟组,每个model中只保存原有网络的部分layer。这里的model_provider_func最终会调用到ParallelTransformer的初始化。

def get_model(model_provider_func, ...):
        ......
        model = []
        for i in range(args.virtual_pipeline_model_parallel_size):
            mpu.set_virtual_pipeline_model_parallel_rank(i)
            ......
            this_model = model_provider_func(
                pre_process=pre_process,
                post_process=post_process
            )
            this_model.model_type = model_type
            model.append(this_model)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

对于interleaving的offset定义有些复杂,定义如下:

    offset = mpu.get_virtual_pipeline_model_parallel_rank() * (
        args.num_layers // args.virtual_pipeline_model_parallel_size) + \
        (mpu.get_pipeline_model_parallel_rank() * self.num_layers)
  • 1
  • 2
  • 3

mpu.get_virtual_pipeline_model_parallel_rank() * (args.num_layers // args.virtual_pipeline_model_parallel_size)表示先按大的虚拟组来做区分,然后在每个大虚拟组中再切分小块(chunk)。以layer_num=8, stage_num=2, virtual_pipeline_model_parallel_size=2为例,大的虚拟组的偏移分别是0和4,这里的self.num_layers大小为2, mpu.get_pipeline_model_parallel_rank()的值有0和1(stage为2),最终组合起来可能的值有0, 2, 4, 6。在使用offset求最终的layer编号时,也是跟之前非interleaving一样的方式。

    self.layers = torch.nn.ModuleList(
            [build_layer(i + 1 + offset) for i in range(self.num_layers)])
  • 1
  • 2

最终device0对应的model有两个ModuleList, 分别是ModuleList[l0, l1]ModuleList[l4, l5];device1对应的model也有两个ModuleList,分别是ModuleList[l2, l3]ModuleList[l6, l7]

device0(stage1) : {ModuleList[l0, l1], ModuleList[l4, l5]}
device1(stage2) : {ModuleList[l2, l3], ModuleList[l6, l7]}
  • 1
  • 2

2.5 forward_backward_pipelining_with_interleaving运行流程

在一开始跟without_interleaving情况一样,需要关闭自动的grad更新;同时由于存在多个model,这里要保存多个model的输入input_tensors和输出output_tensors

    disable_grad_sync()

    # Model chunk IDs with synchronized grads
    synchronized_model_chunks = set()

    input_tensors = [[] for _ in range(len(model))]
    output_tensors = [[] for _ in range(len(model))]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

如果是sequence并行的话,是从sequence维度对矩阵tensor进行切分为get_tensor_model_parallel_world_size()份。

    if sequence_parallel:
        seq_length, batch_size, hidden = tensor_shape
        tensor_shape = (
            seq_length // parallel_state.get_tensor_model_parallel_world_size(),
            batch_size,
            hidden,
        )
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

warmup前计算每个device在warmup阶段要用到的microbatch数量num_warmup_microbatches

  • 首先计算总的total_num_microbatches,等于传入的microbatch和len(model)的乘积,如果virtual_stage=2的话,这里num_model_chunks值对应是2。

  • 如果num_microbatches等于pipeline_parallel_size,并行方式跟GPipe中的思路一样,先全forward再走backward,在warmup阶段把所有的microbatch都训练完。

  • 如果num_microbatches不等于pipeline_parallel_size,warmup的计算的单位按pipeline_parallel_size来进行,num_warmup_microbatches计算分为两部分,第一部分是在阶梯下降时每个device处理的microbatch个数正好错一个,对应rank号大小,用 p i p e l i n e _ p a r a l l e l _ s i z e − p i p e l i n e _ p a r a l l e l _ r a n k − 1 ) ∗ 2 pipeline\_parallel\_size - pipeline\_parallel\_rank - 1) * 2 pipeline_parallel_sizepipeline_parallel_rank1)2计算; 第二部分是等量增加的部分,用 ( n u m _ m o d e l _ c h u n k s − 1 ) ∗ p i p e l i n e _ p a r a l l e l _ s i z e (num\_model\_chunks - 1) * pipeline\_parallel\_size (num_model_chunks1)pipeline_parallel_size来计算。示例如下图:
    在这里插入图片描述

  • 代码实现:

    # Compute number of warmup and remaining microbatches.
    num_model_chunks = len(model)
    total_num_microbatches = num_microbatches * num_model_chunks

    ......
    if num_microbatches == pipeline_parallel_size:
        num_warmup_microbatches = total_num_microbatches
        all_warmup_microbatches = True
    else:
        num_warmup_microbatches = (pipeline_parallel_size - pipeline_parallel_rank - 1) * 2
        num_warmup_microbatches += (num_model_chunks - 1) * pipeline_parallel_size
        num_warmup_microbatches = min(num_warmup_microbatches, total_num_microbatches)
    num_microbatches_remaining = total_num_microbatches - num_warmup_microbatches
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

在这里还实现了几个辅助函数分别是:

  1. get_model_chunk_id(microbatch_id, forward): 获取microbatch_id对应的model_chunk_id
  2. is_first_microbatch_for_model_chunk(microbatch_id: int): microbatch_id是否是model chunk中的第一个microbatch
  3. is_last_microbatch_for_model_chunk(microbatch_id: int): microbatch_id是否是model chunk中的最后一个microbatch
  4. forward_step_helper(microbatch_id, checkpoint_activations_microbatch): 执行microbatch_id对应的前向
  5. backward_step_helper(microbatch_id): 执行microbatch_id对应的反向

interleaving 1F1B稳定状态为例,对应的流程如下:

# 遍历处理每个个micro
for k in range(num_microbatches_remaining):
        ......
        if config.overlap_p2p_comm:
            ......
        else: # no p2p overlap
            # 先进行1 forward,forward_k是microbatch_id
            # 跟without-interleaving不同的是forward中会跟据microbatch_id计算model_chunk_id,跟据model_chunk_id运行对应的forward_step
            output_tensor = forward_step_helper(forward_k, checkpoint_activations_microbatch)

            # 再进行1 backward, 类似forward
            backward_k = k
            input_tensor_grad = backward_step_helper(backward_k)
            
            ......
            
            # 计算完前向和后向,跟前向和后向的rank结点进行通信
            input_tensor, output_tensor_grad = \
                p2p_communication.send_forward_backward_recv_forward_backward(
                    output_tensor, input_tensor_grad,
                    recv_prev=recv_prev, recv_next=recv_next,
                    tensor_shape=tensor_shape, config=config)
            deallocate_output_tensor(output_tensor, config.deallocate_pipeline_outputs)

# 在一批microbatch训练结束后,进行grad的同步操作
enable_grad_sync()
if config.grad_sync_func is not None:
    params = []
    for model_chunk_id in range(num_model_chunks):
        if model_chunk_id not in synchronized_model_chunks:
            params.extend(model[model_chunk_id].parameters())
            synchronized_model_chunks.add(model_chunk_id)
    if params:
        config.grad_sync_func(params)
return forward_data_store
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

注意interleaving目前的一些限制:

  • 不支持encoder-decoder类的模型
  • 如果len(model) > 1mpu.get_pipeline_model_parallel_world_size() > 1时,只支持args.DDP_impl='local'
  • 要求num_microbatches % pipeline_parallel_size == 0

3. 参考

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/354476
推荐阅读
相关标签
  

闽ICP备14008679号