赞
踩
这个算法原型非常简单,有很多博主都有写,大家自己去看看就好了,也不用花太多时间,顶多十分钟就能了解个大概。
由于该算法对Eps和Minpts参数十分敏感,所以如何确定这两个参数对于DBSCAN来说是很重要的一步,这篇博文是基于李文杰老师的论文《自适应确定DBSCAN算法参数的算法研究》,通过这篇论文,输入数据集即可大致确定这两个参数,从而可以直接在DBSCAN中应用。
参考论文中提出的,根据数据集提取出Eps候选项(按从小到大排列),然后再提取出Minpts候选项,随后用这些候选项尝试使用DBSCAN算法进行聚类,如果连续的候选项聚类的类别数目相同,那么选择Eps相对较大的那个最为最终参数输入到DBSCAN算法中去。
论文中认为如果连续3个Eps候选项聚类的类别数目相同,那么可以认为数据集在这些参数下逐渐收敛,但是我觉得具体最好看图像是否收敛,所以我就简单的在程序里将聚类数目打印出来,读者可以自行选择聚利时使用的Eps和Minpts参数。
- import math
- import copy
- import numpy as np
- from sklearn.cluster import DBSCAN
- def dist(a,b):
- """
- :param a: 样本点
- :param b: 样本点
- :return: 两个样本点之间的欧式距离
- """
- return math.sqrt(math.pow(a[0]-b[0],2) + math.pow(a[1]-b[1],2))
- def returnDk(matrix,k):
- """
- :param matrix: 距离矩阵
- :param k: 第k最近
- :return: 第k最近距离集合
- """
- Dk = []
- for i in range(len(matrix)):
- Dk.append(matrix[i][k])
- return Dk
-
-
- def returnDkAverage(Dk):
- """
- :param Dk: k-最近距离集合
- :return: Dk的平均值
- """
- sum = 0
- for i in range(len(Dk)):
- sum = sum + Dk[i]
- return sum/len(Dk)
-
-
- def CalculateDistMatrix(dataset):
- """
- :param dataset: 数据集
- :return: 距离矩阵
- """
- DistMatrix = [[0 for j in range(len(dataset))] for i in range(len(dataset))]
- for i in range(len(dataset)):
- for j in range(len(dataset)):
- DistMatrix[i][j] = dist(dataset[i], dataset[j])
- return DistMatrix
-
-
- def returnEpsCandidate(dataSet):
- """
- :param dataSet: 数据集
- :return: eps候选集合
- """
- DistMatrix = CalculateDistMatrix(dataSet)
- tmp_matrix = copy.deepcopy(DistMatrix)
- for i in range(len(tmp_matrix)):
- tmp_matrix[i].sort()
- EpsCandidate = []
- for k in range(1,len(dataSet)):
- Dk = returnDk(tmp_matrix,k)
- DkAverage = returnDkAverage(Dk)
- EpsCandidate.append(DkAverage)
- return EpsCandidate
-
-
- def returnMinptsCandidate(DistMatrix,EpsCandidate):
- """
- :param DistMatrix: 距离矩阵
- :param EpsCandidate: Eps候选列表
- :return: Minpts候选列表
- """
- MinptsCandidate = []
- for k in range(len(EpsCandidate)):
- tmp_eps = EpsCandidate[k]
- tmp_count = 0
- for i in range(len(DistMatrix)):
- for j in range(len(DistMatrix[i])):
- if DistMatrix[i][j] <= tmp_eps:
- tmp_count = tmp_count + 1
- MinptsCandidate.append(tmp_count/len(dataSet))
- return MinptsCandidate
- def returnClusterNumberList(dataset,EpsCandidate,MinptsCandidate):
- """
- :param dataset: 数据集
- :param EpsCandidate: Eps候选列表
- :param MinptsCandidate: Minpts候选列表
- :return: 聚类数量列表
- """
- np_dataset = np.array(dataset) #将dataset转换成numpy_array的形式
- ClusterNumberList = []
- for i in range(len(EpsCandidate)):
- clustering = DBSCAN(eps= EpsCandidate[i],min_samples= MinptsCandidate[i]).fit(np_dataset)
- num_clustering = max(clustering.labels_)
- ClusterNumberList.append(num_clustering)
- return ClusterNumberList
- import math
- import copy
- import numpy as np
- from sklearn.cluster import DBSCAN
-
-
- def loadDataSet(fileName, splitChar='\t'):
- """
- 输入:文件名
- 输出:数据集
- 描述:从文件读入数据集
- """
- dataSet = []
- with open(fileName) as fr:
- for line in fr.readlines():
- curline = line.strip().split(splitChar)
- fltline = list(map(float, curline))
- dataSet.append(fltline)
- return dataSet
-
-
- def dist(a,b):
- """
- :param a: 样本点
- :param b: 样本点
- :return: 两个样本点之间的欧式距离
- """
- return math.sqrt(math.pow(a[0]-b[0],2) + math.pow(a[1]-b[1],2))
-
-
- def returnDk(matrix,k):
- """
- :param matrix: 距离矩阵
- :param k: 第k最近
- :return: 第k最近距离集合
- """
- Dk = []
- for i in range(len(matrix)):
- Dk.append(matrix[i][k])
- return Dk
-
-
- def returnDkAverage(Dk):
- """
- :param Dk: k-最近距离集合
- :return: Dk的平均值
- """
- sum = 0
- for i in range(len(Dk)):
- sum = sum + Dk[i]
- return sum/len(Dk)
-
-
- def CalculateDistMatrix(dataset):
- """
- :param dataset: 数据集
- :return: 距离矩阵
- """
- DistMatrix = [[0 for j in range(len(dataset))] for i in range(len(dataset))]
- for i in range(len(dataset)):
- for j in range(len(dataset)):
- DistMatrix[i][j] = dist(dataset[i], dataset[j])
- return DistMatrix
-
-
- def returnEpsCandidate(dataSet):
- """
- :param dataSet: 数据集
- :return: eps候选集合
- """
- DistMatrix = CalculateDistMatrix(dataSet)
- tmp_matrix = copy.deepcopy(DistMatrix)
- for i in range(len(tmp_matrix)):
- tmp_matrix[i].sort()
- EpsCandidate = []
- for k in range(1,len(dataSet)):
- Dk = returnDk(tmp_matrix,k)
- DkAverage = returnDkAverage(Dk)
- EpsCandidate.append(DkAverage)
- return EpsCandidate
-
-
- def returnMinptsCandidate(DistMatrix,EpsCandidate):
- """
- :param DistMatrix: 距离矩阵
- :param EpsCandidate: Eps候选列表
- :return: Minpts候选列表
- """
- MinptsCandidate = []
- for k in range(len(EpsCandidate)):
- tmp_eps = EpsCandidate[k]
- tmp_count = 0
- for i in range(len(DistMatrix)):
- for j in range(len(DistMatrix[i])):
- if DistMatrix[i][j] <= tmp_eps:
- tmp_count = tmp_count + 1
- MinptsCandidate.append(tmp_count/len(dataSet))
- return MinptsCandidate
-
-
- def returnClusterNumberList(dataset,EpsCandidate,MinptsCandidate):
- """
- :param dataset: 数据集
- :param EpsCandidate: Eps候选列表
- :param MinptsCandidate: Minpts候选列表
- :return: 聚类数量列表
- """
- np_dataset = np.array(dataset) #将dataset转换成numpy_array的形式
- ClusterNumberList = []
- for i in range(len(EpsCandidate)):
- clustering = DBSCAN(eps= EpsCandidate[i],min_samples= MinptsCandidate[i]).fit(np_dataset)
- num_clustering = max(clustering.labels_)
- ClusterNumberList.append(num_clustering)
- return ClusterNumberList
-
- if __name__ == '__main__':
- dataSet = loadDataSet('788points.txt', splitChar=',')
- EpsCandidate = returnEpsCandidate(dataSet)
- DistMatrix = CalculateDistMatrix(dataSet)
- MinptsCandidate = returnMinptsCandidate(DistMatrix,EpsCandidate)
- ClusterNumberList = returnClusterNumberList(dataSet,EpsCandidate,MinptsCandidate)
- print(EpsCandidate)
- print(MinptsCandidate)
- print('cluster number list is')
- print(ClusterNumberList)
我会将txt文件和程序放在以下位置:
https://download.csdn.net/download/liyihao17/11125093
https://download.csdn.net/download/liyihao17/11125098
另外我也在github上放了
https://github.com/liyihao17/KANN-DBSCAN
需要的读者可以自行下载,相关论文读者可以自行去知网搜索下载
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。