赞
踩
# 读取数据 import os import numpy as np import torch import PIL from PIL import Image import cv2 from torch import nn device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f'当前使用的device为{device}') data_dir = "./resizedImages" bus_dir = data_dir + os.sep +'bus' car_dir = data_dir + os.sep +'car' truck_dir = data_dir + os.sep +'truck' if not os.path.exists(data_dir): os.mkdir(data_dir) if not os.path.exists(car_dir): os.mkdir(car_dir) if not os.path.exists(bus_dir): os.mkdir(bus_dir) if not os.path.exists(truck_dir): os.mkdir(truck_dir) width = 200 height = width path = "./车辆分类数据集/bus" busData = os.listdir(path) for img_item in busData: if img_item != "desktop.ini": img = Image.open(path + os.sep + img_item) img = img.resize((width, height), Image.LANCZOS) img.save(bus_dir + os.sep + img_item) path = "./车辆分类数据集/car" carData = os.listdir(path) for img_item in carData: if img_item == "desktop.ini": continue img = Image.open(path + os.sep + img_item) img = img.resize((width, height), Image.LANCZOS) img.save(car_dir + os.sep + img_item) path = "./车辆分类数据集/truck" truckData = os.listdir(path) for img_item in truckData: if img_item == "desktop.ini": continue img = Image.open(path + os.sep + img_item) img = img.resize((width, height), Image.LANCZOS) img.save(truck_dir + os.sep + img_item) # 已缩放处理过,存储在根目录下resizedImages文件夹中各分类下
# 超参数设定
epochs = 10
lr = 0.001
batch_size = 32
# 划分数据集 import random import shutil import torchvision import torchvision.transforms as transforms from torch.utils.data import DataLoader from torchvision.datasets import ImageFolder train_dir = './Classfication-train_data' test_dir = './Classfication-test_data' if not os.path.exists(train_dir) or not os.path.exists(test_dir): os.mkdir(train_dir) os.mkdir(test_dir) vehicles = os.listdir(data_dir) bus_dir = train_dir + os.sep +'bus' car_dir = train_dir + os.sep +'car' truck_dir = train_dir + os.sep +'truck' if not os.path.exists(car_dir): os.mkdir(car_dir) if not os.path.exists(bus_dir): os.mkdir(bus_dir) if not os.path.exists(truck_dir): os.mkdir(truck_dir) bus_dir = test_dir + os.sep +'bus' car_dir = test_dir + os.sep +'car' truck_dir = test_dir + os.sep +'truck' if not os.path.exists(car_dir): os.mkdir(car_dir) if not os.path.exists(bus_dir): os.mkdir(bus_dir) if not os.path.exists(truck_dir): os.mkdir(truck_dir) # 项目结构: # ./(根目录) # resizedImages/ # bus/ # car/ # truck/ # Classfication-train_data/ # bus/ # car/ # truck/ # Classfication-test_data/ # bus/ # car/ # truck/ split_rate = 0.8 # 训练集:测试集=8:2 # 开始拷贝 for folder in vehicles: # car, bus, truck print(folder) file_names = np.array(os.listdir(os.path.join(data_dir,folder))) train_number = int(len(file_names) * split_rate) total_number = list(range(len(file_names))) print(f"训练集数量: {train_number}, 测试集数量{len(file_names) - train_number}") if len(os.listdir(train_dir + os.sep + folder)) != 0: continue random.shuffle(total_number) # 打乱下标 # 获得打乱下标后的训练集和测试集的数据 train_files = file_names[total_number[0: train_number]] test_files = file_names[total_number[train_number:]] for file in train_files: if file == "desktop.ini": continue path_train = os.path.join(data_dir, folder) + os.sep + file path_train_copy = train_dir + os.sep + folder + os.sep + file shutil.copy(path_train, path_train_copy) # 将文件复制到训练集文件夹 for file in test_files: if file == "desktop.ini": continue path_test = os.path.join(data_dir, folder) + os.sep + file path_test_copy = test_dir + os.sep + folder + os.sep + file shutil.copy(path_test, path_test_copy) # 讲文件复制到测试集文件夹 transform = transforms.Compose( [transforms.ToTensor(), transforms.Normalize((0.5,0.5,0.5), (0.5,0.5,0.5))]) # 归一化 # 相关参数 train_data = ImageFolder(root=train_dir, transform=transform) train_loader = DataLoader(dataset=train_data, shuffle=True, batch_size=batch_size) test_data = ImageFolder(root=test_dir, transform=transform) test_loader = DataLoader(dataset=test_data, shuffle=True, batch_size=batch_size) print() print(f"训练集数量 :{len(train_data)}") print(f"测试集数量 :{len(test_data)}")
from torch import nn # 定义卷积 def corr2d(X, K, kernel_size): ''' X, shape(batch_size,H,W) kernel_size, shape(k_h, k_w) ''' batch_size, H,W = X.shape k_h, k_w = kernel_size Y = torch.zeros((batch_size, H-k_h+1, W-k_w+1)).to(device) for i in range(Y.shape[0]): for j in range(Y.shape[1]): Y[:, i, j] = (X[:,i: i+k_h, j:j+k_w]*K).sum(dim=2).sum(dim=1) return Y # 多通道输入 # 卷积核通道数 = 输入通道数 def corr2d_multi_in(X, K, kernel_size): res = corr2d(X[:, 0, :, :], K[0, :, :],kernel_size) for i in range(1, X.shape[1]): res += corr2d(X[:,i,:,:], K[i,:,:], kernel_size) return res # 多通道输出 # 输出通道数 = 卷积核个数 def corr2d_multi_in_out(X, K, kernel_size): return torch.stack([corr2d_multi_in(X, k, kernel_size) for k in K]) # 将卷积运算封装成卷积层 class myConv2d(nn.Module): def __init__(self, in_channels, out_channels, kernel_size): super(myConv2d, self).__init__() if isinstance(kernel_size, int): self.kernel_size = (kernel_size, kernel_size) self.weight = nn.Parameter(torch.randn((out_channels, in_channels) + self.kernel_size)).to(device) self.bias = nn.Parameter(torch.randn(out_channels, 1,1)) def forward(self, x): return corr2d_multi_in_out(x, self.weight, self.kernel_size) + self.bias
# 手动实现二维 CNN import torch.nn.functional as F class myCNN(torch.nn.Module): def __init__(self, num_classes): super(myCNN, self).__init__() self.conv = nn.Sequential( myConv2d(in_channels=3, out_channels=32, kernel_size=3), torch.nn.BatchNorm2d(32), torch.nn.ReLU(inplace=True) ) self.fc = torch.nn.Linear(32, num_classes) def forward(self, X): # 根据实验要求,图片经过一层卷积,输出(batch_size, C_out, H, W) out = self.conv(X) # 使用平均池化层将图片大小变为1*1 out = F.avg_pool2d(out, 198) # 图片大小为200*200,卷积后为198 out = out.squeeze() # 输入到全连接层 out = self.fc(out) return out
# nn实现二维卷积 class nnCNN(nn.Module): def __init__(self, num_classes): super(nnCNN,self).__init__() # 三层卷积 self.conv = nn.Sequential( # 这里设置了填充,因为图片大小为 200*200,运算过程出现了小数,这里是考虑不周到的地方 nn.Conv2d(in_channels=3,out_channels=32,kernel_size=3,stride=1,padding=1), nn.BatchNorm2d(32), nn.ReLU(inplace=True), nn.Conv2d(in_channels=32,out_channels=64,kernel_size=3,stride=1,padding=1), nn.BatchNorm2d(64), nn.ReLU(inplace=True), nn.Conv2d(in_channels=64,out_channels=128,kernel_size=3,stride=1,padding=1), nn.BatchNorm2d(128), nn.ReLU(inplace=True) ) self.fc = nn.Linear(128,num_classes) def forward(self,X): out= self.conv(X) out = F.avg_pool2d(out, kernel_size=out.size()[2:]) out = out.view(out.size(0), -1) out = self.fc(out) return out
# 定义训练函数 import time from torch.nn import CrossEntropyLoss from torch.optim import SGD criterion = CrossEntropyLoss() criterion = criterion.to(device) # 测试加训练 def train_with_test(net, train_loader, test_loader,batch_size=32, epochs=10, lr=0.001, optimizer=None, disp=True, skip=False): if disp: print(net) if optimizer == None: optimizer = SGD(net.parameters(), lr=lr) train_batch_num = len(train_loader) train_loss_list = [] train_acc= [] train_loss_in_batch, train_acc_in_batch = [], [] test_loss_list =[] test_acc=[] begin_time = time.time() for epoch in range(epochs): train_loss, acc = 0, 0 train_sample_num = 0 # acc_num = 0 jmp = 25 for batch_idx, (data, target) in enumerate(train_loader): # batch_size = 32 if len(data) != batch_size: continue # if batch_idx == 2: # break if skip and batch_idx < jmp: continue if skip and batch_idx % 2 == 0: continue # 加快训练,卷积训练太慢,主要看手动卷积的效果 tb1 = time.time() data, target = data.to(device), target.to(device) prediction = net(data) loss = criterion(prediction, target) optimizer.zero_grad() loss.backward() optimizer.step() # 循环内损失率 train_loss_in_batch.append(loss.to('cpu')/len(prediction)) # 循环损失率 train_loss += loss.item() # 循环内正确率 train_batch_acc_num = (prediction.argmax(dim=1)==target).sum() train_acc_in_batch.append(train_batch_acc_num.to('cpu')/len(prediction)) # 循环正确率 acc += train_batch_acc_num train_sample_num += len(prediction) tb2=time.time() if skip or batch_idx % 4 == 0: print(f"\t|Train-in-batch:{batch_idx+1}/{len(train_loader)}, loss:{train_loss/train_sample_num}, acc:{acc/train_sample_num}, 耗时:{tb2-tb1}s") train_acc.append(acc.to('cpu')/train_sample_num) train_loss_list.append(train_loss/train_sample_num) # 测试 test_batch_num = len(test_loader) total_loss = 0 sample_num2=0 acc2 = 0 with torch.no_grad(): for batch_idx, (data, target) in enumerate (test_loader): if len(data) != batch_size: continue # if batch_idx == 2: # break data, target = data.to(device), target.to(device) prediction = net(data) loss = criterion(prediction, target) # 循环总损失 total_loss += loss.item() sample_num2 += len(prediction) # 循环总正确 acc_num2 = (prediction.argmax(dim=1)==target).sum() acc2 += acc_num2 test_loss_list.append(total_loss/sample_num2) test_acc.append(acc2.to('cpu')/sample_num2) print('***epoch: %d***train loss: %.5f***train acc:%5f***test loss:%.5f***test acc:%5f' % (epoch + 1, train_loss_list[epoch], train_acc[epoch],test_loss_list[epoch],test_acc[epoch])) print() end_time = time.time() print('%d轮总用时: %.2fs'%(epochs, end_time-begin_time)) # 返回全部损失,准确率 return train_loss_in_batch, train_acc_in_batch, train_loss_list, train_acc, test_loss_list, test_acc # 测试函数(单独) def test_epoch(net, data_loader, disp=False): # net.eval() if disp: print(net) test_batch_num = len(data_loader) total_loss = 0 acc = 0 sample_num = 0 with torch.no_grad(): for batch_idx, (data, target) in enumerate (data_loader): if len(data) != 32: continue # if batch_idx == 2: # break tb1 = time.time() data, target = data.to(device), target.to(device) prediction = net(data) loss = criterion(prediction, target) total_loss += loss.item() sample_num += len(prediction) acc_num = (prediction.argmax(dim=1)==target).sum() acc += acc_num tb2 = time.time() print(f"\t|Test-in-batch:{batch_idx+1}/{len(data_loader)}, loss:{total_loss/sample_num}, acc:{acc/sample_num}, 耗时:{tb2-tb1}s") loss = total_loss/test_batch_num test_acc = acc/sample_num return loss, test_acc
# 小批量数据,减少GPU压力
train_loader16 = DataLoader(dataset=train_data, shuffle=True, batch_size=16)
test_loader16 = DataLoader(dataset=test_data, shuffle=True, batch_size=16)
# 定义画图函数 import matplotlib.pyplot as plt def plot_batching(train_loss, train_acc): plt.figure(1) plt.xlabel('batch') plt.ylabel('loss') plt.title('Loss-Rate') # plt.plot([i for i in range(len(train_loss))], train_loss[i], 'b-', label=u'train_loss') mylist = [] for i in range(len(train_loss)): mylist.append(train_loss[i].detach().numpy()) plt.plot([i for i in range(len(train_loss))], mylist, 'b-', label=u'train_loss') plt.legend() plt.figure(2) plt.xlabel("batch") plt.ylabel("Acc") plt.title("Accuracy") plt.plot([i for i in range(len(train_acc))], train_acc, 'r-', label=u'train_acc') plt.legend() def plot_learning(train_loss, train_acc, test_loss, test_acc): plt.figure(1) plt.xlabel('epoch') plt.ylabel('loss') plt.title('Loss-Rate') plt.plot([i for i in range(len(train_loss))], train_loss, 'b-', label=u'train_loss') plt.legend() plt.plot([i for i in range(len(test_loss))], test_loss, 'r-', label=u'test_loss') plt.legend() plt.figure(2) plt.xlabel("epoch") plt.ylabel("Acc") plt.title("Accuracy") plt.plot([i for i in range(len(train_acc))], train_acc, 'b-', label=u'train_acc') plt.legend() plt.plot([i for i in range(len(test_acc))], test_acc, 'r-', label=u'test_acc') plt.legend() plt.show()
# 手动实现
torch.cuda.empty_cache()
net11 = myCNN(3).to(device)
train_l_111, train_acc_111, train_l_112, train_acc_112, test_l11, test_acc11 = train_with_test(
net11, train_loader, test_loader, batch_size=32, epochs=1, lr=0.01, optimizer=None, disp=True, skip=True)
# 图表展示以batch为单位的训练结果
plot_batching(train_l_111, train_acc_111)
# nn实现卷积
torch.cuda.empty_cache()
net12 = nnCNN(3).to(device)
train_l_121, train_acc_121, train_l_122, train_acc_122, test_l12, test_acc12 = train_with_test(
net12, train_loader, test_loader, epochs=10, lr=0.001)
plot_learning(train_l_122, train_acc_122, test_l12, test_acc12)
# 修改卷积层数 # 只用一层卷积 class nnCNNof1(nn.Module): def __init__(self, num_classes): super(nnCNNof1,self).__init__() # 三层卷积 self.conv = nn.Sequential( # 这里设置了填充,因为图片大小为 200*200,运算过程出现了小数,这里是考虑不周到的地方 nn.Conv2d(in_channels=3,out_channels=32,kernel_size=3,stride=1,padding=1), nn.BatchNorm2d(32), nn.ReLU(inplace=True), ) self.fc = nn.Linear(32,num_classes) def forward(self,X): out= self.conv(X) out = F.avg_pool2d(out, kernel_size=out.size()[2:]) out = out.view(out.size(0), -1) out = self.fc(out) return out torch.cuda.empty_cache() net13 = nnCNNof1(3).to(device) train_l_131, train_acc_131, train_l_132, train_acc_132, test_l13, test_acc13 = train_with_test( net13, train_loader, test_loader, epochs=10, lr=lr)
# 作图对比
plot_learning(train_l_122, train_acc_122, test_l12, test_acc12)
plot_learning(train_l_132, train_acc_132, test_l13, test_acc13)
# 修改学习率
lr12 = 0.01
# lr = 0.001
torch.cuda.empty_cache()
net14 = nnCNN(3).to(device)
train_l_141, train_acc_141, train_l_142, train_acc_142, test_l14, test_acc14 = train_with_test(
net14, train_loader, test_loader, epochs=10, lr=lr12)
# 作图对比
plot_learning(train_l_122, train_acc_122, test_l12, test_acc12)
plot_learning(train_l_142, train_acc_142, test_l14, test_acc14)
# nn实现空洞卷积 class nnDCNN(nn.Module): def __init__(self,num_classes): super(nnDCNN,self).__init__() # 三层卷积 self.conv=nn.Sequential( # 添加padding,否则出现小数,下次一定要把图片缩放成2的幂 nn.Conv2d(in_channels=3,out_channels=32,kernel_size=3,stride=1,padding=1,dilation=1), nn.BatchNorm2d(32), nn.ReLU(inplace=True), nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=2,dilation=2), nn.BatchNorm2d(64), nn.ReLU(inplace=True), nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=4,dilation=5), nn.BatchNorm2d(128), nn.ReLU(inplace=True), ) #输出层,将通道数变为分类数量 self.fc = nn.Linear(128,num_classes) def forward(self,x): out = self.conv(x) out = F.avg_pool2d(out, kernel_size=out.size()[2:]) out = out.squeeze() out = self.fc(out) return out
torch.cuda.empty_cache()
net21 = nnDCNN(num_classes=3).to(device)
train_l_211, train_acc_211, train_l_212, train_acc_212, test_l21, test_acc21 = train_with_test(
net21, train_loader, test_loader, epochs=10, lr=0.001)
plot_learning(train_l_212, train_acc_212, test_l21, test_acc21)
plot_learning(train_l_122, train_acc_122, test_l12, test_acc12)
# 修改层数 class nnDCNNof6(nn.Module): def __init__(self, num_classes): super(nnDCNNof6, self).__init__() # 六层卷积 self.conv = nn.Sequential( nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=1, padding=1, dilation=1), nn.BatchNorm2d(32), nn.ReLU(inplace=True), nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=2, dilation=2), nn.BatchNorm2d(64), nn.ReLU(inplace=True), nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=4, dilation=5), nn.BatchNorm2d(128), nn.ReLU(inplace=True), nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=1, padding=8, dilation=10), nn.BatchNorm2d(256), nn.ReLU(inplace=True), nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, stride=1, padding=16, dilation=15), nn.BatchNorm2d(512), nn.ReLU(inplace=True), nn.Conv2d(in_channels=512, out_channels=1024, kernel_size=3, stride=1, padding=32, dilation=20), nn.BatchNorm2d(1024), nn.ReLU(inplace=True), nn.Conv2d(in_channels=1024, out_channels=2048, kernel_size=3, stride=1, padding=64, dilation=25), nn.BatchNorm2d(2048), nn.ReLU(inplace=True), ) # 输出层,将通道数变为分类数量 self.fc = nn.Linear(2048, num_classes) def forward(self, x): out = self.conv(x) out = F.avg_pool2d(out, kernel_size=out.size()[2:]) out = out.squeeze() out = self.fc(out) return out torch.cuda.empty_cache() net22 = nnDCNNof6(num_classes=3).to(device) train_l_221, train_acc_221, train_l_222, train_acc_222, test_l22, test_acc22 = train_with_test( net22, train_loader, test_loader, epochs=10, lr=0.001)
# 作图比对
plot_learning(train_l_212, train_acc_212, test_l21, test_acc21)
plot_learning(train_l_222, train_acc_222, test_l22, test_acc22)
# 修改batch_size
torch.cuda.empty_cache()
net23 = nnDCNN(num_classes=3).to(device)
train_l_231, train_acc_231, train_l_232, train_acc_232, test_l23, test_acc23 = train_with_test(
net23, train_loader16, test_loader16,batch_size=16, epochs=10, lr=0.001)
# 作图比对
plot_learning(train_l_212, train_acc_212, test_l21, test_acc21)
plot_learning(train_l_232, train_acc_232, test_l23, test_acc23)
实现给定结构的残差网络,在 至少一个数据集上进行实验, 从训练时间、预测精度、Loss 变化等角度分析实验结果(最
好使用图表展示)
class ResBlock(nn.Module): def __init__(self, in_channels, out_channels, stride=[1, 1], padding=1): super(ResBlock, self).__init__() self.layer = nn.Sequential( nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride[0], padding=padding, bias=False), nn.BatchNorm2d(out_channels), nn.ReLU(inplace=True), nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride[1], padding=padding, bias=False), nn.BatchNorm2d(out_channels) ) self.shortcut = nn.Sequential() if stride[0] != 1 or in_channels != out_channels: self.shortcut = nn.Sequential( nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride[0], bias=False), nn.BatchNorm2d(out_channels) ) def forward(self, x): out = self.layer(x) shortcut = self.shortcut(x) if shortcut.size(1) != out.size(1): shortcut = F.pad(shortcut, (0, 0, 0, 0, 0, out.size(1) - shortcut.size(1))) out += shortcut out = F.relu(out) return out
class nnResNet(nn.Module): def __init__(self, num_classes=3) -> None: super(nnResNet, self).__init__() self.in_channels = 64 self.conv1 = nn.Sequential( nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False), nn.BatchNorm2d(64) ) self.conv2 = self.set_layer(64, [[1, 1], [1, 1]]) self.conv3 = self.set_layer(128, [[2, 1], [1, 1]]) self.conv4 = self.set_layer(256, [[2, 1], [1, 1]]) self.conv5 = self.set_layer(512, [[2, 1], [1, 1]]) self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) self.fc = nn.Sequential( nn.Linear(512, 256), nn.Linear(256, 128), nn.Linear(128, num_classes) ) def set_layer(self, out_channels, strides): layers = [] for stride in strides: layers.append(BasicBlock(self.in_channels, out_channels, stride)) self.in_channels = out_channels return nn.Sequential(*layers) # 图片的大小为 200*200 def forward(self, x): out = self.conv1(x) # [32, 64, 200, 200] out = self.conv2(out) # [32, 64, 200, 200] out = self.conv3(out) # [32, 128, 100, 100] out = self.conv4(out) # [32, 256, 50, 50] out = self.conv5(out) # [32, 512, 25, 25] out = F.avg_pool2d(out, 25) # [32, 512, 1, 1] out = out.squeeze() # [32, 512] out = self.fc(out) return out
torch.cuda.empty_cache()
net31=nnResNet(num_classes=3).to(device)
train_l_311, train_acc_311, train_l_312, train_acc_312, test_l31, test_acc31 = train_with_test(
net31, train_loader16, test_loader16, batch_size=16, epochs=10, lr=0.001, disp=True, skip=True)
# 绘制训练过程
plot_learning(train_l_312, train_acc_312, test_l31, test_acc31)
首先,本次实验学习了卷积、空洞卷积和残差网络三种模型,掌握了手动实现它们的训练过程,了解了它们的性质和优势。
其次,实验中有很多模型训练的地方值得优化,例如处理数据集图像时索性取最大值,将每张图片都缩放至200*200,这一点造成了维数降低时出现了小数,通过引入padding解决了这一问题。更加妥善的做法是采用2的幂,而不是随便一个数字。
最后,使用超参数的能力有所提升,包括对batch、学习率和模型深度的理解,增加了对机器学习实践的经验。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。