当前位置:   article > 正文

[AI算法][Pytorch]:分布式训练_pytorch 分布式训练

pytorch 分布式训练

1. 数据并行训练

PyTorch 为数据并行训练提供了多种选项。对于从简单到复杂,从原型到量产逐渐增长的应用,共同的发展轨迹是:

  1. 使用单机训练,如果数据和模型可以放在一个 GPU 中,并且训练速度不是问题。
  2. 使用单机多 GPU DataParallel,如果服务器上有多个 GPU,并且您希望以最少的代码更改来加速训练。
  3. 使用单机多 GPU DistributedDataParallel,如果您想进一步加快训练速度并愿意编写更多代码来设置它。
  4. 如果应用程序需要跨机器边界扩展,请使用多机DistributedDataParallel启动脚本
  5. 如果预期会出现错误(例如,OOM)或者资源可以在训练期间动态加入和离开,则使用torchelastic启动分布式训练。

1. 单机训练

device = torch.device("cuda:0")
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)
# tensor 放入 gpu0内
mytensor = my_tensor.to(device)
  • 1
  • 2
  • 3
  • 4
  • 5

2. 单机多GPU训练–DataParallel

torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0)

Parameters

  • module (Module) – module to be parallelized
  • device_ids (list of python:int or torch.device) – CUDA devices (default: all devices) 可以设定要并行的cuda:id
  • output_device (int or torch.device) – device location of output (default: device_ids[0])

在模块级别实现数据并行。

该容器将module通过在批处理维度中分块将输入拆分到指定设备上来并行化给定的应用程序(其他对象将在每个设备上复制一次)。在前向传递中,模块在每个设备上复制,每个副本处理一部分输入。在向后传递期间,来自每个副本的梯度被汇总到原始模块中。

批量大小应大于所使用的 GPU 数量。

DataParallel包使具有最低编码障碍单机多GPU的并行性。它只需要对应用程序代码进行一行更改。教程 Optional: Data Parallelism 显示了一个示例。需要注意的是,虽然DataParallel它非常易于使用,但通常不能提供最佳性能。

DataParallel在每个前向传递中的实现都会复制模型,并且其单进程多线程并行性自然会受到 GIL 争用的影响。为了获得更好的性能,请考虑使用 DistributedDataParallel

DataParallel 自动拆分您的数据并将作业订单发送到多个 GPU 上的多个模型。每个模型完成其工作后,DataParallel 会收集并合并结果,然后再将结果返回给您。

net = torch.nn.DataParallel(model, device_ids=[0, 1, 2])
output = net(input_var)  # input_var can be on any device, including CPU

net = LSTMModel_(config['args_lstm_model']).float()
if torch.cuda.device_count() > 1:
  print("Let's use", torch.cuda.device_count(), "GPUs!")
  net = torch.nn.DataParallel(net)
net.to(device)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

Note:

  1. 在进行LSTM的并行训练时,若batch_first=False需要注意输入特征的维度

  2. if not hasattr(self, '_flattened'):
        self.history_encoder.flatten_parameters()
    setattr(self, '_flattened', True)
    
    • 1
    • 2
    • 3

3.单机多 GPU–DistributedDataParallel

