赞
踩
在使用PyTorch进行多GPU分布式训练时,主要目标是利用多个GPU并行处理数据,从而加速训练过程。这通常涉及到数据的并行加载、模型的并行更新以及跨GPU的梯度同步。
PyTorch提供了torch.nn.parallel.DistributedDataParallel
(DDP)这样的工具,它可以轻松地将模型包装起来以进行分布式训练。此外,PyTorch还提供了一个torch.distributed
包,它包含了一些用于分布式训练的低级API。
为了进行多GPU分布式训练,你需要遵循以下步骤:
初始化分布式环境:使用torch.distributed.init_process_group
初始化分布式环境,并指定后端(如"nccl"或"gloo")。
创建模型和数据加载器:定义你的模型和数据加载器,确保它们可以在多个进程中运行。
包装模型:使用torch.nn.parallel.DistributedDataParallel
将你的模型包装起来,以便在多个GPU上进行分布式训练。
训练循环:在训练循环中,将数据加载到GPU上,前向传播模型,计算损失,反向传播梯度,并使用优化器更新模型参数。确保梯度在所有GPU上同步。
保存和加载模型:在训练过程中,定期保存模型检查点,并在需要时从检查点恢复训练。
通过这些步骤,你可以利用PyTorch进行高效的多GPU分布式训练。
使用PyTorch进行多GPU分布式训练通常有两种方式在多个设备之间分配计算:
数据并行性,其中单个模型在多个设备或机器上进行复制。它们各自处理不同的数据批次,然后合并它们的结果。这种设置有许多变体,区别在于不同的模型副本如何合并结果,以及它们是否在每个批次上都保持同步,或者它们是否更加松散地耦合等。
模型并行性,其中单个模型的不同部分在不同的设备上运行,共同处理单个数据批次。这种方法最适合具有自然并行架构的模型,例如具有多个分支的模型。
本问主要讨论数据并行性,特别是同步数据并行性,其中模型的不同副本在它们处理每个批次后保持同步。同步性使模型的收敛行为与在单个设备上训练时看到的行为保持一致。
具体来说,本文主要探讨了如何使用PyTorch的DistributedDataParallel模块包装器来在多个GPU(通常是2到16个)上训练Keras,而你的代码几乎不需要做任何修改。这些GPU安装在一台机器上(单主机,多设备训练)。这是研究人员和小规模工业工作流程中最常见的设置。
import os os.environ["KERAS_BACKEND"] = "torch" import torch import numpy as np import keras def get_model(): # Make a simple convnet with batch normalization and dropout. inputs = keras.Input(shape=(28, 28, 1)) x = keras.layers.Rescaling(1.0 / 255.0)(inputs) x = keras.layers.Conv2D(filters=12, kernel_size=3, padding="same", use_bias=False)( x ) x = keras.layers.BatchNormalization(scale=False, center=True)(x) x = keras.layers.ReLU()(x) x = keras.layers.Conv2D( filters=24, kernel_size=6, use_bias=False, strides=2, )(x) x = keras.layers.BatchNormalization(scale=False, center=True)(x) x = keras.layers.ReLU()(x) x = keras.layers.Conv2D( filters=32, kernel_size=6, padding="same", strides=2, name="large_k", )(x) x = keras.layers.BatchNormalization(scale=False, center=True)(x) x = keras.layers.ReLU()(x) x = keras.layers.GlobalAveragePooling2D()(x) x = keras.layers.Dense(256, activation="relu")(x) x = keras.layers.Dropout(0.5)(x) outputs = keras.layers.Dense(10)(x) model = keras.Model(inputs, outputs) return model def get_dataset(): # Load the data and split it between train and test sets (x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data() # Scale images to the [0, 1] range x_train = x_train.astype("float32") x_test = x_test.astype("float32") # Make sure images have shape (28, 28, 1) x_train = np.expand_dims(x_train, -1) x_test = np.expand_dims(x_test, -1) print("x_train shape:", x_train.shape) # Create a TensorDataset dataset = torch.utils.data.TensorDataset( torch.from_numpy(x_train), torch.from_numpy(y_train) ) return dataset
然后,定义一个简单的PyTorch训练循环,该循环针对GPU进行(注意调用 .cuda())。
def train_model(model, dataloader, num_epochs, optimizer, loss_fn): for epoch in range(num_epochs): running_loss = 0.0 running_loss_count = 0 for batch_idx, (inputs, targets) in enumerate(dataloader): inputs = inputs.cuda(non_blocking=True) targets = targets.cuda(non_blocking=True) # Forward pass outputs = model(inputs) loss = loss_fn(outputs, targets) # Backward and optimize optimizer.zero_grad() loss.backward() optimizer.step() running_loss += loss.item() running_loss_count += 1 # Print loss statistics print( f"Epoch {epoch + 1}/{num_epochs}, " f"Loss: {running_loss / running_loss_count}" )
在这种设置中,程序员有一台机器上配备了多个GPU(通常是2到16个)。每个设备将运行你的模型的一个副本(称为副本)。为了简化,在以下内容中,我们将假设我们正在处理8个GPU。
在训练的每一步中:
在实践中,同步更新模型副本权重的过程是在每个单独的权重变量级别上处理的。这是通过镜像变量对象来完成的。
要在单主机上使用Keras模型进行多设备同步训练,程序员会使用torch.nn.parallel.DistributedDataParallel
模块包装器。以下是它的工作原理:
torch.multiprocessing.start_processes
来启动多个Python进程,每个设备一个进程。每个进程将运行per_device_launch_fn
函数。per_device_launch_fn
函数执行以下操作:
torch.distributed.init_process_group
和torch.cuda.set_device
来配置该进程要使用的设备。torch.utils.data.distributed.DistributedSampler
和torch.utils.data.DataLoader
将我们的数据转换为一个分布式数据加载器。torch.nn.parallel.DistributedDataParallel
将我们的模型转换为一个分布式PyTorch模块。train_model
函数。train_model
函数随后将在每个进程中运行,每个进程中的模型使用单独的设备。以下是流程,其中每个步骤都被拆分为自己的实用函数:
# Config num_gpu = torch.cuda.device_count() num_epochs = 2 batch_size = 64 print(f"Running on {num_gpu} GPUs") def setup_device(current_gpu_index, num_gpus): # Device setup os.environ["MASTER_ADDR"] = "localhost" os.environ["MASTER_PORT"] = "56492" device = torch.device("cuda:{}".format(current_gpu_index)) torch.distributed.init_process_group( backend="nccl", init_method="env://", world_size=num_gpus, rank=current_gpu_index, ) torch.cuda.set_device(device) def cleanup(): torch.distributed.destroy_process_group() def prepare_dataloader(dataset, current_gpu_index, num_gpus, batch_size): sampler = torch.utils.data.distributed.DistributedSampler( dataset, num_replicas=num_gpus, rank=current_gpu_index, shuffle=False, ) dataloader = torch.utils.data.DataLoader( dataset, sampler=sampler, batch_size=batch_size, shuffle=False, ) return dataloader def per_device_launch_fn(current_gpu_index, num_gpu): # Setup the process groups setup_device(current_gpu_index, num_gpu) dataset = get_dataset() model = get_model() # prepare the dataloader dataloader = prepare_dataloader(dataset, current_gpu_index, num_gpu, batch_size) # Instantiate the torch optimizer optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) # Instantiate the torch loss function loss_fn = torch.nn.CrossEntropyLoss() # Put model on device model = model.to(current_gpu_index) ddp_model = torch.nn.parallel.DistributedDataParallel( model, device_ids=[current_gpu_index], output_device=current_gpu_index ) train_model(ddp_model, dataloader, num_epochs, optimizer, loss_fn) cleanup()
if __name__ == "__main__":
# We use the "fork" method rather than "spawn" to support notebooks
torch.multiprocessing.start_processes(
per_device_launch_fn,
args=(num_gpu,),
nprocs=num_gpu,
join=True,
start_method="fork",
)
本文主要介绍了如何在PyTorch中实现单主机多设备的同步训练。这种训练方式中,一台机器配备多个GPU,每个GPU上运行模型的一个副本。训练过程中,全局数据批次被分割成多个局部批次,每个GPU独立处理一个局部批次,然后通过有效的合并方式同步更新所有模型副本的权重。为了实现这种训练方式,需要使用torch.nn.parallel.DistributedDataParallel
模块和其他相关工具。此外,还介绍了如何使用多个Python进程和分布式数据加载器来进一步提高训练效率。最后提到了启动多个进程的时间问题。总的来说,这是一种利用多GPU并行计算能力来加速深度学习模型训练的方法。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。