import torch import torch.nn as nn import torch.nn.functional as F class LaneNet(nn.Module): def __init__(self, num_classes=7): super(LaneNet, self).__init__() # encoder self.conv1 = nn.Conv2d(3, 64, 3, padding=1) self.conv2 = nn.Conv2d(64, 128, 3, padding=1) self.conv3 = nn.Conv2d(128, 256, 3, padding=1) self.conv4 = nn.Conv2d(256, 512, 3, padding=1) # decoder self.deconv1 = nn.ConvTranspose2d(512, 256, 2, stride=2) self.deconv2 = nn.ConvTranspose2d(256, 128, 2, stride=2) self.deconv3 = nn.ConvTranspose2d(128, 64, 2, stride=2) self.classifier = nn.Conv2d(64, num_classes, 1) # key points offset regression self.offset_conv1 = nn.Conv2d(256, 256, 3, padding=1) self.offset_conv2 = nn.Conv2d(256, 256, 3, padding=1) self.offset_conv3 = nn.Conv2d(256, 256, 3, padding=1) self.offset_conv4 = nn.Conv2d(256, 256, 3, padding=1) self.offset_output = nn.Conv2d(256, 2, 1) # initialization for m in self.modules(): if isinstance(m, nn.Conv2d): nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') nn.init.constant_(m.bias, 0) elif isinstance(m, nn.BatchNorm2d): nn.init.constant_(m.weight, 1) nn.init.constant_(m.bias, 0) def forward(self, input): # encoder conv1 = F.relu(self.conv1(input)) conv2 = F.relu(self.conv2(conv1)) conv3 = F.relu(self.conv3(conv2)) conv4 = F.relu(self.conv4(conv3)) # key points offset regression offset1 = F.relu(self.offset_conv1(conv3)) offset2 = F.relu(self.offset_conv2(offset1)) offset3 = F.relu(self.offset_conv3(offset2)) offset4 = F.relu(self.offset_conv4(offset3)) offset_output = self.offset_output(offset4) # decoder deconv1 = F.relu(self.deconv1(conv4)) deconv2 = F.relu(self.deconv2(deconv1 + conv3)) deconv3 = F.relu(self.deconv3(deconv2 + conv2)) score = self.classifier(deconv3 + conv1) return score, offset_output
def generate_keypoints(score_map, offset_map): """ Generate key points from score map and offset map """ # get positions where score > 0 mask = score_map > 0 idx = torch.nonzero(mask) # get offsets for each position offsets = offset_map[0, :, idx[:, 0], idx[:, 1]] offsets = offsets.transpose(0, 1).contiguous() # calculate key points keypoints = idx.float() + offsets keypoints = torch.cat((keypoints, score_map[idx[:, 0], idx[:, 1], None]), dim=1) return keypoints
class LaneLoss(nn.Module): def __init__(self, delta_v=0.1, delta_d=3.0): super(LaneLoss, self).__init__() self.delta_v = delta_v self.delta_d = delta_d def forward(self, score_map, offset_map, gt): # generate key points keypoints = generate_keypoints(score_map, offset_map) # get gt keypoints gt_keypoints = [] for i in range(gt.shape[0]): idx = (gt[i] < 255) gt_keypoints.append(torch.nonzero(idx.float())) # calculate loss loss = 0 count = 0 for i in range(len(keypoints)): if gt_keypoints[i].shape[0] > 0: v_diff = keypoints[i, 0] - gt_keypoints[i][:, 1:2].float() d_diff = keypoints[i, 1] - gt_keypoints[i][:, 0:1].float() mask = (torch.abs(v_diff) < self.delta_v) & (torch.abs(d_diff) < self.delta_d) if torch.sum(mask) > 0: loss += torch.mean(torch.sqrt(torch.sum(torch.pow(v_diff[mask], 2) + torch.pow(d_diff[mask], 2)))) count += 1 if count > 0: loss /= count return loss
# define model and loss function model = LaneNet(num_classes=7) criterion = nn.CrossEntropyLoss() lane_loss = LaneLoss(delta_v=0.1, delta_d=3.0) # training loop for epoch in range(num_epochs): # train model for batch_idx, (image, label) in enumerate(train_loader): optimizer.zero_grad() # forward pass score, offset = model(image) loss_cls = criterion(score, label) loss_offset = lane_loss(score, offset, label) loss = loss_cls + loss_offset # backward pass loss.backward() optimizer.step() # log if (batch_idx+1) % log_interval == 0: print('Epoch [{}/{}], Step [{}/{}], Loss: {:.8f}, Loss_cls: {:.8f}, Loss_offset: {:.8f}' .format(epoch+1, num_epochs, batch_idx+1, len(train_loader), loss.item(), loss_cls.item(), loss_offset.item()))
import torch import torch.nn as nn class LaneDetectionModel(nn.Module): def __init__(self): super(LaneDetectionModel, self).__init__() self.inc = DoubleConv(3, 64) self.down1 = Down(64, 128) self.down2 = Down(128, 256) self.down3 = Down(256, 512) self.down4 = Down(512, 512) self.up1 = Up(1024, 256) self.up2 = Up(512, 128) self.up3 = Up(256, 64) self.up4 = Up(128, 64) self.outc = nn.Conv2d(64, 6, 1) # 6 classes including background self.keypoints = nn.Conv2d(64, 2, 1) # 2 channels for x, y coordinates of keypoints def forward(self, x): x1 = self.inc(x) x2 = self.down1(x1) x3 = self.down2(x2) x4 = self.down3(x3) x5 = self.down4(x4) x = self.up1(x5, x4) x = self.up2(x, x3) x = self.up3(x, x2) x = self.up4(x, x1) logits = self.outc(x) out = torch.softmax(logits, dim=1) keypoints = self.keypoints(x) return out, keypoints class DoubleConv(nn.Module): def __init__(self, in_channels, out_channels): super(DoubleConv, self).__init__() self.conv = nn.Sequential( nn.Conv2d(in_channels, out_channels, 3, padding=1), nn.BatchNorm2d(out_channels), nn.ReLU(inplace=True), nn.Conv2d(out_channels, out_channels, 3, padding=1), nn.BatchNorm2d(out_channels), nn.ReLU(inplace=True) ) def forward(self, x): x = self.conv(x) return x class Down(nn.Module): def __init__(self, in_channels, out_channels): super(Down, self).__init__() self.mpconv = nn.Sequential( nn.MaxPool2d(2), DoubleConv(in_channels, out_channels) ) def forward(self, x): x = self.mpconv(x) return x class Up(nn.Module): def __init__(self, in_channels, out_channels): super(Up, self).__init__() self.up = nn.ConvTranspose2d(in_channels, in_channels // 2, 2, stride=2) self.conv = DoubleConv(in_channels, out_channels) def forward(self, x1, x2): x1 = self.up(x1) diffY = x2.size()[2] - x1.size()[2] diffX = x2.size()[3] - x1.size()[3] x1 = nn.functional.pad(x1, [diffX // 2, diffX - diffX // 2, diffY // 2, diffY - diffY // 2]) x = torch.cat([x2, x1], dim=1) x = self.conv(x) return x def lane_detection_loss(out, keypoints, labels, coords): # out: output of the model with shape (batch_size, num_classes, height, width) # keypoints: output of the model with shape (batch_size, 2, height, width) # labels: input label with shape (batch_size, height, width) # coords: coordinates of keypoints with shape (batch_size, num_keypoints, 2) batch_size, num_classes, height, width = out.shape num_keypoints = coords.shape[1] # classification loss criterion_cls = nn.CrossEntropyLoss() loss_cls = criterion_cls(out, labels) # keypoint regression loss keypoints = keypoints.permute(0, 2, 3, 1).view(batch_size, height * width, 2) coords = coords.view(batch_size, num_keypoints, 2) loss_kp = torch.sqrt(torch.mean(torch.sum((keypoints - coords) ** 2, dim=2))) return loss_cls + loss_kp # example usage model = LaneDetectionModel() optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) criterion = lane_detection_loss for epoch in range(num_epochs): for batch in dataloader: images, labels, keypoints, coords = batch optimizer.zero_grad() out, kps = model(images) loss = criterion(out, kps, labels, coords) loss.backward() optimizer.step()
import torch import torch.nn as nn import torch.nn.functional as F class LaneNet(nn.Module): def __init__(self): super(LaneNet, self).__init__() self.conv1 = nn.Conv2d(3, 64, 3, padding=1) self.conv2 = nn.Conv2d(64, 128, 3, padding=1) self.conv3 = nn.Conv2d(128, 256, 3, padding=1) self.conv4 = nn.Conv2d(256, 512, 3, padding=1) self.conv5 = nn.Conv2d(512, 256, 3, padding=1) self.conv6 = nn.Conv2d(256, 128, 3, padding=1) self.conv7 = nn.Conv2d(128, 32, 3, padding=1) self.conv8 = nn.Conv2d(32, 6, 3, padding=1) self.pool = nn.MaxPool2d(2, stride=2) def forward(self, x): x = F.relu(self.conv1(x)) x = self.pool(F.relu(self.conv2(x))) x = F.relu(self.conv3(x)) x = self.pool(F.relu(self.conv4(x))) x = self.pool(F.relu(self.conv5(x))) x = F.relu(self.conv6(x)) x = F.upsample(x, scale_factor=2, mode="bilinear") x = F.relu(self.conv7(x)) x = F.upsample(x, scale_factor=2, mode="bilinear") out1 = F.softmax(self.conv8(x), dim=1) # 分类输出 out2 = self.conv7(x) # 关键点回归输出 return out1, out2 class LaneLoss(nn.Module): def __init__(self, alpha=1, beta=2): super(LaneLoss, self).__init__() self.alpha = alpha self.beta = beta def forward(self, y_pred, y_true): # 分类损失 loss_cls = -torch.mean(y_true * torch.log(y_pred + 1e-6)) # 交叉熵损失 # 关键点回归损失 mask = (y_true > 0).float() # 生成掩码 loss_reg = torch.mean(torch.sum(mask * (y_pred - y_true)**2, dim=[1, 2])) # 均方误差损失 # 加权求和 loss = self.alpha * loss_cls + self.beta * loss_reg return loss
import cv2 import numpy as np def generate_keypoints(label): keypoints = [] for i in range(1, 7): mask = (label == i).astype(np.uint8) if np.sum(mask) == 0: # 没有检测到该车道线 keypoints.append([-100, -100]) # 标记为负数 else: contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) cnt = contours[0] M = cv2.moments(cnt) cx = int(M['m10'] / M['m00']) cy = int(M['m01'] / M['m00']) keypoints.append([cx, cy]) # 保存中心点坐标 return keypoints
model = LaneNet() criterion = LaneLoss() optimizer = torch.optim.Adam(model.parameters(), lr=0.001) for epoch in range(10): for inputs, labels in data_loader: # 迭代数据集 optimizer.zero_grad() outputs = model(inputs) # 前向传播 y_pred = outputs[0] y_true = F.one_hot(labels, num_classes=7).permute(0, 3, 1, 2).float() # 转换为one-hot编码 loss = criterion(y_pred, y_true) # 计算损失函数 loss.backward() # 反向传播 optimizer.step() # 打印损失函数值和准确率 print("Epoch: {}, Loss: {:.4f}".format(epoch, loss.item()))
import torch import torch.nn as nn import torch.nn.functional as F class LaneDetection(nn.Module): def __init__(self, num_classes=6, num_keypoints=12): super().__init__() self.num_classes = num_classes self.num_keypoints = num_keypoints self.vit = ... # ViT backbone self.classifier = nn.Linear(in_features=..., out_features=num_classes) self.keypoints = nn.Linear(in_features=..., out_features=num_keypoints * 2) # 每个关键点有两个坐标,因此输出维度为num_keypoints * 2 def forward(self, x): feat_map = self.vit(x) logits = self.classifier(feat_map) # 输出分类结果 keypoints = self.keypoints(feat_map) # 输出关键点坐标 return logits, keypoints criterion_cls = nn.CrossEntropyLoss() # 分类任务损失函数 criterion_kp = nn.MSELoss() # 回归任务损失函数 model = LaneDetection() optimizer = torch.optim.Adam(model.parameters(), lr=1e-4) for epoch in range(num_epochs): for imgs, labels in dataloader: imgs, labels = imgs.to(device), labels.to(device) optimizer.zero_grad() logits, keypoints = model(imgs) loss_cls = criterion_cls(logits, labels) loss_kp = criterion_kp(keypoints, generate_keypoints(labels)) # 生成关键点坐标的方法根据自己选择的方法进行调用 loss = loss_cls + loss_kp loss.backward() optimizer.step()
模型的输出shape为一个二元组 (B, num_classes)
和 (B, num_keypoints * 2)
import torch import torch.nn as nn class LaneDetectionNetwork(nn.Module): def __init__(self): super(LaneDetectionNetwork, self).__init__() # 定义骨干网络 self.vit = ... # 定义关键点生成层和分类层 self.keypoints = nn.Sequential( nn.Conv2d(...), nn.ReLU(), nn.Conv2d(...), nn.Sigmoid() ) self.classification = nn.Sequential( nn.Conv2d(...), nn.ReLU(), nn.Conv2d(...) ) def forward(self, x): # 骨干网络 features = self.vit(x) # 关键点生成层 keypoints = self.keypoints(features) # 分类层 classification = self.classification(features) return keypoints, classification # 定义Loss函数 def loss_function(keypoints_pred, keypoints_gt, classification_pred, classification_gt): # 计算关键点回归损失 keypoints_loss = nn.MSELoss()(keypoints_pred, keypoints_gt) # 计算分类损失 classification_loss = nn.CrossEntropyLoss()(classification_pred, classification_gt) # 加权平均得到总的Loss loss = keypoints_loss + 0.1 * classification_loss return loss # 训练网络 net = LaneDetectionNetwork() optimizer = torch.optim.Adam(net.parameters()) keypoints_gt = ... # 真实的关键点坐标 classification_gt = ... # 真实的分类标签 for i in range(num_epochs): optimizer.zero_grad() keypoints_pred, classification_pred = net(x) loss = loss_function(keypoints_pred, keypoints_gt, classification_pred, classification_gt) loss.backward() optimizer.step()
输出结果应该包括分类结果和关键点的坐标。因此,输出的shape应该是一个元组,包括分类结果和关键点坐标的shape。比如,假设我们有6个车道线点需要分类和回归,那么输出的shape应该是(6, 7),其中第一维是点的数量,第二维是分类结果和关键点坐标的数量加一。
import torch import torch.nn as nn import torch.optim as optim class LaneDetectionModel(nn.Module): def __init__(self, num_classes, num_keypoints): super(LaneDetectionModel, self).__init__() self.encoder = nn.Sequential( # define your encoder layers here ) self.pooling = nn.AdaptiveAvgPool2d((1, 1)) self.classifier = nn.Linear(in_features=..., out_features=num_classes) self.keypoint_regressor = nn.Linear(in_features=..., out_features=num_keypoints) def forward(self, x): x = self.encoder(x) x = self.pooling(x) x = x.flatten(start_dim=1) classifications = self.classifier(x) keypoint_coordinates = self.keypoint_regressor(x) return classifications, keypoint_coordinates # define your training loop here
import torch import torch.nn as nn from timm.models.vision_transformer import Block, Mlp from timm.models.registry import register_model class ViT_Line(nn.Module): def __init__(self, in_channels=5, num_classes=7): super(ViT_Line, self).__init__() self.patch_embed = nn.Sequential( nn.Conv2d(in_channels, 64, kernel_size=3, stride=2, padding=1), nn.BatchNorm2d(64), nn.GELU(), nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(64), nn.GELU(), nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1), nn.BatchNorm2d(128), nn.GELU(), nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(128), nn.GELU() ) self.blocks = nn.Sequential( Block( dim=128, num_heads=8, mlp_ratio=4, qkv_bias=True, drop_rate=0.0, attn_drop_rate=0.0, drop_path_rate=0.1, norm_layer=nn.LayerNorm ) for i in range(4) ) self.head = nn.Sequential( nn.Linear(128 * 9 * 9, 512), nn.ReLU(inplace=True), nn.Linear(512, num_classes) ) self.reg_head = nn.Sequential( nn.Linear(128 * 9 * 9, 512), nn.ReLU(inplace=True), nn.Linear(512, 2 * num_classes) ) def forward(self, x, coords): x = self.patch_embed(x) for blk in self.blocks: x = blk(x) x = x.flatten(1) cls_pred = self.head(x) reg_pred = self.reg_head(x) keypoint_pred = [] for i in range(reg_pred.shape[0]): keypoints = [] for j in range(reg_pred.shape[1] // 2): coord = coords[j] x, y = coord[0], coord[1] kpt_x = reg_pred[i][2 * j] + x kpt_y = reg_pred[i][2 * j + 1] + y keypoints.append(torch.stack([kpt_x, kpt_y])) keypoint_pred.append(torch.stack(keypoints)) keypoint_pred = torch.stack(keypoint_pred) return cls_pred, keypoint_pred
import numpy as np
def generate_keypoints(label, stride=8):
keypoints = []
for i in range(1, 7):
indices = np.argwhere(label == i)
if len(indices) > 0:
indices = np.mean(indices, axis=0).astype(int)
x, y = indices[1] * stride, indices[0] * stride
keypoints.append([x, y])
return np.array(keypoints)
函数找到所有类别为i的点,然后求出这些点的均值,即为该类别的中心点坐标。最后返回的是一个坐标形式为[x, y]的列表。
分类loss采用交叉熵,关键点坐标回归采用L1 loss。
class ViTLoss(nn.Module):
def __init__(self):
super(ViTLoss, self).__init__()
self.cls_loss = nn.CrossEntropyLoss()
self.reg_loss = nn.L1Loss()
def forward(self, cls_pred, keypoint_pred, cls_label, keypoint_label):
cls_loss = self.cls_loss(cls_pred, cls_label)
reg_loss = self.reg_loss(keypoint_pred, keypoint_label)
loss = cls_loss + reg_loss
return loss
import torch from torch.utils.data import Dataset, DataLoader from torchvision.transforms import functional as F import numpy as np from PIL import Image import random from tqdm import tqdm class LaneDetectionDataset(Dataset): def __init__(self, data_path): self.data_path = data_path self.data = np.load(data_path, allow_pickle=True) def __len__(self): return len(self.data) def __getitem__(self, idx): point_cloud, label = self.data[idx] point_cloud = point_cloud.astype(np.float32) label = label.astype(np.int64) keypoints = generate_keypoints(label) point_cloud, label, keypoints = random_crop(point_cloud, label, keypoints) point_cloud, label = normalize(point_cloud, label) point_cloud = point_cloud.transpose(2, 0, 1) # [C, H, W] point_cloud = torch.from_numpy(point_cloud) label = torch.from_numpy(label) keypoints = torch.from_numpy(keypoints) return point_cloud, label, keypoints class RandomCrop: def __init__(self, crop_size): self.crop_size = crop_size def __call__(self, point_cloud, label, keypoints): H, W, C = point_cloud.shape y_start = random.randint(0, H - self.crop_size[0]) x_start = random.randint(0, W - self.crop_size[1]) point_cloud = point_cloud[y_start:y_start+self.crop_size[0], x_start:x_start+self.crop_size[1], :] label = label[y_start:y_start+self.crop_size[0], x_start:x_start+self.crop_size[1]] keypoints[:, 0] = keypoints[:, 0] - x_start keypoints[:, 1] = keypoints[:, 1] - y_start return point_cloud, label, keypoints class Normalize: def __call__(self, point_cloud, label): mean = np.mean(point_cloud, axis=(0, 1)) std = np.std(point_cloud, axis=(0, 1)) point_cloud = (point_cloud - mean) / std return point_cloud, label def collate_fn(batch_data): batch_size = len(batch_data) pcs = [] labels = [] keypoints = [] for i in range(batch_size): pc, label, kpts = batch_data[i] pcs.append(pc) labels.append(label) keypoints.append(kpts) pcs = torch.stack(pcs, dim=0) labels = torch.stack(labels, dim=0) keypoints = torch.stack(keypoints, dim=0) return pcs, labels, keypoints def generate_keypoints(label, stride=8): keypoints = [] for i in range(1, 7): indices = np.argwhere(label == i) if len(indices) > 0: indices = np.mean(indices, axis=0).astype(int) x, y = indices[1] * stride, indices[0] * stride keypoints.append([x, y]) return np.array(keypoints) class ViT_Line(nn.Module): def __init__(self, in_channels=5, num_classes=7): super(ViT_Line, self).__init__() self.patch_embed = nn.Sequential( nn.Conv2d(in_channels, 64, kernel_size=3, stride=2, padding=1), nn.BatchNorm2d(64), nn.GELU(), nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(64), nn.GELU(), nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1), nn.BatchNorm2d(128), nn.GELU(), nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(128), nn.GELU() ) self.blocks = nn.Sequential( Block( dim=128, num_heads=8, mlp_ratio=4, qkv_bias=True, drop_rate=0.0, attn_drop_rate=0.0, drop_path_rate=0.1, norm_layer=nn.LayerNorm ) for i in range(4) ) self.head = nn.Sequential( nn.Linear(128 * 9 * 9, 512), nn.ReLU(inplace=True), nn.Linear(512, num_classes) ) self.reg_head = nn.Sequential( nn.Linear(128 * 9 * 9, 512), nn.ReLU(inplace=True), nn.Linear(512, 2 * num_classes) ) def forward(self, x, coords): x = self.patch_embed(x) for blk in self.blocks: x = blk(x) x = x.flatten(1) cls_pred = self.head(x) reg_pred = self.reg_head(x) keypoint_pred = [] for i in range(reg_pred.shape[0]): keypoints = [] for j in range(reg_pred.shape[1] // 2): coord = coords[j] x, y = coord[0], coord[1] kpt_x = reg_pred[i][2 * j] + x kpt_y = reg_pred[i][2 * j + 1] + y keypoints.append(torch.stack([kpt_x, kpt_y])) keypoint_pred.append(torch.stack(keypoints)) keypoint_pred = torch.stack(keypoint_pred) return cls_pred, keypoint_pred class ViTLoss(nn.Module): def __init__(self): super(ViTLoss, self).__init__() self.cls_loss = nn.CrossEntropyLoss() self.reg_loss = nn.L1Loss() def forward(self, cls_pred, keypoint_pred, cls_label, keypoint_label): cls_loss = self.cls_loss(cls_pred, cls_label) reg_loss = self.reg_loss(keypoint_pred, keypoint_label) loss = cls_loss + reg_loss return loss # 数据集 data_path = "data.npy" dataset = LaneDetectionDataset(data_path) # 数据加载器 batch_size = 32 data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn) # 模型 model = ViT_Line() model = model.cuda() # 优化器 optimizer = torch.optim.Adam(model.parameters(), lr=1e-4) # loss函数 criterion = ViTLoss() # 训练 num_epochs = 50 for epoch in range(num_epochs): model.train() running_loss = 0.0 for i, (points, labels, keypoints) in enumerate(tqdm(data_loader)): points = points.cuda() labels = labels.cuda() keypoints = keypoints.cuda() cls_pred, keypoint_pred = model(points, keypoints) loss = criterion(cls_pred, keypoint_pred, labels, keypoints) optimizer.zero_grad() loss.backward() optimizer.step() running_loss += loss.item() print("Epoch [{}/{}], Loss: {:.4f}".format(epoch+1, num_epochs, running_loss / len(data_loader))) # 测试 model.eval() with torch.no_grad(): points, labels, keypoints = dataset[0] points = points.unsqueeze(0) labels = labels.unsqueeze(0) keypoints = keypoints.unsqueeze(0) points = points.cuda() labels = labels.cuda() keypoints = keypoints.cuda() cls_pred, keypoint_pred = model(points, keypoints) print("Classification output shape:", cls_pred.shape) print("Keypoint output shape:", keypoint_pred.shape)
在测试时,构造一个形状为[1, 5, 144, 144]的输入点云数据,使用模型进行推断,得到分类输出和关键点输出的形状。
首先,我们需要将原始点云转化成伪图像形式。这里可以使用Pillar-based方法(参考论文:PointPillars)。Pillar-based方法的思路是将点云分割成小的体素(pillar),每个pillar内部的点云信息再转化为一个 f × f f \times f f×f的矩阵,其中 f f f为特征图分辨率。
接下来,我们可以使用一个基于ViT(Vision Transformer)的骨干网络来提取特征。ViT是一种非常优秀的图像分类模型,可以将输入的图像转化为一组向量。如果我们将伪图像看做是一张图像,那么我们同样可以使用ViT来提取相应的特征。在这里,我们可以使用开源的PyTorch实现:pytorch-image-models。
在特征提取完毕后,我们需要使用检测头网络来对车道线点进行分类和关键点坐标回归。这里可以使用一种常见的检测头设计:Anchor-based方法(参考论文:Faster R-CNN)。Anchor-based方法的核心思想是在特征图上放置一组预定义的anchor框,对于每个anchor框,我们可以计算它和每个车道线点的IoU(Intersection over Union),并将其分配给IoU最大的车道线点。然后,我们可以根据分配的车道线点来计算分类损失和回归损失,从而优化整个网络。
最后,对于loss函数的设计,可以使用交叉熵损失和Smooth L1损失来计算分类损失和回归损失。最终的loss可以由这两部分损失加权求和。关于权重的选择,可以通过调节各部分损失的系数来进行实验。具体实现可以参考以下代码:
import torch import torch.nn as nn import torch.nn.functional as F import torchvision.models as models import torchvision.transforms as transforms import torchvision.transforms.functional as TF from torchvision.models.detection import anchor_utils from torchvision.models.detection.transform import GeneralizedRCNNTransform class LaneDetection(nn.Module): def __init__(self, num_classes=6, num_keypoints=2): super().__init__() # define pillar-based encoder # ... # define ViT backbone self.backbone = models.vit_base_patch16_224(pretrained=True) # modify ViT output layer self.backbone.head = nn.Sequential( nn.LayerNorm(768), nn.Linear(768, num_classes + 4 * num_keypoints) ) # define detection head self.anchor_sizes = ((32,), (64,), (128,), (256,), (512,)) self.aspect_ratios = ((0.5, 1.0, 2.0),) * len(self.anchor_sizes) self.num_anchors = len(self.aspect_ratios[0]) * len(self.anchor_sizes) self.transform = GeneralizedRCNNTransform(min_size=224, max_size=224) self.anchor_generator = anchor_utils.AnchorGenerator( sizes=self.anchor_sizes, aspect_ratios=self.aspect_ratios ) self.rpn_head = nn.Conv2d(768, self.num_anchors * 2, kernel_size=1) self.rpn_bbox = nn.Conv2d(768, self.num_anchors * 4, kernel_size=1) self.classification_loss = nn.CrossEntropyLoss() self.regression_loss = nn.SmoothL1Loss(beta=1.0) def forward(self, x, labels=None): # encode input point cloud into pillar-based representation # ... # generate pseudo image and resize to 224x224 x = F.interpolate(x, size=224, mode='bilinear') # pass through ViT backbone features = self.backbone(x) # split output into classification and regression logits classification, regression = features[..., :6], features[..., 6:] # compute anchor boxes and apply regression image_sizes = [x.shape[-2:]] * x.shape[0] anchors = self.anchor_generator(image_sizes) anchors = anchors.to(x.device) # compute classification and regression losses if self.training and labels is not None: # generate ground-truth anchor targets targets = self.transform.get_ground_truth(image_sizes, labels) targets = [{k: v.to(x.device) for k, v in t.items()} for t in targets] # compute RPN losses classification_loss, regression_loss = self.compute_rpn_loss( classification, regression, anchors, targets) loss = classification_loss + regression_loss return loss else: # decode anchor boxes and apply nms boxes = anchor_utils.decode(anchors, regression) scores = F.softmax(classification, dim=-1) boxes, scores = anchor_utils.clip_boxes_to_image(boxes, scores, image_sizes) boxes, scores = anchor_utils.remove_small_boxes(boxes, scores) # select top-k boxes per image pred_boxes = [] pred_scores = [] for b, s in zip(boxes, scores): topk_idx = torch.topk(s[:, :-1], k=2)[1].flatten() b = b[topk_idx] s = s[topk_idx] pred_boxes.append(b) pred_scores.append(s) return torch.stack(pred_boxes), torch.stack(pred_scores) def compute_rpn_loss(self, classification, regression, anchors, targets): classification_loss = 0 regression_loss = 0 for c, r, a, t in zip(classification, regression, anchors, targets): # compute classification loss c = c.permute(0, 2, 3, 1).flatten(end_dim=-2) c_targets = anchor_utils.label_to_targets(t['labels'], self.num_anchors) classification_loss += self.classification_loss(c, c_targets) # compute regression loss r = r.permute(0, 2, 3, 1).reshape(-1, 4) a = a.reshape(-1, 4) r_targets = anchor_utils.get_regression_targets(a, t['boxes'], self.aspect_ratios) regression_loss += self.regression_loss(r, r_targets) return classification_loss, regression_loss # example usage model = LaneDetection() inputs = torch.randn(2, 144, 144) labels = torch.randint(1, 7, (2, 144, 144)) loss = model(inputs, labels) # train the model with this loss outputs = model(inputs) # inference mode, output shape: (2, 2, 4)
首先,需要对原始点云进行 pillar 化处理,将其转换为三维稀疏张量。这里可以使用 PointPillars 等方法,将点云分割成不同的 pillar,并利用编码器将每个 pillar 编码成一个向量。这些编码向量将作为模型输入。
接下来,可以使用 ViT 等模型作为骨干网络,用于将 pillar 编码图像化。首先,将 pillar 编码向量扩展为类似图像的形状,然后将它们输入到 ViT 中,以获得图像特征。
接下来,可以使用检测头网络进行车道线点的分类和关键点坐标回归。这里建议使用 FCOS 等目标检测方法,对各级特征图进行预测。每个预测包括类别分数和关键点坐标。
针对这一任务,也可以使用关键点监督方法进行训练。可以通过将每个车道线点的标签转换为其对应的关键点标签,来生成关键点。然后可以使用 MSE 损失或者 Smooth L1 损失函数来优化关键点坐标的回归,同时使用交叉熵损失函数来优化分类。
最后,输出结果应该包括分类结果和关键点坐标。分类结果的 shape 为 N×H×W×C(N 为 batch 大小,H 和 W 分别为特征图的高和宽,C 为类别数加背景类别数),关键点坐标的 shape 为 N×H×W×K×2(K 为关键点数,这里为 6)。
以下是一份用 PyTorch 实现的汽车车道线关键点监督网络的示例代码:
import torch import torch.nn as nn from torch.nn.parameter import Parameter import torch.nn.functional as F class PointPillar(nn.Module): def __init__(self, num_pillars, encoder_size, num_input_features): super(PointPillar, self).__init__() self.num_pillars = num_pillars self.encoder_size = encoder_size self.num_input_features = num_input_features self.encoder = nn.Sequential( nn.Linear(num_input_features, encoder_size), nn.ReLU(), nn.Linear(encoder_size, encoder_size), nn.ReLU(), nn.Linear(encoder_size, encoder_size), nn.ReLU(), nn.Linear(encoder_size, encoder_size)) def forward(self, x): x = x.contiguous().view(-1, self.num_input_features) x = self.encoder(x) return x.view(-1, self.num_pillars, self.encoder_size) class ViT(nn.Module): def __init__(self, image_size, patch_size, num_classes, num_input_channels, dim, depth, heads, mlp_dim, dropout): super(ViT, self).__init__() self.image_size = image_size self.patch_size = patch_size self.num_classes = num_classes self.num_input_channels = num_input_channels self.dim = dim self.depth = depth self.heads = heads self.mlp_dim = mlp_dim self.dropout = dropout num_patches = (image_size // patch_size) ** 2 patch_dim = num_input_channels * patch_size ** 2 self.to_patch_embedding = nn.Sequential( nn.Conv2d(num_input_channels, num_input_channels, kernel_size=patch_size, stride=patch_size), nn.Flatten(), nn.Linear(patch_dim, dim), ) self.pos_embedding = Parameter(torch.randn(1, num_patches + 1, dim)) self.cls_token = Parameter(torch.randn(1, 1, dim)) self.dropout_layer = nn.Dropout(dropout) self.transformer = nn.ModuleList([ nn.TransformerEncoderLayer(d_model=dim, nhead=heads, dim_feedforward=mlp_dim, dropout=dropout) for _ in range(depth) ]) self.fc = nn.Linear(dim, num_classes) def forward(self, x): B, C, H, W = x.size() assert H == W == self.image_size x = self.to_patch_embedding(x) cls_tokens = self.cls_token.expand(B, -1, -1) x = torch.cat((cls_tokens, x), dim=1) x += self.pos_embedding[:, :(H // self.patch_size) ** 2 + 1] x = self.dropout_layer(x) for transformer in self.transformer: x = transformer(x) x = x[:, 0] x = self.fc(x) return x class FCOS(nn.Module): def __init__(self, num_classes, num_keypoints, feature_maps, strides, base_size): super(FCOS, self).__init__() self.num_classes = num_classes self.feature_maps = feature_maps self.strides = strides self.base_size = base_size self.num_keypoints = num_keypoints self.cls_heads = nn.ModuleList([ nn.Conv2d(base_size, num_classes, kernel_size=3, stride=1, padding=1) for _ in range(len(feature_maps)) ]) self.reg_heads = nn.ModuleList([ nn.Conv2d(base_size, 4, kernel_size=3, stride=1, padding=1) for _ in range(len(feature_maps)) ]) self.kps_heads = nn.ModuleList([ nn.Conv2d(base_size, 2 * num_keypoints, kernel_size=3, stride=1, padding=1) for _ in range(len(feature_maps)) ]) def forward(self, features): cls_preds = [] reg_preds = [] kps_preds = [] for idx, feature in enumerate(features): cls_pred = self.cls_heads[idx](feature) reg_pred = self.reg_heads[idx](feature) kps_pred = self.kps_heads[idx](feature) cls_preds.append(cls_pred) reg_preds.append(reg_pred) kps_preds.append(kps_pred) return cls_preds, reg_preds, kps_preds class LaneDetector(nn.Module): def __init__(self, num_classes, num_keypoints, pillar_encoder_size, backbone_dim, backbone_depth, backbone_heads, backbone_mlp_dim, backbone_dropout, feature_maps, strides, num_input_channels, base_size): super(LaneDetector, self).__init__() self.pillar_encoder = PointPillar(num_pillars=144, encoder_size=pillar_encoder_size, num_input_features=num_input_channels) self.vit_backbone = ViT(image_size=128, patch_size=32, num_classes=num_classes, num_input_channels=backbone_dim, dim=backbone_dim, depth=backbone_depth, heads=backbone_heads, mlp_dim=backbone_mlp_dim, dropout=backbone_dropout) self.fcos_head = FCOS(num_classes=num_classes, num_keypoints=num_keypoints, feature_maps=feature_maps, strides=strides, base_size=base_size) def forward(self, x): pillar_features = self.pillar_encoder(x) image_features = self.vit_backbone(pillar_features.view(-1, 6, 24, 24)) cls_preds, reg_preds, kps_preds = self.fcos_head(image_features) return cls_preds, reg_preds, kps_preds
模型用于对图像进行分类和关键点回归。在 LaneDetector
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。