当前位置:   article > 正文



Ghost Bottleneck模块




  1. import copy
  2. import numpy as np
  3. import torch
  4. import torch.nn as nn
  5. import torch.nn.functional as F
  6. def _make_divisible(v, divisor, min_value=None):
  7. """
  8. This function is taken from the original tf repo.
  9. It ensures that all layers have a channel number that is divisible by 8 mg
  10. It can be seen here:
  11. https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet/mobilenet.py
  12. """
  13. if min_value is None:
  14. min_value = divisor
  15. new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
  16. # Make sure that round down does not go down by more than 10%.
  17. if new_v < 0.9 * v:
  18. new_v += divisor
  19. return new_v
  20. def hard_sigmoid(x, inplace: bool = False):
  21. if inplace:
  22. return x.add_(3.0).clamp_(0.0, 6.0).div_(6.0)
  23. else:
  24. return F.relu6(x + 3.0) / 6.0
  25. class SqueezeExcite(nn.Module):
  26. def __init__(
  27. self,
  28. in_chs,
  29. se_ratio=0.25,
  30. reduced_base_chs=None,
  31. #act_layer=nn.ReLU,
  32. act_layer=nn.SiLU,
  33. gate_fn=hard_sigmoid,
  34. divisor=4,
  35. **_,
  36. ):
  37. super(SqueezeExcite, self).__init__()
  38. self.gate_fn = gate_fn
  39. reduced_chs = _make_divisible(
  40. (reduced_base_chs or in_chs) * se_ratio, divisor,
  41. )
  42. self.avg_pool = nn.AdaptiveAvgPool2d(1)
  43. self.conv_reduce = nn.Conv2d(in_chs, reduced_chs, 1, bias=True)
  44. self.act1 = act_layer(inplace=True)
  45. self.conv_expand = nn.Conv2d(reduced_chs, in_chs, 1, bias=True)
  46. def forward(self, x):
  47. x_se = self.avg_pool(x)
  48. x_se = self.conv_reduce(x_se)
  49. x_se = self.act1(x_se)
  50. x_se = self.conv_expand(x_se)
  51. x = x * self.gate_fn(x_se)
  52. return x
  53. class ConvBnAct(nn.Module):
  54. def __init__(self, in_chs, out_chs, kernel_size, stride=1, act_layer=nn.SiLU):
  55. super(ConvBnAct, self).__init__()
  56. self.conv = nn.Conv2d(
  57. in_chs, out_chs, kernel_size, stride, kernel_size // 2, bias=False,
  58. )
  59. self.bn1 = nn.BatchNorm2d(out_chs)
  60. self.act1 = act_layer(inplace=True)
  61. def forward(self, x):
  62. x = self.conv(x)
  63. x = self.bn1(x)
  64. x = self.act1(x)
  65. return x
  66. class RepGhostModule(nn.Module):
  67. def __init__(
  68. self, inp, oup, kernel_size=1, dw_size=3, stride=1, relu=True, deploy=False, reparam_bn=True, reparam_identity=False
  69. ):
  70. super(RepGhostModule, self).__init__()
  71. init_channels = oup
  72. new_channels = oup
  73. self.deploy = deploy
  74. self.primary_conv = nn.Sequential(
  75. nn.Conv2d(
  76. inp, init_channels, kernel_size, stride, kernel_size // 2, bias=False,
  77. ),
  78. nn.BatchNorm2d(init_channels),
  79. nn.SiLU(inplace=True) if relu else nn.Sequential(),
  80. )
  81. fusion_conv = []
  82. fusion_bn = []
  83. if not deploy and reparam_bn:
  84. fusion_conv.append(nn.Identity())
  85. fusion_bn.append(nn.BatchNorm2d(init_channels))
  86. if not deploy and reparam_identity:
  87. fusion_conv.append(nn.Identity())
  88. fusion_bn.append(nn.Identity())
  89. self.fusion_conv = nn.Sequential(*fusion_conv)
  90. self.fusion_bn = nn.Sequential(*fusion_bn)
  91. self.cheap_operation = nn.Sequential(
  92. nn.Conv2d(
  93. init_channels,
  94. new_channels,
  95. dw_size,
  96. 1,
  97. dw_size // 2,
  98. groups=init_channels,
  99. bias=deploy,
  100. ),
  101. nn.BatchNorm2d(new_channels) if not deploy else nn.Sequential(),
  102. # nn.ReLU(inplace=True) if relu else nn.Sequential(),
  103. )
  104. if deploy:
  105. self.cheap_operation = self.cheap_operation[0]
  106. if relu:
  107. self.relu = nn.SiLU(inplace=False)
  108. else:
  109. self.relu = nn.Sequential()
  110. def forward(self, x):
  111. x1 = self.primary_conv(x)#mg
  112. x2 = self.cheap_operation(x1)
  113. for conv, bn in zip(self.fusion_conv, self.fusion_bn):
  114. x2 = x2 + bn(conv(x1))
  115. return self.relu(x2)
  116. def get_equivalent_kernel_bias(self):
  117. kernel3x3, bias3x3 = self._fuse_bn_tensor(self.cheap_operation[0], self.cheap_operation[1])
  118. for conv, bn in zip(self.fusion_conv, self.fusion_bn):
  119. kernel, bias = self._fuse_bn_tensor(conv, bn, kernel3x3.shape[0], kernel3x3.device)
  120. kernel3x3 += self._pad_1x1_to_3x3_tensor(kernel)
  121. bias3x3 += bias
  122. return kernel3x3, bias3x3
  123. @staticmethod
  124. def _pad_1x1_to_3x3_tensor(kernel1x1):
  125. if kernel1x1 is None:
  126. return 0
  127. else:
  128. return torch.nn.functional.pad(kernel1x1, [1, 1, 1, 1])
  129. @staticmethod
  130. def _fuse_bn_tensor(conv, bn, in_channels=None, device=None):
  131. in_channels = in_channels if in_channels else bn.running_mean.shape[0]
  132. device = device if device else bn.weight.device
  133. if isinstance(conv, nn.Conv2d):
  134. kernel = conv.weight
  135. assert conv.bias is None
  136. else:
  137. assert isinstance(conv, nn.Identity)
  138. kernel_value = np.zeros((in_channels, 1, 1, 1), dtype=np.float32)
  139. for i in range(in_channels):
  140. kernel_value[i, 0, 0, 0] = 1
  141. kernel = torch.from_numpy(kernel_value).to(device)
  142. if isinstance(bn, nn.BatchNorm2d):
  143. running_mean = bn.running_mean
  144. running_var = bn.running_var
  145. gamma = bn.weight
  146. beta = bn.bias
  147. eps = bn.eps
  148. std = (running_var + eps).sqrt()
  149. t = (gamma / std).reshape(-1, 1, 1, 1)
  150. return kernel * t, beta - running_mean * gamma / std
  151. assert isinstance(bn, nn.Identity)
  152. return kernel, torch.zeros(in_channels).to(kernel.device)
  153. def switch_to_deploy(self):
  154. if len(self.fusion_conv) == 0 and len(self.fusion_bn) == 0:
  155. return
  156. kernel, bias = self.get_equivalent_kernel_bias()
  157. self.cheap_operation = nn.Conv2d(in_channels=self.cheap_operation[0].in_channels,
  158. out_channels=self.cheap_operation[0].out_channels,
  159. kernel_size=self.cheap_operation[0].kernel_size,
  160. padding=self.cheap_operation[0].padding,
  161. dilation=self.cheap_operation[0].dilation,
  162. groups=self.cheap_operation[0].groups,
  163. bias=True)
  164. self.cheap_operation.weight.data = kernel
  165. self.cheap_operation.bias.data = bias
  166. self.__delattr__('fusion_conv')
  167. self.__delattr__('fusion_bn')
  168. self.fusion_conv = []
  169. self.fusion_bn = []
  170. self.deploy = True
  171. class RepGhostBottleneck(nn.Module):
  172. """RepGhost bottleneck w/ optional SE"""
  173. def __init__(
  174. self,
  175. in_chs,
  176. mid_chs,
  177. out_chs,
  178. dw_kernel_size=3,
  179. stride=1,
  180. se_ratio=0.0,
  181. shortcut=True,
  182. reparam=True,
  183. reparam_bn=True,
  184. reparam_identity=False,
  185. deploy=False,
  186. ):
  187. super(RepGhostBottleneck, self).__init__()
  188. has_se = se_ratio is not None and se_ratio > 0.0
  189. self.stride = stride
  190. self.enable_shortcut = shortcut
  191. self.in_chs = in_chs
  192. self.out_chs = out_chs
  193. # Point-wise expansion
  194. self.ghost1 = RepGhostModule(
  195. in_chs,
  196. mid_chs,
  197. relu=True,
  198. reparam_bn=reparam and reparam_bn,
  199. reparam_identity=reparam and reparam_identity,
  200. deploy=deploy,
  201. )
  202. # Depth-wise convolution
  203. if self.stride > 1:
  204. self.conv_dw = nn.Conv2d(
  205. mid_chs,
  206. mid_chs,
  207. dw_kernel_size,
  208. stride=stride,
  209. padding=(dw_kernel_size - 1) // 2,
  210. groups=mid_chs,
  211. bias=False,
  212. )
  213. self.bn_dw = nn.BatchNorm2d(mid_chs)
  214. # Squeeze-and-excitation
  215. if has_se:
  216. self.se = SqueezeExcite(mid_chs, se_ratio=se_ratio)
  217. else:
  218. self.se = None
  219. # Point-wise linear projection
  220. self.ghost2 = RepGhostModule(
  221. mid_chs,
  222. out_chs,
  223. relu=False,
  224. reparam_bn=reparam and reparam_bn,
  225. reparam_identity=reparam and reparam_identity,
  226. deploy=deploy,
  227. )
  228. # shortcut
  229. if in_chs == out_chs and self.stride == 1:
  230. self.shortcut = nn.Sequential()
  231. else:
  232. self.shortcut = nn.Sequential(
  233. nn.Conv2d(
  234. in_chs,
  235. in_chs,
  236. dw_kernel_size,
  237. stride=stride,
  238. padding=(dw_kernel_size - 1) // 2,
  239. groups=in_chs,
  240. bias=False,
  241. ),
  242. nn.BatchNorm2d(in_chs),
  243. nn.Conv2d(
  244. in_chs, out_chs, 1, stride=1,
  245. padding=0, bias=False,
  246. ),
  247. nn.BatchNorm2d(out_chs),
  248. )
  249. def forward(self, x):
  250. residual = x
  251. x1 = self.ghost1(x)
  252. if self.stride > 1:
  253. x = self.conv_dw(x1)
  254. x = self.bn_dw(x)
  255. else:
  256. x = x1
  257. if self.se is not None:
  258. x = self.se(x)
  259. # 2nd repghost bottleneck mg
  260. x = self.ghost2(x)
  261. if not self.enable_shortcut and self.in_chs == self.out_chs and self.stride == 1:
  262. return x
  263. return x + self.shortcut(residual)
  264. # def repghost_model_convert(model:torch.nn.Module, save_path=None, do_copy=True):
  265. # """
  266. # taken from from https://github.com/DingXiaoH/RepVGG/blob/main/repvgg.py
  267. # """
  268. # if do_copy:
  269. # model = copy.deepcopy(model)
  270. # for module in model.modules():
  271. # if hasattr(module, 'switch_to_deploy'):
  272. # module.switch_to_deploy()
  273. # if save_path is not None:
  274. # torch.save(model.state_dict(), save_path)
  275. # return model
  276. def repghost_model_convert(model:torch.nn.Module, save_path=None, do_copy=True):
  277. """
  278. taken from from https://github.com/DingXiaoH/RepVGG/blob/main/repvgg.py
  279. """
  280. if do_copy:
  281. model = copy.deepcopy(model)
  282. for module in model.modules():
  283. if hasattr(module, 'switch_to_deploy'):
  284. module.switch_to_deploy()
  285. if save_path is not None:
  286. torch.save(model, save_path)
  287. return model



  1. class C2frepghost(nn.Module):
  2. # CSP Bottleneck with 2 convolutions
  3. def __init__(self, c1, c2, n=1, shortcut=False, g=1, e=0.5): # ch_in, ch_out, number, shortcut, groups, expansion
  4. super().__init__()
  5. self.c = int(c2 * e) # hidden channels
  6. self.cv1 = Conv(c1, 2 * self.c, 1, 1)
  7. self.cv2 = Conv((2 + n) * self.c, c2, 1) #
  8. self.m = nn.ModuleList(RepGhostBottleneck(self.c, self.c, self.c,dw_kernel_size=((3),(3))) for _ in range(n))
  9. def forward(self, x):
  10. y = list(self.cv1(x).split((self.c, self.c), 1))
  11. y.extend(m(y[-1]) for m in self.m)
  12. return self.cv2(torch.cat(y, 1))





  1. # YOLOv5