赞
踩
lanenet模型的复现文章有很多,原文Towards End-to-End Lane Detection: an Instance Segmentation Approach的代码是基于tensorflow1.0版本实现的。本文将在以tensorflow2.0版本为后端的keras上对lanenet模型部分功能进行复现,实现细节和原文有出入,但是实现目的和原文是一致的。希望读者能够为本文提出宝贵的建议。
论文中提出了一种端到端的车道线检测算法,包括LaneNet和H-Net两个网络模型。其中,LaneNet是一种将语义分割和对像素进行向量表示结合起来的多任务模型,目的是将不同车道线实例化;H-Net是由卷积层和全连接层组成的网络模型,负责预测转换矩阵H,目的是对属于同一车道线的像素点进行回归。下图为论文中提到的网络结构,下文将具体介绍每一部分实现的功能。
下图为LaneNet的网络结构,是有两个解码分支的多任务模型,Segmentation branch 负责对输入图像进行语义分割(对像素进行二分类,判断像素属于车道线还是背景);Embedding branch 对像素进行嵌入式表示,论文将嵌入式向量的维度设置为4,训练得到的 embedding 向量用于聚类。最后将两个分支的结果进行结合利用 Mean-Shift 算法进行聚类,得到实例分割的结果。
LaneNet的输出是每条车道线的像素集合,还需要根据这些像素点回归出一条车道线。传统的做法是将图片投影到 bird’s-eye view 中,然后使用 2 阶或者 3 阶多项式进行拟合。在这种方法中,变换矩阵 H 只被计算一次,所有的图片使用的是相同的变换矩阵,这会导致地平面(山地,丘陵)变化下的误差。
为了解决这个问题,论文训练了一个可以预测变换矩阵 H 的神经网络 H-Net,网络的输入是图片,输出是变换矩阵 H:
由上式可以看出,转置矩阵 H 只有6个参数,因此H-Net的输出是一个 6 维的向量。H-Net 由 6 层普通卷积网络和一层全连接网络构成,其网络结构如图所示:
Curve fitting的过程就是通过坐标 y 去重新预测坐标 x 的过程:
注:
LaneNet论文讲解的文章有很多,在这里主要对LaneNet的论文的核心内容进行描述。
想对原文有更多了解的读者可参考:
https://www.jianshu.com/p/c6d38d648509
https://www.cnblogs.com/xuanyuyt/p/11523192.html
本节主要讲论文中没有提到的实验细节,主要对LaneNet网络中的LaneNet分支进行复现(思路复现),tensorflow_keras实现内容包括数据集处理、数据预处理、网络搭建、损失函数、学习率调整机制。网络的基础框架没有沿用论文中提到的Enet,复现时用到的基础网络是Unet编解码器网络。
注:文中提到的代码不能直接使用,需根据自己的需求进行简单修改
论文中提到的数据集为TuSimple数据集,此数据集包含三个文件train_set.zip、 test_set.zip 、test_label.json,tusimple数据集是点标注数据集,为保证之后的模型训练,在将数据投入网络之前需要对数据集进行处理。
文件格式为:
- import cv2
- import json
- import numpy as np
- import os
- base_path = "F:\\train_set\\"
-
- file = open(base_path + 'label_data_0313.json', 'r')
- image_num=0
- for line in file.readlines():
- data=json.loads(line)
- image = cv2.imread(os.path.join(base_path, data['raw_file']))
- #image=cv2.imread(data['raw_file'])
- binaryimage=np.zeros((image.shape[0],image.shape[1],1),np.uint8)
- instanceimage=binaryimage.copy()
- arr_width=data['lanes']
- arr_height=data['h_samples']
- width_num=len(arr_width)
- height_num=len(arr_height)
- for i in range(height_num):
- lane_hist=0
- for j in range(width_num):
- if arr_width[j][i-1]>0 and arr_width[j][i]>0:
- binaryimage[int(arr_height[i]),int(arr_width[j][i])]=255
- instanceimage[int(arr_height[i]),int(arr_width[j][i])]=lane_hist
- if i>0:
- cv2.line(binaryimage, (int(arr_width[j][i-1]),int(arr_height[i-1])), (int(arr_width[j][i]),int(arr_height[i])), 255, 10)
- cv2.line(instanceimage,(int(arr_width[j][i-1]),int(arr_height[i-1])), (int(arr_width[j][i]),int(arr_height[i])), lane_hist, 10)
- lane_hist+=50
-
- string1="H:\\unet-keras\\TUsimple\\label_instance\\"+data['raw_file'][6:]+str(image_num)+".png"
- string2="F:\\train_set\\png1\\"+str(image_num)+".png"
- string3="F:\\train_set\\image\\"+str(image_num)+".jpg"
- cv2.imwrite(string1,binaryimage)
- cv2.imwrite(string2,instanceimage)
- cv2.imwrite(string3,image)
- image_num=image_num+1
- #if image_num==500:
- #break
- file.close()
训练时需要通过txt文件提取网络训练数据,根据网络实现具体目的构建txt,由于网络的结构时单输入,两输出,因此三个图片途径为一条数据,分别是真实地面、二值分割标签和车道线实例标签。
生成训练数据文件的代码
- import os
- import re
- base_path = "H:/unet-keras/TUsimple\Data/"
- img_path= "H:/unet-keras/TUsimple/Data/image"
- label_path='H:/unet-keras/TUsimple/Data/label_instance'
- seg_path='H:/unet-keras/TUsimple/Data/label_binary'
- with open(base_path + 'train_seg_instance.txt', 'w') as f:
- images = os.listdir(img_path)
- labels = os.listdir(label_path)
- seg_labels=os.listdir(seg_path)
- for dir in images: #0313-1 0313-2
- #image
- dir1=img_path+'/'+dir
- Dir= os.listdir(dir1)
- Dir.sort(key=lambda i: int(re.match(r'(\d+)', i).group()))
- #label
- label_dir1 = label_path + '/' + dir
- #print(label_dir1) #H:\unet-keras\TUsimple\Data\label_instance\0313-1
- label_dir = os.listdir(label_dir1)
- label_dir.sort(key=lambda i: int(re.match(r'(\d+)', i).group()))
- #print(label_dir)
- seg_label_dir1 = seg_path + '/' + dir
- # print(label_dir1) #H:\unet-keras\TUsimple\Data\label_instance\0313-1
- seg_label_dir = os.listdir(seg_label_dir1)
- seg_label_dir.sort(key=lambda i: int(re.match(r'(\d+)', i).group()))
- for filenames in Dir: #60.... 20...
- #print(filenames)
- filenames1=dir1+'/'+filenames
- #print(filenames1)
- Filenames=os.listdir(filenames1)
- Filenames.sort(key=lambda i: int(re.match(r'(\d+)', i).group()))
- #print(filenames)
- #print(Filenames[15:])
- a=[]
- for filename in Filenames[15:]:
- #print(filename)
- out_path=filenames1+'/'+filename #H:\unet-keras\TUsimple\Data\image\0313-1\60\1.jpg
- a.append(out_path)
- #print(out_path)
- label_out_path=label_dir1+'/'+filenames+'.png'
- seg_label_out_path=seg_label_dir1+'/'+filenames+'.png'
- #print(label_out_path)
- a.append(label_out_path)
- a.append(seg_label_out_path)
- path=' '.join(a)
- #print(path)
- #print(a)
- f.write(path+'\n')
可将数据集按8:2划分为训练集和测试集:
- import numpy.random
- '''划分数据集'''
- n=0
- from random import shuffle
- with open(r"H:\unet-keras\TUsimple\Data\train_seg_instance.txt","r", encoding='UTF-8') as f:
- with open(r"H:\unet-keras\TUsimple\Data\train_instance_tusimple0.8.txt", "w", encoding='UTF-8') as f1:
- with open(r"H:\unet-keras\TUsimple\Data\train_instance_tusimple0.2.txt", "w",
- encoding='UTF-8') as f2:
- if n == 0:
- train = f.readlines()
- shuffle(train)
- split=0.8
- num = len(train)
- train_num = int(num * split)
- # for j in train:
- # print(j)
- #print(len(train))
- for line in train[:train_num]:
- f1.writelines(line)
- for line in train[train_num:]:
- f2.writelines(line)
-
- n = n + 1
数据预处理为网络的输入做准备工作,本文中用到Unet网络做编解码器网络,输入尺寸为
(256,256,3)即(H,W,3),但是原图为256*128*3,为了使图片不失真,这里将图片进行白条边框处理,代码如下
- def letterbox_image(self,image, label, instance_label,size):
- label = Image.fromarray(np.array(label))
- instance_label = Image.fromarray(np.array(instance_label))
- '''resize image with unchanged aspect ratio using padding'''
- iw, ih = image.size
- w, h = size
- scale = min(w / iw, h / ih)
- nw = int(iw * scale)
- nh = int(ih * scale)
-
- image = image.resize((nw, nh), Image.BICUBIC)
- new_image = Image.new('RGB', size, (252, 252, 252))
- new_image.paste(image, ((w - nw) // 2, (h - nh) // 2))
-
- label = label.resize((nw, nh), Image.NEAREST)
- instance_label = instance_label.resize((nw, nh), Image.NEAREST)
- new_label = Image.new('L', size, (252))
- new_label.paste(label, ((w - nw) // 2, (h - nh) // 2))
-
- new_instance_label = Image.new('L', size, (252))
- new_instance_label.paste(instance_label, ((w - nw) // 2, (h - nh) // 2))
- return new_image, new_label, new_instance_label
'运行
左边为地面真实图像,右边为二值分割图像标签,即lanenet的输入图像(Input)和二值分割分支的输出标签(y_true),解释一下这里也对实例分割标签进行了白条调整,但是嵌入分支网络的输出的不是图像,而是每个像素点的多向量表示矩阵。
在上边的白条处理部分可以看出,对不同的训练任务,标签处理的方式有很大的差别。二值分割分支标签处理用one_hot处理,例如:[0,1]代表背景像素,[1,0]代表车道线像素,可以看出非0即1,这样表示的好处是各类别之间不存在任何的相关性。
- png = np.array(png)
- # 转化成one_hot的形式
- seg_labels = np.zeros_like(png)
- seg_labels[png <= 127.5] = 1
- seg_labels = np.eye(self.num_classes + 1)[seg_labels.reshape([-1])]
- #把白条部分也加上,在之后的损失函数计算中会将其去除
- seg_labels = seg_labels.reshape((int(self.image_size[1]), int(self.image_size[0]), 2 + 1))
嵌入分支主要目的是用多维向量来表示每一个像素点,能够体现各像素点之间的关系,方便之后对像素点聚类,具体实现在损失函数部分会提到,这里只对该分支的标签处理进行解释,对于需要表示各类别之间关系的数据,one_hot编码就不适用了,我们将用到标签编码,例[1,2,3,4],图片像素中所有为1的数值作为一类,2,3,4也分别表示一类,这样表示的好处在于能够直接讲同一类的像素点聚为一个类别。
有了这样的想法,我们就要对数据进行特殊的处理,数据集处理部分的代码中,将每隔50像素值作为一个车道线种类,对此可设置标签编码
- instance_png=np.array(instance_png)
- instance_label = np.zeros_like(instance_png)
- instance_label[instance_png < 255] = 4
- instance_label[instance_png == 200] = 3
- instance_label[instance_png == 150] = 2
- instance_label[instance_png == 100] = 1
- instance_label[instance_png == 50] = 0
LaneNet网络是多任务网络,基于此本文用到Unet网络为基础网络,单输入(真实路面图像256,256,3),两输出(二值分割图像(256,256,2)和嵌入式矩阵(256,256,5)),了解了网络组成,就很容易搭建了:
- from tensorflow.keras.layers import *
- from tensorflow.keras.models import Model
- import tensorflow.keras.backend as K
- class UNET_double(object):
- def __init__(self, input_shape, category_classes, color_classes):
- # default input_shape = (width, height, channel)
- self.input_shape = input_shape
- self.category_classes = category_classes
- self.color_classes = color_classes
-
-
- def vgg(self, inputs):
- # Conv->ReLU->BN->Pool
- bn_axis = 3
- x = Conv2D(64, (3, 3), padding='same')(inputs)
- x = BatchNormalization(axis=bn_axis)(x)
- x = Activation('relu')(x)
- x = Conv2D(64, (3, 3), padding='same')(x)
- x = BatchNormalization(axis=bn_axis)(x)
- x = Activation('relu')(x)
- feat1 = x
- x = MaxPooling2D((2, 2), strides=(2, 2))(x)
-
- # Block 2
- x = Conv2D(128, (3, 3), padding='same')(x)
- x = BatchNormalization(axis=bn_axis)(x)
- x = Activation('relu')(x)
- x = Conv2D(128, (3, 3), padding='same')(x)
- x = BatchNormalization(axis=bn_axis)(x)
- x = Activation('relu')(x)
- feat2 = x
- x = MaxPooling2D((2, 2), strides=(2, 2))(x)
-
- # Block 3
- x = Conv2D(256, (3, 3), padding='same')(x)
- x = BatchNormalization(axis=bn_axis)(x)
- x = Activation('relu')(x)
- x = Conv2D(256, (3, 3), padding='same')(x)
- x = BatchNormalization(axis=bn_axis)(x)
- x = Activation('relu')(x)
- x = Conv2D(256, (3, 3), padding='same')(x)
- x = BatchNormalization(axis=bn_axis)(x)
- x = Activation('relu')(x)
- feat3 = x
- x = MaxPooling2D((2, 2), strides=(2, 2))(x)
-
- # Block 4
- x = Conv2D(512, (3, 3), padding='same')(x)
- x = BatchNormalization(axis=bn_axis)(x)
- x = Activation('relu')(x)
- x = Conv2D(512, (3, 3), padding='same')(x)
- x = BatchNormalization(axis=bn_axis)(x)
- x = Activation('relu')(x)
- x = Conv2D(512, (3, 3), padding='same')(x)
- x = BatchNormalization(axis=bn_axis)(x)
- x = Activation('relu')(x)
- feat4 = x
- x = MaxPooling2D((2, 2), strides=(2, 2))(x)
-
- # Block 5
- x = Conv2D(512, (3, 3), padding='same')(x)
- x = BatchNormalization(axis=bn_axis)(x)
- x = Activation('relu')(x)
- x = Conv2D(512, (3, 3), padding='same')(x)
- x = BatchNormalization(axis=bn_axis)(x)
- x = Activation('relu')(x)
- x = Conv2D(512, (3, 3), padding='same')(x)
- x = BatchNormalization(axis=bn_axis)(x)
- x = Activation('relu')(x)
-
- return x,feat1,feat2,feat3,feat4
- def binery(self, inputs):
- x, feat1, feat2, feat3, feat4 = inputs
- channels = [64, 128, 256, 512]
- # Conv->ReLU->BN->Pool
-
- P5_up = UpSampling2D(size=(2, 2))(x)
- P4 = Concatenate(axis=3)([feat4, P5_up]) # 28,28,512 28,28,512
- P4 = Conv2D(channels[3], 3, padding='same', kernel_initializer='he_normal')(P4)
- P4 = Activation('relu')(P4)
- P4 = BatchNormalization()(P4)
- P4 = Conv2D(channels[3], 3, padding='same', kernel_initializer='he_normal')(P4)
- P4 = Activation('relu')(P4)
- P4 = BatchNormalization()(P4)
-
- P4_up = UpSampling2D(size=(2, 2))(P4) # 56,56,512
- P3 = Concatenate(axis=3)([feat3, P4_up])
- P3 = Conv2D(channels[2], 3, padding='same', kernel_initializer='he_normal')(P3)
- P3 = Activation('relu')(P3)
- P3 = BatchNormalization()(P3)
- P3 = Conv2D(channels[2], 3, padding='same', kernel_initializer='he_normal')(P3)
- P3 = Activation('relu')(P3)
- P3 = BatchNormalization()(P3)
-
- P3_up = UpSampling2D(size=(2, 2))(P3)
- P2 = Concatenate(axis=3)([feat2, P3_up])
- P2 = Conv2D(channels[1], 3, padding='same', kernel_initializer='he_normal')(P2)
- P2 = Activation('relu')(P2)
- P2 = BatchNormalization()(P2)
- P2 = Conv2D(channels[1], 3, padding='same', kernel_initializer='he_normal')(P2)
- P2 = Activation('relu')(P2)
- P2 = BatchNormalization()(P2)
-
- P2_up = UpSampling2D(size=(2, 2))(P2)
- P1 = Concatenate(axis=3)([feat1, P2_up])
- P1 = Conv2D(channels[0], 3, padding='same', kernel_initializer='he_normal')(P1)
- P1 = Activation('relu')(P1)
- P1 = BatchNormalization()(P1)
- P1 = Conv2D(channels[0], 3, padding='same', kernel_initializer='he_normal')(P1)
- P1 = Activation('relu')(P1)
- P1 = BatchNormalization()(P1)
- # softmax classifier
- out1 = Conv2D(self.category_classes, 1, activation="softmax",name='category_output')(P1)
-
- return out1
-
- def instance(self, inputs):
- # Conv->ReLU->BN->Pool
- x, feat1, feat2, feat3, feat4=inputs
- channels = [64, 128, 256, 512]
-
- P5_up = UpSampling2D(size=(2, 2))(x)
- P4 = Concatenate(axis=3)([feat4, P5_up]) # 28,28,512 28,28,512
- P4 = Conv2D(channels[3], 3, padding='same', kernel_initializer='he_normal')(P4)
- P4 = Activation('relu')(P4)
- P4 = BatchNormalization()(P4)
- P4 = Conv2D(channels[3], 3, padding='same', kernel_initializer='he_normal')(P4)
- P4 = Activation('relu')(P4)
- P4 = BatchNormalization()(P4)
-
- P4_up = UpSampling2D(size=(2, 2))(P4) # 56,56,512
- P3 = Concatenate(axis=3)([feat3, P4_up])
- P3 = Conv2D(channels[2], 3, padding='same', kernel_initializer='he_normal')(P3)
- P3 = Activation('relu')(P3)
- P3 = BatchNormalization()(P3)
- P3 = Conv2D(channels[2], 3, padding='same', kernel_initializer='he_normal')(P3)
- P3 = Activation('relu')(P3)
- P3 = BatchNormalization()(P3)
-
- P3_up = UpSampling2D(size=(2, 2))(P3)
- P2 = Concatenate(axis=3)([feat2, P3_up])
- P2 = Conv2D(channels[1], 3, padding='same', kernel_initializer='he_normal')(P2)
- P2 = Activation('relu')(P2)
- P2 = BatchNormalization()(P2)
- P2 = Conv2D(channels[1], 3, padding='same', kernel_initializer='he_normal')(P2)
- P2 = Activation('relu')(P2)
- P2 = BatchNormalization()(P2)
-
- P2_up = UpSampling2D(size=(2, 2))(P2)
- P1 = Concatenate(axis=3)([feat1, P2_up])
- P1 = Conv2D(channels[0], 3, padding='same', kernel_initializer='he_normal')(P1)
- P1 = Activation('relu')(P1)
- P1 = BatchNormalization()(P1)
- P1 = Conv2D(channels[0], 3, padding='same', kernel_initializer='he_normal')(P1)
- P1 = Activation('relu')(P1)
- P1 = BatchNormalization()(P1)
- out2 = Conv2D(self.color_classes, 1,name='color_output')(P1) #256,256,5
-
- return out2
-
- def build_model(self):
- input_shape = self.input_shape
- inputs = Input(shape=input_shape)
- vgg = self.vgg(inputs)
- binery=self.binery(vgg)
- instance=self.instance(vgg)
- model = Model(inputs=inputs, outputs=[binery,instance])
- #model.summary()
- return model
- #UNET_double(input_shape=(256,256,3),category_classes=2,color_classes=5).build_model()
两个任务需要构建两个损失函数,本文中二值分割分支用到的损失函数是dice_loss损失函数,车道线和背景属于极度不均衡的样本分类,此函数能够很好的解决这样的问题并且不需要设置任何权重超参数。
- def dice_loss(beta=1, smooth=1e-5):
- def _dice_loss_with_CE(y_true, y_pred):
- #print(y_pred) #2,256,256,2
- #print(y_true) #2,256,256,3
- y_pred = K.clip(y_pred, K.epsilon(), 1.0 - K.epsilon())
- CE_loss = - y_true[..., :-1] * K.log(y_pred)
- CE_loss = K.mean(K.sum(CE_loss, axis=-1))
- tp = K.sum(y_true[..., :-1] * y_pred, axis=[0, 1, 2])
- fp = K.sum(y_pred, axis=[0, 1, 2]) - tp
- fn = K.sum(y_true[..., :-1], axis=[0, 1, 2]) - tp
- score = ((1 + beta ** 2) * tp + smooth) / ((1 + beta ** 2) * tp + beta ** 2 * fn + fp + smooth)
- score = tf.reduce_mean(score)
- dice_loss = 1 - score
- return CE_loss + dice_loss
-
- return _dice_loss
像素嵌入分支的损失函数保留论文中的损失函数:
为了区分车道线上的像素属于哪条车道,embedding_branch为每个像素初始化一个embedding向量,并且在设计loss时,使得属于同一条车道线的像素向量距离很小,属于不同车道线的像素向量距离很大。
方差loss(L_var) :当像素向量(pixel embedding)x_i与对应车道线均值向量μ_c的距离大于δ_v时,模型会进行更新,使得x_i 靠近μ_c ;
距离loss(L_dist) :当不同车道线均值向量 μ_ca和μ_cb之间的距离小于δ_d 时,模型会进行更新,使得μ_ca与μ_cb远离彼此;
方差loss(L_var)使得像素向量向车道线的均值向量 μ_c 靠近,距离loss(L_dist)则会推动聚类中心远离彼此。
- tf.config.experimental_run_functions_eagerly(True)#调试时用于将静态图转换为动态图
- def discriminative_loss_single(
- prediction,
- correct_label,
- feature_dim,
- label_shape,
- delta_v,
- delta_d,
- param_var,
- param_dist,
- param_reg):
- """
- 鉴别损失
- :参数预测:网络推理
- :param correct_label:实例标签
- :param feature_dim:预测的特征维度
- :param label_shape:标签的形状
- :param delta_v:截止方差距离 0.5, 3.0, 1.0, 1.0, 0.001
- :param delta_d:截止簇距离
- :param param_var:簇内方差的权重
- :param param_dist:簇间距离的权重
- :param param_reg:权重正则化
- """
- correct_label = tf.reshape(
- correct_label, [label_shape[1] * label_shape[0]] #可认为是网络的输入形状(h*w)
- )
- reshaped_pred = tf.reshape(
- prediction, [label_shape[1] * label_shape[0], feature_dim] #可认为是网络的输出形状(h*w,特征维度)
- )
-
- # calculate instance nums 计算实例的个数
- unique_labels, unique_id, counts = tf.unique_with_counts(correct_label)
- #unique_with_counts函数
- # tensor 'x' is [1, 1, 2, 4, 4, 4, 7, 8, 8]
- # y, idx, count = unique_with_counts(x)
- # y == > [1, 2, 4, 7, 8]
- # idx == > [0, 0, 1, 2, 2, 2, 3, 4, 4]
- # count == > [2, 1, 3, 1, 2]
- #以上返回的都是tensor
- counts = tf.cast(counts, tf.float32) #将counets的张量形式转换维float32 counts有几个元素代表几类,元素值代表那一类的数量
- num_instances = tf.size(unique_labels) #返回unique_labels的形状
-
- # calculate instance pixel embedding mean vec计算实例像素嵌入平均向量
- segmented_sum = tf.math.unsorted_segment_sum(
- reshaped_pred, unique_id, num_instances) #将相同类别的像素加在一起
- #a = [[1 2 3],[4 5 6],[7 8 9]]
- #tf.unsorted_segment_sum(data=a,segment_ids=[0,1,0],num_segments=2))
- #输出为#[[ 8 10 12],[ 4 5 6]
- mu = tf.divide(segmented_sum, tf.reshape(counts, (-1, 1))) #segmented_sum里的每一个元素除以依次counets中的每个元素
- mu_expand = tf.gather(mu, unique_id) #根据unique_id将mu进行扩充
-
- distance = tf.norm(tf.subtract(mu_expand, reshaped_pred), axis=1, ord=1)
- distance = tf.subtract(distance, delta_v)
- distance = tf.clip_by_value(distance, 0., distance)
- distance = tf.square(distance)
-
- l_var = tf.math.unsorted_segment_sum(distance, unique_id, num_instances)
- l_var = tf.divide(l_var, counts)
- l_var = tf.reduce_sum(l_var)
- l_var = tf.divide(l_var, tf.cast(num_instances, tf.float32))
-
- mu_interleaved_rep = tf.tile(mu, [num_instances, 1])
- mu_band_rep = tf.tile(mu, [1, num_instances])
- mu_band_rep = tf.reshape(
- mu_band_rep,
- (num_instances *
- num_instances,
- feature_dim))
-
- mu_diff = tf.subtract(mu_band_rep, mu_interleaved_rep)
-
- intermediate_tensor = tf.reduce_sum(tf.abs(mu_diff), axis=1)
- zero_vector = tf.zeros(1, dtype=tf.float32)
- bool_mask = tf.not_equal(intermediate_tensor, zero_vector)
- mu_diff_bool = tf.boolean_mask(mu_diff, bool_mask)
-
- mu_norm = tf.norm(mu_diff_bool, axis=1, ord=1)
- mu_norm = tf.subtract(2. * delta_d, mu_norm)
- mu_norm = tf.clip_by_value(mu_norm, 0., mu_norm)
- mu_norm = tf.square(mu_norm)
-
- l_dist = tf.reduce_mean(mu_norm)
-
- l_reg = tf.reduce_mean(tf.norm(mu, axis=1, ord=1))
-
- param_scale = 1.
- l_var = param_var * l_var
- l_dist = param_dist * l_dist
- l_reg = param_reg * l_reg
-
- loss = param_scale * (l_var + l_dist + l_reg)
-
- return loss, l_var, l_dist, l_reg
- def instance_loss(feature_dim=5, image_shape=(256,256),
- delta_v=0.5, delta_d=3.0, param_var=1.0, param_dist=1.0, param_reg=0.001):
- """
- 0.5, 3.0, 1.0, 1.0, 0.001
- :return: discriminative loss and its three components 判别损失及其三个分量
- """
- def loss(y_true,y_pred):
- def cond(label, batch, out_loss, out_var, out_dist, out_reg, i):
- return tf.less(i, tf.shape(batch)[0])
-
- def body(label, batch, out_loss, out_var, out_dist, out_reg, i):
- disc_loss, l_var, l_dist, l_reg = discriminative_loss_single(
- y_pred[i], y_true[i], feature_dim, image_shape, delta_v, delta_d, param_var, param_dist, param_reg)
-
- out_loss = out_loss.write(i, disc_loss)
- out_var = out_var.write(i, l_var)
- out_dist = out_dist.write(i, l_dist)
- out_reg = out_reg.write(i, l_reg)
-
- return label, batch, out_loss, out_var, out_dist, out_reg, i + 1
-
- # TensorArray is a data structure that support dynamic writing
- output_ta_loss = tf.TensorArray(
- dtype=tf.float32, size=0, dynamic_size=True)
- output_ta_var = tf.TensorArray(
- dtype=tf.float32, size=0, dynamic_size=True)
- output_ta_dist = tf.TensorArray(
- dtype=tf.float32, size=0, dynamic_size=True)
- output_ta_reg = tf.TensorArray(
- dtype=tf.float32, size=0, dynamic_size=True)
-
- _, _, out_loss_op, out_var_op, out_dist_op, out_reg_op, _ = tf.while_loop(
- cond, body, [
- y_true, y_pred, output_ta_loss, output_ta_var, output_ta_dist, output_ta_reg, 0])
- out_loss_op = out_loss_op.stack()
- out_var_op = out_var_op.stack()
- out_dist_op = out_dist_op.stack()
- out_reg_op = out_reg_op.stack()
-
- dice_loss = tf.reduce_mean(out_loss_op)
- l_var = tf.reduce_mean(out_var_op)
- l_dist = tf.reduce_mean(out_dist_op)
- l_reg = tf.reduce_mean(out_reg_op)
-
- return dice_loss, l_var, l_dist, l_reg
- return loss
因为有背景像素的存在,此问题也是不均衡分类问题,可以看出以上代码中也用到了dice_loss。
本文用到了两种学习率下降策略,
阶段性下降:
- reduce_lr = ReduceLROnPlateau(monitor='category_output_loss', factor=0.5, patience=3, verbose=1)
- # 是否需要早停,当val_loss一直不下降的时候意味着模型基本训练完毕,可以停止
- early_stopping = EarlyStopping(monitor='category_output_loss', min_delta=0, patience=10, verbose=1)
余弦退火学习率下降:
- def cosine_decay_with_warmup(global_step,
- learning_rate_base,
- total_steps,
- warmup_learning_rate=0.0,
- warmup_steps=0,
- hold_base_rate_steps=0,
- min_learn_rate=0,
- ):
- """
- 参数:
- global_step: 上面定义的Tcur,记录当前执行的步数。
- learning_rate_base:预先设置的学习率,当warm_up阶段学习率增加到learning_rate_base,就开始学习率下降。
- total_steps: 是总的训练的步数,等于epoch*sample_count/batch_size,(sample_count是样本总数,epoch是总的循环次数)
- warmup_learning_rate: 这是warm up阶段线性增长的初始值
- warmup_steps: warm_up总的需要持续的步数
- hold_base_rate_steps: 这是可选的参数,即当warm up阶段结束后保持学习率不变,知道hold_base_rate_steps结束后才开始学习率下降
- """
- if total_steps < warmup_steps:
- raise ValueError('total_steps must be larger or equal to '
- 'warmup_steps.')
- #这里实现了余弦退火的原理,设置学习率的最小值为0,所以简化了表达式
- learning_rate = 0.5 * learning_rate_base * (1 + np.cos(np.pi *
- (global_step - warmup_steps - hold_base_rate_steps) / float(total_steps - warmup_steps - hold_base_rate_steps)))
- #如果hold_base_rate_steps大于0,表明在warm up结束后学习率在一定步数内保持不变
- if hold_base_rate_steps > 0:
- learning_rate = np.where(global_step > warmup_steps + hold_base_rate_steps,
- learning_rate, learning_rate_base)
- if warmup_steps > 0:
- if learning_rate_base < warmup_learning_rate:
- raise ValueError('learning_rate_base must be larger or equal to '
- 'warmup_learning_rate.')
- #线性增长的实现
- slope = (learning_rate_base - warmup_learning_rate) / warmup_steps
- warmup_rate = slope * global_step + warmup_learning_rate
- #只有当global_step 仍然处于warm up阶段才会使用线性增长的学习率warmup_rate,否则使用余弦退火的学习率learning_rate
- learning_rate = np.where(global_step < warmup_steps, warmup_rate,
- learning_rate)
-
- learning_rate = max(learning_rate,min_learn_rate)
- return learning_rate
'运行
- class WarmUpCosineDecayScheduler(keras.callbacks.Callback):
- """
- 继承Callback,实现对学习率的调度
- """
- def __init__(self,
- learning_rate_base,
- total_steps,
- global_step_init=0,
- warmup_learning_rate=0.0,
- warmup_steps=0,
- hold_base_rate_steps=0,
- min_learn_rate=0,
- # interval_epoch代表余弦退火之间的最低点
- interval_epoch=[1],
- verbose=0):
- super(WarmUpCosineDecayScheduler, self).__init__()
- # 基础的学习率
- self.learning_rate_base = learning_rate_base
- # 热调整参数
- self.warmup_learning_rate = warmup_learning_rate
- # 参数显示
- self.verbose = verbose
- # learning_rates用于记录每次更新后的学习率,方便图形化观察
- self.min_learn_rate = min_learn_rate
- self.learning_rates = []
-
- self.interval_epoch = interval_epoch
- # 贯穿全局的步长
- self.global_step_for_interval = global_step_init
- # 用于上升的总步长
- self.warmup_steps_for_interval = warmup_steps
- # 保持最高峰的总步长
- self.hold_steps_for_interval = hold_base_rate_steps
- # 整个训练的总步长
- self.total_steps_for_interval = total_steps
-
- self.interval_index = 0
- # 计算出来两个最低点的间隔
- self.interval_reset = [self.interval_epoch[0]]
- for i in range(len(self.interval_epoch)-1):
- self.interval_reset.append(self.interval_epoch[i+1]-self.interval_epoch[i])
- self.interval_reset.append(1-self.interval_epoch[-1])
-
- #更新global_step,并记录当前学习率
- def on_batch_end(self, batch, logs=None):
- self.global_step = self.global_step + 1
- self.global_step_for_interval = self.global_step_for_interval + 1
- lr = K.get_value(self.model.optimizer.lr)
- self.learning_rates.append(lr)
-
- #更新学习率
- def on_batch_begin(self, batch, logs=None):
- # 每到一次最低点就重新更新参数
- if self.global_step_for_interval in [0]+[int(i*self.total_steps_for_interval) for i in self.interval_epoch]:
- self.total_steps = self.total_steps_for_interval * self.interval_reset[self.interval_index]
- self.warmup_steps = self.warmup_steps_for_interval * self.interval_reset[self.interval_index]
- self.hold_base_rate_steps = self.hold_steps_for_interval * self.interval_reset[self.interval_index]
- self.global_step = 0
- self.interval_index += 1
-
- lr = cosine_decay_with_warmup(global_step=self.global_step,
- learning_rate_base=self.learning_rate_base,
- total_steps=self.total_steps,
- warmup_learning_rate=self.warmup_learning_rate,
- warmup_steps=self.warmup_steps,
- hold_base_rate_steps=self.hold_base_rate_steps,
- min_learn_rate = self.min_learn_rate)
- K.set_value(self.model.optimizer.lr, lr)
- if self.verbose > 0:
- print('\nBatch %05d: setting learning '
- 'rate to %s.' % (self.global_step + 1, lr))
- losses = {'category_output': dice_loss_with_CE(),
- 'color_output': instance_loss(feature_dim=5, image_shape=(256,256),delta_v=0.5, delta_d=3.0, param_var=1.0, param_dist=1.0, param_reg=0.001)}
- model.compile(loss=losses ,
- optimizer=Adam(lr=learning_rate_base),loss_weights=(0.5, 0.5),
- metrics=[f_score(),'acc',recall(),precision()])
学习率:0.01;轮次:100;批处理:16
- model.fit_generator(generator=gen,
- steps_per_epoch=max(1, len(train_lines) // batch_size),
- epochs=epochs,verbose=2,
- callbacks=[checkpoint_period, tensorboard, early_stopping,
- reduce_lr])
训练结束后,获得的数据需要进行后处理才能得出最终的输出图,本文只将laneNet网络两分支的输出图进行后处理展示,H-Net部分没有复现(H-Net后处理过程为一个特征提取网络,输入为真是地面图像,输出为坐标转换矩阵用来处理上下坡车道线位置变化情况的)。
- import copy
- import matplotlib.pyplot as plt
- import numpy as np
- from PIL import Image
- from Model.model import UNET_double as lanenet
- from dataload import postprocess
-
- import os
- #os.environ["CUDA_VISIBLE_DEVICES"]="-1"
- def minmax_scale(input_arr):
-
- min_val = np.min(input_arr)
- max_val = np.max(input_arr)
-
- output_arr = (input_arr - min_val) * 255.0 / (max_val - min_val)
-
- return output_arr
-
- def letterbox_image(image, size):
- image = image.convert("RGB")
- iw, ih = image.size
- w, h = size
- scale = min(w / iw, h / ih)
- nw = int(iw * scale)
- nh = int(ih * scale)
-
- image = image.resize((nw, nh), Image.BICUBIC)
- new_image = Image.new('RGB', size, (128, 128, 128))
- new_image.paste(image, ((w - nw) // 2, (h - nh) // 2))
- return new_image, nw, nh
- def test_lanenet(image_path, weights_path,input_size=(256,256,3)):
- model = lanenet(input_shape=input_size,category_classes=2,color_classes=5).build_model()
- model.load_weights(weights_path)
-
- image = Image.open(image_path)
- old_img = copy.deepcopy(image)
- image_vis = old_img
-
- img, nw, nh = letterbox_image(image, (input_size[0],input_size[1]))
- image_vis1=img
- img = [np.array(img) / 255]
- image = np.asarray(img)
-
- binary_seg_image, instance_seg_image = model.predict(image)
- '''二值图后处理'''
- binary_seg_image = binary_seg_image.argmax(axis=-1).reshape([input_size[0], input_size[1]])
- # 去除灰条
- pr = binary_seg_image[int((input_size[0]- nh) // 2):int((input_size[0] - nh) // 2 + nh),
- int((input_size[1] - nw) // 2):int((input_size[1] - nw) // 2 + nw)]
-
- seg_img = np.zeros((np.shape(pr)[0], np.shape(pr)[1], 3))
- colors = [(255, 255, 255), (0, 0, 0)]
-
- for c in range(2):
- seg_img[:, :, 0] += ((pr[:, :] == c) * (colors[c][0])).astype('uint8')
- seg_img[:, :, 1] += ((pr[:, :] == c) * (colors[c][1])).astype('uint8')
- seg_img[:, :, 2] += ((pr[:, :] == c) * (colors[c][2])).astype('uint8')
- seg_img = np.array(Image.fromarray(np.uint8(seg_img)).resize((256, 128)))
- postprocessor = postprocess.LaneNetPostProcessor()
- postprocess_result = postprocessor.postprocess(
- binary_seg_result=binary_seg_image,
- instance_seg_result=instance_seg_image[0],
- source_image=image_vis1
- )
- mask_image = postprocess_result['mask_image']
-
- for i in range(5):
- instance_seg_image[0][:, :, i] = minmax_scale(instance_seg_image[0][:, :, i])
- embedding_image = np.array(instance_seg_image[0], np.uint8)
- #去灰条
- embedding_image = embedding_image[int((input_size[0] - nh) // 2):int((input_size[0] - nh) // 2 + nh),
- int((input_size[1] - nw) // 2):int((input_size[1] - nw) // 2 + nw)]
- plt.figure('mask_image')
- plt.imshow(mask_image)
- plt.figure('src_image')
- plt.imshow(image_vis)
- plt.figure('instance_image')
- plt.imshow(embedding_image[:, :, (3, 1, 0)])
- plt.figure('seg_image')
- plt.imshow(seg_img)
- plt.show()
-
- return
- if __name__ == '__main__':
- image_path = r'D:\dataset\trainset\image\origin\0601\1494452579506899721\9.jpg'
- weights_path = r'H:\unet-keras\多任务训练\aspp_convlstm_cos\lanenet_loss0.14.h5'
- test_lanenet(image_path, weights_path)
最终输出图:
我刚开始复现的时候是一头雾水,因为没有对论文中的内容进行细致理解,因此对一篇论文完成复现,首先明确论文的研究目的,熟悉整个论文框架,再对每一块的内容进行学习理解。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。