torch.nn.parallel.DistributedDataParallel(module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True, process_group=None, bucket_cap_mb=25, find_unused_parameters=False, check_reduction=False, gradient_as_bucket_view=False)

  • Parameters

    module (Module) – module to be parallelized

    device_ids (list of python:int or torch.device) –CUDA devices.

    1. For single-device modules, device_ids can contain exactly one device id, which represents the only CUDA device where the input module corresponding to this process resides. Alternatively, device_ids can also be None.

    2. For multi-device modules and CPU modules, device_ids must be None.When device_ids is None for both cases, both the input data for the forward pass and the actual module must be placed on the correct device. (default: None)

    output_device (int or torch.device) – Device location of output for single-device CUDA modules. For multi-device modules and CPU modules, it must be None, and the module itself dictates the output location. (default: device_ids[0] for single-device modules)

    broadcast_buffers (bool) – Flag that enables syncing (broadcasting) 同步buffers of the module at beginning of the forward function. (default: True)

    process_group – The process group to be used for distributed data all-reduction. If None, the default process group, which is created by torch.distributed.init_process_group(), will be used. (default: None)

    bucket_cap_mbDistributedDataParallel will bucket parameters into multiple buckets so that gradient reduction of each bucket can potentially overlap with backward computation. bucket_cap_mb controls the bucket size in MegaBytes (MB). (default: 25)

    find_unused_parameters (bool) – Traverse 遍历 the autograd graph from all tensors contained in the return value of the wrapped 包装 module’s forward function. Parameters that don’t receive gradients as part of this graph are preemptively marked as being ready to be reduced. In addition, parameters that may have been used in the wrapped module’s forward function but were not part of loss computation and thus would also not receive gradients are preemptively marked as ready to be reduced. (default: False)

    check_reduction – This argument is deprecated.

    gradient_as_bucket_view (bool) – When set to True, gradients will be views pointing to different offsets of allreduce communication buckets. This can reduce peak memory usage, where the saved memory size will be equal to the total gradients size. Moreover, it avoids the overhead of copying between gradients and allreduce communication buckets. When gradients are views, detach_() cannot be called on the gradients. If hitting such errors, please fix it by referring to the zero_grad() function in torch/optim/optimizer.py as a solution.

  • Variables

    ~DistributedDataParallel.module (Module) – the module to be parallelized.

Example:

>>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...')
>>> net = torch.nn.parallel.DistributedDataParallel(model, pg)
  • 1
  • 2

DataParallel相比, DistributedDataParallel 需要多一步设置,即调用 init_process_group。DDP 使用多进程并行,因此模型副本之间不存在 GIL 争用。此外,模型在 DDP 构建时广播,而不是在每次前向传递中广播,这也有助于加快训练速度。DDP 附带了多种性能优化技术。

  • 每个进程维护自己的优化器,并在每次迭代中执行一个完整的优化步骤。虽然这可能看起来是多余的,但由于梯度已经聚集在一起并跨进程平均,因此对于每个进程都是相同的,这意味着不需要参数广播步骤,减少了在节点之间传输张量所花费的时间。
  • 每个进程都包含一个独立的 Python 解释器,消除了来自单个 Python 进程驱动多个执行线程、模型副本或 GPU 的额外解释器开销和“GIL 颠簸”。这对于大量使用 Python 运行时的模型尤其重要,包括具有循环层或许多小组件的模型。

torch.distributed在模块级别实现基于包的分布式数据并行 。

该容器通过在批处理维度中分块将输入拆分到指定的设备,从而并行化给定模块的应用程序。该模块在每台机器和每台设备上复制,每个这样的副本处理输入的一部分。在向后传递期间,每个节点的梯度被平均。

批量大小应大于本地使用的 GPU 数量。

创建这个类需要torch.distributed已经初始化,通过调用torch.distributed.init_process_group().

DistributedDataParallel被证明比torch.nn.DataParallel单节点多 GPU 数据并行训练要快得多 。

DistributedDataParallel在具有 N 个 GPU 的主机上使用,您应该生成N进程,确保每个进程只在从 0 到 N-1 的单个 GPU 上运行。这可以通过CUDA_VISIBLE_DEVICES为每个进程设置 或通过调用来完成:

>>> torch.cuda.set_device(i)
  • 1

其中 i 是从 0 到 N-1。在每个过程中,您应该参考以下内容来构建此模块:

>>> torch.distributed.init_process_group(
>>>     backend='nccl', world_size=N, 
='...'
>>> )
>>> model = DistributedDataParallel(model, device_ids=[i], output_device=i)
  • 1
  • 2
  • 3
  • 4
  • 5

