当前位置:   article > 正文

车道线检测CondLaneNet论文和源码解读

condlane

CondLaneNet: a Top-to-down Lane Detection Framework Based on Conditional Convolution

Paper:https://arxiv.org/pdf/2105.05003.pdf

code:GitHub - aliyun/conditional-lane-detection

论文解读:

一、摘要

这项工作作为车道线检测任务,比较新颖的是检测头head。并不同于常规的基于bbox进行目标检测,这项工作采用的是检测关键点构造mask,输出形式类似instance segmentation。

二、网络结构

  • backbone采用的是普通的CNN,比如ResNet;
  • neck采用的是TransformerFPN,实际上就是考虑到车道线比较长,需要全局注意力,因此就在基础FPN构造金字塔之前对backbone输出的feature进行了Transformer的self-attention操作
  • head分为两部分
    • Proposal head用于检测车道线实例,并为每个实例生成动态的卷积核参数;
    • Conditional shape head利用Proposal head步骤生成的动态卷积核参数和conditional卷积确定车道线的point set。然后根据这些point set进行连线得到最后的车道线结果。

代码解析:

代码基于mmdetection框架(v2.0.0)开发。在config/condlanenet/里可以看到有三个文件夹,分别对应作者在三个数据集CurveLanes、CULane、TuSimple上的配置。它们之间最大的区别在于针对CurveLanes设计了RIM。下面我重点分析一下它们共同的一些模块:

backbone

采用的是resnet,根据模型的大小可能选择resnet18到resnet101不等

neck

这里采用的是TransConvFPN,在mmdet/models/necks/trans_fpn.py

跟FPN不同点主要在于多了个transformer操作。动机是觉得车道线比较细长,需要有self-attention这样non-local的结构。

