当前位置:   article > 正文

Accelerate launch pytorch分布式训练

accelerate launch

Accelerate

一、基础概念

  1. Accelerate 是一个库,只需添加四行代码,就可以在任何分布式 configuration 中运行相同的 PyTorch 代码:

     

    + from accelerate import Accelerator
    + accelerator = Accelerator()
     
    + model, optimizer, training_dataloader, scheduler = accelerator.prepare(
    +     model, optimizer, training_dataloader, scheduler
    + )
     
      for batch in training_dataloader:
          optimizer.zero_grad()
          inputs, targets = batch
          inputs = inputs.to(device)
          targets = targets.to(device)
          outputs = model(inputs)
          loss = loss_function(outputs, targets)
    +     accelerator.backward(loss)
          optimizer.step()
          scheduler.step()

    上述代码可以通过 Accelerate 的 CLI 接口在任何系统上启动:

     

     
    accelerate launch {my_script.py}
  2. 安装:

     

     
    pip install accelerate
    conda install -c conda-forge accelerate
    pip install git+https://github.com/huggingface/accelerate
  3. 配置:

     

     
    accelerate config

    然后 accelerate 会向你询问一些问题从而生成配置。

    检查配置:

     

     
    accelerate env
  4. 为了使用 Accelerate,你只需要修改四件事:

    • 首先,导入 Accelerator 并创建一个 accelerator 对象

       

       
      from accelerate import Accelerator
      accelerator = Accelerator()
    • 然后,移除针对你的模型和输入数据的所有 .to(device) 或 .cuda() 的调用。accelerator 将会为你正确处理这个问题,并为你把所有这些对象放在正确的设备上。

      如果你知道你在做什么,你可以保留那些 .to(device) 的调用,但你应该使用 accelerator 对象提供的设备: accelerator.device 。

      要完全停用自动的 device placement ,在初始化 Accelerator 时传递 device_placement=False 。

    • 接着,将所有与训练有关的对象(optimizer, model, training dataloader, learning rate scheduler )传递给accelerator.prepare() 方法。这将确保一切都为训练做好准备。

       

       
      model, optimizer, train_dataloader, lr_scheduler = accelerator.prepare(
          model, optimizer, train_dataloader, lr_scheduler
      )

      具体而言,training dataloader 将被分片到所有可用的 GPU/TPU 核心上,这样每个设备都能看到训练数据集的不同部分。此外,所有进程的随机数状态将在每次迭代开始时通过 dataloader 进行同步,以确保数据以相同的方式被混洗(如果你决定使用shuffle=True 或任何类型的 random sampler )。

      训练的实际 batch size 将是使用的设备数量乘以你在脚本中设置的 batch size 。另外,你可以在创建 accelerator 对象时使用 split_batches=True 参半,此时无论你在多少个 GPU 上运行你的脚本,实际 batch size 都会保持不变。

      你需要再开始实际的 training loop 之前执行 accelerator.prepare() 。

      只有当 scheduler 需要再每个 optimizer step 中被 stepped 时,才需要把 learning rate scheduler 传递给 prepare() 。

      任何获取 training dataloader length 的方法(例如,你需要记录 total training step)都应该在 accelerator.prepare() 之后进行。

      你可能想、也可能不想把你的validation dataloader 发送到 prepare() ,这取决于你是否想运行分布式评估。

    • 最后,用 accelerator.backward(loss) 替换 loss.backward() 。

    现在,你的脚本将在你的本地机器上运行,也可以在多个 GPU 或 TPU 上运行。你可以使用你喜欢的工具来启动分布式训练,或者你可以使用 Accelerate launcher 启动。

  5. 分布式评估:

    • 可以进行常规评估,此时你需要将 validation dataloader 保持在 accelerator.prepare() 之外。并且,你需要手动将 input 数据放在 accelerator.device 上。

    • 也可以进行分布式评估,此时你需要将 validation dataloader 放置在 accelerator.prepare() 之内:

       

       
      validation_dataloader = accelerator.prepare(validation_dataloader)

      就像 training dataloader,这意味着在分布式评估时,每个设备将仅看到部分 evaluation 数据。这意味着你需要把 predictions 进行 group 。可以通过 accelerator.gather_for_metrics() 方法来实现:

       

       
      for inputs, targets in validation_dataloader:
          predictions = model(inputs)
          # Gather all predictions and targets
          all_predictions, all_targets = accelerator.gather_for_metrics((predictions, targets))
          # Example of use with a Datasets.Metric
          metric.add_batch(all_predictions, all_targets)

      类似 training dataloader ,把 validation dataloader 传入 prepare() 可能会改变该 dataloader :如果你在 � 个 GPU 上运行,则它的长度将被除以 � (因为你的实际 batch size 将被乘以 � ),除非你设置 split_batches=True 。

      任何获取 validation dataloader length 的方法都应该在 accelerator.prepare() 之后进行。

      数据集末尾的一些数据可能是重复的,所以这个 batch 的数据可以平均分配给所有的工作者。因此,应该通过gather_for_metrics() 方法计算指标,以便在收集时自动删除重复的数据。如果出于某种原因,你不希望自动完成这项工作,可以用 accelerator.gather() 来收集所有进程的数据,然后手动完成。

      gather() 和 gather_for_metrics() 要求每个进程上的张量是相同尺寸的。如果你在每个进程上有不同尺寸的张量(例如,当动态填充到一个 batch 的最大长度时),你应该使用 accelerator.gather.pad_across_processes() 方法将张量填充到跨进程的最大尺寸。

  6. 启动分布式脚本:你可以使用常规命令来启动你的分布式训练(如 PyTorch 的 torch.distributed.launch ),它们与 Accelerate 完全兼容。这里唯一需要注意的是: Accelerate 使用 environment 来确定所有有用的信息,所以 torch.distributed.launch 应与标志 --use_env 一起使用。

    Accelerate 还提供了一个 CLI 工具,它统一了所有的 launcher ,所以你只需要记住一个命令:

     

     
    accelerate config

    你需要回答问题,然后 Accelerate 将在你的 cache folder 创建一个 default_config.yaml 文件。这个缓存目录是(根据优先级递减):

    • 环境变量 HF_HOME 的内容,以 accelerate 为后缀。
    • 如果不存在,则环境变量 XDG_CACHE_HOME 的内容,以 huggingface/accelerate 为后缀。
    • 如果也不存在,则为 ~/.cache/huggingface/accelerate 。

    你也可以通过标志 --config_file 来指定你要保存的文件的位置。

    然后,你可以通过运行来测试你的设置是否一切顺利:

     

     
    accelerate test

    这将启动一个简短的脚本,测试分布式环境。你也可以在测试期间指定配置文件的位置:

     

     
    accelerate test --config_file path_to_config.yaml

    如果测试通过,你可以通过如下的命令来执行你的脚本:

     

     
    accelerate launch path_to_script.py --args_for_the_script

    也可以指定配置文件的位置:

     

     
    accelerate launch --config_file path_to_config.yaml path_to_script.py --args_for_the_script
  7. 从 notebook 中启动:在 Accelerate 0.3.0 中引入了一个 notebook_launcher() 从而帮助你在 notebook 上启动训练。

    只要在 notebook 的一个 cell 中定义一个负责整个 train and/or evaluation 的函数,然后用以下代码执行一个 cell :

     

     
    from accelerate import notebook_launcher
     
    notebook_launcher(training_function)

    注意:你的 Accelerator 对象应该只在 training_function 中定义,这是因为初始化应该只在 launcher 内完成。

  8. 在 TPU 上训练:如果你想在 TPU 上启动你的脚本,有一些注意事项是你应该注意的。在幕后,TPU 将为你的 training step (前向传播、反向传播、以及 optimizer step )中发生的所有操作创建一个 graph 。这就是为什么你的第一个训练步总是非常长,因为建立和编译这个 graph 需要一些时间。

    好消息是,这个编译将被缓存,所以第二步和所有后续的 step 将更快。坏消息是,这只适用于你的所有 step 做完全相同的操作,这意味着:

    • 所有 batch 必须有用相同的张量尺寸。
    • 必须使用静态的代码(即,如果单个 step 中存在循环,那么循环次数在每个 step 必须相同)。

    如果上述任何一项在两个 step 之间发生变化,都会触发新的编译,这将再次花费大量时间。在实践中,这意味着:你必须特别注意让你的输入中的所有张量具有相同的形状(所以没有动态填充),并且不应该使用具有 for 循环的层,其中 for 循环的根据 input 的不同而具有不同长度(如 LSTM)。否则,训练会慢得令人难受。

    可以针对 TPU 执行一些特殊的代码:

     

     
    from accelerate import DistributedType
     
    if accelerator.distributed_type == DistributedType.TPU:
        # do something of static shape
    else:
        # go crazy and be dynamic

    最后要注意的是:如果你的模型有 tied weight (比如语言模型将 embedding matrix 的权重与 decoder 的权重绑定),将这个模型移动到 TPU (无论是你自己移动、还是由 prepare() 移动)会破坏绑定。你将需要在之后重新绑定权重。

  9. 在单个进程上执行的语句:有些语句只需要在特定的进程上执行而无需在所有进程上执行,如数据下载、记录日志、以及打印进度条。此时可以执行:

     

     
    if accelerator.is_local_main_process:
        # Is executed once per server
        
    from tqdm.auto import tqdm
    progress_bar = tqdm(range(args.max_train_steps), disable=not accelerator.is_local_main_process)

    local 意思是每台机器上运行:如果你在两台服务器上训练,其中每台服务器有几个 GPU ,则代码将在每台服务器上执行一次。

    如果你希望对所有进程仅执行一次(如,上传模型到 model hub),则可以执行:

     

     
    if accelerator.is_main_process:
        # Is executed once only

    对于 print 语句,你希望在每台机器上执行一次,则可以用 accelerator.print 代替 print 函数。

  10. 延迟执行:当你运行你的常规脚本时,指令是按顺序执行的。使用 Accelerate 在几个 GPU 上同时部署你的脚本会带来一个复杂的问题:虽然每个进程都是按顺序执行所有指令,但有些可能比其他的快。

    你可能需要等待所有进程达到一定程度后再执行某条指令。例如,在确定每个进程都完成了训练之前,你不应该保存一个模型。要做到这一点,可以执行:

     

     
    accelerator.wait_for_everyone()

    这条指令将阻塞所有先到的进程,直到所有其他进程都到达该点(如果你只在一个 GPU 或 CPU 上运行你的脚本,这不会有任何作用)。

  11. 保存/加载模型:保存训练好的模型可能需要一些调整:

    • 首先,你应该等待所有的进程到达脚本中的 “延迟执行” 所描述的那个点。

    • 然后,你应该在保存模型之前 unwrap 你的模型。这是因为在通过 prepare() 方法时,你的模型可能被 wrap 从而用于分布式训练。如:

       

       
      accelerator.wait_for_everyone()
      unwrapped_model = accelerator.unwrap_model(model)
      accelerator.save(unwrapped_model.state_dict(), filename)

      如果你的脚本包含加载 checkpoint 的逻辑,我们也建议你在 unwrapped model 中加载你的权重(这只在 prepare() 后使用加载函数时有用)。如:

       

       
      unwrapped_model = accelerator.unwrap_model(model)
      unwrapped_model.load_state_dict(torch.load(filename))
  12. 保存/加载整个状态:当训练你的模型时,你可能想保存模型、优化器、随机数生成器、以及潜在的 LR scheduler 的当前状态,以便在同一个脚本中恢复训练。你可以分别使用 save_state() 和 load_state() 来做到这一点,只需简单地传入一个保存位置。

    如果你通过 register_for_checkpointing() 注册了任何其他需要存储的 stateful item ,它们也会被保存和/或加载。

    示例:

     

     
    from accelerate import Accelerator
    import torch
     
    accelerator = Accelerator()
     
    my_scheduler = torch.optim.lr_scheduler.StepLR(my_optimizer, step_size=1, gamma=0.99)
    my_model, my_optimizer, my_training_dataloader = accelerate.prepare(my_model, my_optimizer, my_training_dataloader)
     
    # Register the LR scheduler
    accelerate.register_for_checkpointing(my_scheduler)
     
    # Save the starting state
    accelerate.save_state("my/save/path")
     
    device = accelerator.device
    my_model.to(device)
     
    # Perform training
    for epoch in range(num_epochs):
        for batch in my_training_dataloader:
            my_optimizer.zero_grad()
            inputs, targets = batch
            inputs = inputs.to(device)
            targets = targets.to(device)
            outputs = my_model(inputs)
            loss = my_loss_function(outputs, targets)
            accelerator.backward(loss)
            my_optimizer.step()
        my_scheduler.step()
     
    # Restore previous state
    accelerate.load_state("my/save/path")
  13. 梯度裁剪:如果你在脚本中使用梯度剪裁,你应该把对 torch.nn.utils.clip_grad_norm_ 或 torch.nn.utils.clip_grad_value_ 的调用分别替换为 accelerator.clipgrad_norm() 和 accelerator.clipgrad_value() 。

  14. 混合精度训练:如果你用 Accelerate 在混合精度下训练,那么模型内的计算将以混合精度进行,而模型外的每一次计算都将以 full precision 执行。例如,loss 的计算通常在模型外,且涉及 softmax 。然而,你可能想把你的 loss 计算放在 accelerator.autocast上下文管理器中:

     

     
    with accelerator.autocast():
        loss = complex_loss_function(outputs, target)

    混合精度训练的另一个注意事项是:梯度会在开始时跳过一些更新,有时在训练过程中也会跳过。这是因为动态损失缩放 dynamic loss scaling 策略,在训练过程中会有一些时刻,梯度已经溢出,loss scaling factor 会减少,从而避免在下一步再次发生这种情况。

    这意味着你可能会在没有梯度更新的时候就更新你的 learning rate scheduler 。这在一般情况下是没有问题的,但是当你的训练数据非常少,或者你的 scheduler 的第一个学习率值非常重要时,可能会有影响。在这种情况下,你可以跳过 learning rate scheduler 的更新:

     

     
    if not accelerator.optimizer_step_was_skipped:
        lr_scheduler.step()
  15. 梯度累积:要执行梯度累积,请使用 accumulate() 并指定 gradient_accumulation_steps 。在多设备训练时,这也会自动确保梯度同步或不同步,检查是否真的应该执行该 step ,并自动计算损失:

     

     
    accelerator = Accelerator(gradient_accumulation_steps=2)
    model, optimizer, training_dataloader = accelerator.prepare(model, optimizer, training_dataloader)
     
    for input, label in training_dataloader:
        with accelerator.accumulate(model):
            predictions = model(input)
            loss = loss_function(predictions, label)
            accelerator.backward(loss)
            optimizer.step()
            scheduler.step()
            optimizer.zero_grad()

    相比之下,传统的梯度累加方法会用更冗长的代码:

     

     
    + from accelerate import Accelerator
    + accelerator = Accelerator()
     
    + model, optimizer, training_dataloader, scheduler = accelerator.prepare(
    +     model, optimizer, training_dataloader, scheduler
    + )
     
      for index, batch in enumerate(training_dataloader):
          inputs, targets = batch
    -     inputs = inputs.to(device)
    -     targets = targets.to(device)
          outputs = model(inputs)
          loss = loss_function(outputs, targets)
          loss = loss / gradient_accumulation_steps
    +     accelerator.backward(loss)
          if (index+1) % gradient_accumulation_steps == 0:
              optimizer.step()
              scheduler.step()
              optimizer.zero_grad()
  16. DeepSpeedDeepSpeed 支持是实验性的,所以底层 API 将在不久的将来发展,可能会有一些轻微的破坏性变化。具体而言, Accelerate 还不支持你自己编写的 DeepSpeed 配置,这将在下一个版本中添加。

  17. 使用 accelerate launch

     

     
    accelerate launch {script_name.py} --arg1 --arg2 ...

    指定单个 GPU

     

     
    CUDA_VISIBLE_DEVICES="0" accelerate launch {script_name.py} --arg1 --arg2 ...

    在两个 GPU 上混合精度训练:

     

     
    accelerate launch --multi_gpu --mixed_precision=fp16 --num_processes=2 {script_name.py} {--arg1} {--arg2} ...

    建议总是在 accelerate launch 之前执行 accelerate config ,这样就无需再 accelerate launch 中指定各种配置。

  18. 在 notebook 中 launch

    • 确保任何使用 CUDA 的代码在一个函数中,该函数被传递给 notebook_launcher() 。
    • 设置 num_processes 为训练的设备数量(如,GPU, CPU, TPU 数量)。
    • 如果使用 TPU ,在 training loop 函数之外声明你的模型。

    如:

     

     
    from accelerate import notebook_launcher
    args = ("fp16", 42, 64)
    notebook_launcher(training_loop, args, num_processes=2)

    对于 TPU

     

     
    model = create_model("resnet50d", pretrained=True, num_classes=len(label_to_id))
     
    args = (model, "fp16", 42, 64)
    notebook_launcher(training_loop, args, num_processes=8)
  19. 启用 FSDP

    • 首先进行配置:accelerate configFSDP 配置的一个例子:

       

       
      compute_environment: LOCAL_MACHINE
      deepspeed_config: {}
      distributed_type: FSDP
      downcast_bf16: 'no'
      fsdp_config:
        fsdp_auto_wrap_policy: TRANSFORMER_BASED_WRAP
        fsdp_backward_prefetch_policy: BACKWARD_PRE
        fsdp_offload_params: false
        fsdp_sharding_strategy: 1
        fsdp_state_dict_type: FULL_STATE_DICT
        fsdp_transformer_layer_cls_to_wrap: GPT2Block
      machine_rank: 0
      main_process_ip: null
      main_process_port: null
      main_training_function: main
      mixed_precision: 'no'
      num_machines: 1
      num_processes: 2
      use_cpu: false

      然后开始训练:

       

       
      accelerate launch examples/nlp_example.py
    • 这些配置参数的含义为:

      • Sharding Strategy

        • FULL_SHARD:对 optimizer states, gradients, parameters 都进行分片。
        • SHARD_GRAD_OP:仅对 optimizer states, gradients 进行分片。
        • NO_SHARD:不进行分片。
      • Offload Params:一个布尔值,指定是否将 parameters 和 gradients 卸载到 CPU 。

      • Auto Wrap Policy:可以为 TRANSFORMER_BASED_WRAP, SIZE_BASED_WRAP, NO_WRAP 。

      • Transformer Layer Class to Wrap:当使用 TRANSFORMER_BASED_WRAP 时,指定特定的 transformer layer class name (大小写敏感)从而执行 wrap 。如 BertLayer, GPTJBlock, T5Block,... 。

      • Min Num Params:使用 SIZE_BASED_WRAP 的最小参数数量。

      • Backward Prefetch:可以为 BACKWARD_PRE, BACKWARD_POST, NO_PREFETCH

      • State Dict Type:可以为 FULL_STATE_DICT, LOCAL_STATE_DICT, SHARDED_STATE_DICT 。

    • 有几个需要注意的地方:

      • PyTorch FSDP 会自动 wrap 子模块,对参数进行扁平化处理,并将参数分片。由于这个原因,任何在 model wrapping 之前创建的 optimizer 都会被破坏,并占用更多的内存。因此,强烈建议在创建 optimizer 之前准备好模型,这也是很有效的。Accelerate 将自动 wrap 模型,并在单个模型的情况下为你创建一个优化器,并发出警告信息:

         

         
        FSDP Warning: When using FSDP, it is efficient and recommended to call prepare for the model before creating the optimizer.

        下面是使用 FSDP 时准备模型和优化器的推荐方法:

         

         
        model = AutoModelForSequenceClassification.from_pretrained("bert-base-cased", return_dict=True)
        + model = accelerator.prepare(model)
         
        optimizer = torch.optim.AdamW(params=model.parameters(), lr=lr)
         
        - model, optimizer, train_dataloader, eval_dataloader, lr_scheduler = accelerator.prepare(
        -        model, optimizer, train_dataloader, eval_dataloader, lr_scheduler
        -    )
         
        + optimizer, train_dataloader, eval_dataloader, lr_scheduler = accelerator.prepare(
        +         optimizer, train_dataloader, eval_dataloader, lr_scheduler
        +    )
      • 在单个模型的情况下,如果你用多个 parameter groups创建了优化器,并且用它们一起调用 prepare ,那么 parameter groups 将被丢失,并显示以下警告:

         

         
        FSDP Warning: When using FSDP, several parameter groups will be conflated into a single one due to nested module wrapping and parameter flattening.

        这是因为,由于嵌套的 FSDP 模块的参数扁平化为一维数组,在 wrapping 前创建的 parameter groups 在 wrapping 后将没有意义。

      • 在有多个模型的情况下,有必要在创建优化器之前准备好模型,否则会抛出一个错误。然后将优化器以与相应模型相同的顺序传递给 prepare() 方法,否则 accelerator.save_state() 和 accelerator.load_state() 将导致错误/意外的行为。

      • 这个功能与 Transformers library 的 run_translation.py 脚本中的 --predict_with_generate 不兼容。

    • 对于更多的控制,用户可以利用 FullyShardedDataParallelPlugin 。在创建这个类的实例后,用户可以把它传递给 Accelerator 类的实例。

  20. 启用 DeepSpeedDeepSpeed 实现了 ZeRO 论文中描述的一切。目前,它提供了如下的支持:Optimizer state partitioningZeRO stage 1)、Gradient partitioningZeRO stage 2)、Parameter partitioningZeRO stage 3)、Custom mixed precision training handling、一系列基于 CUDA 扩展的快速优化器、ZeRO-Offload 到 CPU 和 Disk/NVMe 。

    DeepSpeed ZeRO-2 主要只用于训练,因为它的功能对推理没有用处。DeepSpeed ZeRO-3 也可以用于推理,因为它允许在多个 GPU 上加载巨大的模型。

    • Accelerate 通过两种方式集成 DeepSpeed

      • 通过 deepspeed 配置文件来集成。它支持 DeepSpeed 的所有核心功能,并为用户提供了很大的灵活性。用户可能需要根据配置来改变几行代码。
      • 通过 deepspeed_plugin 来集成。这支持 DeepSpeed 功能的子集,并对其余的配置使用默认选项。用户不需要改变任何代码。
    • 什么被集成了?

      • 训练:DeepSpeed ZeRO 训练支持完整的 ZeRO stages 1, 2 and 3、以及 optimizer states, gradients and parameters 的 CPU/Disk offload 。

        • Stage 1:将 optimizer states 分片到数据并行 workers/GPUs 上。
        • Stage 2:将 optimizer states + gradients 分片到数据并行 workers/GPUs 上。
        • Stage 3:将 optimizer states + gradients + model parameters 分片到数据并行 workers/GPUs 上。
        • Optimizer Offload:将 optimizer states + gradients 卸载到 CPU/Disk ,建立在 ZERO Stage 2 之上。
        • Param Offload:将 model parameters 卸载到 CPU/Disk ,建立在 ZERO Stage 3 之上。

        注意:关于 Disk Offload ,磁盘应该是 NVME 的,以便有好的速度,但技术上可以在任何磁盘上工作。

      • 推断:DeepSpeed ZeRO Inference 支持 ZeRO Stage 3 与 ZeRO-Infinity 。它使用与训练相同的 ZeRO 协议,但它不使用优化器和 lr scheduler 。

    • 如何工作:

      • 首先安装 DeepSpeed version >=0.6.5 。

      • 然后配置:accelerate config 。一个配置的例子:

         

         
        compute_environment: LOCAL_MACHINE
        deepspeed_config:
         gradient_accumulation_steps: 1
         gradient_clipping: 1.0
         offload_optimizer_device: none
         offload_param_device: none
         zero3_init_flag: true
         zero_stage: 2
        distributed_type: DEEPSPEED
        fsdp_config: {}
        machine_rank: 0
        main_process_ip: null
        main_process_port: null
        main_training_function: main
        mixed_precision: fp16
        num_machines: 1
        num_processes: 2
        use_cpu: false
      • 最后执行训练:accelerate launch examples/nlp_example.py 。

    • 配置参数的含义:

      • zero_stage0 表示禁用,1 表示 optimizer state partitioning2 表示 optimizer+gradient state partitioning ,3 表示 optimizer+gradient+parameter partitioning 。
      • gradient_accumulation_steps:一个整数,表示在 averaging 和 applying 这些梯度之前,积累梯度的 training steps 数量。
      • gradient_clipping:一个浮点数,指定启用梯度剪裁的值。
      • offload_optimizer_devicenone 表示禁用 optimizer offloadingcpu 表示 offload optimizer 到 CPUnvme 表示offload optimizer 到 NVMe SSD 。仅适用于 ZeRO >= Stage-2 。
      • offload_param_devicenone 表示禁用 parameter offloadingcpu 表示 offload parameter 到 CPUnvme 表示offload parameter 到 NVMe SSD 。仅适用于 ZeRO Stage-3 。
      • zero3_init_flag:决定是否启用 deepspeed.zero.Init 来构建大规模模型。只适用于 ZeRO Stage-3 。
      • zero3_save_16bit_model:决定是否在使用 ZeRO Stage-3 时保存 16 位模型权重。
      • mixed_precisionno 用于 FP32 训练,fp16 用于 FP16 混合精度训练,bf16用于 BF16 混合精度训练。
    • 当使用配置文件时,需要修改一些代码:

      • DeepSpeed Optimizers and Schedulers

        • 如果是 DeepSpeed Optim + DeepSpeed Scheduler :用户必须使用 enhance.utils.DummyOptim 和enhance.utils.DummyScheduler 来取代他们代码中的 PyTorch/Custom 优化器和调度器:

           

           
          # Creates Dummy Optimizer if `optimizer` was spcified in the config file else creates Adam Optimizer
           optimizer_cls = (
               torch.optim.AdamW
               if accelerator.state.deepspeed_plugin is None
               or "optimizer" not in accelerator.state.deepspeed_plugin.deepspeed_config
               else DummyOptim
           )
           optimizer = optimizer_cls(optimizer_grouped_parameters, lr=args.learning_rate)
           
           # Creates Dummy Scheduler if `scheduler` was spcified in the config file else creates `args.lr_scheduler_type` Scheduler
           if (
               accelerator.state.deepspeed_plugin is None
               or "scheduler" not in accelerator.state.deepspeed_plugin.deepspeed_config
           ):
               lr_scheduler = get_scheduler(
                   name=args.lr_scheduler_type,
                   optimizer=optimizer,
                   num_warmup_steps=args.num_warmup_steps,
                   num_training_steps=args.max_train_steps,
               )
           else:
               lr_scheduler = DummyScheduler(
                   optimizer, total_num_steps=args.max_train_steps, warmup_num_steps=args.num_warmup_steps
               )
        • Custom Optim + Custom Scheduler:当 DeepSpeed 配置文件中没有 optimizer key 和 scheduler key 的情况。在这种情况下,不需要用户修改代码,通过 DeepSpeed Plugin 使用集成时就是这种情况。

        • Custom Optim + DeepSpeed Scheduler :这种情况下,用户必须使用accelerate.utils.DummyScheduler 来替换代码中的 PyTorch/Custom scheduler 。

        • DeepSpeed Optim + Custom Scheduler:这将导致一个错误,因为当使用 DeepSpeed Optim 时必须使用 DeepSpeed Scheduler 。

      • DeepSpeed 配置文件中存在一些 "auto" 值,这些值是由 prepare 方法根据所提供的模型、dataloaders 、dummy optimizer 和 dummy schedulers 自动处理的。那些不是 "auto" 的字段必须由用户明确指定。如 zero_stage2_config.json 文件:

         

         
        {
            "fp16": {
                "enabled": true,
                "loss_scale": 0,
                "loss_scale_window": 1000,
                "initial_scale_power": 16,
                "hysteresis": 2,
                "min_loss_scale": 1
            },
            "optimizer": {
                "type": "AdamW",
                "params": {
                    "lr": "auto",
                    "weight_decay": "auto",
                    "torch_adam": true,
                    "adam_w_mode": true
                }
            },
            "scheduler": {
                "type": "WarmupDecayLR",
                "params": {
                    "warmup_min_lr": "auto",
                    "warmup_max_lr": "auto",
                    "warmup_num_steps": "auto",
                    "total_num_steps": "auto"
                }
            },
            "zero_optimization": {
                "stage": 2,
                "allgather_partitions": true,
                "allgather_bucket_size": 2e8,
                "overlap_comm": true,
                "reduce_scatter": true,
                "reduce_bucket_size": "auto",
                "contiguous_gradients": true
            },
            "gradient_accumulation_steps": 1,
            "gradient_clipping": "auto",
            "steps_per_print": 2000,
            "train_batch_size": "auto",
            "train_micro_batch_size_per_gpu": "auto",
            "wall_clock_breakdown": false
        }
    • 保存和加载:

      • 对于 ZeRO Stage-1 和 ZeRO Stage-2,模型的保存和加载不需要改动。

      • 对于 ZeRO Stage-3 ,state_dict 仅只包含占位符,因为模型的权重被分片到多个 GPU 。ZeRO Stage-3 有两个选项:

        • 保存整个 16 位的模型权重,然后使用 model.load_state_dict(torch.load(pytorch_model.bin)) 来直接加载。为此,要么在 DeepSpeed 配置文件中把 zero_optimization.stage3_gather_16bit_weights_on_model_save 设为True ,要么在 DeepSpeed Plugin 中把 zero3_save_16bit_model 设为 True 。

          请注意,这个选项需要在一个 GPU 上整合权重,这可能会很慢,而且对内存要求很高,所以只有在需要时才使用这个功能。

          示例:

           

           
          unwrapped_model = accelerator.unwrap_model(model)
           
          # New Code #
          # Saves the whole/unpartitioned fp16 model when in ZeRO Stage-3 to the output directory if
          # `stage3_gather_16bit_weights_on_model_save` is True in DeepSpeed Config file or
          # `zero3_save_16bit_model` is True in DeepSpeed Plugin.
          # For Zero Stages 1 and 2, models are saved as usual in the output directory.
          # The model name saved is `pytorch_model.bin`
          unwrapped_model.save_pretrained(
              args.output_dir,
              is_main_process=accelerator.is_main_process,
              save_function=accelerator.save,
              state_dict=accelerator.get_state_dict(model),
          )
        • 为了获得 32 位的权重,首先使用 model.save_checkpoint() 保存模型:

           

           
          success = model.save_checkpoint(PATH, ckpt_id, checkpoint_state_dict)
          status_msg = "checkpointing: PATH={}, ckpt_id={}".format(PATH, ckpt_id)
          if success:
              logging.info(f"Success {status_msg}")
          else:
              logging.warning(f"Failure {status_msg}")

          这将在 checkpoint 目录下创建 ZeRO model 和 optimizer 的 partitions 以及 zero_to_fp32.py 脚本。你可以使用这个脚本来做离线整合,这不需要配置文件或 GPU 。如:

           

           
          cd /path/to/checkpoint_dir
          ./zero_to_fp32.py . pytorch_model.bin
          # Processing zero checkpoint at global_step1
          # Detected checkpoint of type zero stage 3, world_size: 2
          # Saving fp32 state dict to pytorch_model.bin (total_numel=60506624)

          要想加载 32 位的模型,做法如下:

           

           
          from deepspeed.utils.zero_to_fp32 import load_state_dict_from_zero_checkpoint
           
          unwrapped_model = accelerator.unwrap_model(model)
          fp32_model = load_state_dict_from_zero_checkpoint(unwrapped_model, checkpoint_dir)

          如果你仅仅想得到 state_dict,做法如下:

           

           
          from deepspeed.utils.zero_to_fp32 import get_fp32_state_dict_from_zero_checkpoint
           
          state_dict = get_fp32_state_dict_from_zero_checkpoint(checkpoint_dir)

          注意,加载时需要大约 2 倍于 final checkpoint 大小的内存。

    • ZeRO InferenceDeepSpeed ZeRO Inference 支持 ZeRO stage 3 。通过 accelerate 的集成,你只需要 prepare 模型和 dataloader ,如下:

       

       
      model, eval_dataloader = accelerator.prepare(model, eval_dataloader)
    • 注意事项:

      • 目前的集成不支持 DeepSpeed 的 Pipeline Parallelism 。
      • 当前的集成不支持 mpu ,限制了 Megatron-LM 中支持的张量并行。
      • 目前的集成不支持多个模型。
  21. 目前 Accelerate 支持如下的 trackerTensorBoard, WandB, CometML, MLFlow ,如:

     

     
    from accelerate import Accelerator
    from accelerate.utils import LoggerType
     
    accelerator = Accelerator(log_with="all")  # For all available trackers in the environment
    accelerator = Accelerator(log_with="wandb")
    accelerator = Accelerator(log_with=["wandb", LoggerType.TENSORBOARD])

    然后需要初始化 tracker

     

     
    hps = {"num_iterations": 5, "learning_rate": 1e-2}
    accelerator.init_trackers("my_project", config=hps)

    然后记录日志:

     

     
    accelerator.log({"train_loss": 1.12, "valid_loss": 0.8}, step=1)

    最后在训练结束时调用:accelerator.end_training() 。

    你也可以通过 accelerator.get_tracker 来获取内置的 tracker 对象:

     

     
    wandb_tracker = accelerator.get_tracker("wandb")
    if accelerator.is_main_process:
        wandb_run.log_artifact(some_artifact_to_log)
  22. 处理大模型:常规的加载预训练模型的方式:

     

     
    import torch
     
    my_model = ModelClass(...)               # step 1
    state_dict = torch.load(checkpoint_file) # step 2
    my_model.load_state_dict(state_dict)     # step 3

    这对于常规大小的模型而言很有效,但是无法处理大型模型:在 step 1 我们在 RAM 中加载一个完整版本的模型,并花一些时间随机初始化权重(这将在 step 3 被丢弃);在 step 2 ,我们在 RAM 中加载另一个完整版本的模型,并使用预训练的权重。

    Accelerate 提供一些工具来帮助处理大模型(这些 API 是实验性质的,未来可能会发生改变):

    • init_empty_weights 上下文管理器:初始化一个模型而无需使用任何内存。这依赖于 PyTorch 1.9 中引入的 meta device 。

       

       
      from accelerate import init_empty_weights
       
      with init_empty_weights():
          my_model = ModelClass(...)

      在该上下文管理器中,每当有一个 parameter 被创建时,它就被立即移动到 meta device 。

    • sharded checkpoints:有可能你的模型太大从而无法装入内存,这并不意味着它不能被加载:如果你有一个或几个 GPU ,这就有更多的内存可用于存储你的模型。此时需要你的 checkpoint 被拆分为几个小文件,即 checkpoint shards 。

      Accelerate 将处理 checkpoint shards ,但是要满足如下格式:你的 checkpoint shards 应该放在一个文件夹中,并且有几个包含部分 state dict 的文件、以及一个 index.json 文件(将 parameter name 映射到包含该 parameter weights 的文件)。如:

       

       
      first_state_dict.bin
      index.json
      second_state_dict.bin

      其中 index.json 内容为:

       

       
      {
        "linear1.weight": "first_state_dict.bin",
        "linear1.bias": "first_state_dict.bin",
        "linear2.weight": "second_state_dict.bin",
        "linear2.bias": "second_state_dict.bin"
      }
    • load_checkpoint_and_dispatch:在 empty model 中加载一个 checkpoint 。它支持 full checkpoints (包含整个 state dict 的单一文件)以及 sharded checkpoints 。它还会在你可用的设备( GPU 、CPU )上自动分配这些权重,所以如果你正在加载一个 sharded checkpoints ,最大的RAM 用量将是最大分片的大小。

      例如:

       

       
      git clone https://huggingface.co/sgugger/sharded-gpt-j-6B
      cd sharded-gpt-j-6B
      git-lfs install
      git pull

      初始化模型:

       

       
      from accelerate import init_empty_weights
      from transformers import AutoConfig, AutoModelForCausalLM
       
      checkpoint = "EleutherAI/gpt-j-6B"
      config = AutoConfig.from_pretrained(checkpoint)
       
      with init_empty_weights():
          model = AutoModelForCausalLM.from_config(config)

      加载权重:

       

       
      from accelerate import load_checkpoint_and_dispatch
       
      model = load_checkpoint_and_dispatch(
          model, "sharded-gpt-j-6B", device_map="auto", no_split_module_classes=["GPTJBlock"]
      )

      通过 device_map="auto"Accelerate 根据可用资源自动决定将模型的每一层放在哪里:

      • 首先,我们使用 GPU 上的最大可用空间。
      • 如果我们仍然需要空间,我们将剩余的权重存储在 CPU 上。
      • 如果没有足够的 RAM ,我们将剩余的权重作为内存映射的张量存储在硬盘上。

      no_split_module_classes=["GPTJBlock"] 表示属于 GPTJBlock 的模块不应该在不同的设备上分割。你应该在这里设置所有包括某种残差连接的 block 。

      可以通过 model.hf_device_map 查看模型的权重的设备。

  23. 分布式训练的复现:

    • 设置随机数种子:

       

       
      from accelerate import set_seed
      set_seed(42)

      它在内部设置了五种随机数种子:

       

       
          random.seed(seed)
          np.random.seed(seed)
          torch.manual_seed(seed)
          torch.cuda.manual_seed_all(seed)
          # ^^ safe to call this function even if cuda is not available
          if is_tpu_available():
              xm.set_rng_state(seed)
    • 设置 batch size:当使用 Accelerate 训练时,传递给 dataloader 的 batch size 是 batch size/GPU ,因此 final batch size 是 batch size * device num 。

    • 设置学习率:学习率应该和 device num 成正比,如:

       

       
      learning_rate = 1e-3
      accelerator = Accelerator()
      learning_rate *= accelerator.num_processes
       
      optimizer = AdamW(params=model.parameters(), lr=learning_rate)
  24. 梯度同步:在 DDP 中,PyTorch 在一些特定的点上进行进程间通信。然而在梯度累积时,你会累积 n 个 loss 并跳过 .backward() 。这可能会导致明显的减速,因为所有的进程都需要与它们进行更多次的通信。

    可以通过 no_sync 上下文管理器来避免:

     

     
      ddp_model, dataloader = accelerator.prepare(model, dataloader)
     
      for index, batch in enumerate(dataloader):
          inputs, targets = batch
          # Trigger gradient synchronization on the last batch
          if index != (len(dataloader)-1):
    -         with ddp_model.no_sync():
    +         with accelerator.no_sync(model):
                  # Gradients only accumulate
                  outputs = ddp_model(inputs)
                  loss = loss_func(outputs, targets)
                  accelerator.backward(loss)
          else:
              # Gradients finally sync
              outputs = ddp_model(inputs)
              loss = loss_func(outputs)
              accelerator.backward(loss)

    或者直接使用 accelerator.accumulate

     

     
    ddp_model, dataloader = accelerator.prepare(model, dataloader)
     
    for batch in dataloader:
        with accelerator.accumulate(model):
            optimizer.zero_grad()
            inputs, targets = batch
            outputs = model(inputs)
            loss = loss_function(outputs, targets)
            accelerator.backward(loss)
  25. 进程间同步:

     

     
    accelerator.wait_for_everyone()

    这将阻塞所有进程直到所有进程都达到该点。

    用途:

    • 加载数据集:

       

       
      with accelerator.main_process_first():
          datasets = load_dataset("glue", "mrpc")

      这等价于:

       

       
      # First do something on the main process
      if accelerator.is_main_process:
          datasets = load_dataset("glue", "mrpc")
      else:
          accelerator.wait_for_everyone()
       
      # And then send it to the rest of them
      if not accelerator.is_main_process:
          datasets = load_dataset("glue", "mrpc")
      else:
          accelerator.wait_for_everyone()
    • 存取 state_dict

       

       
      if accelerator.is_main_process:
          model = accelerator.unwrap_model(model)
          torch.save(model.state_dict(), "weights.pth")
          
      with accelerator.main_process_first():
          state = torch.load("weights.pth")
          model.load_state_dict(state)
    • 在 global main 进程上 tokenizing,然后传播到每个 worker :

       

       
      datasets = load_dataset("glue", "mrpc")
       
      with accelerator.main_process_first():
          tokenized_datasets = datasets.map(
              tokenize_function,
              batched=True,
              remove_columns=["idx", "sentence1", "sentence2"],
          )

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/175654?site
推荐阅读
相关标签
  

闽ICP备14008679号