赞
踩
迁移学习(Transfer Learning):在一个任务上训练得到的模型包含的知识可以部分或全部地转移到另一个任务上。允许模型将从一个任务中学到的知识应用到另一个相关的任务中。适用于数据稀缺的情况,可减少对大量标记数据的需求。
迁移学习是一种机器学习方法,具体是指将已经在某一领域(或任务)学习到的知识或模型,应用到另一个不同但相关的领域(或任务)中,以提高在该新任务上的学习效率和效果。这种知识或模型的迁移可以包括网络参数、特征表示、数据间的关系等多种形式的知识。
迁移学习通常可以分为以下几种类型:
迁移学习的实现流程通常包括以下几个步骤:
狼狗数据集提取自ImageNet分类数据集
from download import download
dataset_url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/intermediate/Canidae_data.zip"
download(dataset_url, "./datasets-Canidae", kind="zip", replace=True)
输出数据集目录结构:
使用mindspore.dataset.ImageFolderDataset接口来加载数据集,并进行相关图像增强操作
import mindspore as ms import mindspore.dataset as ds import mindspore.dataset.vision as vision # 数据集目录路径 data_path_train = "./datasets-Canidae/data/Canidae/train/" data_path_val = "./datasets-Canidae/data/Canidae/val/" # 创建训练数据集 def create_dataset_canidae(dataset_path, usage): """数据加载""" data_set = ds.ImageFolderDataset(dataset_path, num_parallel_workers=workers, shuffle=True,) # 数据增强操作 mean = [0.485 * 255, 0.456 * 255, 0.406 * 255] std = [0.229 * 255, 0.224 * 255, 0.225 * 255] scale = 32 if usage == "train": # Define map operations for training dataset trans = [ vision.RandomCropDecodeResize(size=image_size, scale=(0.08, 1.0), ratio=(0.75, 1.333)), vision.RandomHorizontalFlip(prob=0.5), vision.Normalize(mean=mean, std=std), vision.HWC2CHW() ] else: # Define map operations for inference dataset trans = [ vision.Decode(), vision.Resize(image_size + scale), vision.CenterCrop(image_size), vision.Normalize(mean=mean, std=std), vision.HWC2CHW() ] # 数据映射操作 data_set = data_set.map( operations=trans, input_columns='image', num_parallel_workers=workers) # 批量操作 data_set = data_set.batch(batch_size) return data_set dataset_train = create_dataset_canidae(data_path_train, "train") step_size_train = dataset_train.get_dataset_size() dataset_val = create_dataset_canidae(data_path_val, "val") step_size_val = dataset_val.get_dataset_size() # 数据集可视化 data = next(dataset_train.create_dict_iterator()) images = data["image"] labels = data["label"] print("Tensor of image", images.shape) print("Labels:", labels)
import matplotlib.pyplot as plt import numpy as np # class_name对应label,按文件夹字符串从小到大的顺序标记label class_name = {0: "dogs", 1: "wolves"} plt.figure(figsize=(5, 5)) for i in range(4): # 获取图像及其对应的label data_image = images[i].asnumpy() data_label = labels[i] # 处理图像供展示使用 data_image = np.transpose(data_image, (1, 2, 0)) mean = np.array([0.485, 0.456, 0.406]) std = np.array([0.229, 0.224, 0.225]) data_image = std * data_image + mean data_image = np.clip(data_image, 0, 1) # 显示图像 plt.subplot(2, 2, i+1) plt.imshow(data_image) plt.title(class_name[int(labels[i].asnumpy())]) plt.axis("off") plt.show()
使用ResNet50模型进行训练。搭建好模型框架后,通过将pretrained参数设置为True来下载ResNet50的预训练模型并将权重参数加载到网络中。
下载ResNet50的预训练模型
1.构建模型
from typing import Type, Union, List, Optional from mindspore import nn, train from mindspore.common.initializer import Normal weight_init = Normal(mean=0, sigma=0.02) gamma_init = Normal(mean=1, sigma=0.02) class ResidualBlockBase(nn.Cell): expansion: int = 1 # 最后一个卷积核数量与第一个卷积核数量相等 def __init__(self, in_channel: int, out_channel: int, stride: int = 1, norm: Optional[nn.Cell] = None, down_sample: Optional[nn.Cell] = None) -> None: super(ResidualBlockBase, self).__init__() if not norm: self.norm = nn.BatchNorm2d(out_channel) else: self.norm = norm self.conv1 = nn.Conv2d(in_channel, out_channel, kernel_size=3, stride=stride, weight_init=weight_init) self.conv2 = nn.Conv2d(in_channel, out_channel, kernel_size=3, weight_init=weight_init) self.relu = nn.ReLU() self.down_sample = down_sample def construct(self, x): """ResidualBlockBase construct.""" identity = x # shortcuts分支 out = self.conv1(x) # 主分支第一层:3*3卷积层 out = self.norm(out) out = self.relu(out) out = self.conv2(out) # 主分支第二层:3*3卷积层 out = self.norm(out) if self.down_sample is not None: identity = self.down_sample(x) out += identity # 输出为主分支与shortcuts之和 out = self.relu(out) return out
残差模块:
class ResidualBlock(nn.Cell): expansion = 4 # 最后一个卷积核的数量是第一个卷积核数量的4倍 def __init__(self, in_channel: int, out_channel: int, stride: int = 1, down_sample: Optional[nn.Cell] = None) -> None: super(ResidualBlock, self).__init__() self.conv1 = nn.Conv2d(in_channel, out_channel, kernel_size=1, weight_init=weight_init) self.norm1 = nn.BatchNorm2d(out_channel) self.conv2 = nn.Conv2d(out_channel, out_channel, kernel_size=3, stride=stride, weight_init=weight_init) self.norm2 = nn.BatchNorm2d(out_channel) self.conv3 = nn.Conv2d(out_channel, out_channel * self.expansion, kernel_size=1, weight_init=weight_init) self.norm3 = nn.BatchNorm2d(out_channel * self.expansion) self.relu = nn.ReLU() self.down_sample = down_sample def construct(self, x): identity = x # shortscuts分支 out = self.conv1(x) # 主分支第一层:1*1卷积层 out = self.norm1(out) out = self.relu(out) out = self.conv2(out) # 主分支第二层:3*3卷积层 out = self.norm2(out) out = self.relu(out) out = self.conv3(out) # 主分支第三层:1*1卷积层 out = self.norm3(out) if self.down_sample is not None: identity = self.down_sample(x) out += identity # 输出为主分支与shortcuts之和 out = self.relu(out) return out
网络层
def make_layer(last_out_channel, block: Type[Union[ResidualBlockBase, ResidualBlock]], channel: int, block_nums: int, stride: int = 1): down_sample = None # shortcuts分支 if stride != 1 or last_out_channel != channel * block.expansion: down_sample = nn.SequentialCell([ nn.Conv2d(last_out_channel, channel * block.expansion, kernel_size=1, stride=stride, weight_init=weight_init), nn.BatchNorm2d(channel * block.expansion, gamma_init=gamma_init) ]) layers = [] layers.append(block(last_out_channel, channel, stride=stride, down_sample=down_sample)) in_channel = channel * block.expansion # 堆叠残差网络 for _ in range(1, block_nums): layers.append(block(in_channel, channel)) return nn.SequentialCell(layers)
组合
from mindspore import load_checkpoint, load_param_into_net class ResNet(nn.Cell): def __init__(self, block: Type[Union[ResidualBlockBase, ResidualBlock]], layer_nums: List[int], num_classes: int, input_channel: int) -> None: super(ResNet, self).__init__() self.relu = nn.ReLU() # 第一个卷积层,输入channel为3(彩色图像),输出channel为64 self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, weight_init=weight_init) self.norm = nn.BatchNorm2d(64) # 最大池化层,缩小图片的尺寸 self.max_pool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode='same') # 各个残差网络结构块定义, self.layer1 = make_layer(64, block, 64, layer_nums[0]) self.layer2 = make_layer(64 * block.expansion, block, 128, layer_nums[1], stride=2) self.layer3 = make_layer(128 * block.expansion, block, 256, layer_nums[2], stride=2) self.layer4 = make_layer(256 * block.expansion, block, 512, layer_nums[3], stride=2) # 平均池化层 self.avg_pool = nn.AvgPool2d() # flattern层 self.flatten = nn.Flatten() # 全连接层 self.fc = nn.Dense(in_channels=input_channel, out_channels=num_classes) def construct(self, x): x = self.conv1(x) x = self.norm(x) x = self.relu(x) x = self.max_pool(x) x = self.layer1(x) x = self.layer2(x) x = self.layer3(x) x = self.layer4(x) x = self.avg_pool(x) x = self.flatten(x) x = self.fc(x) return x def _resnet(model_url: str, block: Type[Union[ResidualBlockBase, ResidualBlock]], layers: List[int], num_classes: int, pretrained: bool, pretrianed_ckpt: str, input_channel: int): model = ResNet(block, layers, num_classes, input_channel) if pretrained: # 加载预训练模型 download(url=model_url, path=pretrianed_ckpt, replace=True) param_dict = load_checkpoint(pretrianed_ckpt) load_param_into_net(model, param_dict) return model def resnet50(num_classes: int = 1000, pretrained: bool = False): "ResNet50模型" resnet50_url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/models/application/resnet50_224_new.ckpt" resnet50_ckpt = "./LoadPretrainedModel/resnet50_224_new.ckpt" return _resnet(resnet50_url, ResidualBlock, [3, 4, 6, 3], num_classes, pretrained, resnet50_ckpt, 2048)
使用固定特征进行训练的时候,需要冻结除最后一层之外的所有网络层。通过设置requires_grad == False
冻结参数,以便不在反向传播中计算梯度。
import mindspore as ms import matplotlib.pyplot as plt import os import time net_work = resnet50(pretrained=True) # 全连接层输入层的大小 in_channels = net_work.fc.in_channels # 输出通道数大小为狼狗分类数2 head = nn.Dense(in_channels, 2) # 重置全连接层 net_work.fc = head # 平均池化层kernel size为7 avg_pool = nn.AvgPool2d(kernel_size=7) # 重置平均池化层 net_work.avg_pool = avg_pool # 冻结除最后一层外的所有参数 for param in net_work.get_parameters(): if param.name not in ["fc.weight", "fc.bias"]: param.requires_grad = False # 定义优化器和损失函数 opt = nn.Momentum(params=net_work.trainable_params(), learning_rate=lr, momentum=0.5) loss_fn = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean') def forward_fn(inputs, targets): logits = net_work(inputs) loss = loss_fn(logits, targets) return loss grad_fn = ms.value_and_grad(forward_fn, None, opt.parameters) def train_step(inputs, targets): loss, grads = grad_fn(inputs, targets) opt(grads) return loss # 实例化模型 model1 = train.Model(net_work, loss_fn, opt, metrics={"Accuracy": train.Accuracy()})
训练和评估
import mindspore as ms import matplotlib.pyplot as plt import os import time dataset_train = create_dataset_canidae(data_path_train, "train") step_size_train = dataset_train.get_dataset_size() dataset_val = create_dataset_canidae(data_path_val, "val") step_size_val = dataset_val.get_dataset_size() num_epochs = 5 # 创建迭代器 data_loader_train = dataset_train.create_tuple_iterator(num_epochs=num_epochs) data_loader_val = dataset_val.create_tuple_iterator(num_epochs=num_epochs) best_ckpt_dir = "./BestCheckpoint"
可视化模型预测
使用固定特征得到的best.ckpt文件对对验证集的狼和狗图像数据进行预测。若预测字体为蓝色即为预测正确,若预测字体为红色则预测错误。
import mindspore as ms import matplotlib.pyplot as plt import os import time # 开始循环训练 print("Start Training Loop ...") best_acc = 0 for epoch in range(num_epochs): losses = [] net_work.set_train() epoch_start = time.time() # 为每轮训练读入数据 for i, (images, labels) in enumerate(data_loader_train): labels = labels.astype(ms.int32) loss = train_step(images, labels) losses.append(loss) # 每个epoch结束后,验证准确率 acc = model1.eval(dataset_val)['Accuracy'] epoch_end = time.time() epoch_seconds = (epoch_end - epoch_start) * 1000 step_seconds = epoch_seconds/step_size_train print("-" * 20) print("Epoch: [%3d/%3d], Average Train Loss: [%5.3f], Accuracy: [%5.3f]" % ( epoch+1, num_epochs, sum(losses)/len(losses), acc )) print("epoch time: %5.3f ms, per step time: %5.3f ms" % ( epoch_seconds, step_seconds )) if acc > best_acc: best_acc = acc if not os.path.exists(best_ckpt_dir): os.mkdir(best_ckpt_dir) ms.save_checkpoint(net_work, best_ckpt_path) print("=" * 80) print(f"End of validation the best Accuracy is: {best_acc: 5.3f}, " f"save the best ckpt file in {best_ckpt_path}", flush=True)
import matplotlib.pyplot as plt import mindspore as ms def visualize_model(best_ckpt_path, val_ds): net = resnet50() # 全连接层输入层的大小 in_channels = net.fc.in_channels # 输出通道数大小为狼狗分类数2 head = nn.Dense(in_channels, 2) # 重置全连接层 net.fc = head # 平均池化层kernel size为7 avg_pool = nn.AvgPool2d(kernel_size=7) # 重置平均池化层 net.avg_pool = avg_pool # 加载模型参数 param_dict = ms.load_checkpoint(best_ckpt_path) ms.load_param_into_net(net, param_dict) model = train.Model(net) # 加载验证集的数据进行验证 data = next(val_ds.create_dict_iterator()) images = data["image"].asnumpy() labels = data["label"].asnumpy() class_name = {0: "dogs", 1: "wolves"} # 预测图像类别 output = model.predict(ms.Tensor(data['image'])) pred = np.argmax(output.asnumpy(), axis=1) # 显示图像及图像的预测值 plt.figure(figsize=(5, 5)) for i in range(4): plt.subplot(2, 2, i + 1) # 若预测正确,显示为蓝色;若预测错误,显示为红色 color = 'blue' if pred[i] == labels[i] else 'red' plt.title('predict:{}'.format(class_name[pred[i]]), color=color) picture_show = np.transpose(images[i], (1, 2, 0)) mean = np.array([0.485, 0.456, 0.406]) std = np.array([0.229, 0.224, 0.225]) picture_show = std * picture_show + mean picture_show = np.clip(picture_show, 0, 1) plt.imshow(picture_show) plt.axis('off') plt.show() visualize_model(best_ckpt_path, dataset_val)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。