也就是在resnet和FPN的中间多了一个transformer模块。

  1. ## TransConvFPN 不重要的代码部分已省略
  2. def forward(self, src):
  3. assert len(src) >= len(self.in_channels)
  4. src = list(src)
  5. if self.attention:
  6. trans_feat = self.trans_head(src[self.trans_idx])
  7. else:
  8. trans_feat = src[self.trans_idx]
  9. inputs = src[:-1]
  10. inputs.append(trans_feat)
  11. if len(inputs) > len(self.in_channels):
  12. for _ in range(len(inputs) - len(self.in_channels)):
  13. del inputs[0]
  14. ## 下面内容跟FPN一致
  15. # build laterals
  16. laterals = [
  17. lateral_conv(inputs[i + self.start_level])
  18. for i, lateral_conv in enumerate(self.lateral_convs)
  19. ]
  20. ## 省略
  1. ## 在TransConvFPN的__init__里
  2. if self.attention:
  3. self.trans_head = TransConvEncoderModule(**trans_cfg)
  4. class TransConvEncoderModule(nn.Module):
  5. def __init__(self, in_dim, attn_in_dims, attn_out_dims, strides, ratios, downscale=True, pos_shape=None):
  6. super(TransConvEncoderModule, self).__init__()
  7. if downscale:
  8. stride = 2
  9. else:
  10. stride = 1
  11. # self.first_conv = ConvModule(in_dim, 2*in_dim, kernel_size=3, stride=stride, padding=1)
  12. # self.final_conv = ConvModule(attn_out_dims[-1], attn_out_dims[-1], kernel_size=3, stride=1, padding=1)
  13. attn_layers = []
  14. for dim1, dim2, stride, ratio in zip(attn_in_dims, attn_out_dims, strides, ratios):
  15. attn_layers.append(AttentionLayer(dim1, dim2, ratio, stride))
  16. if pos_shape is not None:
  17. self.attn_layers = nn.ModuleList(attn_layers)
  18. else:
  19. self.attn_layers = nn.Sequential(*attn_layers)
  20. self.pos_shape = pos_shape
  21. self.pos_embeds = []
  22. if pos_shape is not None:
  23. for dim in attn_out_dims:
  24. pos_embed = build_position_encoding(dim, pos_shape).cuda()
  25. self.pos_embeds.append(pos_embed)
  26. def forward(self, src):
  27. # src = self.first_conv(src)
  28. if self.pos_shape is None:
  29. src = self.attn_layers(src)
  30. else:
  31. for layer, pos in zip(self.attn_layers, self.pos_embeds):
  32. src = layer(src, pos.to(src.device))
  33. # src = self.final_conv(src)
  34. return src
  35. class AttentionLayer(nn.Module):
  36. """ Position attention module"""
  37. def __init__(self, in_dim, out_dim, ratio=4, stride=1):
  38. super(AttentionLayer, self).__init__()
  39. self.chanel_in = in_dim
  40. norm_cfg = dict(type='BN', requires_grad=True)
  41. act_cfg = dict(type='ReLU')
  42. self.pre_conv = ConvModule(
  43. in_dim,
  44. out_dim,
  45. kernel_size=3,
  46. stride=stride,
  47. padding=1,
  48. norm_cfg=norm_cfg,
  49. act_cfg=act_cfg,
  50. inplace=False)
  51. self.query_conv = nn.Conv2d(
  52. in_channels=out_dim, out_channels=out_dim // ratio, kernel_size=1)
  53. self.key_conv = nn.Conv2d(
  54. in_channels=out_dim, out_channels=out_dim // ratio, kernel_size=1)
  55. self.value_conv = nn.Conv2d(
  56. in_channels=out_dim, out_channels=out_dim, kernel_size=1)
  57. self.final_conv = ConvModule(
  58. out_dim,
  59. out_dim,
  60. kernel_size=3,
  61. padding=1,
  62. norm_cfg=norm_cfg,
  63. act_cfg=act_cfg)
  64. self.softmax = nn.Softmax(dim=-1)
  65. self.gamma = nn.Parameter(torch.zeros(1))
  66. def forward(self, x, pos=None):
  67. """
  68. inputs :
  69. x : inpput feature maps( B X C X H X W)
  70. returns :
  71. out : attention value + input feature
  72. attention: B X (HxW) X (HxW)
  73. """
  74. x = self.pre_conv(x)
  75. m_batchsize, _, height, width = x.size()
  76. if pos is not None:
  77. x += pos
  78. proj_query = self.query_conv(x).view(m_batchsize, -1,
  79. width * height).permute(0, 2, 1)
  80. proj_key = self.key_conv(x).view(m_batchsize, -1, width * height)
  81. energy = torch.bmm(proj_query, proj_key)
  82. attention = self.softmax(energy)
  83. attention = attention.permute(0, 2, 1)
  84. proj_value = self.value_conv(x).view(m_batchsize, -1, width * height)
  85. out = torch.bmm(proj_value, attention)
  86. out = out.view(m_batchsize, -1, height, width)
  87. proj_value = proj_value.view(m_batchsize, -1, height, width)
  88. out_feat = self.gamma * out + x
  89. out_feat = self.final_conv(out_feat)
  90. return out_feat

head

用的是CondLaneHead,在mmdet/models/dense_heads/condlanenet_head.py

需要重点分析,跟一般的检测任务差别很大:

首先这个CondLaneHead类的forward方法是直接调用了forward_test,因此要从model去看到neck输出后具体调用的是head的什么函数

  1. # mmdet/models/detectors/condlanenet.py
  2. def forward(self, img, img_metas=None, return_loss=True, **kwargs):
  3. ...
  4. if img_metas is None:
  5. return self.test_inference(img)
  6. elif return_loss:
  7. return self.forward_train(img, img_metas, **kwargs)
  8. else:
  9. return self.forward_test(img, img_metas, **kwargs)
  10. def forward_train(self, img, img_metas, **kwargs):
  11. ...
  12. if self.head:
  13. outputs = self.bbox_head.forward_train(output, poses, num_ins)
  14. ...
  15. def forward_test(self,
  16. img,
  17. img_metas,
  18. benchmark=False,
  19. hack_seeds=None,
  20. **kwargs):
  21. ...
  22. if self.head:
  23. seeds, hm = self.bbox_head.forward_test(output, hack_seeds,
  24. kwargs['thr'])
  25. ...

 所以实际上head的forward是没用到的,直接去看head的forward_train和forward_test就行

forward_train

  1. # mmdet/models/dense_heads/condlanenet_head.py
  2. def forward_train(self, inputs, pos, num_ins):
  3. # x_list是backbone+neck输出后的multi level feature map
  4. x_list = list(inputs)
  5. # 这里根据hm_idx参数来取某个level 的feature map,用它去生成heat_map
  6. # mask同理
  7. f_hm = x_list[self.hm_idx]
  8. f_mask = x_list[self.mask_idx]
  9. m_batchsize = f_hm.size()[0]
  10. # f_mask
  11. z = self.ctnet_head(f_hm)
  12. hm, params = z['hm'], z['params']
  13. h_hm, w_hm = hm.size()[2:]
  14. h_mask, w_mask = f_mask.size()[2:]
  15. params = params.view(m_batchsize, self.num_classes, -1, h_hm, w_hm)
  16. mask_branch = self.mask_branch(f_mask)
  17. reg_branch = mask_branch
  18. # reg_branch = self.reg_branch(f_mask)
  19. params = params.permute(0, 1, 3, 4,
  20. 2).contiguous().view(-1, self.num_gen_params)
  21. pos_tensor = torch.from_numpy(np.array(pos)).long().to(
  22. params.device).unsqueeze(1)
  23. pos_tensor = pos_tensor.expand(-1, self.num_gen_params)
  24. mask_pos_tensor = pos_tensor[:, :self.num_mask_params]
  25. reg_pos_tensor = pos_tensor[:, self.num_mask_params:]
  26. if pos_tensor.size()[0] == 0:
  27. masks = None
  28. feat_range = None
  29. else:
  30. mask_params = params[:, :self.num_mask_params].gather(
  31. 0, mask_pos_tensor)
  32. masks = self.mask_head(mask_branch, mask_params, num_ins)
  33. if self.regression:
  34. reg_params = params[:, self.num_mask_params:].gather(
  35. 0, reg_pos_tensor)
  36. regs = self.reg_head(reg_branch, reg_params, num_ins)
  37. else:
  38. regs = masks
  39. # regs = regs.view(sum(num_ins), 1, h_mask, w_mask)
  40. feat_range = masks.permute(0, 1, 3,
  41. 2).view(sum(num_ins), w_mask, h_mask)
  42. feat_range = self.mlp(feat_range)
  43. return hm, regs, masks, feat_range, [mask_branch, reg_branch]

forward_test

  1. # mmdet/models/dense_heads/condlanenet_head.py
  2. def forward_test(
  3. self,
  4. inputs,
  5. hack_seeds=None,
  6. hm_thr=0.3,
  7. ):
  8. def parse_pos(seeds, batchsize, num_classes, h, w, device):
  9. pos_list = [[p['coord'], p['id_class'] - 1] for p in seeds]
  10. poses = []
  11. for p in pos_list:
  12. [c, r], label = p
  13. pos = label * h * w + r * w + c
  14. poses.append(pos)
  15. poses = torch.from_numpy(np.array(
  16. poses, np.long)).long().to(device).unsqueeze(1)
  17. return poses
  18. # with Timer("Elapsed time in stage1: %f"): # ignore
  19. x_list = list(inputs)
  20. f_hm = x_list[self.hm_idx]
  21. f_mask = x_list[self.mask_idx]
  22. m_batchsize = f_hm.size()[0]
  23. f_deep = f_mask
  24. m_batchsize = f_deep.size()[0]
  25. # with Timer("Elapsed time in ctnet_head: %f"): # 0.3ms
  26. z = self.ctnet_head(f_hm)
  27. h_hm, w_hm = f_hm.size()[2:]
  28. h_mask, w_mask = f_mask.size()[2:]
  29. hm, params = z['hm'], z['params']
  30. hm = torch.clamp(hm.sigmoid(), min=1e-4, max=1 - 1e-4)
  31. params = params.view(m_batchsize, self.num_classes, -1, h_hm, w_hm)
  32. # with Timer("Elapsed time in two branch: %f"): # 0.6ms
  33. mask_branch = self.mask_branch(f_mask)
  34. reg_branch = mask_branch
  35. # reg_branch = self.reg_branch(f_mask)
  36. params = params.permute(0, 1, 3, 4,
  37. 2).contiguous().view(-1, self.num_gen_params)
  38. batch_size, num_classes, h, w = hm.size()
  39. # with Timer("Elapsed time in ct decode: %f"): # 0.2ms
  40. seeds = self.ctdet_decode(hm, thr=hm_thr)
  41. if hack_seeds is not None:
  42. seeds = hack_seeds
  43. # with Timer("Elapsed time in stage2: %f"): # 0.08ms
  44. pos_tensor = parse_pos(seeds, batch_size, num_classes, h, w, hm.device)
  45. pos_tensor = pos_tensor.expand(-1, self.num_gen_params)
  46. num_ins = [pos_tensor.size()[0]]
  47. mask_pos_tensor = pos_tensor[:, :self.num_mask_params]
  48. if self.regression:
  49. reg_pos_tensor = pos_tensor[:, self.num_mask_params:]
  50. # with Timer("Elapsed time in stage3: %f"): # 0.8ms
  51. if pos_tensor.size()[0] == 0:
  52. return [], hm
  53. else:
  54. mask_params = params[:, :self.num_mask_params].gather(
  55. 0, mask_pos_tensor)
  56. # with Timer("Elapsed time in mask_head: %f"): #0.3ms
  57. masks = self.mask_head(mask_branch, mask_params, num_ins)
  58. if self.regression:
  59. reg_params = params[:, self.num_mask_params:].gather(
  60. 0, reg_pos_tensor)
  61. # with Timer("Elapsed time in reg_head: %f"): # 0.25ms
  62. regs = self.reg_head(reg_branch, reg_params, num_ins)
  63. else:
  64. regs = masks
  65. feat_range = masks.permute(0, 1, 3,
  66. 2).view(sum(num_ins), w_mask, h_mask)
  67. feat_range = self.mlp(feat_range)
  68. for i in range(len(seeds)):
  69. seeds[i]['reg'] = regs[0, i:i + 1, :, :]
  70. m = masks[0, i:i + 1, :, :]
  71. seeds[i]['mask'] = m
  72. seeds[i]['range'] = feat_range[i:i + 1]
  73. return seeds, hm

可以发现,这部分的操作跟论文中描述的差不多。

(等我具体有时间再慢慢弄来看,最近很忙)

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号