当前位置:   article > 正文

机器学习框架Ray -- 2.4 基于Ray的Fashion Minst数据集识别_ray train

ray train

1. 概述

使用 Ray 和 Ray Train,可以在多个 worker 上分发训练任务,从而加速整个训练过程。每个 worker 都在独立的数据子集上训练相同的神经网络结构。在训练过程中,所有 worker 共享并更新同一个神经网络的参数。

这里的并行计算并不是用于比较不同神经网络结构的训练效果,而是用于加速单个神经网络结构的训练。通过在多个 worker 上分发训练任务,可以更快地完成整个训练过程。这种方法特别适用于大型数据集和复杂模型,因为这些情况下单个设备(例如单个 CPU 或 GPU)可能会受到计算能力和内存限制。

下面将使用Ray Train 分布式训练 PyTorch 模型的Fashion MNIST 图像分类问题。

代码的主要组成部分:

  1. 导入所需库和模块
  2. 下载 Fashion MNIST 训练数据和测试数据
  3. 定义一个神经网络模型(NeuralNetwork 类)
  4. 定义训练和验证函数(train_epoch 和 validate_epoch)
  5. 实现一个训练循环函数(train_func),它将用于每个 Ray Train worker
  6. 定义一个 train_fashion_mnist 函数,用于设置和运行分布式训练。这个函数创建一个 TorchTrainer 实例,并使用给定的配置参数(如工作器数量和是否使用 GPU)来初始化训练
  7. 最后,使用 argparse 模块处理命令行参数,并在 __main__ 块中启动 Ray 和分布式训练

目标是设置并运行一个分布式训练任务,使用 Ray Train 在 Fashion MNIST 数据集上训练一个 PyTorch 神经网络模型。这个示例展示了如何使用 Ray Train 轻松地扩展训练任务,使其可以在多个工作器上并行运行。

2. 环境构建

训练环境为《机器学习框架Ray -- 1.4 Ray RLlib的基本使用》中创建的RayRLlib环境。

本案例中除了安装Ray以外,还需要安装pytorch。

Anaconda中环境创建如下:

  1. conda create -n RayRLlib python=3.7
  2. conda activate RayRLlib
  3. conda install pytorch==1.12.1 torchvision==0.13.1 torchaudio==0.12.1 cudatoolkit=11.3 -c pytorch
  4. pip install ipykernel -i https://pypi.tuna.tsinghua.edu.cn/simple
  5. pip install pyarrow
  6. pip install gputil
  7. pip install "ray[rllib]" -i https://pypi.tuna.tsinghua.edu.cn/simple

进入RayRLlib环境中,导入所需包。

  1. import argparse
  2. from typing import Dict
  3. from ray.air import session
  4. import torch
  5. from torch import nn
  6. from torch.utils.data import DataLoader
  7. from torchvision import datasets
  8. from torchvision.transforms import ToTensor
  9. import ray.train as train
  10. from ray.train.torch import TorchTrainer
  11. from ray.air.config import ScalingConfig

3. 数据产生、定义神经网络

下载Fashion MNIST数据集,将自动联网下载数据集,位置在Linux系统根目录的data文件夹内。

  1. # Download training data from open datasets.
  2. # 下载训练数据
  3. training_data = datasets.FashionMNIST(
  4. root="~/data",
  5. train=True,
  6. download=True,
  7. transform=ToTensor(),
  8. )
  9. # Download test data from open datasets.
  10. # 下载测试数据
  11. test_data = datasets.FashionMNIST(
  12. root="~/data",
  13. train=False,
  14. download=True,
  15. transform=ToTensor(),
  16. )

定义神经网络模型。由于Fashion MNIST 数据集每张图片均为28x28大小,神经网络维度需要对应设置28x28。神经网络为若干全连接层:

  1. nn.Linear(28 * 28,512),
  2. nn.ReLU(),
  3. nn.Linear(512,128),
  4. nn.ReLU(),
  5. nn.Linear(128,64),
  6. nn.ReLU(),
  7. nn.Linear(64,10),
  8. nn.LogSoftmax(dim=1)

具体定义神经网络与训练函数。epochs原始代码为4,本文改为50。

