赞
踩
源码(包含数据)获取链接:
百度网盘
:
链接:https://pan.baidu.com/s/1kdbZ1t5hU55B4az6tDPO6Q 提取码:1111
gitee仓库
:
https://gitee.com/gaoqiangmath/Reset
flower_data 分为训练数据train以及测试数据valid
其中训练数据中有102的文件,每个文件对应一种花,里面有对应花的图片
其中测试数据valid中也对应102个文件,每个文件对应一种花,里面有对应花的图片
cat_to_name_json 里面存储的是花的具体名字
import os
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import torch
from torch import nn
import torch.optim as optim
import torchvision
#pip install torchvision
from torchvision import transforms, models, datasets
#https://pytorch.org/docs/stable/torchvision/index.html
import imageio
import time
import warnings
warnings.filterwarnings("ignore")
import random
import sys
import copy
import json
from PIL import Image
# 设置好读取数据的目录
data_dir = './flower_data/'
train_dir = data_dir + '/train'
valid_dir = data_dir + '/valid'
'''
制作好数据源:
data_transforms中指定了所有图像预处理操作(数据增强)
ImageFolder假设所有的文件按文件夹保存好,每个文件夹下面存贮同一类别的图片,文件夹的名字为分类的名字
'''
data_transforms = {
'train':
transforms.Compose([
transforms.Resize([96, 96]), #设置好每张图片大小相同
transforms.RandomRotation(45),#随机旋转,-45到45度之间随机选
transforms.CenterCrop(64),#将图片随机裁剪为64x64
transforms.RandomHorizontalFlip(p=0.5),#随机水平翻转 选择一个概率概率
transforms.RandomVerticalFlip(p=0.5),#随机垂直翻转
transforms.ColorJitter(brightness=0.2, contrast=0.1, saturation=0.1, hue=0.1),#参数1为亮度,参数2为对比度,参数3为饱和度,参数4为色相
transforms.RandomGrayscale(p=0.025),#概率转换成灰度率,3通道就是R=G=B
transforms.ToTensor(), #由于PyTorch框架数据格式必须为Tensor,因此转换数据为Tensor
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])#对三通道分别做均值,标准差
]),
'valid':
transforms.Compose([
transforms.Resize([64, 64]), #必须与训练集中transforms.CenterCrop(64)大小相同
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]),
}
#指定batch_size 为128 也即每次从集合中拿128个样本点进行训练或者测试
batch_size = 128
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x), data_transforms[x]) for x in ['train', 'valid']}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size, shuffle=True) for x in ['train', 'valid']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'valid']} #可有可无
class_names = image_datasets['train'].classes #将训练集中每个文件名作为每个花的类名
结果呈现:
读取花的实际名字:
with open('cat_to_name.json', 'r') as f:
cat_to_name = json.load(f)
# 是否用GPU训练
train_on_gpu = torch.cuda.is_available()
if not train_on_gpu:
print('CUDA is not available. Training on CPU ...')
else:
print('CUDA is available! Training on GPU ...')
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
feature_extract = True #把其他层都冻住,只保留输出层自己修改 (迁移学习)
def set_parameter_requires_grad(model, feature_extracting): #此函数后面把feature_extract 带入到feature_extracting
if feature_extracting: #设置模型中所有参数不可反向传播(也即不可修改,包括输出层,后面会重新定义输出层)
for param in model.parameters():
param.requires_grad = False
model_name = 'resnet' #可选的比较多 ['resnet', 'alexnet', 'vgg', 'squeezenet', 'densenet', 'inception'] 一般先选择最基本的模型
model_ft = models.resnet18()#18层的能快点,条件好点的也可以选152
model_ft
得到的model_ft 部分如下:
由此可知model_ft输出层(fc)中输入为512,输出为1000,也即默认是做1000分类的,但本例子中花一共只有102个分类,因此需要修改输出层。
# 定义函数自定义修改模型输出层大小
def initialize_model(model_name, num_classes, feature_extract, use_pretrained=True):
model_ft = models.model_name(pretrained=use_pretrained) #采用迁移学习,模型参数采用预训练的参数
set_parameter_requires_grad(model_ft, feature_extract) #冻层,只允许输出层修改
num_ftrs = model_ft.fc.in_features #得到模型输出层的输入大小
model_ft.fc = nn.Linear(num_ftrs, num_classes)#重新定义了输出层,设定输出层输出为num_classes,也即做102分类,因为本例花的类别只有102类
input_size = 64#输入大小根据自己配置来
return model_ft, input_size
#得到模型,并且保存
model_ft, input_size = initialize_model(model_name, 102, feature_extract, use_pretrained=True)
##GPU还是CPU计算
model_ft = model_ft.to(device)
## 模型保存,名字自己起
filename='checkpoint.pth'
##由于迁移学习策略中设置了所有参数的feature_extract为True,param.requires_grad为False,但是由于自定义了输出层,因此只有输出层中的参数保留在了params_to_update中
params_to_update = model_ft.parameters()
print("Params to learn:")
if feature_extract:
params_to_update = []
for name,param in model_ft.named_parameters():
if param.requires_grad == True:
#由于initialize_model()函数重新定义了输出层,因此输出层中的param.requires_grad == True
params_to_update.append(param)
print("\t",name)
else:
for name,param in model_ft.named_parameters():
if param.requires_grad == True:
print("\t",name)
因此得到 params_to_update 的结果为:输出层中的权重和偏置(fc.weight fc.bias)
因此,自定义输出层完成,模型 model_ft 部分结果如下:
由此可以看出自定义输出层完成,模型 model_ft 输出层(fc)的输出成功修改为 102,因为花的类别一共只有102个类别。
优化器设置,也就是设置参数优化的算法,常见的梯度下降算法有SGD、Adam等(绝大多数情况采用Adam),另外也设置学习率。最用用以优化模型参数。
本例中需要优化的为模型 model_ft 输出层的参数,模型修改 一节中把输出层的参数(fc.weight fc.bias)保存在 params_to_update 中。
# 优化器设置
optimizer_ft = optim.Adam(params_to_update, lr=1e-2)#params_to_update表示需要训练的参数 lr表示学习率
scheduler = optim.lr_scheduler.StepLR(optimizer_ft, step_size=10, gamma=0.1)#定义学习率衰减策略 学习率每7个epoch衰减成原来的1/10
criterion = nn.CrossEntropyLoss() #损失函数 (分类任务,用的是交叉熵)
def train_model(model, dataloaders, criterion, optimizer, num_epochs=25,filename='best.pt'):
#model:要训练的模型
#dataloaders:训练的数据,在数据读取与预处理中已经定义
#criterion:损失函数,在优化器设置中已经定义
#optimizer:优化器,在优化器设置中已经定义
#num_epochs:迭代次数,可以自行修改
#filename:保存文件的路径
#计算时间
since = time.time()
#也要记录最好的那一次的准确率
best_acc = 0
#模型也得放到你的CPU或者GPU
model.to(device)
#训练过程中打印一堆损失和指标
val_acc_history = [] #训练集准确率
train_acc_history = [] #测试集准确率
train_losses = [] #训练集损失
valid_losses = [] #测试集损失
#学习率
LRs = [optimizer.param_groups[0]['lr']] #为了打印出学习率,可有可无
#最好的那次模型,后续会变的,先初始化
best_model_wts = copy.deepcopy(model.state_dict()) #用于保存模型最好的权重参数
#一个个epoch来遍历
for epoch in range(num_epochs):
print('Epoch {}/{}'.format(epoch, num_epochs - 1))
print('-' * 10)
# 训练和验证
for phase in ['train', 'valid']:
if phase == 'train':
model.train() # 训练
else:
model.eval() # 验证
running_loss = 0.0
running_corrects = 0
# 把数据都取个遍
for inputs, labels in dataloaders[phase]:
inputs = inputs.to(device)#放到你的CPU或GPU
labels = labels.to(device)
# 清零
optimizer.zero_grad()
# 只有训练的时候计算和更新梯度
outputs = model(inputs)
loss = criterion(outputs, labels)
_, preds = torch.max(outputs, 1)
# 训练阶段更新权重
if phase == 'train':
loss.backward()
optimizer.step()
# 计算损失
running_loss += loss.item() * inputs.size(0)#0表示batch那个维度
running_corrects += torch.sum(preds == labels.data)#预测结果最大的和真实值是否一致
epoch_loss = running_loss / len(dataloaders[phase].dataset)#算平均
epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)
time_elapsed = time.time() - since#一个epoch我浪费了多少时间
print('Time elapsed {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))
# 得到最好那次的模型
if phase == 'valid' and epoch_acc > best_acc:
best_acc = epoch_acc
best_model_wts = copy.deepcopy(model.state_dict())
state = {
'state_dict': model.state_dict(),#字典里key就是各层的名字,值就是训练好的权重
'best_acc': best_acc,
'optimizer' : optimizer.state_dict(),
}
torch.save(state, filename)
if phase == 'valid':
val_acc_history.append(epoch_acc)
valid_losses.append(epoch_loss)
#scheduler.step(epoch_loss)#学习率衰减
if phase == 'train':
train_acc_history.append(epoch_acc)
train_losses.append(epoch_loss)
print('Optimizer learning rate : {:.7f}'.format(optimizer.param_groups[0]['lr']))
LRs.append(optimizer.param_groups[0]['lr'])
print()
scheduler.step()#学习率衰减
time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
print('Best val Acc: {:4f}'.format(best_acc))
# 训练完后用最好的一次当做模型最终的结果,等着一会测试
model.load_state_dict(best_model_wts)
return model, val_acc_history, train_acc_history, valid_losses, train_losses, LRs
由于前面迁移学习的设定,模型训练只训练了输出层:
model_ft, val_acc_history, train_acc_history, valid_losses, train_losses, LRs = train_model(model_ft, dataloaders, criterion, optimizer_ft, num_epochs=20)
得到的结果如下:(选择迭代二十次,最终准确率只有0.36)
将迁移学习策略修改为解冻所有层(即所有层的参数都自己训练):
for param in model_ft.parameters(): #将所有层解冻
param.requires_grad = True
# 再继续训练所有的参数,学习率调小一点
optimizer = optim.Adam(model_ft.parameters(), lr=1e-3)
scheduler = optim.lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)
# 损失函数
criterion = nn.CrossEntropyLoss()
# 加载之前训练好的权重参数----》将上一步仅输出层训练的结果加载进来
checkpoint = torch.load(filename)
best_acc = checkpoint['best_acc']
model_ft.load_state_dict(checkpoint['state_dict']) #当然也可以不加载,即模型训练(所有层)与模型训练(所有层)完全分离开来,形成两种不同的策略。
#模型训练(所有层)
model_ft, val_acc_history, train_acc_history, valid_losses, train_losses, LRs = train_model(model_ft, dataloaders, criterion, optimizer, num_epochs=10,)
得出的结果如下:若进行所有层训练,那么仅迭代9次,预测准确率就已经到达0.64!
如果模型在运行过程中,发生了异常,或者未运行完成而关闭,那么可以直接加载模型
'''
model_ft, input_size = initialize_model(model_name, 102, feature_extract, use_pretrained=True)
# GPU模式
model_ft = model_ft.to(device)
# 保存文件的名字
filename='best.pt'
# 加载模型
checkpoint = torch.load(filename)
best_acc = checkpoint['best_acc']
model_ft.load_state_dict(checkpoint['state_dict'])
'''
当模型进行训练了以后,可以进一步基于验证集进行预测(这里我为了方便,还是用了测试集)
# 得到一个batch的测试数据
dataiter = iter(dataloaders['valid']) #得到验证集的一个迭代器(这里本来应该用验证集的,我为了方便直接用了测试集)
images, labels = dataiter.next() #一批一批地取数(例如验证集一共有1000个样本点,一批一批地取,一次取128个样本点)
model_ft.eval() #模型开启验证模型,仅用作验证,并不要更新模型参数
if train_on_gpu:
output = model_ft(images.cuda())
else:
output = model_ft(images)
print(output.shape)
得到的结果为:torch.size([128,102]) 对应一个batch(128)中128个样本点分别为102个类别花的概率
进一步,选出最大的概率,最大概率对应的花,即为样本点预测的花的类别
_, preds_tensor = torch.max(output, 1)
preds = np.squeeze(preds_tensor.numpy()) if not train_on_gpu else np.squeeze(preds_tensor.cpu().numpy())
preds
某个样本点属于不同花的概率结果如下:
def im_convert(tensor):
""" 展示数据"""
image = tensor.to("cpu").clone().detach()
image = image.numpy().squeeze()
image = image.transpose(1,2,0)
image = image * np.array((0.229, 0.224, 0.225)) + np.array((0.485, 0.456, 0.406))
image = image.clip(0, 1)
return image
fig=plt.figure(figsize=(20, 20))
columns =4
rows = 2
for idx in range (columns*rows):
ax = fig.add_subplot(rows, columns, idx+1, xticks=[], yticks=[])
plt.imshow(im_convert(images[idx]))
ax.set_title("{} ({})".format(cat_to_name[str(preds[idx])], cat_to_name[str(labels[idx].item())]),
color=("green" if cat_to_name[str(preds[idx])]==cat_to_name[str(labels[idx].item())] else "red"))
plt.show()
其中若预测对,字体为绿色,预测错,字体为红色:
#导包
import os
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import torch
from torch import nn
import torch.optim as optim
import torchvision
#pip install torchvision
from torchvision import transforms, models, datasets
#https://pytorch.org/docs/stable/torchvision/index.html
import imageio
import time
import warnings
warnings.filterwarnings("ignore")
import random
import sys
import copy
import json
from PIL import Image
# 设置好读取数据的目录
data_dir = './flower_data/'
train_dir = data_dir + '/train'
valid_dir = data_dir + '/valid
#数据增强
'''
制作好数据源:
data_transforms中指定了所有图像预处理操作(数据增强)
ImageFolder假设所有的文件按文件夹保存好,每个文件夹下面存贮同一类别的图片,文件夹的名字为分类的名字
'''
data_transforms = {
'train':
transforms.Compose([
transforms.Resize([96, 96]), #设置好每张图片大小相同
transforms.RandomRotation(45),#随机旋转,-45到45度之间随机选
transforms.CenterCrop(64),#将图片随机裁剪为64x64
transforms.RandomHorizontalFlip(p=0.5),#随机水平翻转 选择一个概率概率
transforms.RandomVerticalFlip(p=0.5),#随机垂直翻转
transforms.ColorJitter(brightness=0.2, contrast=0.1, saturation=0.1, hue=0.1),#参数1为亮度,参数2为对比度,参数3为饱和度,参数4为色相
transforms.RandomGrayscale(p=0.025),#概率转换成灰度率,3通道就是R=G=B
transforms.ToTensor(), #由于PyTorch框架数据格式必须为Tensor,因此转换数据为Tensor
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])#对三通道分别做均值,标准差
]),
'valid':
transforms.Compose([
transforms.Resize([64, 64]), #必须与训练集中transforms.CenterCrop(64)大小相同
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]),
}
#指定batch_size 为128 也即每次从集合中拿128个样本点进行训练或者测试
batch_size = 128
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x), data_transforms[x]) for x in ['train', 'valid']}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size, shuffle=True) for x in ['train', 'valid']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'valid']} #可有可无
class_names = image_datasets['train'].classes #将训练集中每个文件名作为每个花的类名
with open('cat_to_name.json', 'r') as f: #读取花的实际名字
cat_to_name = json.load(f)
# 是否用GPU训练
train_on_gpu = torch.cuda.is_available()
if not train_on_gpu:
print('CUDA is not available. Training on CPU ...')
else:
print('CUDA is available! Training on GPU ...')
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
#迁移学习策略----》冻层
feature_extract = True
def set_parameter_requires_grad(model, feature_extracting): #此函数后面把feature_extract 带入到feature_extracting
if feature_extracting: #设置模型中所有参数不可反向传播(也即不可修改,包括输出层,后面会重新定义输出层)
for param in model.parameters():
param.requires_grad = False #把其他层都冻住
#模型选择
model_name = 'resnet' #可选的比较多 ['resnet', 'alexnet', 'vgg', 'squeezenet', 'densenet', 'inception'] 一般先选择最基本的模型
model_ft = models.resnet18()#18层的能快点,条件好点的也可以选152
model_ft
#模型修改(自定义输出层)
def initialize_model(model_name, num_classes, feature_extract, use_pretrained=True):
model_ft = models.model_name(pretrained=use_pretrained) #采用迁移学习,模型参数采用预训练的参数
set_parameter_requires_grad(model_ft, feature_extract) #冻层,只允许输出层修改
num_ftrs = model_ft.fc.in_features #得到模型输出层的输入大小
model_ft.fc = nn.Linear(num_ftrs, num_classes)#重新定义了输出层,设定输出层输出为num_classes,也即做102分类,因为本例花的类别只有102类
input_size = 64#输入大小根据自己配置来
return model_ft, input_size
#得到模型,并且保存
model_ft, input_size = initialize_model(model_name, 102, feature_extract, use_pretrained=True)
##GPU还是CPU计算
model_ft = model_ft.to(device)
## 模型保存,名字自己起
filename='checkpoint.pth'
##由于迁移学习策略中设置了所有参数的feature_extract为True,param.requires_grad为False,但是由于自定义了输出层,因此只有输出层中的参数保留在了params_to_update中
params_to_update = model_ft.parameters()
print("Params to learn:")
if feature_extract:
params_to_update = []
for name,param in model_ft.named_parameters():
if param.requires_grad == True:
#由于initialize_model()函数重新定义了输出层,因此输出层中的param.requires_grad == True
params_to_update.append(param)
print("\t",name)
else:
for name,param in model_ft.named_parameters():
if param.requires_grad == True:
print("\t",name)
# 优化器设置
optimizer_ft = optim.Adam(params_to_update, lr=1e-2)#params_to_update表示需要训练的参数 lr表示学习率
scheduler = optim.lr_scheduler.StepLR(optimizer_ft, step_size=10, gamma=0.1)#定义学习率衰减策略 学习率每7个epoch衰减成原来的1/10
criterion = nn.CrossEntropyLoss() #损失函数 (分类任务,用的是交叉熵)
#模型训练
def train_model(model, dataloaders, criterion, optimizer, num_epochs=25,filename='best.pt'):
#model:要训练的模型
#dataloaders:训练的数据,在数据读取与预处理中已经定义
#criterion:损失函数,在优化器设置中已经定义
#optimizer:优化器,在优化器设置中已经定义
#num_epochs:迭代次数,可以自行修改
#filename:保存文件的路径
#计算时间
since = time.time()
#也要记录最好的那一次的准确率
best_acc = 0
#模型也得放到你的CPU或者GPU
model.to(device)
#训练过程中打印一堆损失和指标
val_acc_history = [] #训练集准确率
train_acc_history = [] #测试集准确率
train_losses = [] #训练集损失
valid_losses = [] #测试集损失
#学习率
LRs = [optimizer.param_groups[0]['lr']] #为了打印出学习率,可有可无
#最好的那次模型,后续会变的,先初始化
best_model_wts = copy.deepcopy(model.state_dict()) #用于保存模型最好的权重参数
#一个个epoch来遍历
for epoch in range(num_epochs):
print('Epoch {}/{}'.format(epoch, num_epochs - 1))
print('-' * 10)
# 训练和验证
for phase in ['train', 'valid']:
if phase == 'train':
model.train() # 训练
else:
model.eval() # 验证
running_loss = 0.0
running_corrects = 0
# 把数据都取个遍
for inputs, labels in dataloaders[phase]:
inputs = inputs.to(device)#放到你的CPU或GPU
labels = labels.to(device)
# 清零
optimizer.zero_grad()
# 只有训练的时候计算和更新梯度
outputs = model(inputs)
loss = criterion(outputs, labels)
_, preds = torch.max(outputs, 1)
# 训练阶段更新权重
if phase == 'train':
loss.backward()
optimizer.step()
# 计算损失
running_loss += loss.item() * inputs.size(0)#0表示batch那个维度
running_corrects += torch.sum(preds == labels.data)#预测结果最大的和真实值是否一致
epoch_loss = running_loss / len(dataloaders[phase].dataset)#算平均
epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)
time_elapsed = time.time() - since#一个epoch我浪费了多少时间
print('Time elapsed {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))
# 得到最好那次的模型
if phase == 'valid' and epoch_acc > best_acc:
best_acc = epoch_acc
best_model_wts = copy.deepcopy(model.state_dict())
state = {
'state_dict': model.state_dict(),#字典里key就是各层的名字,值就是训练好的权重
'best_acc': best_acc,
'optimizer' : optimizer.state_dict(),
}
torch.save(state, filename)
if phase == 'valid':
val_acc_history.append(epoch_acc)
valid_losses.append(epoch_loss)
#scheduler.step(epoch_loss)#学习率衰减
if phase == 'train':
train_acc_history.append(epoch_acc)
train_losses.append(epoch_loss)
print('Optimizer learning rate : {:.7f}'.format(optimizer.param_groups[0]['lr']))
LRs.append(optimizer.param_groups[0]['lr'])
print()
scheduler.step()#学习率衰减
time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
print('Best val Acc: {:4f}'.format(best_acc))
# 训练完后用最好的一次当做模型最终的结果,等着一会测试
model.load_state_dict(best_model_wts)
return model, val_acc_history, train_acc_history, valid_losses, train_losses, LRs
model_ft, val_acc_history, train_acc_history, valid_losses, train_losses, LRs = train_model(model_ft, dataloaders, criterion, optimizer_ft, num_epochs=20)
#模型结果呈现
# 得到一个batch的测试数据
dataiter = iter(dataloaders['valid']) #得到验证集的一个迭代器(这里本来应该用验证集的,我为了方便直接用了测试集)
images, labels = dataiter.next() #一批一批地取数(例如验证集一共有1000个样本点,一批一批地取,一次取128个样本点)
model_ft.eval() #模型开启验证模型,仅用作验证,并不要更新模型参数
if train_on_gpu:
output = model_ft(images.cuda())
else:
output = model_ft(images)
print(output.shape)
_, preds_tensor = torch.max(output, 1)
preds = np.squeeze(preds_tensor.numpy()) if not train_on_gpu else np.squeeze(preds_tensor.cpu().numpy())
preds
def im_convert(tensor):
"""模型展示"""
image = tensor.to("cpu").clone().detach()
image = image.numpy().squeeze()
image = image.transpose(1,2,0)
image = image * np.array((0.229, 0.224, 0.225)) + np.array((0.485, 0.456, 0.406))
image = image.clip(0, 1)
return image
fig=plt.figure(figsize=(20, 20))
columns =4
rows = 2
for idx in range (columns*rows):
ax = fig.add_subplot(rows, columns, idx+1, xticks=[], yticks=[])
plt.imshow(im_convert(images[idx]))
ax.set_title("{} ({})".format(cat_to_name[str(preds[idx])], cat_to_name[str(labels[idx].item())]),
color=("green" if cat_to_name[str(preds[idx])]==cat_to_name[str(labels[idx].item())] else "red"))
plt.show()
#导包
import os
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import torch
from torch import nn
import torch.optim as optim
import torchvision
#pip install torchvision
from torchvision import transforms, models, datasets
#https://pytorch.org/docs/stable/torchvision/index.html
import imageio
import time
import warnings
warnings.filterwarnings("ignore")
import random
import sys
import copy
import json
from PIL import Image
# 设置好读取数据的目录
data_dir = './flower_data/'
train_dir = data_dir + '/train'
valid_dir = data_dir + '/valid
#数据增强
'''
制作好数据源:
data_transforms中指定了所有图像预处理操作(数据增强)
ImageFolder假设所有的文件按文件夹保存好,每个文件夹下面存贮同一类别的图片,文件夹的名字为分类的名字
'''
data_transforms = {
'train':
transforms.Compose([
transforms.Resize([96, 96]), #设置好每张图片大小相同
transforms.RandomRotation(45),#随机旋转,-45到45度之间随机选
transforms.CenterCrop(64),#将图片随机裁剪为64x64
transforms.RandomHorizontalFlip(p=0.5),#随机水平翻转 选择一个概率概率
transforms.RandomVerticalFlip(p=0.5),#随机垂直翻转
transforms.ColorJitter(brightness=0.2, contrast=0.1, saturation=0.1, hue=0.1),#参数1为亮度,参数2为对比度,参数3为饱和度,参数4为色相
transforms.RandomGrayscale(p=0.025),#概率转换成灰度率,3通道就是R=G=B
transforms.ToTensor(), #由于PyTorch框架数据格式必须为Tensor,因此转换数据为Tensor
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])#对三通道分别做均值,标准差
]),
'valid':
transforms.Compose([
transforms.Resize([64, 64]), #必须与训练集中transforms.CenterCrop(64)大小相同
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]),
}
#指定batch_size 为128 也即每次从集合中拿128个样本点进行训练或者测试
batch_size = 128
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x), data_transforms[x]) for x in ['train', 'valid']}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size, shuffle=True) for x in ['train', 'valid']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'valid']} #可有可无
class_names = image_datasets['train'].classes #将训练集中每个文件名作为每个花的类名
with open('cat_to_name.json', 'r') as f: #读取花的实际名字
cat_to_name = json.load(f)
# 是否用GPU训练
train_on_gpu = torch.cuda.is_available()
if not train_on_gpu:
print('CUDA is not available. Training on CPU ...')
else:
print('CUDA is available! Training on GPU ...')
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
#迁移学习策略----》解冻
feature_extract = True
def set_parameter_requires_grad(model, feature_extracting): #此函数后面把feature_extract 带入到feature_extracting
if feature_extracting:
for param in model.parameters():
param.requires_grad = True #解冻
#模型选择
model_name = 'resnet' #可选的比较多 ['resnet', 'alexnet', 'vgg', 'squeezenet', 'densenet', 'inception'] 一般先选择最基本的模型
model_ft = models.resnet18()#18层的能快点,条件好点的也可以选152
model_ft
#模型修改(自定义输出层)
def initialize_model(model_name, num_classes, feature_extract, use_pretrained=True):
model_ft = models.model_name(pretrained=use_pretrained) #采用迁移学习,模型参数采用预训练的参数
set_parameter_requires_grad(model_ft, feature_extract) #解冻,所有层允许修改
num_ftrs = model_ft.fc.in_features #得到模型输出层的输入大小
model_ft.fc = nn.Linear(num_ftrs, num_classes)#重新定义了输出层,设定输出层输出为num_classes,也即做102分类,因为本例花的类别只有102类
input_size = 64#输入大小根据自己配置来
return model_ft, input_size
#得到模型,并且保存
model_ft, input_size = initialize_model(model_name, 102, feature_extract, use_pretrained=True)
##GPU还是CPU计算
model_ft = model_ft.to(device)
## 模型保存,名字自己起
filename='checkpoint.pth'
##由于迁移学习策略中设置了所有参数的feature_extract为True,param.requires_grad为True,因此所有的参数均保留在了params_to_update中
params_to_update = model_ft.parameters()
print("Params to learn:")
if feature_extract:
params_to_update = []
for name,param in model_ft.named_parameters():
if param.requires_grad == True
params_to_update.append(param)
print("\t",name)
else:
for name,param in model_ft.named_parameters():
if param.requires_grad == True:
print("\t",name)
# 优化器设置
optimizer_ft = optim.Adam(params_to_update, lr=1e-2)#params_to_update表示需要训练的参数 lr表示学习率
scheduler = optim.lr_scheduler.StepLR(optimizer_ft, step_size=10, gamma=0.1)#定义学习率衰减策略 学习率每7个epoch衰减成原来的1/10
criterion = nn.CrossEntropyLoss() #损失函数 (分类任务,用的是交叉熵)
#模型训练
def train_model(model, dataloaders, criterion, optimizer, num_epochs=25,filename='best.pt'):
#model:要训练的模型
#dataloaders:训练的数据,在数据读取与预处理中已经定义
#criterion:损失函数,在优化器设置中已经定义
#optimizer:优化器,在优化器设置中已经定义
#num_epochs:迭代次数,可以自行修改
#filename:保存文件的路径
#计算时间
since = time.time()
#也要记录最好的那一次的准确率
best_acc = 0
#模型也得放到你的CPU或者GPU
model.to(device)
#训练过程中打印一堆损失和指标
val_acc_history = [] #训练集准确率
train_acc_history = [] #测试集准确率
train_losses = [] #训练集损失
valid_losses = [] #测试集损失
#学习率
LRs = [optimizer.param_groups[0]['lr']] #为了打印出学习率,可有可无
#最好的那次模型,后续会变的,先初始化
best_model_wts = copy.deepcopy(model.state_dict()) #用于保存模型最好的权重参数
#一个个epoch来遍历
for epoch in range(num_epochs):
print('Epoch {}/{}'.format(epoch, num_epochs - 1))
print('-' * 10)
# 训练和验证
for phase in ['train', 'valid']:
if phase == 'train':
model.train() # 训练
else:
model.eval() # 验证
running_loss = 0.0
running_corrects = 0
# 把数据都取个遍
for inputs, labels in dataloaders[phase]:
inputs = inputs.to(device)#放到你的CPU或GPU
labels = labels.to(device)
# 清零
optimizer.zero_grad()
# 只有训练的时候计算和更新梯度
outputs = model(inputs)
loss = criterion(outputs, labels)
_, preds = torch.max(outputs, 1)
# 训练阶段更新权重
if phase == 'train':
loss.backward()
optimizer.step()
# 计算损失
running_loss += loss.item() * inputs.size(0)#0表示batch那个维度
running_corrects += torch.sum(preds == labels.data)#预测结果最大的和真实值是否一致
epoch_loss = running_loss / len(dataloaders[phase].dataset)#算平均
epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)
time_elapsed = time.time() - since#一个epoch我浪费了多少时间
print('Time elapsed {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))
# 得到最好那次的模型
if phase == 'valid' and epoch_acc > best_acc:
best_acc = epoch_acc
best_model_wts = copy.deepcopy(model.state_dict())
state = {
'state_dict': model.state_dict(),#字典里key就是各层的名字,值就是训练好的权重
'best_acc': best_acc,
'optimizer' : optimizer.state_dict(),
}
torch.save(state, filename)
if phase == 'valid':
val_acc_history.append(epoch_acc)
valid_losses.append(epoch_loss)
#scheduler.step(epoch_loss)#学习率衰减
if phase == 'train':
train_acc_history.append(epoch_acc)
train_losses.append(epoch_loss)
print('Optimizer learning rate : {:.7f}'.format(optimizer.param_groups[0]['lr']))
LRs.append(optimizer.param_groups[0]['lr'])
print()
scheduler.step()#学习率衰减
time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
print('Best val Acc: {:4f}'.format(best_acc))
# 训练完后用最好的一次当做模型最终的结果,等着一会测试
model.load_state_dict(best_model_wts)
return model, val_acc_history, train_acc_history, valid_losses, train_losses, LRs
model_ft, val_acc_history, train_acc_history, valid_losses, train_losses, LRs = train_model(model_ft, dataloaders, criterion, optimizer_ft, num_epochs=20)
#模型结果呈现
# 得到一个batch的测试数据
dataiter = iter(dataloaders['valid']) #得到验证集的一个迭代器(这里本来应该用验证集的,我为了方便直接用了测试集)
images, labels = dataiter.next() #一批一批地取数(例如验证集一共有1000个样本点,一批一批地取,一次取128个样本点)
model_ft.eval() #模型开启验证模型,仅用作验证,并不要更新模型参数
if train_on_gpu:
output = model_ft(images.cuda())
else:
output = model_ft(images)
print(output.shape)
_, preds_tensor = torch.max(output, 1)
preds = np.squeeze(preds_tensor.numpy()) if not train_on_gpu else np.squeeze(preds_tensor.cpu().numpy())
preds
def im_convert(tensor):
"""模型展示"""
image = tensor.to("cpu").clone().detach()
image = image.numpy().squeeze()
image = image.transpose(1,2,0)
image = image * np.array((0.229, 0.224, 0.225)) + np.array((0.485, 0.456, 0.406))
image = image.clip(0, 1)
return image
fig=plt.figure(figsize=(20, 20))
columns =4
rows = 2
for idx in range (columns*rows):
ax = fig.add_subplot(rows, columns, idx+1, xticks=[], yticks=[])
plt.imshow(im_convert(images[idx]))
ax.set_title("{} ({})".format(cat_to_name[str(preds[idx])], cat_to_name[str(labels[idx].item())]),
color=("green" if cat_to_name[str(preds[idx])]==cat_to_name[str(labels[idx].item())] else "red"))
plt.show()
模型训练好后,需要进一步将模型部署到服务端,以供客户端使用。
模型训练好后,需构造结构如下:
其中flower_data 为花的数据
best.pth 为建模好的模型
flask_server.py 为部署在服务端的代码
flask_predict.py 为部署在客户端的代码
进一步写 flask_server.py 的代码
``python
#导包
import io
import json
import flask
import torch
import torch
import torch.nn.functional as F
from PIL import Image
from torch import nn
#from torchvision import transforms as T
from torchvision import transforms, models, datasets
from torch.autograd import Variable
# 初始化Flask app
app = flask.Flask(__name__)
model = None
use_gpu = False
# 加载模型进来
def load_model():
"""Load the pre-trained model, you can use your model just as easily.
"""
global model
#这里我们直接加载官方工具包里提供的训练好的模型(代码会自动下载)括号内参数为是否下载模型对应的配置信息
model = models.resnet18()
num_ftrs = model.fc.in_features
model.fc = nn.Sequential(nn.Linear(num_ftrs, 102)) # 类别数自己根据自己任务来
#print(model)
checkpoint = torch.load('best.pth')
model.load_state_dict(checkpoint['state_dict'])
#将模型指定为测试格式
model.eval()
#是否使用gpu
if use_gpu:
model.cuda()
#数据预处理(目的是将客户端输入进来的图片预处理成目标的图片格式,也即做模型建模时候的图片格式)
def prepare_image(image, target_size):
"""Do image preprocessing before prediction on any data.
:param image: original image
:param target_size: target image size
:return:
preprocessed image
"""
#针对不同模型,image的格式不同,但需要统一至RGB格式
if image.mode != 'RGB':
image = image.convert("RGB")
# Resize the input image and preprocess it.(按照所使用的模型将输入图片的尺寸修改,并转为tensor)
image = transforms.Resize(target_size)(image)
image = transforms.ToTensor()(image)
# Convert to Torch.Tensor and normalize. mean与std (RGB三通道)这里的参数和数据集中是对应的,训练过程中一致
image = transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])(image) #这个要和模型建模的时候设置的参数一致
# Add batch_size axis.增加一个维度,用于按batch测试 本次这里一次测试一张
image = image[None]
if use_gpu:
image = image.cuda()
return Variable(image, volatile=True) #不需要求导
# 开启服务 这里的predict只是一个名字,可自定义
@app.route("/predict", methods=["POST"])
def predict():
# Initialize the data dictionary that will be returned from the view.
#做一个标志,刚开始无图像传入时为false,传入图像时为true
data = {"success": False}
# 如果收到请求
if flask.request.method == 'POST':
#判断是否为图像
if flask.request.files.get("image"):
# Read the image in PIL format
# 将收到的图像进行读取
image = flask.request.files["image"].read()
image = Image.open(io.BytesIO(image)) #二进制数据
# 利用上面的预处理函数将读入的图像进行预处理
image = prepare_image(image, target_size=(64, 64))
preds = F.softmax(model(image), dim=1)
results = torch.topk(preds.cpu().data, k=3, dim=1)
results = (results[0].cpu().numpy(), results[1].cpu().numpy())
#将data字典增加一个key,value,其中value为list格式
data['predictions'] = list()
# Loop over the results and add them to the list of returned predictions
for prob, label in zip(results[0][0], results[1][0]):
#label_name = idx2label[str(label)]
r = {"label": str(label), "probability": float(prob)}
#将预测结果添加至data字典
data['predictions'].append(r)
# Indicate that the request was a success.
data["success"] = True
# 将最终结果以json格式文件传出
return flask.jsonify(data)
"""
test_json = {
"status_code": 200,
"success": {
"message": "image uploaded",
"code": 200
},
"video":{
"video_name":opt['source'].split('/')[-1],
"video_path":opt['source'],
"description":"1",
"length": str(hour)+','+str(minute)+','+str(round(second,4)),
"model_object_completed":model_flag
}
"status_txt": "OK"
}
response = requests.post(
'http://xxx.xxx.xxx.xxx:8090/api/ObjectToKafka/',,
data={'json': str(test_json)})
"""
if __name__ == '__main__':
print("Loading PyTorch model and Flask starting server ...")
print("Please wait until server has fully started")
#先加载模型
load_model()
#再开启服务
app.run(port='5012')
运行flask_server.py 代码后(开启服务端),结果如下:开辟了红框中的一个端口
进一步写 flask_predict.py 的代码,以便给客户端发送请求
import requests
import argparse
# url和端口 这个端口要和开启后的服务端显示的端口一致
flask_url = 'http://127.0.0.1:5012/predict'
def predict_result(image_path):
#啥方法都行
image = open(image_path, 'rb').read()
payload = {'image': image}
#request发给server.
r = requests.post(flask_url, files=payload).json()
# 成功的话在返回.
if r['success']:
# 输出结果.
for (i, result) in enumerate(r['predictions']):
print('{}. {}: {:.4f}'.format(i + 1, result['label'],
result['probability']))
# 失败了就打印.
else:
print('Request failed')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Classification demo')
parser.add_argument('--file', default='./flower_data/train_filelist/image_06998.jpg', type=str, help='test image file') #default='./flower_data/train_filelist/image_06998.jpg' 这个就是自己想要预测的照片,可以是任何自己想预测的照片
args = parser.parse_args()
predict_result(args.file)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。