赞
踩
本文为李沐老师《动手学深度学习》笔记小结,用于个人复习并记录学习历程,适用于初学者
训练深层神经网络是十分困难的,特别是在较短的时间内使他们收敛更加棘手。 本节将介绍批量规范化(batch normalization),这是一种流行且有效的技术,可持续加速深层网络的收敛速度。
- import torch
- from torch import nn
-
-
- def batch_norm(X, gamma, beta, moving_mean, moving_var, eps, momentum):
- # 通过is_grad_enabled来判断当前模式是训练模式还是预测模式
- if not torch.is_grad_enabled():
- # 如果是在预测模式下,直接使用传入的移动平均所得的均值和方差
- X_hat = (X - moving_mean) / torch.sqrt(moving_var + eps)
- else:
- assert len(X.shape) in (2, 4)
- if len(X.shape) == 2:
- # 使用全连接层的情况,计算特征维上的均值和方差
- mean = X.mean(dim=0)
- var = ((X - mean) ** 2).mean(dim=0)
- else:
- # 使用二维卷积层的情况,计算通道维上(axis=1)的均值和方差。
- # 这里我们需要保持X的形状以便后面可以做广播运算
- mean = X.mean(dim=(0, 2, 3), keepdim=True)
- var = ((X - mean) ** 2).mean(dim=(0, 2, 3), keepdim=True)
- # 训练模式下,用当前的均值和方差做标准化
- X_hat = (X - mean) / torch.sqrt(var + eps)
- # 更新移动平均的均值和方差
- moving_mean = momentum * moving_mean + (1.0 - momentum) * mean
- moving_var = momentum * moving_var + (1.0 - momentum) * var
- Y = gamma * X_hat + beta # 缩放和移位
- return Y, moving_mean.data, moving_var.data
- class BatchNorm(nn.Module):
- # num_features:完全连接层的输出数量或卷积层的输出通道数。
- # num_dims:2表示完全连接层,4表示卷积层
- def __init__(self, num_features, num_dims):
- super().__init__()
- if num_dims == 2:
- shape = (1, num_features)
- else:
- shape = (1, num_features, 1, 1)
- # 参与求梯度和迭代的拉伸和偏移参数,分别初始化成1和0
- self.gamma = nn.Parameter(torch.ones(shape))
- self.beta = nn.Parameter(torch.zeros(shape))
- # 非模型参数的变量初始化为0和1
- self.moving_mean = torch.zeros(shape)
- self.moving_var = torch.ones(shape)
-
- def forward(self, X):
- # 如果X不在内存上,将moving_mean和moving_var
- # 复制到X所在显存上
- if self.moving_mean.device != X.device:
- self.moving_mean = self.moving_mean.to(X.device)
- self.moving_var = self.moving_var.to(X.device)
- # 保存更新过的moving_mean和moving_var
- Y, self.moving_mean, self.moving_var = batch_norm(
- X, self.gamma, self.beta, self.moving_mean,
- self.moving_var, eps=1e-5, momentum=0.9)
- return Y
批量规范化是在卷积层或全连接层之后、相应的激活函数之前应用的。
- net = nn.Sequential(
- nn.Conv2d(1, 6, kernel_size=5), BatchNorm(6, num_dims=4), nn.Sigmoid(),
- nn.AvgPool2d(kernel_size=2, stride=2),
- nn.Conv2d(6, 16, kernel_size=5), BatchNorm(16, num_dims=4), nn.Sigmoid(),
- nn.AvgPool2d(kernel_size=2, stride=2), nn.Flatten(),
- nn.Linear(16*4*4, 120), BatchNorm(120, num_dims=2), nn.Sigmoid(),
- nn.Linear(120, 84), BatchNorm(84, num_dims=2), nn.Sigmoid(),
- nn.Linear(84, 10))
和之前多篇文章中提到的一样,不再赘述,只给出代码
- from IPython import display
- import torchvision
- from torch.utils import data
- from torchvision import transforms
- import matplotlib.pyplot as plt
-
- def load_data_fashion_mnist(batch_size, resize=None):
- """下载Fashion-MNIST数据集,然后将其加载到内存中"""
- trans = [transforms.ToTensor()]
- if resize:
- trans.insert(0, transforms.Resize(resize))
- trans = transforms.Compose(trans)
- mnist_train = torchvision.datasets.FashionMNIST(
- root="../data", train=True, transform=trans, download=0)
- mnist_test = torchvision.datasets.FashionMNIST(
- root="../data", train=False, transform=trans, download=0)
- return (data.DataLoader(mnist_train, batch_size, shuffle=True,
- num_workers=get_dataloader_workers()),
- data.DataLoader(mnist_test, batch_size, shuffle=False,
- num_workers=get_dataloader_workers()))
-
- def get_dataloader_workers():
- """使用4个进程来读取数据"""
- return 4
-
- batch_size = 128
- train_iter, test_iter = load_data_fashion_mnist(batch_size, resize=224)
-
- lr, num_epochs, batch_size = 1.0, 10, 256
- train_iter, test_iter = load_data_fashion_mnist(batch_size)
-
- def accuracy(y_hat, y): #@save
- """计算预测正确的数量"""
- if len(y_hat.shape) > 1 and y_hat.shape[1] > 1:
- y_hat = y_hat.argmax(axis=1) #找出输入张量(tensor)中最大值的索引
- cmp = y_hat.type(y.dtype) == y
- return float(cmp.type(y.dtype).sum())
- class Accumulator: #@save
- """在n个变量上累加"""
- def __init__(self, n):
- self.data = [0.0] * n
-
- def add(self, *args):
- self.data = [a + float(b) for a, b in zip(self.data, args)]
-
- def reset(self):
- self.data = [0.0] * len(self.data)
-
- def __getitem__(self, idx):
- return self.data[idx]
-
- import matplotlib.pyplot as plt
- from matplotlib_inline import backend_inline
-
- def use_svg_display():
- """使⽤svg格式在Jupyter中显⽰绘图"""
- backend_inline.set_matplotlib_formats('svg')
-
- def set_axes(axes, xlabel, ylabel, xlim, ylim, xscale, yscale, legend):
- """设置matplotlib的轴"""
- axes.set_xlabel(xlabel)
- axes.set_ylabel(ylabel)
- axes.set_xscale(xscale)
- axes.set_yscale(yscale)
- axes.set_xlim(xlim)
- axes.set_ylim(ylim)
- if legend:
- axes.legend(legend)
- axes.grid()
-
- class Animator: #@save
- """在动画中绘制数据"""
- def __init__(self, xlabel=None, ylabel=None, legend=None, xlim=None,
- ylim=None, xscale='linear', yscale='linear',
- fmts=('-', 'm--', 'g-.', 'r:'), nrows=1, ncols=1,
- figsize=(3.5, 2.5)):
- # 增量地绘制多条线
- if legend is None:
- legend = []
- use_svg_display()
- self.fig, self.axes = plt.subplots(nrows, ncols, figsize=figsize)
- if nrows * ncols == 1:
- self.axes = [self.axes, ]
- # 使用lambda函数捕获参数
- self.config_axes = lambda: set_axes(
- self.axes[0], xlabel, ylabel, xlim, ylim, xscale, yscale, legend)
- self.X, self.Y, self.fmts = None, None, fmts
-
- def add(self, x, y):
- # 向图表中添加多个数据点
- if not hasattr(y, "__len__"):
- y = [y]
- n = len(y)
- if not hasattr(x, "__len__"):
- x = [x] * n
- if not self.X:
- self.X = [[] for _ in range(n)]
- if not self.Y:
- self.Y = [[] for _ in range(n)]
- for i, (a, b) in enumerate(zip(x, y)):
- if a is not None and b is not None:
- self.X[i].append(a)
- self.Y[i].append(b)
- self.axes[0].cla()
- for x, y, fmt in zip(self.X, self.Y, self.fmts):
- self.axes[0].plot(x, y, fmt)
- self.config_axes()
- display.display(self.fig)
- display.clear_output(wait=True)
-
- def evaluate_accuracy_gpu(net, data_iter, device=None): #@save
- """使用GPU计算模型在数据集上的精度"""
- if isinstance(net, nn.Module):
- net.eval() # 设置为评估模式
- if not device:
- device = next(iter(net.parameters())).device
- # 正确预测的数量,总预测的数量
- metric = Accumulator(2)
- with torch.no_grad():
- for X, y in data_iter:
- if isinstance(X, list):
- # BERT微调所需的(之后将介绍)
- X = [x.to(device) for x in X]
- else:
- X = X.to(device)
- y = y.to(device)
- metric.add(accuracy(net(X), y), y.numel())
- return metric[0] / metric[1]
-
- import time
- class Timer: #@save
- """记录多次运行时间"""
- def __init__(self):
- self.times = []
- self.start()
-
- def start(self):
- """启动计时器"""
- self.tik = time.time()
-
- def stop(self):
- """停止计时器并将时间记录在列表中"""
- self.times.append(time.time() - self.tik)
- return self.times[-1]
-
- def avg(self):
- """返回平均时间"""
- return sum(self.times) / len(self.times)
-
- def sum(self):
- """返回时间总和"""
- return sum(self.times)
-
- def cumsum(self):
- """返回累计时间"""
- return np.array(self.times).cumsum().tolist()
-
- def train_ch6(net, train_iter, test_iter, num_epochs, lr, device):
- """用GPU训练模型(在第六章定义)"""
- def init_weights(m):
- if type(m) == nn.Linear or type(m) == nn.Conv2d:
- nn.init.xavier_uniform_(m.weight)
- net.apply(init_weights)
- print('training on', device)
- net.to(device)
- optimizer = torch.optim.SGD(net.parameters(), lr=lr)
- loss = nn.CrossEntropyLoss()
- animator = Animator(xlabel='epoch', xlim=[1, num_epochs],
- legend=['train loss', 'train acc', 'test acc'])
- timer, num_batches = Timer(), len(train_iter)
- for epoch in range(num_epochs):
- # 训练损失之和,训练准确率之和,样本数
- metric = Accumulator(3)
- net.train()
- for i, (X, y) in enumerate(train_iter):
- timer.start()
- optimizer.zero_grad()
- X, y = X.to(device), y.to(device)
- y_hat = net(X)
- l = loss(y_hat, y)
- l.backward()
- optimizer.step()
- with torch.no_grad():
- metric.add(l * X.shape[0], accuracy(y_hat, y), X.shape[0])
- timer.stop()
- train_l = metric[0] / metric[2]
- train_acc = metric[1] / metric[2]
- if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1:
- animator.add(epoch + (i + 1) / num_batches,
- (train_l, train_acc, None))
- test_acc = evaluate_accuracy_gpu(net, test_iter)
- animator.add(epoch + 1, (None, None, test_acc))
- print(f'loss {train_l:.3f}, train acc {train_acc:.3f}, '
- f'test acc {test_acc:.3f}')
- print(f'{metric[2] * num_epochs / timer.sum():.1f} examples/sec '
- f'on {str(device)}')
-
- def try_gpu(i=0): #@save
- """如果存在,则返回gpu(i),否则返回cpu()"""
- if torch.cuda.device_count() >= i + 1:
- return torch.device(f'cuda:{i}')
- return torch.device('cpu')
和以前一样,我们将在Fashion-MNIST数据集上训练网络。 这个代码与我们第一次训练LeNet时几乎完全相同,主要区别在于学习率大得多。
- begin = time.time()
- train_ch6(net, train_iter, test_iter, num_epochs, lr, try_gpu())
- end = time.time()
- print(end - begin)
这个结果,对比当时不用批量归一化层的LeNet,训练的收敛速度快了许多,loss变小了,train acc提高了许多,但是test acc没有提高太多,出现了过拟合。
除了使用我们刚刚定义的BatchNorm
,我们也可以直接使用深度学习框架中定义的BatchNorm
。 该代码看起来几乎与我们上面的代码相同。
- net = nn.Sequential(
- nn.Conv2d(1, 6, kernel_size=5), nn.BatchNorm2d(6), nn.Sigmoid(),
- nn.AvgPool2d(kernel_size=2, stride=2),
- nn.Conv2d(6, 16, kernel_size=5), nn.BatchNorm2d(16), nn.Sigmoid(),
- nn.AvgPool2d(kernel_size=2, stride=2), nn.Flatten(),
- nn.Linear(256, 120), nn.BatchNorm1d(120), nn.Sigmoid(),
- nn.Linear(120, 84), nn.BatchNorm1d(84), nn.Sigmoid(),
- nn.Linear(84, 10))
下面,我们使用相同超参数来训练模型。 请注意,通常高级API变体运行速度快得多,因为它的代码已编译为C++或CUDA,而我们的自定义代码由Python实现。
- begin = time.time()
- train_ch6(net, train_iter, test_iter, num_epochs, lr, try_gpu())
- end = time.time()
从结果可以看到,运行速度快了,并且过拟合也小了许多。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。