赞
踩
Paper:https://arxiv.org/pdf/2105.05003.pdf
code:GitHub - aliyun/conditional-lane-detection
这项工作作为车道线检测任务,比较新颖的是检测头head。并不同于常规的基于bbox进行目标检测,这项工作采用的是检测关键点构造mask,输出形式类似instance segmentation。
代码基于mmdetection框架(v2.0.0)开发。在config/condlanenet/里可以看到有三个文件夹,分别对应作者在三个数据集CurveLanes、CULane、TuSimple上的配置。它们之间最大的区别在于针对CurveLanes设计了RIM。下面我重点分析一下它们共同的一些模块:
采用的是resnet,根据模型的大小可能选择resnet18到resnet101不等
这里采用的是TransConvFPN,在mmdet/models/necks/trans_fpn.py
跟FPN不同点主要在于多了个transformer操作。动机是觉得车道线比较细长,需要有self-attention这样non-local的结构。
也就是在resnet和FPN的中间多了一个transformer模块。
- ## TransConvFPN 不重要的代码部分已省略
- def forward(self, src):
- assert len(src) >= len(self.in_channels)
- src = list(src)
- if self.attention:
- trans_feat = self.trans_head(src[self.trans_idx])
- else:
- trans_feat = src[self.trans_idx]
- inputs = src[:-1]
- inputs.append(trans_feat)
- if len(inputs) > len(self.in_channels):
- for _ in range(len(inputs) - len(self.in_channels)):
- del inputs[0]
- ## 下面内容跟FPN一致
- # build laterals
- laterals = [
- lateral_conv(inputs[i + self.start_level])
- for i, lateral_conv in enumerate(self.lateral_convs)
- ]
- ## 省略
- ## 在TransConvFPN的__init__里
- if self.attention:
- self.trans_head = TransConvEncoderModule(**trans_cfg)
-
- class TransConvEncoderModule(nn.Module):
- def __init__(self, in_dim, attn_in_dims, attn_out_dims, strides, ratios, downscale=True, pos_shape=None):
- super(TransConvEncoderModule, self).__init__()
- if downscale:
- stride = 2
- else:
- stride = 1
- # self.first_conv = ConvModule(in_dim, 2*in_dim, kernel_size=3, stride=stride, padding=1)
- # self.final_conv = ConvModule(attn_out_dims[-1], attn_out_dims[-1], kernel_size=3, stride=1, padding=1)
- attn_layers = []
- for dim1, dim2, stride, ratio in zip(attn_in_dims, attn_out_dims, strides, ratios):
- attn_layers.append(AttentionLayer(dim1, dim2, ratio, stride))
- if pos_shape is not None:
- self.attn_layers = nn.ModuleList(attn_layers)
- else:
- self.attn_layers = nn.Sequential(*attn_layers)
- self.pos_shape = pos_shape
- self.pos_embeds = []
- if pos_shape is not None:
- for dim in attn_out_dims:
- pos_embed = build_position_encoding(dim, pos_shape).cuda()
- self.pos_embeds.append(pos_embed)
-
- def forward(self, src):
- # src = self.first_conv(src)
- if self.pos_shape is None:
- src = self.attn_layers(src)
- else:
- for layer, pos in zip(self.attn_layers, self.pos_embeds):
- src = layer(src, pos.to(src.device))
- # src = self.final_conv(src)
- return src
-
- class AttentionLayer(nn.Module):
- """ Position attention module"""
-
- def __init__(self, in_dim, out_dim, ratio=4, stride=1):
- super(AttentionLayer, self).__init__()
- self.chanel_in = in_dim
- norm_cfg = dict(type='BN', requires_grad=True)
- act_cfg = dict(type='ReLU')
- self.pre_conv = ConvModule(
- in_dim,
- out_dim,
- kernel_size=3,
- stride=stride,
- padding=1,
- norm_cfg=norm_cfg,
- act_cfg=act_cfg,
- inplace=False)
- self.query_conv = nn.Conv2d(
- in_channels=out_dim, out_channels=out_dim // ratio, kernel_size=1)
- self.key_conv = nn.Conv2d(
- in_channels=out_dim, out_channels=out_dim // ratio, kernel_size=1)
- self.value_conv = nn.Conv2d(
- in_channels=out_dim, out_channels=out_dim, kernel_size=1)
- self.final_conv = ConvModule(
- out_dim,
- out_dim,
- kernel_size=3,
- padding=1,
- norm_cfg=norm_cfg,
- act_cfg=act_cfg)
- self.softmax = nn.Softmax(dim=-1)
- self.gamma = nn.Parameter(torch.zeros(1))
-
- def forward(self, x, pos=None):
- """
- inputs :
- x : inpput feature maps( B X C X H X W)
- returns :
- out : attention value + input feature
- attention: B X (HxW) X (HxW)
- """
- x = self.pre_conv(x)
- m_batchsize, _, height, width = x.size()
- if pos is not None:
- x += pos
- proj_query = self.query_conv(x).view(m_batchsize, -1,
- width * height).permute(0, 2, 1)
- proj_key = self.key_conv(x).view(m_batchsize, -1, width * height)
-
- energy = torch.bmm(proj_query, proj_key)
- attention = self.softmax(energy)
- attention = attention.permute(0, 2, 1)
- proj_value = self.value_conv(x).view(m_batchsize, -1, width * height)
- out = torch.bmm(proj_value, attention)
- out = out.view(m_batchsize, -1, height, width)
- proj_value = proj_value.view(m_batchsize, -1, height, width)
- out_feat = self.gamma * out + x
- out_feat = self.final_conv(out_feat)
- return out_feat
用的是CondLaneHead,在mmdet/models/dense_heads/condlanenet_head.py
需要重点分析,跟一般的检测任务差别很大:
首先这个CondLaneHead类的forward方法是直接调用了forward_test,因此要从model去看到neck输出后具体调用的是head的什么函数
-
- # mmdet/models/detectors/condlanenet.py
- def forward(self, img, img_metas=None, return_loss=True, **kwargs):
- ...
- if img_metas is None:
- return self.test_inference(img)
- elif return_loss:
- return self.forward_train(img, img_metas, **kwargs)
- else:
- return self.forward_test(img, img_metas, **kwargs)
-
- def forward_train(self, img, img_metas, **kwargs):
- ...
- if self.head:
- outputs = self.bbox_head.forward_train(output, poses, num_ins)
- ...
-
- def forward_test(self,
- img,
- img_metas,
- benchmark=False,
- hack_seeds=None,
- **kwargs):
- ...
- if self.head:
- seeds, hm = self.bbox_head.forward_test(output, hack_seeds,
- kwargs['thr'])
- ...
所以实际上head的forward是没用到的,直接去看head的forward_train和forward_test就行
forward_train
- # mmdet/models/dense_heads/condlanenet_head.py
- def forward_train(self, inputs, pos, num_ins):
- # x_list是backbone+neck输出后的multi level feature map
- x_list = list(inputs)
- # 这里根据hm_idx参数来取某个level 的feature map,用它去生成heat_map
- # mask同理
- f_hm = x_list[self.hm_idx]
-
- f_mask = x_list[self.mask_idx]
- m_batchsize = f_hm.size()[0]
-
- # f_mask
- z = self.ctnet_head(f_hm)
- hm, params = z['hm'], z['params']
- h_hm, w_hm = hm.size()[2:]
- h_mask, w_mask = f_mask.size()[2:]
- params = params.view(m_batchsize, self.num_classes, -1, h_hm, w_hm)
- mask_branch = self.mask_branch(f_mask)
- reg_branch = mask_branch
- # reg_branch = self.reg_branch(f_mask)
- params = params.permute(0, 1, 3, 4,
- 2).contiguous().view(-1, self.num_gen_params)
-
- pos_tensor = torch.from_numpy(np.array(pos)).long().to(
- params.device).unsqueeze(1)
-
- pos_tensor = pos_tensor.expand(-1, self.num_gen_params)
- mask_pos_tensor = pos_tensor[:, :self.num_mask_params]
- reg_pos_tensor = pos_tensor[:, self.num_mask_params:]
- if pos_tensor.size()[0] == 0:
- masks = None
- feat_range = None
- else:
- mask_params = params[:, :self.num_mask_params].gather(
- 0, mask_pos_tensor)
- masks = self.mask_head(mask_branch, mask_params, num_ins)
- if self.regression:
- reg_params = params[:, self.num_mask_params:].gather(
- 0, reg_pos_tensor)
- regs = self.reg_head(reg_branch, reg_params, num_ins)
- else:
- regs = masks
- # regs = regs.view(sum(num_ins), 1, h_mask, w_mask)
- feat_range = masks.permute(0, 1, 3,
- 2).view(sum(num_ins), w_mask, h_mask)
- feat_range = self.mlp(feat_range)
- return hm, regs, masks, feat_range, [mask_branch, reg_branch]
forward_test
- # mmdet/models/dense_heads/condlanenet_head.py
- def forward_test(
- self,
- inputs,
- hack_seeds=None,
- hm_thr=0.3,
- ):
-
- def parse_pos(seeds, batchsize, num_classes, h, w, device):
- pos_list = [[p['coord'], p['id_class'] - 1] for p in seeds]
- poses = []
- for p in pos_list:
- [c, r], label = p
- pos = label * h * w + r * w + c
- poses.append(pos)
- poses = torch.from_numpy(np.array(
- poses, np.long)).long().to(device).unsqueeze(1)
- return poses
-
- # with Timer("Elapsed time in stage1: %f"): # ignore
- x_list = list(inputs)
- f_hm = x_list[self.hm_idx]
- f_mask = x_list[self.mask_idx]
- m_batchsize = f_hm.size()[0]
- f_deep = f_mask
- m_batchsize = f_deep.size()[0]
- # with Timer("Elapsed time in ctnet_head: %f"): # 0.3ms
- z = self.ctnet_head(f_hm)
- h_hm, w_hm = f_hm.size()[2:]
- h_mask, w_mask = f_mask.size()[2:]
- hm, params = z['hm'], z['params']
- hm = torch.clamp(hm.sigmoid(), min=1e-4, max=1 - 1e-4)
- params = params.view(m_batchsize, self.num_classes, -1, h_hm, w_hm)
- # with Timer("Elapsed time in two branch: %f"): # 0.6ms
- mask_branch = self.mask_branch(f_mask)
- reg_branch = mask_branch
- # reg_branch = self.reg_branch(f_mask)
- params = params.permute(0, 1, 3, 4,
- 2).contiguous().view(-1, self.num_gen_params)
-
- batch_size, num_classes, h, w = hm.size()
- # with Timer("Elapsed time in ct decode: %f"): # 0.2ms
- seeds = self.ctdet_decode(hm, thr=hm_thr)
- if hack_seeds is not None:
- seeds = hack_seeds
- # with Timer("Elapsed time in stage2: %f"): # 0.08ms
- pos_tensor = parse_pos(seeds, batch_size, num_classes, h, w, hm.device)
- pos_tensor = pos_tensor.expand(-1, self.num_gen_params)
- num_ins = [pos_tensor.size()[0]]
- mask_pos_tensor = pos_tensor[:, :self.num_mask_params]
- if self.regression:
- reg_pos_tensor = pos_tensor[:, self.num_mask_params:]
- # with Timer("Elapsed time in stage3: %f"): # 0.8ms
- if pos_tensor.size()[0] == 0:
- return [], hm
- else:
- mask_params = params[:, :self.num_mask_params].gather(
- 0, mask_pos_tensor)
- # with Timer("Elapsed time in mask_head: %f"): #0.3ms
- masks = self.mask_head(mask_branch, mask_params, num_ins)
- if self.regression:
- reg_params = params[:, self.num_mask_params:].gather(
- 0, reg_pos_tensor)
- # with Timer("Elapsed time in reg_head: %f"): # 0.25ms
- regs = self.reg_head(reg_branch, reg_params, num_ins)
- else:
- regs = masks
- feat_range = masks.permute(0, 1, 3,
- 2).view(sum(num_ins), w_mask, h_mask)
- feat_range = self.mlp(feat_range)
- for i in range(len(seeds)):
- seeds[i]['reg'] = regs[0, i:i + 1, :, :]
- m = masks[0, i:i + 1, :, :]
- seeds[i]['mask'] = m
- seeds[i]['range'] = feat_range[i:i + 1]
- return seeds, hm
可以发现,这部分的操作跟论文中描述的差不多。
(等我具体有时间再慢慢弄来看,最近很忙)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。