赞
踩
使用 Ray 和 Ray Train,可以在多个 worker 上分发训练任务,从而加速整个训练过程。每个 worker 都在独立的数据子集上训练相同的神经网络结构。在训练过程中,所有 worker 共享并更新同一个神经网络的参数。
这里的并行计算并不是用于比较不同神经网络结构的训练效果,而是用于加速单个神经网络结构的训练。通过在多个 worker 上分发训练任务,可以更快地完成整个训练过程。这种方法特别适用于大型数据集和复杂模型,因为这些情况下单个设备(例如单个 CPU 或 GPU)可能会受到计算能力和内存限制。
下面将使用Ray Train 分布式训练 PyTorch 模型的Fashion MNIST 图像分类问题。
代码的主要组成部分:
目标是设置并运行一个分布式训练任务,使用 Ray Train 在 Fashion MNIST 数据集上训练一个 PyTorch 神经网络模型。这个示例展示了如何使用 Ray Train 轻松地扩展训练任务,使其可以在多个工作器上并行运行。
训练环境为《机器学习框架Ray -- 1.4 Ray RLlib的基本使用》中创建的RayRLlib环境。
本案例中除了安装Ray以外,还需要安装pytorch。
Anaconda中环境创建如下:
- conda create -n RayRLlib python=3.7
- conda activate RayRLlib
- conda install pytorch==1.12.1 torchvision==0.13.1 torchaudio==0.12.1 cudatoolkit=11.3 -c pytorch
- pip install ipykernel -i https://pypi.tuna.tsinghua.edu.cn/simple
- pip install pyarrow
- pip install gputil
- pip install "ray[rllib]" -i https://pypi.tuna.tsinghua.edu.cn/simple
进入RayRLlib环境中,导入所需包。
- import argparse
- from typing import Dict
- from ray.air import session
-
- import torch
- from torch import nn
- from torch.utils.data import DataLoader
- from torchvision import datasets
- from torchvision.transforms import ToTensor
-
- import ray.train as train
- from ray.train.torch import TorchTrainer
- from ray.air.config import ScalingConfig
下载Fashion MNIST数据集,将自动联网下载数据集,位置在Linux系统根目录的data文件夹内。
- # Download training data from open datasets.
- # 下载训练数据
- training_data = datasets.FashionMNIST(
- root="~/data",
- train=True,
- download=True,
- transform=ToTensor(),
- )
-
- # Download test data from open datasets.
- # 下载测试数据
- test_data = datasets.FashionMNIST(
- root="~/data",
- train=False,
- download=True,
- transform=ToTensor(),
- )
定义神经网络模型。由于Fashion MNIST 数据集每张图片均为28x28大小,神经网络维度需要对应设置28x28。神经网络为若干全连接层:
- nn.Linear(28 * 28,512),
- nn.ReLU(),
- nn.Linear(512,128),
- nn.ReLU(),
- nn.Linear(128,64),
- nn.ReLU(),
- nn.Linear(64,10),
- nn.LogSoftmax(dim=1)
具体定义神经网络与训练函数。epochs原始代码为4,本文改为50。
本案例中,若使用GPU训练,--num-workers需要设置为GPU数量;--num-workers在仅CPU训练时最大可以设置为略小于CPU的线程数。
- # Define model
- # 定义神经网络模型
- class NeuralNetwork(nn.Module):
- def __init__(self):
- super(NeuralNetwork, self).__init__()
- self.flatten = nn.Flatten()
- self.linear_relu_stack = nn.Sequential(
- # nn.Linear(28 * 28, 512),
- # nn.ReLU(),
- # nn.Linear(512, 512),
- # nn.ReLU(),
- # nn.Linear(512, 10),
- # nn.ReLU(),
- nn.Linear(28 * 28,512),
- nn.ReLU(),
- nn.Linear(512,128),
- nn.ReLU(),
- nn.Linear(128,64),
- nn.ReLU(),
- nn.Linear(64,10),
- nn.LogSoftmax(dim=1),
- )
-
- def forward(self, x):
- x = self.flatten(x)
- logits = self.linear_relu_stack(x)
- return logits
-
- # 定义训练函数
- def train_epoch(dataloader, model, loss_fn, optimizer):
- size = len(dataloader.dataset) // session.get_world_size()
- model.train()
- for batch, (X, y) in enumerate(dataloader):
- # Compute prediction error
- pred = model(X)
- loss = loss_fn(pred, y)
-
- # Backpropagation
- optimizer.zero_grad()
- loss.backward()
- optimizer.step()
-
- if batch % 100 == 0:
- loss, current = loss.item(), batch * len(X)
- print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")
-
- # 定义验证函数
- def validate_epoch(dataloader, model, loss_fn):
- size = len(dataloader.dataset) // session.get_world_size()
- num_batches = len(dataloader)
- model.eval()
- test_loss, correct = 0, 0
- with torch.no_grad():
- for X, y in dataloader:
- pred = model(X)
- test_loss += loss_fn(pred, y).item()
- correct += (pred.argmax(1) == y).type(torch.float).sum().item()
- test_loss /= num_batches
- correct /= size
- print(
- f"Test Error: \n "
- f"Accuracy: {(100 * correct):>0.1f}%, "
- f"Avg loss: {test_loss:>8f} \n"
- )
- return test_loss
-
- # 定义 Ray Train 工作函数
- def train_func(config: Dict):
- batch_size = config["batch_size"]
- lr = config["lr"]
- epochs = config["epochs"]
-
- worker_batch_size = batch_size // session.get_world_size()
-
- # Create data loaders.
- train_dataloader = DataLoader(training_data, batch_size=worker_batch_size)
- test_dataloader = DataLoader(test_data, batch_size=worker_batch_size)
-
- train_dataloader = train.torch.prepare_data_loader(train_dataloader)
- test_dataloader = train.torch.prepare_data_loader(test_dataloader)
-
- # Create model.
- model = NeuralNetwork()
- model = train.torch.prepare_model(model)
-
- loss_fn = nn.CrossEntropyLoss()
- optimizer = torch.optim.SGD(model.parameters(), lr=lr)
-
- for _ in range(epochs):
- train_epoch(train_dataloader, model, loss_fn, optimizer)
- loss = validate_epoch(test_dataloader, model, loss_fn)
- session.report(dict(loss=loss))
-
- # fashion mnist训练函数
- def train_fashion_mnist(num_workers=1, use_gpu=True):
- trainer = TorchTrainer(
- train_loop_per_worker=train_func,
- train_loop_config={"lr": 1e-3, "batch_size": 64, "epochs": 50},
- scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
- )
- result = trainer.fit()
- print(f"Last result: {result.metrics}")
- ...
- trainer = TorchTrainer(
- train_loop_per_worker=train_func,
- train_loop_config={"lr": 1e-3, "batch_size": 64, "epochs": 50},
- scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
- )
-
- ...
- parser.add_argument(
- "--num-workers",
- "-n",
- type=int,
- default=20,
- help="Sets number of workers for training.",
- )
- parser.add_argument(
- "--use-gpu", action="store_true", default=False, help="Enables GPU training"
- )
- ...
- ...
- trainer = TorchTrainer(
- train_loop_per_worker=train_func,
- train_loop_config={"lr": 1e-3, "batch_size": 64, "epochs": 50},
- scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
- )
-
- ...
- parser.add_argument(
- "--num-workers",
- "-n",
- type=int,
- default=1, # 若使用GPU,此处为GPU的数量对应,否则会卡在pending
- help="Sets number of workers for training.",
- )
- parser.add_argument(
- "--use-gpu", action="store_true", default=True, help="Enables GPU training"
- )
- ...
- if __name__ == "__main__":
- parser = argparse.ArgumentParser()
- parser.add_argument(
- "--address", required=False, type=str, help="the address to use for Ray"
- )
- parser.add_argument(
- "--num-workers",
- "-n",
- type=int,
- default=20,
- help="Sets number of workers for training.",
- )
- parser.add_argument(
- "--use-gpu", action="store_true", default=False, help="Enables GPU training"
- )
- parser.add_argument(
- "--smoke-test",
- action="store_true",
- default=False,
- help="Finish quickly for testing.",
- )
-
- args, _ = parser.parse_known_args()
-
- import ray
-
- if args.smoke_test:
- # 2 workers + 1 for trainer.
- ray.init(num_cpus=3)
- train_fashion_mnist()
- else:
- ray.init(address=args.address)
- train_fashion_mnist(num_workers=args.num_workers, use_gpu=args.use_gpu)
使用GPU时,显示如下:
以CPU与GPU分别以相同的学习率训练50epochs,最终准确率都在80%以上。
后续补充如何修改超参数,以提高预测精度。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。