赞
踩
最近开始学习神经网络的量化,经过一番探索,终于在基于LeNet的手写体识别模型上成功量化,并且量化后的参数全为8bit无符号整型,可以直接进行FPGA的部署。
首先下载anaconda进行安装,安装完成后创建一个pytorch环境:
conda create -n pytorch python=3.8
其中python的版本可以任意选择,创建完成后进入pytorch官网查找需要的版本(https://pytorch.org/),如果要装GPU版的,还需要安装对应电脑显卡版本的CUDA,由于LeNet网络较为简单,经测试,使用官方的手写数字训练50个epoch只需要花10多分钟,因此可以只用CPU版本的pytorch。
进入创建的conda环境,复制上图的命令即可安装:
conda activate pytorch
conda install pytorch torchvision torchaudio cpuonly -c pytorch
创建完成后,可以下载pycharm作为IDE,并将刚在conda中创建的pytorch环境配置为pycharm工程环境。
直接上代码:
import torch from torch import nn class LeNet(nn.Module): def __init__(self): super(LeNet, self).__init__() self.c1 = nn.Conv2d(in_channels=1, out_channels=6, kernel_size=5, padding=2) self.Relu = nn.ReLU() self.s2 = nn.AvgPool2d(kernel_size=2, stride=2) self.c3 = nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5) self.s4 = nn.AvgPool2d(kernel_size=2, stride=2) self.c5 = nn.Conv2d(in_channels=16, out_channels=120, kernel_size=5) self.flatten = nn.Flatten() self.f6 = nn.Linear(120, 84) self.output = nn.Linear(84, 10) def forward(self, x): x = self.Relu(self.c1(x)) x = self.s2(x) x = self.Relu(self.c3(x)) x = self.s4(x) x = self.c5(x) x = self.flatten(x) x = self.f6(x) x = self.output(x) return x
为了量化的方便,将LeNet原本的激活函数Sigmoid改为了Relu,经测试能够有效识别。
训练函数:
import torch from torch import nn from net import LeNet from torch.optim import lr_scheduler from torchvision import datasets, transforms import os import matplotlib.pyplot as plt # 解决画图中文显示问题 plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False # 数据转化为tensor格式 data_transform = transforms.Compose([transforms.ToTensor()]) # 加载训练数据集 train_dataset = datasets.MNIST(root='./data', train=True, transform=data_transform, download=True) train_dataloader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=32, shuffle=True) # 加载测试数据集 test_dataset = datasets.MNIST(root='./data', train=False, transform=data_transform, download=True) test_dataloader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=1000, shuffle=True) device = "cuda" if torch.cuda.is_available() else "cpu" # 调用net定义的模型 model = LeNet().to(device) # 定义损失函数(交叉熵) loss_fn = nn.CrossEntropyLoss() # 定义一个优化器 optimizer = torch.optim.SGD(model.parameters(), lr=1e-3, momentum=0.9) # 学习率每隔10轮,变为原来的0.5 lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5) # 定义画图函数 def matplot_loss(train_loss, val_loss): plt.plot(train_loss, label='train_loss') plt.plot(val_loss, label='val_loss') plt.legend(loc='best') plt.ylabel('loss') plt.xlabel('epoch') plt.title("训练集和验证集loss值对比图") plt.show() def matplot_acc(train_acc, val_acc): plt.plot(train_acc, label='train_acc') plt.plot(val_acc, label='val_acc') plt.legend(loc='best') plt.ylabel('acc') plt.xlabel('epoch') plt.title("训练集和验证集acc值对比图") plt.show() # 定义训练函数 def train(dataloader, model, loss_fn, optimizer): model.train() loss, current, n = 0.0, 0.0, 0 for batch, (X, y) in enumerate(dataloader): # 前向传播 X, y = X.to(device), y.to(device) output = model(X) cur_loss = loss_fn(output, y) _, pred = torch.max(output, axis=1) cur_acc = torch.sum(y == pred)/output.shape[0] optimizer.zero_grad() cur_loss.backward() optimizer.step() loss += cur_loss.item() current += cur_acc.item() n = n + 1 train_loss = loss / n train_acc = current / n print("train_loss" + str(train_loss)) print("train_acc" + str(train_acc)) return train_loss, train_acc def val(dataloader, model, loss_fn): model.eval() loss, current, n = 0.0, 0.0, 0 with torch.no_grad(): for X, y in dataloader: # 前向传播 X, y = X.to(device), y.to(device) output = model(X) cur_loss = loss_fn(output, y) _, pred = torch.max(output, axis=1) cur_acc = torch.sum(y == pred) / output.shape[0] loss += cur_loss.item() current += cur_acc.item() n = n + 1 val_loss = loss / n val_acc = current / n print("val_loss" + str(val_loss)) print("val_acc" + str(val_acc)) return val_loss, val_acc # 开始训练 epoch = 50 min_acc = 0 loss_train = [] acc_train = [] loss_val = [] acc_val = [] for t in range(epoch): print(f'epoch{t+1}\n------------------') train_loss, train_acc = train(train_dataloader, model, loss_fn, optimizer) val_loss, val_acc = val(test_dataloader, model, loss_fn) loss_train.append(train_loss) acc_train.append(train_acc) loss_val.append(val_loss) acc_val.append(val_acc) # 保存最好的模型权重 if val_acc >= min_acc: folder = 'save_model' if not os.path.exists(folder): os.mkdir(folder) min_acc = val_acc print('save best model') torch.save(model.state_dict(), folder+'/best_model.pth') if t == epoch - 1: torch.save(model.state_dict(), folder+'/last_model.pth') matplot_loss(loss_train, loss_val) matplot_acc(acc_train, acc_val) print('Done!')
训练过程中没有对输入数据进行正则化处理,让输入数据保持在0~1之间,这也是为了后续的量化方便。读者感兴趣的话可以测试训练后的模型,本文重点主要讲量化过程,这里不附测试代码了,文章最后也会贴我的github网址,量化的所有代码和模型文件都在里面。
量化和反量化的原理可以参考神经网络量化入门–基本原理,由于该文章中的量化参数S(scale)仍然是浮点型,因此本文在此基础上将其改写为整型右移位的形式,例如0.0015可以近似用3 >>11 表示,量化网络的代码:
import torch from torch import nn import torch.nn.functional as F # 定义量化和反量化函数 def quantize_tensor(x, num_bits=8): qmin = 0. qmax = 2.**num_bits - 1. min_val, max_val = x.min(), x.max() scale = (max_val - min_val) / (qmax - qmin) initial_zero_point = qmin - min_val / scale zero_point = 0 if initial_zero_point < qmin: zero_point = qmin elif initial_zero_point > qmax: zero_point = qmax else: zero_point = initial_zero_point zero_point = int(zero_point) q_x = zero_point + x / scale q_x.clamp_(qmin, qmax).round_() q_x = q_x.round().int() return q_x, scale, zero_point def dequantize_tensor(q_x, scale, zero_point): return scale * (q_x.float() - zero_point) # 定义量化卷积和量化全连接 class QuantLinear(nn.Linear): def __init__(self, in_features, out_features, bias=True): super(QuantLinear, self).__init__(in_features, out_features, bias) # out = conv(in * (q_x - z_p) + bias * 256 / scale) * scale self.quant_flag = False self.scale = None self.shift = None self.zero_point = None self.qx_minus_zeropoint = None self.bias_divide_scale = None def linear_quant(self, quantize_bit=8): self.weight.data, self.scale, self.zero_point = quantize_tensor(self.weight.data, num_bits=quantize_bit) self.quant_flag = True def load_quant(self, scale, shift, zero_point): # true_scale = scale >> shift self.scale = scale self.shift = shift self.zero_point = zero_point self.qx_minus_zeropoint = self.weight - self.zero_point self.qx_minus_zeropoint = self.qx_minus_zeropoint.round().int() self.bias_divide_scale = (self.bias * 256) / (self.scale / 2 ** self.shift) self.bias_divide_scale = self.bias_divide_scale.round().int() self.quant_flag = True def forward(self, x): if self.quant_flag == True: # weight = dequantize_tensor(self.weight, self.scale, self.zero_point) # return F.linear(x, weight, self.bias) return (F.linear(x, self.qx_minus_zeropoint, self.bias_divide_scale) * self.scale) >> self.shift else: return F.linear(x, self.weight, self.bias) class QuantAvePool2d(nn.AvgPool2d): def __init__(self, kernel_size, stride, padding=0): super(QuantAvePool2d, self).__init__(kernel_size, stride, padding) self.quant_flag = False def pool_quant(self, quantize_bit=8): self.quant_flag = True def load_quant(self): self.quant_flag = True def forward(self, x): if self.quant_flag == True: return F.avg_pool2d(x.float(), self.kernel_size, self.stride, self.padding).round().int() else: return F.avg_pool2d(x, self.kernel_size, self.stride, self.padding) class QuantConv2d(nn.Conv2d): def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, dilation=1, groups=1, bias=True): super(QuantConv2d, self).__init__(in_channels, out_channels, kernel_size, stride, padding, dilation, groups, bias) self.quant_flag = False self.scale = None self.shift = None self.zero_point = None self.qx_minus_zeropoint = None self.bias_divide_scale = None def conv_quant(self, quantize_bit=8): self.weight.data, self.scale, self.zero_point = quantize_tensor(self.weight.data, num_bits=quantize_bit) self.quant_flag = True def load_quant(self, scale, shift, zero_point): # true_scale = scale >> shift self.scale = scale self.shift = shift self.zero_point = zero_point self.qx_minus_zeropoint = self.weight - self.zero_point self.qx_minus_zeropoint = self.qx_minus_zeropoint.round().int() self.bias_divide_scale = (self.bias * 256) / (self.scale / 2 ** self.shift) self.bias_divide_scale = self.bias_divide_scale.round().int() self.quant_flag = True def forward(self, x): if self.quant_flag == True: # weight = dequantize_tensor(self.weight, self.scale, self.zero_point) # return F.conv2d(x, weight, self.bias, self.stride, # self.padding, self.dilation, self.groups) return (F.conv2d(x, self.qx_minus_zeropoint, self.bias_divide_scale, self.stride, self.padding, self.dilation, self.groups) * self.scale) >> self.shift else: return F.conv2d(x, self.weight, self.bias, self.stride, self.padding, self.dilation, self.groups) # 定义网络模型 class LeNet(nn.Module): # 初始化网络 def __init__(self): super(LeNet, self).__init__() self.c1 = QuantConv2d(in_channels=1, out_channels=6, kernel_size=5, padding=2) self.Relu = nn.ReLU() self.s2 = QuantAvePool2d(kernel_size=2, stride=2) self.c3 = QuantConv2d(in_channels=6, out_channels=16, kernel_size=5) self.s4 = QuantAvePool2d(kernel_size=2, stride=2) self.c5 = QuantConv2d(in_channels=16, out_channels=120, kernel_size=5) self.flatten = nn.Flatten() self.f6 = QuantLinear(120, 84) self.output = QuantLinear(84, 10) def forward(self, x): x = self.Relu(self.c1(x)) x = self.s2(x) x = self.Relu(self.c3(x)) x = self.s4(x) x = self.c5(x) x = self.flatten(x) x = self.f6(x) x = self.output(x) print(x) return x def linear_quant(self, quantize_bit=8): # Should be a less manual way to quantize # Leave it for the future self.c1.conv_quant(quantize_bit) self.s2.pool_quant(quantize_bit) self.c3.conv_quant(quantize_bit) self.s4.pool_quant(quantize_bit) self.c5.conv_quant(quantize_bit) self.f6.linear_quant(quantize_bit) self.output.linear_quant(quantize_bit) def load_quant(self, c1_sc: int, c1_sh: int, c1_zp: int, c3_sc: int, c3_sh: int, c3_zp: int, c5_sc: int, c5_sh: int, c5_zp: int, f6_sc: int, f6_sh: int, f6_zp: int, out_sc: int, out_sh: int, out_zp: int): self.c1.load_quant(c1_sc, c1_sh, c1_zp) self.s2.load_quant() self.c3.load_quant(c3_sc, c3_sh, c3_zp) self.s4.load_quant() self.c5.load_quant(c5_sc, c5_sh, c5_zp) self.f6.load_quant(f6_sc, f6_sh, f6_zp) self.output.load_quant(out_sc, out_sh, out_zp) if __name__ == "__main__": x = torch.rand([1, 1, 28, 28]).round().int() model = LeNet() model.linear_quant() model.eval() # y = model(x) with torch.no_grad(): model.load_quant(26, 2, 90, 26, 2, 90, 26, 2, 90, 26, 2, 90, 26, 2, 90) y = model(x)
看不懂的话可以先看github上的这个仓库https://github.com/mepeichun/Efficient-Neural-Network-Bilibili,这个作者在其中详细写了量化、剪枝和知识蒸馏的简单过程,本文的量化代码即是参考它来写的。但源代码仍只能够进行浮点推理,本文在此基础之上增加了整型的前向推理过程,具体的方式就是重写卷积、池化和全连接层,将它们的计算方式改为整型,在每一层中增加了load_quant方法,用于手动导入训练好的整型权重。由于pytorch需要卷积时的数据类型匹配,该网络的输入图像也需要是int型。
该网络使用linear_quant方法,利用第2节训练好的权重就可以进行第一步量化,即weight量化,量化和测试代码:
import torch from torch import nn from net_quant import LeNet device = "cuda" if torch.cuda.is_available() else "cpu" # 调用net定义的模型 model = LeNet().to(device) model.load_state_dict(torch.load("D:/ws_pytorch/LeNet5/save_model/best_model.pth")) # 量化 model.linear_quant() # 模型保存 folder = 'weight/quantization/' for name in model.state_dict(): # print("################" + name + "################") # print(model.state_dict()[name]) file = open(folder + name + ".txt", "w") file.write(str(model.state_dict()[name])) file.close() file = open(folder + "c1_scale_zero.txt", "w") file.write(str(model.c1.scale)) file.write("\n" + str(model.c1.zero_point)) file.close() file = open(folder + "c3_scale_zero.txt", "w") file.write(str(model.c3.scale)) file.write("\n" + str(model.c3.zero_point)) file.close() file = open(folder + "c5_scale_zero.txt", "w") file.write(str(model.c5.scale)) file.write("\n" + str(model.c5.zero_point)) file.close() file = open(folder + "f6_scale_zero.txt", "w") file.write(str(model.f6.scale)) file.write("\n" + str(model.f6.zero_point)) file.close() file = open(folder + "output_scale_zero.txt", "w") file.write(str(model.output.scale)) file.write("\n" + str(model.output.zero_point)) file.close()
代码中模型保存部分用于将量化过程中生成的权重和bias,以及S(scale)参数和Z(zero point)参数保存到txt文档中,方便进行测试和FPGA部署,至此所有的参数就训练完成,下一步通过导入权重和bias,以及调用load_quant方法导入每一层的S参数和Z参数,即可进行验证。
验证代码如下:
import torch from torch import nn from net_quant import LeNet import time import numpy as np from PIL import Image from torchvision import transforms from torchvision.datasets import ImageFolder from torch.autograd import Variable def read_8bit_img(filepath): # 读取8bit数据 image = Image.open(filepath).convert('L') resize = transforms.Resize([28, 28]) image = resize(image) image = np.copy(image) image = torch.tensor(image) image = Variable(torch.unsqueeze(torch.unsqueeze(image, dim=0).int(), dim=0).int()).to(device) image = image.clone().detach().to(device) return image def read_float_img(filepath): image = Image.open(filepath).convert('L') resize = transforms.Resize([28, 28]) image = resize(image) image = np.copy(image) image = torch.tensor(image) image = Variable(torch.unsqueeze(torch.unsqueeze(image, dim=0).float(), dim=0).float()).to(device) image = image.clone().detach().to(device) return image device = "cuda" if torch.cuda.is_available() else "cpu" # 调用net定义的模型 model1 = LeNet().to(device) model1.load_state_dict(torch.load("D:/ws_pytorch/LeNet5/save_model/best_model.pth")) model2 = LeNet().to(device) model2.load_state_dict(torch.load("D:/ws_pytorch/LeNet5/save_model/quant_model.pth")) model2.load_quant(23, 12, 99, 13, 12, 141, 3, 11, 128, 13, 13, 126, 13, 12, 127) # 定义损失函数(交叉熵) loss_fn = nn.CrossEntropyLoss() # 分类类别 classes = ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"] model1.eval() model2.eval() float_image1 = read_float_img('data/mydata/2/2.jpg') # float_image2 = read_float_img('data/mydata/4/4.jpg') byte_image1 = read_8bit_img('data/mydata/2/2.jpg') # byte_image2 = read_8bit_img('data/mydata/4/4.jpg') # 量化前测试 print("量化前测试") start1 = time.time() with torch.no_grad(): for i in range(1): pred = model1(float_image1) end1 = time.time() predicted= classes[torch.argmax(pred[0])] print(f'predicted:"{predicted}"') print("耗时" + str(end1 - start1)) print("#" * 20) # 量化后测试 # model2.linear_quant() print("量化后测试") start2 = time.time() with torch.no_grad(): for i in range(1): pred = model2(byte_image1) end2 = time.time() predicted = classes[torch.argmax(pred[0])] print(f'predicted:"{predicted}"') print("耗时" + str(end2 - start2))
该段代码对量化前后的网络进行了准确度和时间的对比,通过load_quant函数手动加载量化过程中的S参数和Z参数,其中每一层的参数有3个,共有5个卷积核全连接层,因此输入的数据有15个。3个参数分别对应的是:整型尺度scale,移位个数shift和零点zero_point。其中scale >> shift即为实际的S参数,这两个值需要在上一节得到S参数的基础之上进行手动计算,这里会造成一定的精度损失。贴上本文每一层S和Z参数的训练结果:
layer | S | P | scale | shift |
---|---|---|---|---|
c1 | 0.0056 | 99 | 23 | 12 |
c3 | 0.0032 | 141 | 13 | 12 |
c5 | 0.0015 | 128 | 3 | 11 |
f6 | 0.0016 | 126 | 13 | 13 |
output | 0.0032 | 127 | 13 | 12 |
需要注意的是,原本的卷积可以写为:
o u t = ∑ k e r n e l i n ∗ w e i g h t + b i a s out = \sum_{kernel}in*weight + bias out=kernel∑in∗weight+bias
本文量化后的卷积公式为:
o u t = ( ( ∑ k e r n e l i n ∗ ( w e i g h t − Z ) + b i a s ∗ g a i n s c a l e > > s h i f t ) ∗ s c a l e ) > > s h i f t out = ((\sum_{kernel}in*(weight-Z)+\frac{bias * gain}{scale >> shift}) * scale) >> shift out=((kernel∑in∗(weight−Z)+scale>>shiftbias∗gain)∗scale)>>shift
其中的每个参数都为整型,第二个公式中的weight为量化后的整型权重,gain为输入图像的增益,由于本文中的输入数据由原始的0 ~ 1浮点变为了0 ~ 255整型,因此为了保证计算结果实现整体的缩放,bias一项也需要乘以gain,这里gain为256。
上述代码的测试结果如下:
图中输出了output层的输出tensor、预测结果与所用时间,可以看到量化后的模型的输出tensor相较于量化之前并没有差距太大,检测时间大大降低。附上github网址https://github.com/bird1and1fish/LeNet5。但笔者还没来得及对代码进行整理。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。