当前位置:   article > 正文

【技术总结】Pytorch 多卡训练例程解析_多机多卡训练pytorch

多机多卡训练pytorch

目录


在Pytorch中,DDP训练模式可以利用单机上的多块显卡进行训练,能够保证训练时所有显卡的显存消耗相同(如果分配的数据量相同)。在多卡训练过程中,原理上是启动多进程训练,进程之间依赖网络通讯共享梯度。


例程

import os
import pathlib
import numpy as np
import tqdm
import argparse
import torch
import torch.nn as nn
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.optim import AdamW
from torch.optim.lr_scheduler import StepLR
from torch.utils.tensorboard import SummaryWriter
from torchvision import transforms
from utils import load_configs
from data import data_pipeline, get_ood_dataloader, RESIZE, CROP
from model import get_model
from ood_matrics import ACC, FPR, AUROC
from test import test



def setup(rank, world_size):
    # rank: 进程 ID 0-(world_size - 1)
    # world_size: 进程个数
    print("multi gpu setup", rank, world_size)
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12345'

    dist.init_process_group("nccl", rank=rank, world_size=world_size)
    torch.cuda.set_device(rank)
    return 

def cleanup():
    dist.destroy_process_group()


def train(model: torch.nn.Module, ID_dataloader_train, ID_dataloader_val, OOD_dataloader, epoch, lr, checkpoint_path, fpr_threshold=0.95, log_path='./logs/baseline', device='cuda', current_rank=None, trainlog_writer=None):
    
    
    params = [
            {'params': model.od_head.parameters(), 'lr': lr},
            {'params': model.backbone.fc.parameters()},
            # {'params': model.backbone.layer4.parameters(), 'lr': 0.1 * lr}
    ]

    if hasattr(model, 'proj_head'):
        print("add proj head")
        params.append({'params': model.proj_head.parameters(), 'lr': 0.5 * lr})

    
    optimizer = AdamW(params, lr=lr)
    lr_scheduler = StepLR(optimizer, step_size=20, gamma=0.7)
    # need set the minimum lr_rate
    
    if current_rank == 0 or current_rank is None:
        trainlog_writer = SummaryWriter(log_path)
    
    if current_rank is None:
        print("load model to cuda:0")
        model = model.to(device)
        
    else:
        model = model.to(device)
        model = DDP(model, device_ids=[current_rank])

    best = 1
    
    for i in range(epoch):
        train_acc = 0
        train_loss = 0
        model.train()

        data_loader = ID_dataloader_train
        if current_rank is not None:
            ID_dataloader_train.sampler.set_epoch(i)
            if current_rank == 0:
                data_loader = tqdm.tqdm(ID_dataloader_train, ncols=100)

        for x, y in data_loader:

            x = x.to(device)
            y = y.to(device)
            

            if current_rank is None:
                loss, acc = model.loss(x, y, 0.4)
            else:
                loss, acc = model.module.loss(x, y, 0.4)

            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

            train_loss += float(loss)
            train_acc += acc

        train_acc /= len(ID_dataloader_train)
        train_loss /= len(ID_dataloader_train)
        
        if current_rank == 0 or current_rank is None:
            
            val_acc, val_loss, fpr, auroc = test(model if current_rank is None else model.module, ID_dataloader_val, OOD_dataloader, fpr_threshold)
            print(f"Epoch{i: >5}: Train ACC:{train_acc: >6.4f}, Train Loss:{train_loss: >6.4f}")
            print(f"Epoch{i: >5}: Val ACC:{val_acc: >6.4f}, FPR@{int(100 * fpr_threshold)}:{fpr: >6.4f}, AUROC:{auroc: >6.4f}, ID LOSS: {val_loss: >6.4f}")

            trainlog_writer.add_scalar('train/acc', train_acc, i)
            trainlog_writer.add_scalar('train/loss', train_loss, i)
            trainlog_writer.add_scalar('val/acc', val_acc, i)
            trainlog_writer.add_scalar(f'val/fpr@{int(100 * fpr_threshold)}', fpr, i)
            trainlog_writer.add_scalar('val/auroc', auroc, i)
            trainlog_writer.add_scalar('val/id_loss', val_loss, i)
        
            if fpr < best + 0.1: # save the model
                if fpr < best: 
                    best = fpr
                cur_path = os.path.join(checkpoint_path, f'model_{i}_{int(100 * fpr_threshold)}_{fpr:.3f}.pth')
                torch.save(model.state_dict() if current_rank is None else model.module.state_dict(), cur_path)
    
        lr_scheduler.step()
    return 


