赞
踩
两行初始化代码
weight_init = Normal(mean=0, sigma=0.02)
这行代码定义了一个初始化器weight_init,它将使用均值为0,标准差为0.02的正态分布来初始化网络中的权重。这种初始化策略有助于在网络的初始阶段避免梯度消失或爆炸的问题,因为权重被初始化为接近于0但又不至于太小的值。
gamma_init = Normal(mean=1, sigma=0.02)
这行代码定义了一个初始化器gamma_init,通常用于初始化批量归一化(Batch Normalization)层中的尺度参数(gamma)。批量归一化是一种常用的技术,用于加速神经网络的训练过程,并通过提高稳定性来改善性能。在这种情况下,gamma参数的初始化通常设置为接近1的值,因为它们在批量归一化层中起到缩放的作用,初始化为1意味着在初始阶段不会改变激活的尺度。
class ResidualBlockBase(nn.Cell): expansion: int = 1 # 最后一个卷积核数量与第一个卷积核数量相等 def __init__(self, in_channel: int, out_channel: int, stride: int = 1, norm: Optional[nn.Cell] = None, down_sample: Optional[nn.Cell] = None) -> None: super(ResidualBlockBase, self).__init__() if not norm: self.norm = nn.BatchNorm2d(out_channel) else: self.norm = norm self.conv1 = nn.Conv2d(in_channel, out_channel, kernel_size=3, stride=stride, weight_init=weight_init) self.conv2 = nn.Conv2d(in_channel, out_channel, kernel_size=3, weight_init=weight_init) self.relu = nn.ReLU() self.down_sample = down_sample def construct(self, x): """ResidualBlockBase construct.""" identity = x # shortcuts分支 out = self.conv1(x) # 主分支第一层:3*3卷积层 out = self.norm(out) out = self.relu(out) out = self.conv2(out) # 主分支第二层:3*3卷积层 out = self.norm(out) if self.down_sample is not None: identity = self.down_sample(x) out += identity # 输出为主分支与shortcuts之和 out = self.relu(out) return out
这段代码定义了一个名为ResidualBlockBase
的类,它是一个残差块(Residual Block)的基础实现。残差块是残差网络的基本构建块,它允许网络在增加层数的同时,仍然能够通过反向传播有效地训练。
函数解析:
in_channel
:输入通道数。out_channel
:输出通道数。stride
:卷积层的步幅。norm
:可选的规范化层,默认为None
,此时会使用批量归一化。down_sample
:可选的下采样层,用于匹配输入和输出的维度,默认为None
。super(ResidualBlockBase, self).__init__()
。nn.BatchNorm2d(out_channel)
。conv1
和conv2
,其中conv1
可能包含步幅stride
用于降采样。relu
。down_sample
。x
:输入数据,通常是特征图。x
作为身份映射(shortcuts分支)。conv1
处理输入数据,然后进行规范化、ReLU激活。conv2
处理激活后的数据,然后进行规范化。down_sample
,如果存在,则对身份映射进行下采样以匹配输出的维度。out
与身份映射identity
相加,实现跳跃连接(skip connection)。out
。class ResidualBlockBase(nn.Cell): expansion: int = 1 # 最后一个卷积核数量与第一个卷积核数量相等 def __init__(self, in_channel: int, out_channel: int, stride: int = 1, norm: Optional[nn.Cell] = None, down_sample: Optional[nn.Cell] = None) -> None: super(ResidualBlockBase, self).__init__() if not norm: self.norm = nn.BatchNorm2d(out_channel) else: self.norm = norm self.conv1 = nn.Conv2d(in_channel, out_channel, kernel_size=3, stride=stride, weight_init=weight_init) self.conv2 = nn.Conv2d(in_channel, out_channel, kernel_size=3, weight_init=weight_init) self.relu = nn.ReLU() self.down_sample = down_sample def construct(self, x): """ResidualBlockBase construct.""" identity = x # shortcuts分支 out = self.conv1(x) # 主分支第一层:3*3卷积层 out = self.norm(out) out = self.relu(out) out = self.conv2(out) # 主分支第二层:3*3卷积层 out = self.norm(out) if self.down_sample is not None: identity = self.down_sample(x) out += identity # 输出为主分支与shortcuts之和 out = self.relu(out) return out
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[3], line 1
----> 1 class ResidualBlockBase(nn.Cell):
2 expansion: int = 1 # 最后一个卷积核数量与第一个卷积核数量相等
4 def __init__(self, in_channel: int, out_channel: int,
5 stride: int = 1, norm: Optional[nn.Cell] = None,
6 down_sample: Optional[nn.Cell] = None) -> None:
NameError: name 'nn' is not defined
这段代码定义了一个名为ResidualBlock
的类,它也是残差网络(ResNet)中的一个残差块,但是与之前的基础版残差块相比,这个类实现了更复杂的残差学习结构,这个残差块包含了三个卷积层,其中第一个和第三个卷积层是1x1的,用于降维和升维,第二个卷积层是3x3的,用于提取特征。
函数解析
in_channel
:输入通道数。out_channel
:输出通道数,这里的输出通道数是指第一个卷积层之后的通道数,最终的输出通道数将是这个数的4倍,由expansion
变量控制。stride
:第二个卷积层的步幅,用于降采样。down_sample
:可选的下采样层,用于匹配输入和输出的维度,默认为None
。super(ResidualBlock, self).__init__()
。conv1
、conv2
和conv3
,其中conv1
和conv3
是1x1卷积,conv2
是3x3卷积,conv2
可能包含步幅stride
用于降采样。norm1
、norm2
和norm3
。relu
。down_sample
。x
:输入数据,通常是特征图。x
作为身份映射(shortcuts分支)。conv1
处理输入数据,然后进行规范化、ReLU激活。conv2
处理激活后的数据,然后进行规范化、ReLU激活。conv3
处理激活后的数据,然后进行规范化。down_sample
,如果存在,则对身份映射进行下采样以匹配输出的维度。out
与身份映射identity
相加,实现跳跃连接(skip connection)。out
。def make_layer(last_out_channel, block: Type[Union[ResidualBlockBase, ResidualBlock]], channel: int, block_nums: int, stride: int = 1): down_sample = None # shortcuts分支 if stride != 1 or last_out_channel != channel * block.expansion: down_sample = nn.SequentialCell([ nn.Conv2d(last_out_channel, channel * block.expansion, kernel_size=1, stride=stride, weight_init=weight_init), nn.BatchNorm2d(channel * block.expansion, gamma_init=gamma_init) ]) layers = [] layers.append(block(last_out_channel, channel, stride=stride, down_sample=down_sample)) in_channel = channel * block.expansion # 堆叠残差网络 for _ in range(1, block_nums): layers.append(block(in_channel, channel)) return nn.SequentialCell(layers)
这段代码定义了一个名为make_layer
的函数,它的作用是创建并堆叠多个残差块(Residual Blocks),形成残差网络(ResNet)中的一个层。这个函数可以根据给定的参数生成不同数量的残差块,并且可以处理不同层的输入和输出通道数不匹配的情况。
函数解析
last_out_channel
:前一个层的输出通道数。block
:残差块类型,可以是ResidualBlockBase
或ResidualBlock
。channel
:当前层的输入通道数(对于第一个残差块)。block_nums
:要堆叠的残差块数量。stride
:第一个残差块的步幅,用于降采样。down_sample
变量,用于可能需要的下采样操作。stride
不等于1或者last_out_channel
不等于channel * block.expansion
,则创建一个下采样层,包括一个1x1的卷积层和一个批量归一化层。layers
列表,并将第一个残差块添加到列表中,这个残差块使用down_sample
作为其下采样层。layers
列表中,每个残差块的输入通道数是前一个残差块的输出通道数。nn.SequentialCell(layers)
将堆叠的残差块封装为一个顺序层(Sequential Layer),并返回这个层。from mindspore import load_checkpoint, load_param_into_net class ResNet(nn.Cell): def __init__(self, block: Type[Union[ResidualBlockBase, ResidualBlock]], layer_nums: List[int], num_classes: int, input_channel: int) -> None: super(ResNet, self).__init__() self.relu = nn.ReLU() # 第一个卷积层,输入channel为3(彩色图像),输出channel为64 self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, weight_init=weight_init) self.norm = nn.BatchNorm2d(64) # 最大池化层,缩小图片的尺寸 self.max_pool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode='same') # 各个残差网络结构块定义, self.layer1 = make_layer(64, block, 64, layer_nums[0]) self.layer2 = make_layer(64 * block.expansion, block, 128, layer_nums[1], stride=2) self.layer3 = make_layer(128 * block.expansion, block, 256, layer_nums[2], stride=2) self.layer4 = make_layer(256 * block.expansion, block, 512, layer_nums[3], stride=2) # 平均池化层 self.avg_pool = nn.AvgPool2d() # flattern层 self.flatten = nn.Flatten() # 全连接层 self.fc = nn.Dense(in_channels=input_channel, out_channels=num_classes) def construct(self, x): x = self.conv1(x) x = self.norm(x) x = self.relu(x) x = self.max_pool(x) x = self.layer1(x) x = self.layer2(x) x = self.layer3(x) x = self.layer4(x) x = self.avg_pool(x) x = self.flatten(x) x = self.fc(x) return x def _resnet(model_url: str, block: Type[Union[ResidualBlockBase, ResidualBlock]], layers: List[int], num_classes: int, pretrained: bool, pretrianed_ckpt: str, input_channel: int): model = ResNet(block, layers, num_classes, input_channel) if pretrained: # 加载预训练模型 download(url=model_url, path=pretrianed_ckpt, replace=True) param_dict = load_checkpoint(pretrianed_ckpt) load_param_into_net(model, param_dict) return model def resnet50(num_classes: int = 1000, pretrained: bool = False): "ResNet50模型" resnet50_url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/models/application/resnet50_224_new.ckpt" resnet50_ckpt = "./LoadPretrainedModel/resnet50_224_new.ckpt" return _resnet(resnet50_url, ResidualBlock, [3, 4, 6, 3], num_classes, pretrained, resnet50_ckpt, 2048)
函数部分解析:
block
:残差块类型,可以是ResidualBlockBase
或ResidualBlock
。layer_nums
:一个列表,指定每个残差网络结构块中残差块的数量。num_classes
:输出类别数,用于全连接层的输出尺寸。input_channel
:全连接层的输入通道数。super(ResNet, self).__init__()
。relu
。conv1
,用于处理输入图像。norm
。max_pool
,用于缩小图像尺寸。layer1
到layer4
,每个块由make_layer
函数创建。avg_pool
。flatten
,用于将特征图展平为一维向量。fc
,用于最终分类。x
:输入数据,通常是特征图。conv1
处理输入数据,然后进行批量归一化和ReLU激活。max_pool
处理激活后的数据。layer1
到layer4
处理数据。avg_pool
处理数据。flatten
将特征图展平为一维向量。fc
进行最终的分类。model_url
:预训练模型的URL。block
:残差块类型。layers
:残差块的数量列表。num_classes
:输出类别数。pretrained
:一个布尔值,指示是否加载预训练模型。pretrianed_ckpt
:预训练模型的本地路径。input_channel
:全连接层的输入通道数。model
。pretrained
为True,则从model_url
下载预训练模型并将其加载到model
中。num_classes
:输出类别数,默认为1000。pretrained
:一个布尔值,指示是否加载预训练模型,默认为False。_resnet
函数创建并返回ResNet50模型,可以选择是否加载预训练的权重。这部分代码附有注释,看代码即可
import mindspore as ms import matplotlib.pyplot as plt import os import time net_work = resnet50(pretrained=True) # 全连接层输入层的大小 in_channels = net_work.fc.in_channels # 输出通道数大小为狼狗分类数2 head = nn.Dense(in_channels, 2) # 重置全连接层 net_work.fc = head # 平均池化层kernel size为7 avg_pool = nn.AvgPool2d(kernel_size=7) # 重置平均池化层 net_work.avg_pool = avg_pool # 冻结除最后一层外的所有参数 for param in net_work.get_parameters(): if param.name not in ["fc.weight", "fc.bias"]: param.requires_grad = False # 定义优化器和损失函数 opt = nn.Momentum(params=net_work.trainable_params(), learning_rate=lr, momentum=0.5) loss_fn = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean') def forward_fn(inputs, targets): logits = net_work(inputs) loss = loss_fn(logits, targets) return loss grad_fn = ms.value_and_grad(forward_fn, None, opt.parameters) def train_step(inputs, targets): loss, grads = grad_fn(inputs, targets) opt(grads) return loss # 实例化模型 model1 = train.Model(net_work, loss_fn, opt, metrics={"Accuracy": train.Accuracy()})
训练这块之前有做过类似操作,不过多赘述
import mindspore as ms import matplotlib.pyplot as plt import os import time dataset_train = create_dataset_canidae(data_path_train, "train") step_size_train = dataset_train.get_dataset_size() dataset_val = create_dataset_canidae(data_path_val, "val") step_size_val = dataset_val.get_dataset_size() num_epochs = 5 # 创建迭代器 data_loader_train = dataset_train.create_tuple_iterator(num_epochs=num_epochs) data_loader_val = dataset_val.create_tuple_iterator(num_epochs=num_epochs) best_ckpt_dir = "./BestCheckpoint" best_ckpt_path = "./BestCheckpoint/resnet50-best-freezing-param.ckpt"
import mindspore as ms import matplotlib.pyplot as plt import os import time # 开始循环训练 print("Start Training Loop ...") best_acc = 0 for epoch in range(num_epochs): losses = [] net_work.set_train() epoch_start = time.time() # 为每轮训练读入数据 for i, (images, labels) in enumerate(data_loader_train): labels = labels.astype(ms.int32) loss = train_step(images, labels) losses.append(loss) # 每个epoch结束后,验证准确率 acc = model1.eval(dataset_val)['Accuracy'] epoch_end = time.time() epoch_seconds = (epoch_end - epoch_start) * 1000 step_seconds = epoch_seconds/step_size_train print("-" * 20) print("Epoch: [%3d/%3d], Average Train Loss: [%5.3f], Accuracy: [%5.3f]" % ( epoch+1, num_epochs, sum(losses)/len(losses), acc )) print("epoch time: %5.3f ms, per step time: %5.3f ms" % ( epoch_seconds, step_seconds )) if acc > best_acc: best_acc = acc if not os.path.exists(best_ckpt_dir): os.mkdir(best_ckpt_dir) ms.save_checkpoint(net_work, best_ckpt_path) print("=" * 80) print(f"End of validation the best Accuracy is: {best_acc: 5.3f}, " f"save the best ckpt file in {best_ckpt_path}", flush=True)
看代码注释即可
import matplotlib.pyplot as plt import mindspore as ms def visualize_model(best_ckpt_path, val_ds): net = resnet50() # 全连接层输入层的大小 in_channels = net.fc.in_channels # 输出通道数大小为狼狗分类数2 head = nn.Dense(in_channels, 2) # 重置全连接层 net.fc = head # 平均池化层kernel size为7 avg_pool = nn.AvgPool2d(kernel_size=7) # 重置平均池化层 net.avg_pool = avg_pool # 加载模型参数 param_dict = ms.load_checkpoint(best_ckpt_path) ms.load_param_into_net(net, param_dict) model = train.Model(net) # 加载验证集的数据进行验证 data = next(val_ds.create_dict_iterator()) images = data["image"].asnumpy() labels = data["label"].asnumpy() class_name = {0: "dogs", 1: "wolves"} # 预测图像类别 output = model.predict(ms.Tensor(data['image'])) pred = np.argmax(output.asnumpy(), axis=1) # 显示图像及图像的预测值 plt.figure(figsize=(5, 5)) for i in range(4): plt.subplot(2, 2, i + 1) # 若预测正确,显示为蓝色;若预测错误,显示为红色 color = 'blue' if pred[i] == labels[i] else 'red' plt.title('predict:{}'.format(class_name[pred[i]]), color=color) picture_show = np.transpose(images[i], (1, 2, 0)) mean = np.array([0.485, 0.456, 0.406]) std = np.array([0.229, 0.224, 0.225]) picture_show = std * picture_show + mean picture_show = np.clip(picture_show, 0, 1) plt.imshow(picture_show) plt.axis('off') plt.show()
2024-07-07 23:39:40 Mindstorm
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。