赞
踩
本文章为个人学习使用,版面观感若有不适请谅解,文中知识仅代表个人观点,若出现错误,欢迎各位批评指正。
import torch import torchvision import time from torch import nn from IPython import display from torchvision import transforms from torch.nn import functional as F from torch.utils import data import matplotlib.pyplot as plt from matplotlib_inline import backend_inline mydevice = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") def accuracy(y_hat, y): # 定义一个函数来为预测正确的数量计数 """计算预测正确的数量""" if len(y_hat.shape) > 1 and y_hat.shape[1] > 1: y_hat = y_hat.argmax(axis=1) cmp = y_hat.type(y.dtype) == y # bool 类型,若预测结果与实际结果一致,则为 True return float(cmp.type(y.dtype).sum()) def evaluate_accuracy_gpu(net, data_iter, device=None): """使用GPU计算模型在数据集上的精度""" if isinstance(net, nn.Module): net.eval() # 设置为评估模式 if not device: device = next(iter(net.parameters())).device # 正确预测的数量,总预测的数量 metric = Accumulator(2) with torch.no_grad(): for X, y in data_iter: if isinstance(X, list): # BERT微调所需的(之后将介绍) X = [x.to(device) for x in X] else: X = X.to(device) y = y.to(device) metric.add(accuracy(net(X), y), y.numel()) return metric[0] / metric[1] def set_axes(axes, xlabel, ylabel, xlim, ylim, xscale, yscale, legend): axes.set_xlabel(xlabel), axes.set_ylabel(ylabel) axes.set_xscale(xscale), axes.set_yscale(yscale) axes.set_xlim(xlim), axes.set_ylim(ylim) if legend: axes.legend(legend) axes.grid() class Accumulator: # 定义一个实用程序类 Accumulator,用于对多个变量进行累加 """在n个变量上累加""" def __init__(self, n): self.data = [0.0] * n def add(self, *args): self.data = [a + float(b) for a, b in zip(self.data, args)] def reset(self): self.data = [0.0] * len(self.data) def __getitem__(self, idx): return self.data[idx] class Animator: # 定义一个在动画中绘制数据的实用程序类 Animator """在动画中绘制数据""" def __init__(self, xlabel=None, ylabel=None, legend=None, xlim=None, ylim=None, xscale='linear', yscale='linear', fmts=('-', 'm--', 'g-.', 'r:'), nrows=1, ncols=1, figsize=(3.5, 2.5)): # 增量地绘制多条线 if legend is None: legend = [] backend_inline.set_matplotlib_formats('svg') self.fig, self.axes = plt.subplots(nrows, ncols, figsize=figsize) if nrows * ncols == 1: self.axes = [self.axes, ] # 使用lambda函数捕获参数 self.config_axes = lambda: set_axes( self.axes[0], xlabel, ylabel, xlim, ylim, xscale, yscale, legend) self.X, self.Y, self.fmts = None, None, fmts def add(self, x, y): # Add multiple data points into the figure if not hasattr(y, "__len__"): y = [y] n = len(y) if not hasattr(x, "__len__"): x = [x] * n if not self.X: self.X = [[] for _ in range(n)] if not self.Y: self.Y = [[] for _ in range(n)] for i, (a, b) in enumerate(zip(x, y)): if a is not None and b is not None: self.X[i].append(a) self.Y[i].append(b) self.axes[0].cla() for x, y, fmt in zip(self.X, self.Y, self.fmts): self.axes[0].plot(x, y, fmt) self.config_axes() display.display(self.fig) # 通过以下两行代码实现了在PyCharm中显示动图 # plt.draw() # plt.pause(interval=0.001) display.clear_output(wait=True) plt.rcParams['font.sans-serif'] = ['Microsoft YaHei'] class Timer: def __init__(self): self.times = [] self.start() def start(self): self.tik = time.time() def stop(self): self.times.append(time.time() - self.tik) return self.times[-1] def sum(self): """Return the sum of time.""" return sum(self.times) def load_data_fashion_mnist(batch_size, resize=None): """下载 Fashion-MNIST 数据集,然后将其加载到内存中""" trans = [transforms.ToTensor()] if resize: trans.insert(0, transforms.Resize(resize)) trans = transforms.Compose(trans) mnist_train = torchvision.datasets.FashionMNIST( root="../data", train=True, transform=trans, download=False) mnist_test = torchvision.datasets.FashionMNIST( root="../data", train=False, transform=trans, download=False) return (data.DataLoader(mnist_train, batch_size, shuffle=True, num_workers=4), data.DataLoader(mnist_test, batch_size, shuffle=False, num_workers=4)) def train(net, train_iter, test_iter, num_epochs, lr, device): def init_weights(m): if type(m) == nn.Linear or type(m) == nn.Conv2d: nn.init.xavier_uniform_(m.weight) net.apply(init_weights) print('training on', torch.cuda.get_device_name(device)) net.to(device) optimizer = torch.optim.SGD(net.parameters(), lr=lr) loss = nn.CrossEntropyLoss() animator = Animator(xlabel='epoch', xlim=[1, num_epochs], legend=['train loss', 'train acc', 'test acc']) timer, num_batches = Timer(), len(train_iter) for epoch in range(num_epochs): # 训练损失之和,训练准确率之和,样本数 metric = Accumulator(3) net.train() for i, (X, y) in enumerate(train_iter): timer.start() optimizer.zero_grad() X, y = X.to(device), y.to(device) y_hat = net(X) l = loss(y_hat, y) l.backward() optimizer.step() with torch.no_grad(): metric.add(l * X.shape[0], accuracy(y_hat, y), X.shape[0]) timer.stop() train_l = metric[0] / metric[2] train_acc = metric[1] / metric[2] if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1: animator.add(epoch + (i + 1) / num_batches, (train_l, train_acc, None)) test_acc = evaluate_accuracy_gpu(net, test_iter) animator.add(epoch + 1, (None, None, test_acc)) plt.title(f'loss {train_l:.3f}, train acc {train_acc:.3f}, test acc {test_acc:.3f}\n' f'{metric[2] * num_epochs / timer.sum():.1f} examples/sec on {str(device)}') plt.show() class Residual(nn.Module): def __init__(self, input_channels, num_channels, use_1x1conv=False, strides=1): super().__init__() self.conv1 = nn.Conv2d(input_channels, num_channels, kernel_size=3, padding=1, stride=strides) self.conv2 = nn.Conv2d(num_channels, num_channels, kernel_size=3, padding=1) if use_1x1conv: self.conv3 = nn.Conv2d(input_channels, num_channels, kernel_size=1, stride=strides) else: self.conv3 = None self.bn1 = nn.BatchNorm2d(num_channels) self.bn2 = nn.BatchNorm2d(num_channels) def forward(self, X): Y = F.relu(self.bn1(self.conv1(X))) Y = self.bn2(self.conv2(Y)) if self.conv3: X = self.conv3(X) Y += X return F.relu(Y) # 查看输入和输出形状一致的情况 blk = Residual(3, 3) X = torch.rand(4, 3, 6, 6) Y = blk(X) print(f'查看输入和输出形状是否一致 : {Y.shape == X.shape}') # 可以在增加输出通道数的同时,减半输出的高和宽 blk = Residual(3, 6, use_1x1conv=True, strides=2) print(f'增加输出通道数,减半输出的高和宽 : {blk(X).shape}') b1 = nn.Sequential(nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3), nn.BatchNorm2d(64), nn.ReLU(), nn.MaxPool2d(kernel_size=3, stride=2, padding=1)) def resnet_block(input_channels, num_channels, num_residuals, first_block=False): blk = [] for i in range(num_residuals): if i == 0 and not first_block: blk.append(Residual(input_channels, num_channels, use_1x1conv=True, strides=2)) else: blk.append(Residual(num_channels, num_channels)) return blk b2 = nn.Sequential(*resnet_block(64, 64, 2, first_block=True)) b3 = nn.Sequential(*resnet_block(64, 128, 2)) b4 = nn.Sequential(*resnet_block(128, 256, 2)) b5 = nn.Sequential(*resnet_block(256, 512, 2)) net = nn.Sequential(b1, b2, b3, b4, b5, nn.AdaptiveAvgPool2d((1,1)), nn.Flatten(), nn.Linear(512, 10)) X = torch.rand(size=(1, 1, 224, 224)) for layer in net: X = layer(X) print(layer.__class__.__name__, 'output shape:\t\t', X.shape) lr, num_epochs, batch_size = 0.05, 5, 256 train_iter, test_iter = load_data_fashion_mnist(batch_size, resize=96) train(net, train_iter, test_iter, num_epochs, lr, mydevice)
import torch import torchvision import time from torch import nn from IPython import display from torchvision import transforms from torch.utils import data import matplotlib.pyplot as plt from matplotlib_inline import backend_inline mydevice = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") def accuracy(y_hat, y): # 定义一个函数来为预测正确的数量计数 """计算预测正确的数量""" if len(y_hat.shape) > 1 and y_hat.shape[1] > 1: y_hat = y_hat.argmax(axis=1) cmp = y_hat.type(y.dtype) == y # bool 类型,若预测结果与实际结果一致,则为 True return float(cmp.type(y.dtype).sum()) def evaluate_accuracy_gpu(net, data_iter, device=None): """使用GPU计算模型在数据集上的精度""" if isinstance(net, nn.Module): net.eval() # 设置为评估模式 if not device: device = next(iter(net.parameters())).device # 正确预测的数量,总预测的数量 metric = Accumulator(2) with torch.no_grad(): for X, y in data_iter: if isinstance(X, list): # BERT微调所需的(之后将介绍) X = [x.to(device) for x in X] else: X = X.to(device) y = y.to(device) metric.add(accuracy(net(X), y), y.numel()) return metric[0] / metric[1] def set_axes(axes, xlabel, ylabel, xlim, ylim, xscale, yscale, legend): axes.set_xlabel(xlabel), axes.set_ylabel(ylabel) axes.set_xscale(xscale), axes.set_yscale(yscale) axes.set_xlim(xlim), axes.set_ylim(ylim) if legend: axes.legend(legend) axes.grid() class Accumulator: # 定义一个实用程序类 Accumulator,用于对多个变量进行累加 """在n个变量上累加""" def __init__(self, n): self.data = [0.0] * n def add(self, *args): self.data = [a + float(b) for a, b in zip(self.data, args)] def reset(self): self.data = [0.0] * len(self.data) def __getitem__(self, idx): return self.data[idx] class Animator: # 定义一个在动画中绘制数据的实用程序类 Animator """在动画中绘制数据""" def __init__(self, xlabel=None, ylabel=None, legend=None, xlim=None, ylim=None, xscale='linear', yscale='linear', fmts=('-', 'm--', 'g-.', 'r:'), nrows=1, ncols=1, figsize=(3.5, 2.5)): # 增量地绘制多条线 if legend is None: legend = [] backend_inline.set_matplotlib_formats('svg') self.fig, self.axes = plt.subplots(nrows, ncols, figsize=figsize) if nrows * ncols == 1: self.axes = [self.axes, ] # 使用lambda函数捕获参数 self.config_axes = lambda: set_axes( self.axes[0], xlabel, ylabel, xlim, ylim, xscale, yscale, legend) self.X, self.Y, self.fmts = None, None, fmts def add(self, x, y): # Add multiple data points into the figure if not hasattr(y, "__len__"): y = [y] n = len(y) if not hasattr(x, "__len__"): x = [x] * n if not self.X: self.X = [[] for _ in range(n)] if not self.Y: self.Y = [[] for _ in range(n)] for i, (a, b) in enumerate(zip(x, y)): if a is not None and b is not None: self.X[i].append(a) self.Y[i].append(b) self.axes[0].cla() for x, y, fmt in zip(self.X, self.Y, self.fmts): self.axes[0].plot(x, y, fmt) self.config_axes() display.display(self.fig) # 通过以下两行代码实现了在PyCharm中显示动图 # plt.draw() # plt.pause(interval=0.001) display.clear_output(wait=True) plt.rcParams['font.sans-serif'] = ['Microsoft YaHei'] class Timer: def __init__(self): self.times = [] self.start() def start(self): self.tik = time.time() def stop(self): self.times.append(time.time() - self.tik) return self.times[-1] def sum(self): """Return the sum of time.""" return sum(self.times) def load_data_fashion_mnist(batch_size, resize=None): """下载 Fashion-MNIST 数据集,然后将其加载到内存中""" trans = [transforms.ToTensor()] if resize: trans.insert(0, transforms.Resize(resize)) trans = transforms.Compose(trans) mnist_train = torchvision.datasets.FashionMNIST( root="../data", train=True, transform=trans, download=False) mnist_test = torchvision.datasets.FashionMNIST( root="../data", train=False, transform=trans, download=False) return (data.DataLoader(mnist_train, batch_size, shuffle=True, num_workers=4), data.DataLoader(mnist_test, batch_size, shuffle=False, num_workers=4)) def train(net, train_iter, test_iter, num_epochs, lr, device): def init_weights(m): if type(m) == nn.Linear or type(m) == nn.Conv2d: nn.init.xavier_uniform_(m.weight) net.apply(init_weights) print('training on', torch.cuda.get_device_name(device)) net.to(device) optimizer = torch.optim.SGD(net.parameters(), lr=lr) loss = nn.CrossEntropyLoss() animator = Animator(xlabel='epoch', xlim=[1, num_epochs], legend=['train loss', 'train acc', 'test acc']) timer, num_batches = Timer(), len(train_iter) for epoch in range(num_epochs): # 训练损失之和,训练准确率之和,样本数 metric = Accumulator(3) net.train() for i, (X, y) in enumerate(train_iter): timer.start() optimizer.zero_grad() X, y = X.to(device), y.to(device) y_hat = net(X) l = loss(y_hat, y) l.backward() optimizer.step() with torch.no_grad(): metric.add(l * X.shape[0], accuracy(y_hat, y), X.shape[0]) timer.stop() train_l = metric[0] / metric[2] train_acc = metric[1] / metric[2] if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1: animator.add(epoch + (i + 1) / num_batches, (train_l, train_acc, None)) test_acc = evaluate_accuracy_gpu(net, test_iter) animator.add(epoch + 1, (None, None, test_acc)) plt.title(f'loss {train_l:.3f}, train acc {train_acc:.3f}, test acc {test_acc:.3f}\n' f'{metric[2] * num_epochs / timer.sum():.1f} examples/sec on {str(device)}') plt.show() def conv_block(input_channels, num_channels): return nn.Sequential( nn.BatchNorm2d(input_channels), nn.ReLU(), nn.Conv2d(input_channels, num_channels, kernel_size=3, padding=1)) class DenseBlock(nn.Module): def __init__(self, num_convs, input_channels, num_channels): super(DenseBlock, self).__init__() layer = [] for i in range(num_convs): layer.append(conv_block( num_channels * i + input_channels, num_channels)) self.net = nn.Sequential(*layer) def forward(self, X): for blk in self.net: Y = blk(X) # 连接通道维度上每个块的输入和输出 X = torch.cat((X, Y), dim=1) return X blk = DenseBlock(2, 3, 10) X = torch.randn(4, 3, 8, 8) Y = blk(X) print(f'得到通道数为 3 + 2 * 10 = 23 的输出 : {Y.shape}') def transition_block(input_channels, num_channels): return nn.Sequential( nn.BatchNorm2d(input_channels), nn.ReLU(), nn.Conv2d(input_channels, num_channels, kernel_size=1), nn.AvgPool2d(kernel_size=2, stride=2)) blk = transition_block(23, 10) print(f'输出的通道数减为 10,高和宽均减半 : {blk(Y).shape}') b1 = nn.Sequential( nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3), nn.BatchNorm2d(64), nn.ReLU(), nn.MaxPool2d(kernel_size=3, stride=2, padding=1)) # num_channels为当前的通道数 num_channels, growth_rate = 64, 32 num_convs_in_dense_blocks = [4, 4, 4, 4] blks = [] for i, num_convs in enumerate(num_convs_in_dense_blocks): blks.append(DenseBlock(num_convs, num_channels, growth_rate)) # 上一个稠密块的输出通道数 num_channels += num_convs * growth_rate # 在稠密块之间添加一个转换层,使通道数量减半 if i != len(num_convs_in_dense_blocks) - 1: blks.append(transition_block(num_channels, num_channels // 2)) num_channels = num_channels // 2 net = nn.Sequential( b1, *blks, nn.BatchNorm2d(num_channels), nn.ReLU(), nn.AdaptiveAvgPool2d((1, 1)), nn.Flatten(), nn.Linear(num_channels, 10)) lr, num_epochs, batch_size = 0.1, 5, 256 train_iter, test_iter = load_data_fashion_mnist(batch_size, resize=96) train(net, train_iter, test_iter, num_epochs, lr, mydevice)
文中部分知识参考:B 站 —— 跟李沐学AI;百度百科
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。