def main(rank:int, world_size:int, configs):

    multi_gpus = False
    if rank is not None:
        setup(rank, world_size)
        multi_gpus = True
    
    # load model
    model = get_model(configs)

    # load pretrained model
    try:
        params = torch.load(configs['model_path'], map_location='cpu')
        model.load_state_dict(params, strict=False)
        print("train from our pretrained model.")
    except:
        print("train from pretrained imagenet weight.")
    
    # dataset
    transform = transforms.Compose([
        # transforms.Resize(256),
        transforms.RandomResizedCrop(CROP, (0.8, 1.0)),
        transforms.RandomHorizontalFlip(0.5),
        transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4, hue=min(0.5, 0.4)),
        transforms.ToTensor(),
        transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
    ])
    

    train_max_data_len = None
    val_max_data_len = 5000
    imagenet_train_loader, imagenet_val_loader = data_pipeline(
        transform, configs['batch_size'], train_max_data_len=train_max_data_len, 
        val_max_data_len=val_max_data_len, multi_gpus=multi_gpus, random_max_len=50000)

    ood_dataloader = get_ood_dataloader(configs, batch_size=configs['batch_size'], max_data_len=val_max_data_len, multi_gpus=multi_gpus)
    
    train(
        model, 
        imagenet_train_loader, 
        imagenet_val_loader, 
        ood_dataloader, 
        configs['epoch'], 
        configs['lr'], 
        configs['checkpoint_path'],
        configs['fpr_threshold'],
        configs['log_path'],
        current_rank=rank,
    )

    if rank is not None:
        cleanup()



if __name__ == "__main__":
    
    parser = argparse.ArgumentParser("OOD DET")
    parser.add_argument('--config', type=str, default='./configs/imagenet_train.yml', help='please input the path of yaml config file.')
    parser.add_argument('--gpus', type=str, default='0', help='gpu ids used for training')
    
    args = parser.parse_args()

    os.environ["CUDA_VISIBLE_DEVICES"] = args.gpus

    # check if number gpus >= 1
    N_GPUS = len(args.gpus.split(','))  # N_GPUS is a shared global parameter

    configs = load_configs(args.config)
    print(configs)

    # dir check
    if not os.path.exists(configs['checkpoint_path']):
        # os.mkdir(configs['checkpoint_path'])
        pathlib.Path(configs['checkpoint_path']).mkdir(parents=True)

    if not os.path.exists(configs['log_path']):
        # os.mkdir(configs['log_path'])
        pathlib.Path(configs['log_path']).mkdir(parents=True)

    if N_GPUS > 1:
        world_size=N_GPUS
        mp.spawn(main, args=(world_size, configs), nprocs=N_GPUS, join=True)
    else:
        main(None, None, configs)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208

解析

往往程序可能跑在单卡模式,也可能跑在多卡模式下。因此希望脚本可以自适应的在不同模式下切换。

以上脚本的启动方式为

python train.py --config ./configs/xxx.yml --gpus 0,1,2

当我们只提供一个gpu时,脚本可以切换到单卡模式运行。如果提供多张显卡,那么脚本会自动切换到DDP模型。

DDP模式有一些需要注意的点,首先如果运行在DDP模式,开始训练之前和训练结束需要运行 setup 函数和 cleanup 函数。其中 setup 函数的 torch.cuda.set_device(rank) 是比较重要的,可以防止在0号卡加载多个模型或者加载数据。

同时在训练时有一些需要注意的case:

  1. 模型需要用DDP初始化

    model = model.to(device)
    model = DDP(model, device_ids=[current_rank])

    模型初始化后, 如果想要访问model的方法或者属性,需要model.module.xxx 这种方式。

  2. dataloader的加载需要指定sampler

    from torch.utils.data.distributed import DistributedSampler
    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=False, num_workers=12, prefetch_factor=4, sampler=DistributedSampler(train_set))
    
    • 1
    • 2

    并且shuffle需要设置为False。

  3. 每一个新开始的epoch,需要重新设置 sampler

     if current_rank is not None:
            ID_dataloader_train.sampler.set_epoch(i)
            if current_rank == 0:
                data_loader = tqdm.tqdm(ID_dataloader_train, ncols=100)
    
    • 1
    • 2
    • 3
    • 4

    i 表示第i个epoch。

  4. logs写出或者保存checkpoint的时候,只需要rank=0的进程保存即可。
    dist.get_rank() 可以查询。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/371048
推荐阅读
相关标签
  

闽ICP备14008679号