为了每个节点产生多个进程,您可以使用 torch.distributed.launchtorch.multiprocessing.spawn

NOTE:

  1. nccl后端是目前使用 GPU 时最快且强烈推荐的后端。这适用于单节点和多节点分布式训练。
  2. 如果使用torch.save一个进程来检查模块,并torch.load使用其他一些进程来恢复它,请确保 map_location为每个进程正确配置。如果没有 map_locationtorch.load会将模块恢复到保存模块的设备。
  3. 参数永远不会在进程之间广播。该模块对梯度执行 all-reduce 步骤,并假设优化器将在所有过程中以相同的方式修改它们。在每次迭代中,缓冲区(例如 BatchNorm 统计信息)从 rank 0 进程中的模块广播到系统中的所有其他副本。
  4. 如果您将 DistributedDataParallel 与Distributed RPC Framework结合使用 ,则应始终用于 torch.distributed.autograd.backward()计算梯度和 torch.distributed.optim.DistributedOptimizer优化参数。
init_process_group()

torch.distributed.init_process_group(*backend*, *init_method=None*, *timeout=datetime.timedelta(0*, *1800)*, *world_size=-1*, *rank=-1*, *store=None*, *group_name=''*, *pg_options=None*)

初始化默认分布式进程组,这也将初始化分布式包。

There are 2 main ways to initialize a process group:

  • Specify 指定 store, rank, and world_size` explicitly.

    Specify init_method (a URL string) which indicates where/how to discover peers. Optionally specify rank and world_size, or encode all required parameters in the URL and omit them.

If neither is specified, init_method is assumed to be “env://”.

  • Parameters

    backend (str or Backend) – The backend to use. Depending on build-time configurations, valid values include mpi, gloo, and nccl. This field should be given as a lowercase string (e.g., "gloo"), which can also be accessed via Backend attributes (e.g., Backend.GLOO). If using multiple processes per machine with nccl backend, each process must have exclusive access to every GPU it uses, as sharing GPUs between processes can result in deadlocks.

    init_method (str, optional) – URL specifying how to initialize the process group. Default is “env://” if no init_method or store is specified. Mutually exclusive with store.

    world_size (int, optional) – Number of processes participating in the job. Required if store is specified.

    rank (int, optional) – Rank of the current process (it should be a number between 0 and world_size-1). Required if store is specified.

    store (Store, optional) – Key/value store accessible to all workers, used to exchange connection/address information. Mutually exclusive with init_method.

    timeout (timedelta*,* optional) – Timeout for operations executed against the process group. Default value equals 30 minutes. This is applicable for the gloo backend. For nccl, this is applicable only if the environment variable NCCL_BLOCKING_WAIT or NCCL_ASYNC_ERROR_HANDLING is set to 1. When NCCL_BLOCKING_WAIT is set, this is the duration for which the process will block and wait for collectives to complete before throwing an exception. When NCCL_ASYNC_ERROR_HANDLING is set, this is the duration after which collectives will be aborted asynchronously and the process will crash. NCCL_BLOCKING_WAIT will provide errors to the user which can be caught and handled, but due to its blocking nature, it has a performance overhead. On the other hand, NCCL_ASYNC_ERROR_HANDLING has very little performance overhead, but crashes the process on errors. This is done since CUDA execution is async and it is no longer safe to continue executing user code since failed async NCCL operations might result in subsequent CUDA operations running on corrupted data. Only one of these two environment variables should be set.

    group_name (str, optional*,* deprecated) – Group name.

    pg_options (ProcessGroupOptions*,* optional) – process group options specifying what additional options need to be passed in during the construction of specific process groups. As of now, the only options we support is ProcessGroupNCCL.Options for the nccl backend, is_high_priority_stream can be specified so that the nccl backend can pick up high priority cuda streams when there’re compute kernels waiting.

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/1011793
推荐阅读
相关标签
  

闽ICP备14008679号