赞
踩
本文参考论文:
平时没怎么用py,命名什么的可能不太规范..
目录
DBSCAN聚类算法是一种基于空间密度有传递性质的聚类算法,将簇定义为密度相连的点的最大的集合,可以将高密度点区域划分为簇,并有效地过滤低密度点区域,可以在含有噪声的数据集中识别任意形状和数量的簇。
DBSCAN算法的关键思想是:对于聚类的每个点,给定半径的邻域必须包含至少最小数量的点,即邻域内的密度必须超过某个阈值。数据集中特定点的密度通过该点的Eps半径之内包含的点数来计算。
Eps邻域:给定对象半径Eps内的邻域,用NEps(p)表示点p的Eps半径内的点的集合。
MinPts:给定邻域NEps(p)需要包含的点的最小点数,用于决定点p是簇的核心部分、边界点或噪声。
核心点:如果点p的Eps邻域内包含至少MinPts个点,则该点为核心点。
边界点:不是核心点,但在某个核心点的Eps邻域内。
噪声点:即不是核心点,也不是边界点的点。
直接密度可达:如果p在q的Eps邻域内,且q是一个核心点,则p从q出发是直接密度可达的。
密度可达:如果存在一个对象链p1,p2,……,pn,如果pi+1是从pi出发关于Eps和MinPts直接密度可达的,那么pn是从p1关于Eps和MinPts密度可达的。
密度相连:如果p和q都是从O关于Eps和MinPts直接密度可达的,那么p和q是密度相连的。
1.选择一个未访问的数据点p作为起始点;
2.计算点p的Eps邻域内的数据点数量,如果数量大于等于MinPts,则将p标记为核心点,并创建一个新的簇;
3.将点p加入到当前簇中,并将p的Eps邻域内的所有未访问数据点加入到当前簇中;
4.对当前簇中的每个数据点q进行以下操作:
(1)如果q是核心点,则将q的Eps邻域内的所有未访问数据点加入到当前簇中;
(2)如果q不是核心点,但是位于其他簇的Eps邻域内,则将q标记为边界点,并将其加入到当前簇中;
5.当没有更多的数据点可以加入到当前簇时,当前簇被认为是一个完整的簇;
6.选择下一个未访问的数据点作为起始点,重复步骤2到步骤5,直到所有的数据点都被访问过;
7.标记剩余未分配的数据点为噪声点。
图片来自开头提到的第二篇论文
(1)DBSCAN算法可以发现任意形状的簇。
(2)DBSCAN算法相比其他聚类算法,无需实现指定簇的数量,而通过设置合适的邻域半径和簇最小点数来自动确定簇的数量。
(3)DBSCAN算法将低密度区域视为噪声,不将其归入任何簇中,对于噪声不敏感,具有鲁棒性。
(1)DBSCAN算法对邻域半径、簇最小点数两个参数敏感,不同的参数选择可能会导致完全不同的聚类结果。
(2)如果数据集的密度不均匀,聚类结果可能会受影响,簇的形状可能会不连续或断裂。
这部分用python实现了文章2的方式,利用图标可视化结果确定参数。
计算每个数据点的距离,生成距离矩阵Distn*n,然后将距离矩阵按行升序排序,这样矩阵的每一行就是相应数据点到其他所有点距离的排序,每一列就是距离每个数据点最近的第i个距离值的集合;然后按列绘制距离值的概率密度分布曲线。
1.dist.py
- import math
- import numpy as np
-
-
- # 计算两点间距离
- def calculate_dist(t1, t2):
- dis = math.sqrt(np.power((t1[0] - t2[0]), 2) + np.power((t1[1] - t2[1]), 2))
- return dis
2.caculate_point_dist.py
- import dist
-
-
- # 得到每个点之间的距离矩阵
- def calculate_point_dist(data):
- length = len(data)
- point_dist = dist.np.zeros((length, length))
-
- for i in range(0, length):
- for j in range(i + 1, length):
- distance = dist.calculate_dist(data[i], data[j])
- point_dist[i][j] = distance
- point_dist[j][i] = distance
-
- dist.np.save('point_dist.npy', point_dist)
- dist.np.savetxt('point_dist.csv', point_dist)
-
- return point_dist
3.search_eps.py
- import calculate_point_dist as cpd
- import csv
- import numpy as np
- import matplotlib.pyplot as plt
- import seaborn as sn
-
-
- if __name__ == '__main__':
- # 创建一个空列表来存储数据
- data = []
- # 打开 CSV 文件
- with open('random_data.csv', 'r') as file:
- # 创建 CSV 读取
- reader = csv.reader(file)
- next(reader)
- # 逐行读取数据并添加到列表中
- for row in reader:
- data.append([int(value) for value in row])
-
- data = np.array(data)
- np.save('random_data.npy', data)
-
- distance = cpd.calculate_point_dist(data)
- # 每一行升序排序
- distance = np.sort(distance, axis=1)
-
- plt.figure()
- # 遍历每一列
- for i in range(1, distance.shape[1]):
- column_data = distance[:, i]
- # 绘制概率密度分布曲线
- sn.kdeplot(column_data)
- # 显示图形
- plt.xlabel('Distance')
- plt.ylabel('Density')
- plt.title('Probability Density Distribution')
- plt.show()
生成的概率密度曲线:
由图可见,距离为70的密度已经变得非常小,这些比例很小却和其他点距离明显远很多的点,是噪声的可能性很大,因此可以选取Eps为70。
计算每个数据点的局部密度值(即Eps为70的邻域范围内的点数)ρi,然后计算每个点距离更高密度点的最近距离,若是最高密度点,则取到数据集中最远点的距离δi。然后以局部密度值为横坐标,到更高密度点的最近距离为纵坐标进行绘图。
1.count_points.py
- import dist
-
-
- # 计算每个点邻域范围内的点数
- def count_points(data, distance, eps):
- length = len(data)
- points = dist.np.ones(length)
- for i in range(0, length):
- # marked = -1
- for j in range(i + 1, length):
- # if data[j][0] != marked:
- if distance[i][j] <= eps:
- points[i] = points[i] + 1
- points[j] = points[j] + 1
- # else:
- # marked = data[j][0]
- return points
-
-
- def count_points_2(data, eps):
- length = len(data)
- points = dist.np.ones(length)
- for i in range(0, length):
- marked = -1
- for j in range(i + 1, length):
- if data[j][0] != marked:
- distance = dist.calculate_dist(data[i], data[j])
- if distance <= eps:
- points[i] = points[i] + 1
- points[j] = points[j] + 1
- else:
- marked = data[j][0]
- return points
2.find_minPts.py
- import dist
-
-
- # 计算每个点与更高密度点的最近距离
- def find_minPts(sorted_points, distance):
- length = len(sorted_points)
- min_dist = dist.np.zeros(length)
- for i in range(0, length):
- index_i = sorted_points[i][0]
- temp_min_distance = max(distance[index_i, :])
- for j in range(i + 1, length):
- index_j = sorted_points[j][0]
- if distance[index_i][index_j] < temp_min_distance:
- temp_min_distance = distance[index_i][index_j]
- min_dist[index_i] = temp_min_distance
- return min_dist
-
-
- def find_minPts_2(sorted_points, data):
- length = len(sorted_points)
- min_dist = dist.np.zeros(length)
- for i in range(0, length):
- index_i = sorted_points[i][0]
- temp = -1
- distance = 1000000000
- for j in range(i + 1, length):
- if sorted_points[j][1] > sorted_points[i][1]:
- index_j = sorted_points[j][0]
- current_distance = dist.calculate_dist(data[index_i], data[index_j])
- if current_distance < distance:
- temp = index_j
- distance = current_distance
- if temp == -1:
- min_dist[index_i] = dist.calculate_dist(data[index_i], data[0])
- else:
- min_dist[index_i] = distance
- return min_dist
3.search_minPts.py
- import numpy as np
- import calculate_point_dist
- import count_points
- import find_minPts
- import csv
- import matplotlib.pyplot as plt
-
- if __name__ == '__main__':
- eps = 70
- data = np.load('random_data.npy')
- distance = np.load('point_dist.npy')
-
- points = count_points.count_points(data, distance, eps)
- sorted_data = sorted(enumerate(points), key=lambda x: x[1])
- min_dist = find_minPts.find_minPts(sorted_data, distance)
-
- # points = count_points.count_points_2(data, 20)
- # sorted_data = sorted(enumerate(points), key=lambda x: x[1])
- # min_dist = find_minPts.find_minPts_2(sorted_data, data)
-
- np.savetxt('pts.csv', points, delimiter=',', fmt='%d')
- np.savetxt('min_dist.csv', min_dist, delimiter=',', fmt='%d')
-
- plt.scatter(points, min_dist, 0.5)
- plt.xlabel('pts')
- plt.ylabel('min_dist')
- # 以eps绘制水平线
- plt.axhline(y=eps, color='r', linestyle='-')
- plt.show()
每个点的ρi与δi的函数关系
由图所示,δi<70(即图中红线以下)的数据点,皆为与更高密度点距离小于Eps的数据点。根据图选取MinPts值,ρi>=MinPts的数据点皆为核心点,ρi<MinPts且δi<=70的数据点可能为边界点或噪声点,而ρi<MinPts且δi>70的数据点皆为噪声点。
本文选取Eps为70,MinPts范围为20-70,并使用预先计算得到的欧氏距离矩阵,在python环境下调用sklearn库进行实验。
1.调用sklearn库中的DBSCAN函数进行聚类得到标签;
2.根据标签计算噪声比,绘制聚类结果图像;
3.获取非噪声点的标签,筛选距离矩阵和标签,调用sklearn库中的silhouette_samples函数计算聚类结果中每个非噪声点的轮廓系数,并求其均值得到聚类整体轮廓系数。
聚类结果的评价可转化为对于簇内和簇见相似度的衡量,簇的凝聚性度量簇内样本密切程度,而分离性度量簇间相异程度,较小的簇内紧密度和较大的簇间分离度可以使聚类结果更加理想化。轮廓系数结合了簇内紧密度和簇间分离度,是类的密集与分散程度的评价指标。轮廓系数的范围为[-1,1],轮廓系数越接近1表示聚类效果越好。
聚类总体的轮廓系数为所有样本轮廓系数的平均值。
聚类结果一般都采用轮廓系数评估,但是针对实际解决的问题,也可以结合噪声比、簇数量等进行评估。
1.generate_colors.py
- import colorsys
- import numpy as np
-
-
- def generate_colors(labels):
- unique_labels = np.unique(labels)
- num_colors = np.size(unique_labels)
- colors = []
- for i in range(num_colors):
- hue = i / num_colors # 在0到1之间均匀分布色相值
- saturation = 1.0 # 最大饱和度
- value = 1.0 # 最大亮度
- rgb = colorsys.hsv_to_rgb(hue, saturation, value)
- colors.append(rgb)
-
- result = []
- size = len(labels)
- for i in range(0, size):
- index = np.where(unique_labels == labels[i])
- result.append(colors[index[0][0]])
- return result
2.dbscan.py
- import numpy as np
- from sklearn.cluster import DBSCAN
- from sklearn import metrics
- import matplotlib.pyplot as plt
- import generate_colors
-
-
- def dbscan(data, distance, eps, min_pts):
-
- # 使用 DBSCAN 进行聚类
- dbscan_object = DBSCAN(eps=eps, min_samples=min_pts, metric='precomputed')
- labels = dbscan_object.fit_predict(distance)
- # 噪声数
- noise_num = np.sum(labels == -1)
- noise_ratio = noise_num / len(labels)
- cluster_number = np.size(np.unique(labels))
- if noise_num > 0:
- cluster_number = cluster_number - 1
-
- # 保存标签
- labels_file_path = ('result/labels(eps=' + str(eps) + ',minPts=' + str(min_pts) +
- ',kinds=' + str(cluster_number) + ',noiseRatio=' + str(noise_ratio) + ').csv')
- np.savetxt(labels_file_path, labels, delimiter=',', fmt='%d')
-
- # 创建一个新的图形,并设置尺寸
- fig, ax = plt.subplots(figsize=(10, 8))
- colors = generate_colors.generate_colors(labels)
- ax.scatter(data[:, 0], data[:, 1], c=colors, s=0.5)
- ax.set_xlabel('x')
- ax.set_ylabel('y')
- ax.set_title('DBSCAN Clustering')
- # plt.show()
- pic_file_path = 'result/result(eps=' + str(eps) + ',min_pts=' + str(min_pts) + ').png'
- fig.savefig(fname=pic_file_path, dpi=300)
-
- # 计算轮廓系数
- # 获取非噪声点的索引
- non_noise_indices = np.where(labels != -1)[0]
- # 筛选距离矩阵和标签,只保留非噪声点
- filtered_distance = distance[non_noise_indices][:, non_noise_indices]
- filtered_labels = labels[non_noise_indices]
- silhouette_sample = metrics.silhouette_samples(filtered_distance, filtered_labels, metric='precomputed')
- silhouette_score = np.mean(silhouette_sample)
- silhouette_score_file_path = ('result/silhouette_sample(eps=' + str(eps) + ',minPts=' + str(min_pts)
- + ',score=' + str(silhouette_score) + ').csv')
- np.savetxt(silhouette_score_file_path, silhouette_sample, delimiter=',')
3.main.py
- import numpy as np
- import dbscan
-
-
- if __name__ == '__main__':
-
- data = np.load('random_data.npy')
- dist = np.load('point_dist.npy')
-
- dbscan.dbscan(data, dist, 70, 25)
- dbscan.dbscan(data, dist, 70, 20)
- dbscan.dbscan(data, dist, 70, 30)
- dbscan.dbscan(data, dist, 70, 40)
- dbscan.dbscan(data, dist, 70, 50)
- dbscan.dbscan(data, dist, 70, 60)
- dbscan.dbscan(data, dist, 70, 70)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。