本案例中,若使用GPU训练,--num-workers需要设置为GPU数量;--num-workers在仅CPU训练时最大可以设置为略小于CPU的线程数。

  1. # Define model
  2. # 定义神经网络模型
  3. class NeuralNetwork(nn.Module):
  4. def __init__(self):
  5. super(NeuralNetwork, self).__init__()
  6. self.flatten = nn.Flatten()
  7. self.linear_relu_stack = nn.Sequential(
  8. # nn.Linear(28 * 28, 512),
  9. # nn.ReLU(),
  10. # nn.Linear(512, 512),
  11. # nn.ReLU(),
  12. # nn.Linear(512, 10),
  13. # nn.ReLU(),
  14. nn.Linear(28 * 28,512),
  15. nn.ReLU(),
  16. nn.Linear(512,128),
  17. nn.ReLU(),
  18. nn.Linear(128,64),
  19. nn.ReLU(),
  20. nn.Linear(64,10),
  21. nn.LogSoftmax(dim=1),
  22. )
  23. def forward(self, x):
  24. x = self.flatten(x)
  25. logits = self.linear_relu_stack(x)
  26. return logits
  27. # 定义训练函数
  28. def train_epoch(dataloader, model, loss_fn, optimizer):
  29. size = len(dataloader.dataset) // session.get_world_size()
  30. model.train()
  31. for batch, (X, y) in enumerate(dataloader):
  32. # Compute prediction error
  33. pred = model(X)
  34. loss = loss_fn(pred, y)
  35. # Backpropagation
  36. optimizer.zero_grad()
  37. loss.backward()
  38. optimizer.step()
  39. if batch % 100 == 0:
  40. loss, current = loss.item(), batch * len(X)
  41. print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")
  42. # 定义验证函数
  43. def validate_epoch(dataloader, model, loss_fn):
  44. size = len(dataloader.dataset) // session.get_world_size()
  45. num_batches = len(dataloader)
  46. model.eval()
  47. test_loss, correct = 0, 0
  48. with torch.no_grad():
  49. for X, y in dataloader:
  50. pred = model(X)
  51. test_loss += loss_fn(pred, y).item()
  52. correct += (pred.argmax(1) == y).type(torch.float).sum().item()
  53. test_loss /= num_batches
  54. correct /= size
  55. print(
  56. f"Test Error: \n "
  57. f"Accuracy: {(100 * correct):>0.1f}%, "
  58. f"Avg loss: {test_loss:>8f} \n"
  59. )
  60. return test_loss
  61. # 定义 Ray Train 工作函数
  62. def train_func(config: Dict):
  63. batch_size = config["batch_size"]
  64. lr = config["lr"]
  65. epochs = config["epochs"]
  66. worker_batch_size = batch_size // session.get_world_size()
  67. # Create data loaders.
  68. train_dataloader = DataLoader(training_data, batch_size=worker_batch_size)
  69. test_dataloader = DataLoader(test_data, batch_size=worker_batch_size)
  70. train_dataloader = train.torch.prepare_data_loader(train_dataloader)
  71. test_dataloader = train.torch.prepare_data_loader(test_dataloader)
  72. # Create model.
  73. model = NeuralNetwork()
  74. model = train.torch.prepare_model(model)
  75. loss_fn = nn.CrossEntropyLoss()
  76. optimizer = torch.optim.SGD(model.parameters(), lr=lr)
  77. for _ in range(epochs):
  78. train_epoch(train_dataloader, model, loss_fn, optimizer)
  79. loss = validate_epoch(test_dataloader, model, loss_fn)
  80. session.report(dict(loss=loss))
  81. # fashion mnist训练函数
  82. def train_fashion_mnist(num_workers=1, use_gpu=True):
  83. trainer = TorchTrainer(
  84. train_loop_per_worker=train_func,
  85. train_loop_config={"lr": 1e-3, "batch_size": 64, "epochs": 50},
  86. scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
  87. )
  88. result = trainer.fit()
  89. print(f"Last result: {result.metrics}")
  • 如果使用CPU训练模型,可参考修改对应代码段(以20线程CPU为例):
  1. ...
  2. trainer = TorchTrainer(
  3. train_loop_per_worker=train_func,
  4. train_loop_config={"lr": 1e-3, "batch_size": 64, "epochs": 50},
  5. scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
  6. )
  7. ...
  8. parser.add_argument(
  9. "--num-workers",
  10. "-n",
  11. type=int,
  12. default=20,
  13. help="Sets number of workers for training.",
  14. )
  15. parser.add_argument(
  16. "--use-gpu", action="store_true", default=False, help="Enables GPU training"
  17. )
  18. ...
  • 如果使用单GPU计算,可参考修改对应代码段:
  1. ...
  2. trainer = TorchTrainer(
  3. train_loop_per_worker=train_func,
  4. train_loop_config={"lr": 1e-3, "batch_size": 64, "epochs": 50},
  5. scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
  6. )
  7. ...
  8. parser.add_argument(
  9. "--num-workers",
  10. "-n",
  11. type=int,
  12. default=1, # 若使用GPU,此处为GPU的数量对应,否则会卡在pending
  13. help="Sets number of workers for training.",
  14. )
  15. parser.add_argument(
  16. "--use-gpu", action="store_true", default=True, help="Enables GPU training"
  17. )
  18. ...

4.训练模型

  1. if __name__ == "__main__":
  2. parser = argparse.ArgumentParser()
  3. parser.add_argument(
  4. "--address", required=False, type=str, help="the address to use for Ray"
  5. )
  6. parser.add_argument(
  7. "--num-workers",
  8. "-n",
  9. type=int,
  10. default=20,
  11. help="Sets number of workers for training.",
  12. )
  13. parser.add_argument(
  14. "--use-gpu", action="store_true", default=False, help="Enables GPU training"
  15. )
  16. parser.add_argument(
  17. "--smoke-test",
  18. action="store_true",
  19. default=False,
  20. help="Finish quickly for testing.",
  21. )
  22. args, _ = parser.parse_known_args()
  23. import ray
  24. if args.smoke_test:
  25. # 2 workers + 1 for trainer.
  26. ray.init(num_cpus=3)
  27. train_fashion_mnist()
  28. else:
  29. ray.init(address=args.address)
  30. train_fashion_mnist(num_workers=args.num_workers, use_gpu=args.use_gpu)

使用GPU时,显示如下: 

以CPU与GPU分别以相同的学习率训练50epochs,最终准确率都在80%以上。

后续补充如何修改超参数,以提高预测精度。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/236017
推荐阅读
相关标签
  

闽ICP备14008679号