当前位置:   article > 正文

Pytorch学习笔记--SEResNet50搭建

seresnet50

目录

1--ResNet50介绍

1-1--Stem Block环节

1-2--Stage环节

1-3--ResNet50核心代码:

2--SENet介绍

3--SEResNet50介绍

4--实例之使用SEResNet50实现数据集CIFAR10分类

5--参考

​​​​​​​

1--ResNet50介绍

分析:上图为ResNet50的整体结构,除Input和Output环节外,还包含5个环节:Stem Block环节、Stage1-4环节和Subsequent Processing环节。

1-1--Stem Block环节

分析:Stem Block环节的Input是一个三通道(C = 3, W = 224, H = 224)的图像,首先经过卷积操作(kernel_size = 7 x 7,stride = 2,卷积核个数 = 64 )、归一化操作RELU操作,再经过最大池化操作得到Output(C = 64, W = 56, H = 56),这一环节可以理解为四个Stage前的预处理操作,其核心代码如下:

  1. self.Stem = nn.Sequential(
  2. nn.Conv2d(3, 64, kernel_size = 7, stride = 2, padding = 3, bias = False),
  3. nn.BatchNorm2d(64),
  4. nn.ReLU(),
  5. nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)
  6. )

1-2--Stage环节

上图为4个Stage环节的网络结构,每个Stage包含两种网络结构:Conv Block结构Identity Block结构

以Stage1为例分析两种结构的不同之处:

由上图可见:Conv Block结构Identity Block结构右侧多了一个Conv和BN操作,搭建两种Block的核心代码如下:​​​​​​​

  1. ## 导入第三方库
  2. import torch
  3. from torch import nn
  4. # 搭建Conv Block和Identity Block的网络结构
  5. class Block(nn.Module):
  6. def __init__(self, in_channels, filters, stride = 1, is_1x1conv = False):
  7. super(Block, self).__init__()
  8. # 各个Stage中的每一大块中每一小块的输出维度,即channel(filter1 = filter2 = filter3 / 4)
  9. filter1, filter2, filter3 = filters
  10. self.is_1x1conv = is_1x1conv # 判断是否是Conv Block
  11. self.relu = nn.ReLU(inplace = True) # RELU操作
  12. # 第一小块, stride = 1(stage = 1) or stride = 2(stage = 2, 3, 4)
  13. self.conv1 = nn.Sequential(
  14. nn.Conv2d(in_channels, filter1, kernel_size = 1, stride = stride, bias = False),
  15. nn.BatchNorm2d(filter1),
  16. nn.ReLU()
  17. )
  18. # 中间小块
  19. self.conv2 = nn.Sequential(
  20. nn.Conv2d(filter1, filter2, kernel_size=3, stride = 1, padding = 1, bias=False),
  21. nn.BatchNorm2d(filter2),
  22. nn.ReLU()
  23. )
  24. # 最后小块,不需要进行ReLu操作
  25. self.conv3 = nn.Sequential(
  26. nn.Conv2d(filter2, filter3, kernel_size = 1, stride = 1, bias=False),
  27. nn.BatchNorm2d(filter3),
  28. )
  29. # Conv Block的输入需要额外进行Conv和BN操作(结合Conv Block网络图理解)
  30. if is_1x1conv:
  31. self.shortcut = nn.Sequential(
  32. nn.Conv2d(in_channels, filter3, kernel_size = 1, stride = stride, bias = False),
  33. nn.BatchNorm2d(filter3)
  34. )
  35. def forward(self, x):
  36. x_shortcut = x # 网络图中右侧的传输值
  37. x1 = self.conv1(x) # 执行第一小Block操作
  38. x1 = self.conv2(x1) # 执行中间小Block操作
  39. x1 = self.conv3(x1) # 执行最后小Block操作
  40. if self.is_1x1conv: # Conv Block进行额外的Conv和BN操作
  41. x_shortcut = self.shortcut(x_shortcut)
  42. x1 = x1 + x_shortcut # Add操作
  43. x1 = self.relu(x1) # ReLU操作
  44. return x1

