赞
踩
使用传统算法来滤除地面点,具体思路把点云划分为BEV上面的一个个seg和bin,每个bin里面取z_min一个点作为代表。
- import numpy as np
-
- class Processor:
- '''
- Module: Processor
- Args:
- n_segments(int): The number of fan-shaped regions divided by 360 degrees
- n_bins(int): The number of bins divided in a segment.
- r_max(float): The max boundary of lidar point.(meters)
- r_min(float): The min boundary of lidar point.(meters)
- line_search_angle(float): The angle for relative search in nearby segments.
- max_dist_to_line(float): The distance threshold of the non-ground object to the ground.(meters)
- max_slope(float): Local maximum slope of the ground.
- max_error(float): The max MSE to fit the ground.(meters)
- long_threshold(int): The max threshold of ground wave interval.
- max_start_height(float): The max height difference between hillside and ground.(meters)
- sensor_height(float): The distance from the lidar sensor to the ground.(meters)
- Call:
- Arg:
- vel_msg(numpy.ndarray): The raw local LiDAR cloud points in 3D(x,y,z).
- For example:
- vel_msg shapes [n_point, 3], with `n_point` refers to the number of cloud points,
- while `3` is the number of 3D(x,y,z) axis.
- vel_msg = array([[0.3, 0.1, 0.7],
- [0.6, 0.6, 0.5],
- [0.1, 0.4, 0.8],
- ... ... ...
- [0.5, 0.3, 0.6],
- [0.6, 0.3, 0.4]]
- Returns:
- vel_non_ground(numpy.ndarray): The local LiDAR cloud points after filter out ground info.
- '''
-
- def __init__(self, n_segments=60, n_bins=80, r_max=150, r_min=0.3,
- line_search_angle=0.2, max_dist_to_line=0.25,
- max_slope=2.0, max_error=0.1, long_threshold=3,
- max_start_height=0.2, sensor_height=1.73):
- self.n_segments = n_segments # number of segments
- self.n_bins = n_bins # number of bins in a segment
- self.r_max = r_max
- self.r_min = r_min
- self.line_search_angle = line_search_angle
- self.max_dist_to_line = max_dist_to_line
-
- self.max_slope = max_slope
- self.max_error = max_error
- self.long_threshold = long_threshold # int
- self.max_start_height = max_start_height
- self.sensor_height = sensor_height
-
- self.segment_step = 2 * np.pi / self.n_segments
- self.bin_step = (self.r_max - self.r_min) / self.n_bins
-
- self.segments = []
- self.seg_list = []
-
- def __call__(self, vel_msg):
- point5D = self.Model_Ground(vel_msg)
- vel_non_ground = self.Segment_Vel(point5D)
-
- return vel_non_ground
-
- def Model_Ground(self, vel_msg):
- point5D = self.project_5D(vel_msg)
- point5D = self.filter_out_range(point5D)
- # 根据seg模块排序。
- point5D = point5D[np.argsort(point5D[:, 3])]
- # np.unique去除数组中重复的元素,并从小到大返回一个新的数组
- self.seg_list = np.int16(np.unique(point5D[:, 3]))
- # 每个seg单独处理
- for seg_idx in self.seg_list:
- # 创建一个segmentation的实例
- segment = Segmentation(self.max_slope, self.max_error, self.long_threshold,
- self.max_start_height, self.sensor_height)
- # 取出当前segment的点进行处理
- point5D_seg = point5D[point5D[:, 3] == seg_idx]
- # min_z表示现在seg里的每一个bin的z的最小值
- min_z = segment.get_min_z(point5D_seg) # checked
- segment.fitSegmentLines(min_z) # checked
- # 对应当前segment的实例,一个segment实例里面有一个self.lines的数组属性
- self.segments.append(segment)
-
- return point5D
-
- def Segment_Vel(self, point5D):
- #point5D按seg顺序排好的,并且过滤了不在r范围内的点
- # 每个点的初始label
- label = np.zeros([point5D.shape[0]])
- #np.diff表示数组中相邻元素做减法,如10列向量,运算完得到9列。np.r_表示上下拼接两个矩阵,np.nonzero用来获取数组中非零元素的索引
- #这个slice_list是将point5D里每个seg的起点终点索引建立一个列表,根据列表便可以分开每个seg中的点
- slice_list = np.r_[np.nonzero(np.r_[1, np.diff(point5D[:, 3])])[0], len(point5D)]
-
- for i, seg_idx in enumerate(self.seg_list):
- #得到当前seg的实例以及seg里面的点
- segment = self.segments[i]
- point5D_seg = point5D[point5D[:, 3] == seg_idx]
- #得到当前seg里点的label
- non_ground = segment.verticalDistanceToLine(point5D_seg[:, [4, 2]]) # x,y -> d,z
- #将计算距离大于阈值的点设置为0
- non_ground[non_ground > self.max_dist_to_line] = 0
-
- step = 1
- #下面定义了一个函数,根据i和step可以得到一个下标值
- idx_search = lambda i, step: (i % len(self.seg_list) + step) % len(self.seg_list)
- #判断当前seg左右的几个seg,如果当前seg的bin与z满足隔壁的segline曲线,也定义为地面点。
- while step * self.segment_step < self.line_search_angle:
- segment_f = self.segments[idx_search(i, -step)]
- segment_b = self.segments[idx_search(i, step)]
-
- non_ground_b = segment_f.verticalDistanceToLine(point5D_seg[:, [4, 2]])
- non_ground_b[non_ground_b > self.max_dist_to_line] = 0
- non_ground_f = segment_b.verticalDistanceToLine(point5D_seg[:, [4, 2]])
- non_ground_f[non_ground_f > self.max_dist_to_line] = 0
-
- non_ground += non_ground_b + non_ground_f
-
- step += 1
- #当前seg的点赋值label,label为1表示是地面点
- label[slice_list[i]:slice_list[i + 1]] = non_ground == 0
-
- # vel_non_ground = point5D[label == 1][:, :3]
- vel_non_ground = point5D[label != 1][:, :3]
-
- return vel_non_ground
-
- def project_5D(self, point3D):
- '''
- Args:
- point3D: shapes (n_row, 3), while 3 represent x,y,z axis in order.
- Returns:
- point5D: shapes (n_row, 3+2), while 5 represent x,y,z,seg,bin axis in order.
- '''
- x = point3D[:, 0]
- y = point3D[:, 1]
- z = point3D[:, 2]
-
- # index mapping
- # arctan2输入的是两个点,输出范围为正负派。而arctan是输入一个,输出范围二分之一正负派
- angle = np.arctan2(y, x)
- # 因为输出是正负派,所以加上pai范围是0-2派。除以每份的角度。得到在哪一份
- segment_index = np.int32(np.floor((angle + np.pi) / self.segment_step)) # segment
- # 求每个点的平面距离,也就是半径
- radius = np.sqrt(x ** 2 + y ** 2)
- # 在rmin和rmax之间分为n份,看这个点落在哪一份
- bin_index = np.int32(np.floor((radius - self.r_min) / self.bin_step)) # bin
- # segment_index是一行n列,所以要先转置再按行叠加
- point5D = np.vstack([point3D.T, segment_index, bin_index]).T
-
- return point5D
-
- def filter_out_range(self, point5D):
- '''
- Args:
- point5D: shapes (n_row, 3+2), while 5 represent x,y,z,seg,bin axis in order.
- Returns:
- point5D: shapes (n_row_filtered, 5), while 5 represent x,y,z,seg,bin axis in order.
- '''
- radius = point5D[:, 4] # [x,y,z,seg,bin]
- # np.logical_and滤除掉不在半径范围里面的点。
- condition = np.logical_and(radius < self.r_max, radius > self.r_min)
- point5D = point5D[condition]
-
- return point5D
-
-
- class Segmentation:
- '''
- Args:
- max_slope(float): Local maximum slope of the ground.
- max_error(float): The max MSE to fit the ground.
- long_threshold(int): The max threshold of ground wave interval.
- max_start_height(float): The max height difference between hillside and ground.
- sensor_height(float): The distance from the lidar sensor to the ground.
- '''
-
- def __init__(self, max_slope=2.0, max_error=0.1, long_threshold=3,
- max_start_height=0.2, sensor_height=1.73):
- self.max_slope_ = max_slope
- self.max_error_ = max_error
- self.long_threshold_ = long_threshold # int
- self.max_start_height_ = max_start_height
- self.sensor_height_ = sensor_height
-
- self.matrix_new = np.array([[1, 0, 0], [0, 1, 0]])
- self.matrix_one = np.array([[0, 0, 1]])
-
- self.lines = []
-
- def get_min_z(self, point5D_seg):
- '''
- Args:
- point5D: shapes (n_row, 5), while 5 represent x,y,z,seg,bin axis in order.
- Returns:
- pointSBZ: shapes (n_row, 2), while 3 represent bin,z axis in order. Bin order sorted.
- '''
- # 当前seg里面的bin值
- bin_ = point5D_seg[:, 4]
- # 得到当前seg里面每个bin的z的最小值,返回的是[num_bin,2]2分别是bin_idx和z_min
- pointBZ = np.array([point5D_seg[bin_ == bin_idx].min(axis=0)[2:] for bin_idx in np.unique(bin_)])[:, [2, 0]]
- return pointBZ
-
- def fitLocalLine(self, cur_line_points, error=False):
- # @在numpy中表示矩阵相乘,第二个元素是向量的话会自动转置
- xy1 = np.array(cur_line_points) @ self.matrix_new + self.matrix_one
- # 此时的xy1表示(:(bin,z_min,1))
- A = xy1[:, [0, 2]]
- y = xy1[:, [1]]
- # 自变量是bin,因变量是z_min。np.linalg.lstsqyong'zui'xiao'er'chneg
- [[m], [b]] = np.linalg.lstsq(A, y, rcond=None)[0]
- if error:
- mse = (A @ np.array([[m], [b]]) - y) ** 2
- return [m, b], mse
- else:
- return [m, b]
-
- def verticalDistanceToLine(self, xy): # checked
- #传入的xy是当前seg里面的点,包含bin,z的属性
- kMargin = 0.1
- #创建当前seg点的label
- label = np.zeros(len(xy))
- #遍历当前seg实例里面的线
- for d_l, d_r, m, b in self.lines:
- #计算当前seg里面的点到拟合的线的距离
- distance = np.abs(m * xy[:, 0] + b - xy[:, 1])
- #得到在当前line范围里的bin
- con = (xy[:, 0] > d_l - kMargin) & (xy[:, 0] < d_r + kMargin)
- #将这些bin的distance赋值到label里
- label[con] = distance[con]
-
- return label.flatten()
-
- def fitSegmentLines(self, min_z):
- # 最小bin的点
- cur_line_points = [min_z[0]]
- long_line = False
- # 传感器的高度
- cur_ground_height = self.sensor_height_
- d_i = 1
- while d_i < len(min_z):
- # 存储的bin列表的最后一个
- lst_point = cur_line_points[-1]
- # 循环遍历中当前的bin
- cur_point = min_z[d_i]
- # 如果两者的bin离得远,则为长线
- if cur_point[0] - lst_point[0] > self.long_threshold_:
- long_line = True
-
- if len(cur_line_points) < 2:
- # 如果两者的bin之差小于阈值,同时保存的bin的z与地面高度小于非地面阈值
- # 意思就是地面点是连续的,如果两个bin里离得远或者第一个点离地面远,都重新保存第一个点
- if (cur_point[0] - lst_point[0] < self.long_threshold_) and abs(
- lst_point[1] - cur_ground_height) < self.max_start_height_:
- cur_line_points.append(cur_point)
- else:
- cur_line_points = [cur_point]
- else:
- # 直接把第三个点保存进去
- cur_line_points.append(cur_point)
- # 根据已有的点拟合bin和z之间的直线方程
- cur_line, mse = self.fitLocalLine(cur_line_points, True)
- # 如果最小二乘拟合的误差大于阈值或者斜率大于坡度阈值,或者是长线的话
- if (mse.max() > self.max_error_ or cur_line[0] > self.max_slope_ or long_line):
- #说明刚放入的这个点不是平面里面的,pop出来
- cur_line_points.pop()
- if len(cur_line_points) >= 3:
- #如果里面大于等于三个点,则直接拟合参数,放到self.lines里面,起始bin,结束bin,直线参数
- new_line = self.fitLocalLine(cur_line_points)
- self.lines.append([cur_line_points[0][0], cur_line_points[-1][0], *new_line]) # b boundary
- #根据拟合出来的直线方程更新当前的z也就是地面高度。其中的自变量选择的是最后一个bin
- cur_ground_height = new_line[0] * cur_line_points[-1][0] + new_line[1] # m*x+b
- long_line = False
- cur_line_points = [cur_line_points[-1]]
- #因为前面弹出了,所以d_i并没有参与这次计算,所以减一,让d_i与保留的上一个末尾点再判断一次
- d_i -= 1
- d_i += 1
- #当退出循环后,如果保存的点数大于2,说明也是一个线,保留下来。
- if len(cur_line_points) > 2:
- new_line = self.fitLocalLine(cur_line_points)
- self.lines.append([cur_line_points[0][0], cur_line_points[-1][0], *new_line])
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。