赞
踩
在Pytorch中,DDP训练模式可以利用单机上的多块显卡进行训练,能够保证训练时所有显卡的显存消耗相同(如果分配的数据量相同)。在多卡训练过程中,原理上是启动多进程训练,进程之间依赖网络通讯共享梯度。
import os import pathlib import numpy as np import tqdm import argparse import torch import torch.nn as nn import torch.distributed as dist import torch.multiprocessing as mp from torch.nn.parallel import DistributedDataParallel as DDP from torch.optim import AdamW from torch.optim.lr_scheduler import StepLR from torch.utils.tensorboard import SummaryWriter from torchvision import transforms from utils import load_configs from data import data_pipeline, get_ood_dataloader, RESIZE, CROP from model import get_model from ood_matrics import ACC, FPR, AUROC from test import test def setup(rank, world_size): # rank: 进程 ID 0-(world_size - 1) # world_size: 进程个数 print("multi gpu setup", rank, world_size) os.environ['MASTER_ADDR'] = 'localhost' os.environ['MASTER_PORT'] = '12345' dist.init_process_group("nccl", rank=rank, world_size=world_size) torch.cuda.set_device(rank) return def cleanup(): dist.destroy_process_group() def train(model: torch.nn.Module, ID_dataloader_train, ID_dataloader_val, OOD_dataloader, epoch, lr, checkpoint_path, fpr_threshold=0.95, log_path='./logs/baseline', device='cuda', current_rank=None, trainlog_writer=None): params = [ {'params': model.od_head.parameters(), 'lr': lr}, {'params': model.backbone.fc.parameters()}, # {'params': model.backbone.layer4.parameters(), 'lr': 0.1 * lr} ] if hasattr(model, 'proj_head'): print("add proj head") params.append({'params': model.proj_head.parameters(), 'lr': 0.5 * lr}) optimizer = AdamW(params, lr=lr) lr_scheduler = StepLR(optimizer, step_size=20, gamma=0.7) # need set the minimum lr_rate if current_rank == 0 or current_rank is None: trainlog_writer = SummaryWriter(log_path) if current_rank is None: print("load model to cuda:0") model = model.to(device) else: model = model.to(device) model = DDP(model, device_ids=[current_rank]) best = 1 for i in range(epoch): train_acc = 0 train_loss = 0 model.train() data_loader = ID_dataloader_train if current_rank is not None: ID_dataloader_train.sampler.set_epoch(i) if current_rank == 0: data_loader = tqdm.tqdm(ID_dataloader_train, ncols=100) for x, y in data_loader: x = x.to(device) y = y.to(device) if current_rank is None: loss, acc = model.loss(x, y, 0.4) else: loss, acc = model.module.loss(x, y, 0.4) loss.backward() optimizer.step() optimizer.zero_grad() train_loss += float(loss) train_acc += acc train_acc /= len(ID_dataloader_train) train_loss /= len(ID_dataloader_train) if current_rank == 0 or current_rank is None: val_acc, val_loss, fpr, auroc = test(model if current_rank is None else model.module, ID_dataloader_val, OOD_dataloader, fpr_threshold) print(f"Epoch{i: >5}: Train ACC:{train_acc: >6.4f}, Train Loss:{train_loss: >6.4f}") print(f"Epoch{i: >5}: Val ACC:{val_acc: >6.4f}, FPR@{int(100 * fpr_threshold)}:{fpr: >6.4f}, AUROC:{auroc: >6.4f}, ID LOSS: {val_loss: >6.4f}") trainlog_writer.add_scalar('train/acc', train_acc, i) trainlog_writer.add_scalar('train/loss', train_loss, i) trainlog_writer.add_scalar('val/acc', val_acc, i) trainlog_writer.add_scalar(f'val/fpr@{int(100 * fpr_threshold)}', fpr, i) trainlog_writer.add_scalar('val/auroc', auroc, i) trainlog_writer.add_scalar('val/id_loss', val_loss, i) if fpr < best + 0.1: # save the model if fpr < best: best = fpr cur_path = os.path.join(checkpoint_path, f'model_{i}_{int(100 * fpr_threshold)}_{fpr:.3f}.pth') torch.save(model.state_dict() if current_rank is None else model.module.state_dict(), cur_path) lr_scheduler.step() return def main(rank:int, world_size:int, configs): multi_gpus = False if rank is not None: setup(rank, world_size) multi_gpus = True # load model model = get_model(configs) # load pretrained model try: params = torch.load(configs['model_path'], map_location='cpu') model.load_state_dict(params, strict=False) print("train from our pretrained model.") except: print("train from pretrained imagenet weight.") # dataset transform = transforms.Compose([ # transforms.Resize(256), transforms.RandomResizedCrop(CROP, (0.8, 1.0)), transforms.RandomHorizontalFlip(0.5), transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4, hue=min(0.5, 0.4)), transforms.ToTensor(), transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)) ]) train_max_data_len = None val_max_data_len = 5000 imagenet_train_loader, imagenet_val_loader = data_pipeline( transform, configs['batch_size'], train_max_data_len=train_max_data_len, val_max_data_len=val_max_data_len, multi_gpus=multi_gpus, random_max_len=50000) ood_dataloader = get_ood_dataloader(configs, batch_size=configs['batch_size'], max_data_len=val_max_data_len, multi_gpus=multi_gpus) train( model, imagenet_train_loader, imagenet_val_loader, ood_dataloader, configs['epoch'], configs['lr'], configs['checkpoint_path'], configs['fpr_threshold'], configs['log_path'], current_rank=rank, ) if rank is not None: cleanup() if __name__ == "__main__": parser = argparse.ArgumentParser("OOD DET") parser.add_argument('--config', type=str, default='./configs/imagenet_train.yml', help='please input the path of yaml config file.') parser.add_argument('--gpus', type=str, default='0', help='gpu ids used for training') args = parser.parse_args() os.environ["CUDA_VISIBLE_DEVICES"] = args.gpus # check if number gpus >= 1 N_GPUS = len(args.gpus.split(',')) # N_GPUS is a shared global parameter configs = load_configs(args.config) print(configs) # dir check if not os.path.exists(configs['checkpoint_path']): # os.mkdir(configs['checkpoint_path']) pathlib.Path(configs['checkpoint_path']).mkdir(parents=True) if not os.path.exists(configs['log_path']): # os.mkdir(configs['log_path']) pathlib.Path(configs['log_path']).mkdir(parents=True) if N_GPUS > 1: world_size=N_GPUS mp.spawn(main, args=(world_size, configs), nprocs=N_GPUS, join=True) else: main(None, None, configs)
往往程序可能跑在单卡模式,也可能跑在多卡模式下。因此希望脚本可以自适应的在不同模式下切换。
以上脚本的启动方式为
python train.py --config ./configs/xxx.yml --gpus 0,1,2
当我们只提供一个gpu时,脚本可以切换到单卡模式运行。如果提供多张显卡,那么脚本会自动切换到DDP模型。
DDP模式有一些需要注意的点,首先如果运行在DDP模式,开始训练之前和训练结束需要运行 setup 函数和 cleanup 函数。其中 setup 函数的 torch.cuda.set_device(rank) 是比较重要的,可以防止在0号卡加载多个模型或者加载数据。
同时在训练时有一些需要注意的case:
模型需要用DDP初始化
model = model.to(device)
model = DDP(model, device_ids=[current_rank])
模型初始化后, 如果想要访问model的方法或者属性,需要model.module.xxx 这种方式。
dataloader的加载需要指定sampler
from torch.utils.data.distributed import DistributedSampler
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=False, num_workers=12, prefetch_factor=4, sampler=DistributedSampler(train_set))
并且shuffle需要设置为False。
每一个新开始的epoch,需要重新设置 sampler
if current_rank is not None:
ID_dataloader_train.sampler.set_epoch(i)
if current_rank == 0:
data_loader = tqdm.tqdm(ID_dataloader_train, ncols=100)
i 表示第i个epoch。
logs写出或者保存checkpoint的时候,只需要rank=0的进程保存即可。
dist.get_rank() 可以查询。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。