细节剖析:

  1. 每个Stage的输出作为下一个Stage的输入;
  2. Stage1和Stage2-4的Conv Block结构中,左右侧第一个小Block中的stride取值不同。(Stage1: stride = 1; Stage2-4: stride = 2);
  3. 无论是Conv Block还是Identity Block,最后一个小Block的channel都是第一个和中间小Block的channel的四倍。
  4. 无论是Conv Block还是Identity Block,最后一个小Block没有RELU操作(相对于前面两个小Block)

1-3--ResNet50核心代码:

  1. # 搭建ResNet50
  2. class Resnet(nn.Module):
  3. def __init__(self, cfg):
  4. super(Resnet, self).__init__()
  5. classes = cfg['classes'] # 分类的类别
  6. num = cfg['num'] # ResNet50[3, 4, 6, 3];Conv Block和 Identity Block的个数
  7. # Stem Block
  8. self.conv1 = nn.Sequential(
  9. nn.Conv2d(3, 64, kernel_size = 7, stride = 2, padding = 3, bias = False),
  10. nn.BatchNorm2d(64),
  11. nn.ReLU(),
  12. nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)
  13. )
  14. # Stage1
  15. filters = (64, 64, 256) # channel
  16. self.Stage1 = self._make_layer(in_channels = 64, filters = filters, num = num[0], stride = 1)
  17. # Stage2
  18. filters = (128, 128, 512) # channel
  19. self.Stage2 = self._make_layer(in_channels = 256, filters = filters, num = num[1], stride = 2)
  20. # Stage3
  21. filters = (256, 256, 1024) # channel
  22. self.Stage3 = self._make_layer(in_channels = 512, filters = filters, num = num[2], stride = 2)
  23. # Stage4
  24. filters = (512, 512, 2048) # channel
  25. self.Stage4 = self._make_layer(in_channels = 1024, filters = filters, num = num[3], stride = 2)
  26. # 平均池化
  27. self.global_average_pool = nn.AdaptiveAvgPool2d((1, 1))
  28. # 全连接层 这里可理解为网络中四个Stage后的Subsequent Processing 环节
  29. self.fc = nn.Sequential(
  30. nn.Linear(2048, classes)
  31. )
  32. # 形成单个Stage的网络结构
  33. def _make_layer(self, in_channels, filters, num, stride = 1):
  34. layers = []
  35. # Conv Block
  36. block_1 = Block(in_channels, filters, stride = stride, is_1x1conv = True)
  37. layers.append(block_1)
  38. # Identity Block结构叠加; 基于[3, 4, 6, 3]
  39. for i in range(1, num):
  40. layers.append(Block(filters[2], filters, stride = 1, is_1x1conv = False))
  41. # 返回Conv Block和Identity Block的集合,形成一个Stage的网络结构
  42. return nn.Sequential(*layers)
  43. def forward(self, x):
  44. # Stem Block环节
  45. x = self.conv1(x)
  46. # 执行四个Stage环节
  47. x = self.Stage1(x)
  48. x = self.Stage2(x)
  49. x = self.Stage3(x)
  50. x = self.Stage4(x)
  51. # 执行Subsequent Processing环节
  52. x = self.global_average_pool(x)
  53. x = torch.flatten(x, 1)
  54. x = self.fc(x)
  55. return x

2--SENet介绍

分析:上图摘自文献《Squeeze-and-Excitation Networks》,SENet的直观理解是对原图像的各个通道进行加权操作,下图是其加权操作的主要过程。

