赞
踩
网上写DBSCAN的文章很多,但是问多文章写了无数字,但是看完还是让人觉得云里雾里的。DBSCAN设计其实非常简单,该文同样力争以简洁的方式讲述DBSCAN,尽量让大家能轻易看懂。
DBSCAN(Density-Based Spatial Clustering of Applications with Noise,基于密度的聚类算法)是一种用于聚类分析的无监督学习算法。
DBSCAN 的核心概念包括核心点、边界点和噪声点,理解了这三个核心概念DBSCAN就算懂了:
DBSCAN 算法的工作原理如下:
初始化参数:DBSCAN算法需要两个参数,eps
(邻域半径)和min_samples
(一个簇中所需的最小点数)。eps
定义了两个点被认为是邻域内的距离,min_samples
定义了形成稠密区域所需的最小点数。
标记访问状态:为数据集中的每个点设置一个访问状态,初始时所有点都未被访问。
选择起始点:从数据集中选择一个未访问过的点作为起始点。这个点可以是随机选择的,也可以是按照某种顺序(如索引顺序)选择的第一个点。选择起始点的目的是为了开始探索数据点的邻域。
探索邻域:对于当前选择的点,检查其eps
邻域内的所有点。如果邻域内的点数大于或等于min_samples
,则当前点被认为是一个核心点。
形成簇:如果当前点是核心点,那么它和它的所有邻域点一起形成一个簇。将这些点标记为已访问,并为它们分配一个相同的簇标识符。
递归扩展:对于新加入簇的点,如果它们也是核心点,那么继续递归地探索它们的邻域点,并将满足条件的点加入到当前簇中。
处理边界点:如果一个点不是核心点,但它的邻域内至少有一个核心点,那么这个点被认为是边界点,也被加入到簇中,并标记为已访问。
识别噪声点:如果一个点既不是核心点也不是边界点,那么它被认为是噪声点,通常被标记为-1。
重复处理:重复步骤3到8,直到所有未访问的点都被处理完毕。
结束聚类:当所有点都被访问过,并且每个点都被分配到了一个簇或被标记为噪声点时,聚类过程结束。
通过上述步骤,DBSCAN算法能够识别出数据中的簇和噪声点,而不需要预先指定簇的数量。这种方法特别适用于那些簇形状不规则或数据中包含噪声的情况。
在实际应用中,DBSCAN 常用于数据分析、图像处理、地理信息系统等领域,帮助我们发现数据中的自然分组和异常点。
import numpy as np
def euclidean_distance(point1, point2):
"""
计算两点之间的欧几里得距离
参数:
point1 (numpy.ndarray):第一个点的坐标
point2 (numpy.ndarray):第二个点的坐标
返回:
float:两点之间的欧几里得距离
"""
return np.sqrt(np.sum((point1 - point2) ** 2))
class DBSCAN:
def __init__(self, eps, min_samples):
"""
DBSCAN 类的初始化函数
参数:
eps (float):邻域半径
min_samples (int):形成一个聚类所需的最小样本数
"""
self.eps = eps
self.min_samples = min_samples
self.labels = None # 用于存储每个数据点的聚类标签
def fit(self, X):
"""
执行 DBSCAN 聚类算法
参数:
X (numpy.ndarray):输入的数据集
"""
n_samples = X.shape[0] # 获取数据点的数量
self.labels = np.full(n_samples, -1) # 初始化所有标签为 -1,表示未分类
cluster_id = 0 # 聚类标签的初始值
for i in range(n_samples): # 遍历每个数据点
if self.labels[i]!= -1: # 如果已经分类,跳过
continue
neighbors = self.find_neighbors(X, i) # 找到当前点的邻域点
if len(neighbors) < self.min_samples: # 如果邻域点数量少于最小样本数,标记为噪声点
self.labels[i] = -1
else: # 否则,开始扩展聚类
cluster_id += 1
self.expand_cluster(X, i, neighbors, cluster_id) # 扩展聚类
def find_neighbors(self, X, index):
"""
找到给定索引的数据点的邻域点
参数:
X (numpy.ndarray):输入的数据集
index (int):数据点的索引
返回:
list:邻域点的索引列表
"""
neighbors = []
for i in range(X.shape[0]): # 遍历所有数据点
if euclidean_distance(X[index], X[i]) <= self.eps: # 如果距离小于邻域半径,加入邻域列表
neighbors.append(i)
return neighbors
def expand_cluster(self, X, index, neighbors, cluster_id):
"""
扩展聚类
参数:
X (numpy.ndarray):输入的数据集
index (int):当前点的索引
neighbors (list):当前点的邻域点索引列表
cluster_id (int):当前聚类的标签
"""
self.labels[index] = cluster_id # 标记当前点为当前聚类
i = 0
while i < len(neighbors): # 遍历邻域点
neighbor_index = neighbors[i]
if self.labels[neighbor_index] == -1: # 如果邻域点未分类
new_neighbors = self.find_neighbors(X, neighbor_index) # 找到其邻域点
if len(new_neighbors) >= self.min_samples: # 如果邻域点的邻域满足最小样本数要求
neighbors.extend(new_neighbors) # 扩展邻域列表
if self.labels[neighbor_index] == -1: # 如果邻域点未分类,标记为当前聚类
self.labels[neighbor_index] = cluster_id
i += 1
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。