分析:由上图可知,对原图像进行加权操作,主要通过全局平均池化全连接线性层ReLU操作全连接线性层以及Sigmoid函数求得各个通道的权重值,再通过Scale(乘积)操作完成加权。在实例中,用卷积层代替全连接层,试图减少图片的语义损失,计算加权矩阵的核心代码如下:

  1. # SENet(结合SENet的网络图理解)
  2. self.se = nn.Sequential(
  3. nn.AdaptiveAvgPool2d((1, 1)), # 全局平均池化
  4. nn.Conv2d(filter3, filter3 // 16, kernel_size=1), # 16表示r,filter3//16表示C/r,这里用卷积层代替全连接层
  5. nn.ReLU(),
  6. nn.Conv2d(filter3 // 16, filter3, kernel_size=1),
  7. nn.Sigmoid()
  8. )

3--SEResNet50介绍

分析:上图将SENet添加至Residual模块中,即将SENet模块添加至ResNet中Conv Block和Identity Block的最后一个小Block之后。基于SENet搭建Conv Block和Identity Block的核心代码如下:

  1. # 搭建基于SENet的Conv Block和Identity Block的网络结构
  2. class Block(nn.Module):
  3. def __init__(self, in_channels, filters, stride = 1, is_1x1conv = False):
  4. super(Block, self).__init__()
  5. # 各个Stage中的每一大块中每一小块的输出维度,即channel(filter1 = filter2 = filter3 / 4)
  6. filter1, filter2, filter3 = filters
  7. self.is_1x1conv = is_1x1conv # 判断是否是Conv Block
  8. self.relu = nn.ReLU(inplace = True) # RELU操作
  9. # 第一小块, stride = 1(stage = 1) or stride = 2(stage = 2, 3, 4)
  10. self.conv1 = nn.Sequential(
  11. nn.Conv2d(in_channels, filter1, kernel_size = 1, stride = stride, bias = False),
  12. nn.BatchNorm2d(filter1),
  13. nn.ReLU()
  14. )
  15. # 中间小块
  16. self.conv2 = nn.Sequential(
  17. nn.Conv2d(filter1, filter2, kernel_size=3, stride = 1, padding = 1, bias=False),
  18. nn.BatchNorm2d(filter2),
  19. nn.ReLU()
  20. )
  21. # 最后小块,不需要进行ReLu操作
  22. self.conv3 = nn.Sequential(
  23. nn.Conv2d(filter2, filter3, kernel_size = 1, stride = 1, bias=False),
  24. nn.BatchNorm2d(filter3),
  25. )
  26. # Conv Block的输入需要额外进行卷积和归一化操作(结合Conv Block网络图理解)
  27. if is_1x1conv:
  28. self.shortcut = nn.Sequential(
  29. nn.Conv2d(in_channels, filter3, kernel_size = 1, stride = stride, bias = False),
  30. nn.BatchNorm2d(filter3)
  31. )
  32. # SENet(结合SENet的网络图理解)
  33. self.se = nn.Sequential(
  34. nn.AdaptiveAvgPool2d((1, 1)), # 全局平均池化
  35. nn.Conv2d(filter3, filter3 // 16, kernel_size=1), # 16表示r,filter3//16表示C/r,这里用卷积层代替全连接层
  36. nn.ReLU(),
  37. nn.Conv2d(filter3 // 16, filter3, kernel_size=1),
  38. nn.Sigmoid()
  39. )
  40. def forward(self, x):
  41. x_shortcut = x
  42. x1 = self.conv1(x) # 执行第一Block操作
  43. x1 = self.conv2(x1) # 执行中间Block操作
  44. x1 = self.conv3(x1) # 执行最后Block操作
  45. x2 = self.se(x1) # 利用SENet计算出每个通道的权重大小
  46. x1 = x1 * x2 # 对原通道进行加权操作
  47. if self.is_1x1conv: # Conv Block进行额外的卷积归一化操作
  48. x_shortcut = self.shortcut(x_shortcut)
  49. x1 = x1 + x_shortcut # Add操作
  50. x1 = self.relu(x1) # ReLU操作
  51. return x1

4--实例之使用SEResNet50实现数据集CIFAR10分类

直接运行的代码(详细注释):

  1. ## 导入第三方库
  2. from torch import nn
  3. import time
  4. import torch
  5. import torchvision
  6. import torchvision.transforms as transforms
  7. import matplotlib.pyplot as plt
  8. import torch.optim as optim
  9. # 搭建基于SENet的Conv Block和Identity Block的网络结构
  10. class Block(nn.Module):
  11. def __init__(self, in_channels, filters, stride = 1, is_1x1conv = False):
  12. super(Block, self).__init__()
  13. # 各个Stage中的每一大块中每一小块的输出维度,即channel(filter1 = filter2 = filter3 / 4)
  14. filter1, filter2, filter3 = filters
  15. self.is_1x1conv = is_1x1conv # 判断是否是Conv Block
  16. self.relu = nn.ReLU(inplace = True) # RELU操作
  17. # 第一小块, stride = 1(stage = 1) or stride = 2(stage = 2, 3, 4)
  18. self.conv1 = nn.Sequential(
  19. nn.Conv2d(in_channels, filter1, kernel_size = 1, stride = stride, bias = False),
  20. nn.BatchNorm2d(filter1),
  21. nn.ReLU()
  22. )
  23. # 中间小块
  24. self.conv2 = nn.Sequential(
  25. nn.Conv2d(filter1, filter2, kernel_size=3, stride = 1, padding = 1, bias=False),
  26. nn.BatchNorm2d(filter2),
  27. nn.ReLU()
  28. )
  29. # 最后小块,不需要进行ReLu操作
  30. self.conv3 = nn.Sequential(
  31. nn.Conv2d(filter2, filter3, kernel_size = 1, stride = 1, bias=False),
  32. nn.BatchNorm2d(filter3),
  33. )
  34. # Conv Block的输入需要额外进行卷积和归一化操作(结合Conv Block网络图理解)
  35. if is_1x1conv:
  36. self.shortcut = nn.Sequential(
  37. nn.Conv2d(in_channels, filter3, kernel_size = 1, stride = stride, bias = False),
  38. nn.BatchNorm2d(filter3)
  39. )
  40. # SENet(结合SENet的网络图理解)
  41. self.se = nn.Sequential(
  42. nn.AdaptiveAvgPool2d((1, 1)), # 全局平均池化
  43. nn.Conv2d(filter3, filter3 // 16, kernel_size=1), # 16表示r,filter3//16表示C/r,这里用卷积层代替全连接层
  44. nn.ReLU(),
  45. nn.Conv2d(filter3 // 16, filter3, kernel_size=1),
  46. nn.Sigmoid()
  47. )
  48. def forward(self, x):
  49. x_shortcut = x
  50. x1 = self.conv1(x) # 执行第一Block操作
  51. x1 = self.conv2(x1) # 执行中间Block操作
  52. x1 = self.conv3(x1) # 执行最后Block操作
  53. x2 = self.se(x1) # 利用SENet计算出每个通道的权重大小
  54. x1 = x1 * x2 # 对原通道进行加权操作
  55. if self.is_1x1conv: # Conv Block进行额外的卷积归一化操作
  56. x_shortcut = self.shortcut(x_shortcut)
  57. x1 = x1 + x_shortcut # Add操作
  58. x1 = self.relu(x1) # ReLU操作
  59. return x1
  60. # 搭建SEResNet50
  61. class SEResnet(nn.Module):
  62. def __init__(self, cfg):
  63. super(SEResnet, self).__init__()
  64. classes = cfg['classes'] # 分类的类别
  65. num = cfg['num'] # ResNet50[3, 4, 6, 3];Conv Block和 Identity Block的个数
  66. # Stem Block
  67. self.conv1 = nn.Sequential(
  68. nn.Conv2d(3, 64, kernel_size = 7, stride = 2, padding = 3, bias = False),
  69. nn.BatchNorm2d(64),
  70. nn.ReLU(),
  71. nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)
  72. )
  73. # Stage1
  74. filters = (64, 64, 256) # channel
  75. self.Stage1 = self._make_layer(in_channels = 64, filters = filters, num = num[0], stride = 1)
  76. # Stage2
  77. filters = (128, 128, 512) # channel
  78. self.Stage2 = self._make_layer(in_channels = 256, filters = filters, num = num[1], stride = 2)
  79. # Stage3
  80. filters = (256, 256, 1024) # channel
  81. self.Stage3 = self._make_layer(in_channels = 512, filters = filters, num = num[2], stride = 2)
  82. # Stage4
  83. filters = (512, 512, 2048) # channel
  84. self.Stage4 = self._make_layer(in_channels = 1024, filters = filters, num = num[3], stride = 2)
  85. # 自适应平均池化,(1, 1)表示输出的大小(H x W)
  86. self.global_average_pool = nn.AdaptiveAvgPool2d((1, 1))
  87. # 全连接层 这里可理解为网络中四个Stage后的Subsequent Processing 环节
  88. self.fc = nn.Sequential(
  89. nn.Linear(2048, classes)
  90. )
  91. # 形成单个Stage的网络结构
  92. def _make_layer(self, in_channels, filters, num, stride = 1):
  93. layers = []
  94. # Conv Block
  95. block_1 = Block(in_channels, filters, stride = stride, is_1x1conv = True)
  96. layers.append(block_1)
  97. # Identity Block结构叠加; 基于[3, 4, 6, 3]
  98. for i in range(1, num):
  99. layers.append(Block(filters[2], filters, stride = 1, is_1x1conv = False))
  100. # 返回Conv Block和Identity Block的集合,形成一个Stage的网络结构
  101. return nn.Sequential(*layers)
  102. def forward(self, x):
  103. # Stem Block环节
  104. x = self.conv1(x)
  105. # 执行四个Stage环节
  106. x = self.Stage1(x)
  107. x = self.Stage2(x)
  108. x = self.Stage3(x)
  109. x = self.Stage4(x)
  110. # 执行Subsequent Processing环节
  111. x = self.global_average_pool(x)
  112. x = torch.flatten(x, 1)
  113. x = self.fc(x)
  114. return x
  115. # SeResNet50的参数 (注意调用这个函数将间接调用SEResnet,这里单独编写一个函数是为了方便修改成其它ResNet网络的结构)
  116. def SeResNet50():
  117. cfg = {
  118. 'num':(3, 4, 6, 3), # ResNet50,四个Stage中Block的个数(其中Conv Block为1个,剩下均为增加Identity Block)
  119. 'classes': (10) # 数据集分类的个数
  120. }
  121. return SEResnet(cfg) # 调用SEResnet网络
  122. ## 导入数据集
  123. def load_dataset(batch_size):
  124. # 下载训练集
  125. train_set = torchvision.datasets.CIFAR10(
  126. root = "data/cifar-10", train = True,
  127. download = True, transform = transforms.ToTensor()
  128. )
  129. # 下载测试集
  130. test_set = torchvision.datasets.CIFAR10(
  131. root = "data/cifar-10", train = False,
  132. download = True, transform = transforms.ToTensor()
  133. )
  134. train_iter = torch.utils.data.DataLoader(
  135. train_set, batch_size = batch_size, shuffle = True, num_workers = 4
  136. )
  137. test_iter = torch.utils.data.DataLoader(
  138. test_set, batch_size = batch_size, shuffle = True, num_workers = 4
  139. )
  140. return train_iter, test_iter
  141. # 训练模型
  142. def train(net, train_iter, criterion, optimizer, num_epochs, device, num_print, lr_scheduler = None, test_iter = None):
  143. net.train() # 训练模式
  144. record_train = list() # 记录每一Epoch下训练集的准确率
  145. record_test = list() # 记录每一Epoch下测试集的准确率
  146. for epoch in range(num_epochs):
  147. print("========== epoch: [{}/{}] ==========".format(epoch + 1, num_epochs))
  148. total, correct, train_loss = 0, 0, 0
  149. start = time.time()
  150. for i, (X, y) in enumerate(train_iter):
  151. X, y = X.to(device), y.to(device) # GPU or CPU运行
  152. output = net(X) # 计算输出
  153. loss = criterion(output, y) # 计算损失
  154. optimizer.zero_grad() # 梯度置0
  155. loss.backward() # 计算梯度
  156. optimizer.step() # 优化参数
  157. train_loss += loss.item() # 累积损失
  158. total += y.size(0) # 累积总样本数
  159. correct += (output.argmax(dim=1) == y).sum().item() # 累积预测正确的样本数
  160. train_acc = 100.0 * correct / total # 计算准确率
  161. if (i + 1) % num_print == 0:
  162. print("step: [{}/{}], train_loss: {:.3f} | train_acc: {:6.3f}% | lr: {:.6f}" \
  163. .format(i + 1, len(train_iter), train_loss / (i + 1), \
  164. train_acc, get_cur_lr(optimizer)))
  165. # 调整梯度下降算法的学习率
  166. if lr_scheduler is not None:
  167. lr_scheduler.step()
  168. # 输出训练的时间
  169. print("--- cost time: {:.4f}s ---".format(time.time() - start))
  170. if test_iter is not None: # 判断测试集是否为空 (注意这里将调用test函数)
  171. record_test.append(test(net, test_iter, criterion, device)) # 每训练一个Epoch模型,使用测试集进行测试模型的准确度
  172. record_train.append(train_acc)
  173. # 返回每一个Epoch下测试集和训练集的准确率
  174. return record_train, record_test
  175. # 验证模型
  176. def test(net, test_iter, criterion, device):
  177. total, correct = 0, 0
  178. net.eval() # 测试模式
  179. with torch.no_grad(): # 不计算梯度
  180. print("*************** test ***************")
  181. for X, y in test_iter:
  182. X, y = X.to(device), y.to(device) # CPU or GPU运行
  183. output = net(X) # 计算输出
  184. loss = criterion(output, y) # 计算损失
  185. total += y.size(0) # 计算测试集总样本数
  186. correct += (output.argmax(dim=1) == y).sum().item() # 计算测试集预测准确的样本数
  187. test_acc = 100.0 * correct / total # 测试集准确率
  188. # 输出测试集的损失
  189. print("test_loss: {:.3f} | test_acc: {:6.3f}%" \
  190. .format(loss.item(), test_acc))
  191. print("************************************\n")
  192. # 训练模式 (因为这里是因为每经过一个Epoch就使用测试集一次,使用测试集后,进入下一个Epoch前将模型重新置于训练模式)
  193. net.train()
  194. return test_acc
  195. # 返回学习率lr的函数
  196. def get_cur_lr(optimizer):
  197. for param_group in optimizer.param_groups:
  198. return param_group['lr']
  199. # 画出每一个Epoch下测试集和训练集的准确率
  200. def learning_curve(record_train, record_test=None):
  201. plt.style.use("ggplot")
  202. plt.plot(range(1, len(record_train) + 1), record_train, label = "train acc")
  203. if record_test is not None:
  204. plt.plot(range(1, len(record_test) + 1), record_test, label = "test acc")
  205. plt.legend(loc=4)
  206. plt.title("learning curve")
  207. plt.xticks(range(0, len(record_train) + 1, 5))
  208. plt.yticks(range(0, 101, 5))
  209. plt.xlabel("epoch")
  210. plt.ylabel("accuracy")
  211. plt.show()
  212. BATCH_SIZE = 128 # 批大小
  213. NUM_EPOCHS = 12 # Epoch大小
  214. NUM_CLASSES = 10 # 分类的个数
  215. LEARNING_RATE = 0.01 # 梯度下降学习率
  216. MOMENTUM = 0.9 # 冲量大小
  217. WEIGHT_DECAY = 0.0005 # 权重衰减系数
  218. NUM_PRINT = 100
  219. DEVICE = "cuda" if torch.cuda.is_available() else "cpu" # GPU or CPU运行
  220. def main():
  221. net = SeResNet50()
  222. net = net.to(DEVICE) # GPU or CPU 运行
  223. train_iter, test_iter = load_dataset(BATCH_SIZE) # 导入训练集和测试集
  224. criterion = nn.CrossEntropyLoss() # 损失计算器
  225. # 优化器
  226. optimizer = optim.SGD(
  227. net.parameters(),
  228. lr = LEARNING_RATE,
  229. momentum = MOMENTUM,
  230. weight_decay = WEIGHT_DECAY,
  231. nesterov = True
  232. )
  233. # 调整学习率 (step_size:每训练step_size个epoch,更新一次参数; gamma:更新lr的乘法因子)
  234. lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size = 5, gamma = 0.1)
  235. record_train, record_test = train(net, train_iter, criterion, optimizer, NUM_EPOCHS, DEVICE, NUM_PRINT,
  236. lr_scheduler, test_iter)
  237. learning_curve(record_train, record_test) # 画出准确率曲线
  238. main()

5--参考

代码参考

ResNet50参考

SENet参考​​​​​​​

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号