赞
踩
层次聚类试图在不同层次对数据集进行划分,从而形成树形的聚类结构。数据集的划分可采用“自底向上”的聚合策略,也可采用“自顶向下”的分拆策略。
AGENS是一种自底向上聚合策略的层次聚类算法。它先将数据集中的每一个样本看作一个初始聚类,然后在算法运行的每一步找出距离最近的两个聚类簇进行合并,该过程不断重复,直至达到预设的聚类簇的个数。
来源: 知乎 - yyHaker - 层次聚类AGENS算法及其流程
图片来源: 知乎 - yyHaker - 层次聚类AGENS算法及其流程
两个簇的最近样本的距离
d
m
i
n
(
C
i
,
C
j
)
=
min
x
∈
C
i
,
z
∈
C
j
d
i
s
t
(
x
,
z
)
d_{min}(C_i,C_j)=\min_{x \in C_i, z \in C_j} dist(x, z)
dmin(Ci,Cj)=x∈Ci,z∈Cjmindist(x,z)
两个簇的最远样本的距离
d
m
a
x
(
C
i
,
C
j
)
=
max
x
∈
C
i
,
z
∈
C
j
d
i
s
t
(
x
,
z
)
d_{max}(C_i,C_j)=\max_{x \in C_i, z \in C_j} dist(x, z)
dmax(Ci,Cj)=x∈Ci,z∈Cjmaxdist(x,z)
两个簇的所有样本对的距离平均
d
a
v
g
(
C
i
,
C
j
)
=
1
∣
C
i
∣
⋅
∣
C
j
∣
∑
x
∈
C
i
∑
z
∈
C
j
d
i
s
t
(
x
,
z
)
d_{avg}(C_i,C_j)={1 \over {\vert C_i \vert}\cdot {\vert C_j \vert}}\sum_{x \in C_i}\sum_{z \in C_j}dist(x,z)
davg(Ci,Cj)=∣Ci∣⋅∣Cj∣1x∈Ci∑z∈Cj∑dist(x,z)
也称为重心距离, 重心的求法这里不再赘述
假设k取5, 也就是我们假定有5个种类
而eps取100, minPts取2的DB SCAN对相同的数据聚类结果如下:
DB SCAN 可以参考我的另一篇博文 CSDN - Lin Shengfeng - DB SCAN
这里只是粗略贴了两个方法的结果. 不代表哪个方法更好(毕竟参数的调节也至关重要).
当然, 还有更加科学的评价方法, 挖个坑, 以后填.
main.py
from dbscan_cluster import dbScanCluster
from config import *
from agens_cluster import agensCluster
if __name__ == '__main__':
# DB SCAN聚类
x, y, label, maxCluster = dbScanCluster(total_size=100, eps=100, min_pts=2)
scatterDifferentForDB(x, y, label, maxCluster, color_list=['#8E05C2', '#A9333A', '#3E7C17', 'blue', '#F4A442', '#FF9292', '#1DB9C3', '#6D9886'], marker_list=['^', 'o', '1', 'p', 's'])
# 层次聚类 AGENS
agensClusterResult = agensCluster(5, x, y)
scatterDifferentForAgens(agensClusterResult, color_list=['#8E05C2', '#A9333A', '#3E7C17', 'blue', '#F4A442', '#FF9292', '#1DB9C3', '#6D9886'], marker_list=['^', 'o', '1', 'p', 's'])
agens_cluster.py
from Cluster import Cluster from point import point from generate_random_points import * class agens_cluster: def __init__(self, x_total, y_total, k_limit, method='g'): """ 构造函数,初始化agens_cluster对象 :param x: 所有点的x轴坐标列表 :param y: 所有点的y轴坐标列表 :param k_limit: 超参数k, 最后要得到k类 """ self.points = [point(x_total[i], y_total[i]) for i in range(len(x_total))] self.k = k_limit self.d_method = method # 距离度量默认采用质心距离 self.clusters = [Cluster(self.points[i], i) for i in range(len(self.points))] # 初始情况是把所有点单独看成一个簇 def setDistanceMethod(self, method): """ 更改距离度量方式 :param method: 合法取值为 'min', 'max', 'avg', 'g' :return: None """ self.d_method = method def refreshClusterName(self): for i in range(len(self.clusters)): self.clusters[i].setName(i) def mainProcess(self): """ https://zhuanlan.zhihu.com/p/50113029 这篇文章讲述了详细的中间步骤,但是它维护了一个距离矩阵,这在我的demo中是不需要的。 :return: 簇划分 C = {C1, C2, ..., Ck} """ # 循环判断当前簇的个数是否已经跟预期(k)一样了,如果不一样就继续执行,一样则返回结果 while len(self.clusters) > self.k: # 找出距离最近的两个簇Ci和Cj # 初始设置为第一个簇和最后一个簇 closest_cluster_i = 0 closest_cluster_j = len(self.clusters) - 1 minimum_distance = self.clusters[closest_cluster_i].calculateDistance(self.clusters[closest_cluster_j], self.d_method) # 找出距离最小的两个簇 for i in range(len(self.clusters)): for j in range(len(self.clusters) - 1, i, -1): temp_distance = self.clusters[i].calculateDistance(self.clusters[j], self.d_method) # 暂存距离 if temp_distance < minimum_distance: # 找到了更小的距离 closest_cluster_i = i closest_cluster_j = j minimum_distance = temp_distance # 合并这两个簇 # 1. j的元素全部移到i # 2. self.clusters中删除j self.clusters[closest_cluster_i].union(self.clusters[closest_cluster_j]) # del self.clusters[closest_cluster_j] # 这种方式删除并不推荐 self.clusters.pop(closest_cluster_j) # 推荐使用这个方法 self.refreshClusterName() # 清洗目前簇的名称/序号 return self.clusters def agensCluster(k, x, y, method='g', total_size=200, xScale=1000, yScale=1000): if len(x) == 0: x, y = generate(total=total_size, x_scale=xScale, y_scale=yScale) # 生成随机点 ac = agens_cluster(x, y, k, method) return ac.mainProcess()
dbscan_cluster.py
from point import * import random import queue from generate_random_points import * import matplotlib.pyplot as plt class dbscan_cluster: def __init__(self, x_total, y_total, eps: int, min_pts: int) -> object: """ :param eps: int, 表示半径 :param min_pts: int, 最少要包含多少点 """ self.points = [point(x_total[i], y_total[i]) for i in range(len(x_total))] self.wait_for_pick = [i for i in range(len(self.points))] # 可选取的点 self.eps = eps self.min_pts = min_pts self.cur_cluster_num = 0 # 簇序号从0开始 def printAllPoints(self): for p in self.points: print('({0}, {1})'.format(p.x_pos, p.y_pos)) def getClusterNum(self): return self.cur_cluster_num def initPick(self): """ :return: 在wait_for_pick中随机挑选一个点,返回点在points中的序号 """ cur_order = random.randint(0, len(self.wait_for_pick) - 1) # wait_for_pick中的第cur_order个序号 cur_pick = self.wait_for_pick[cur_order] # 存入cur_pick self.wait_for_pick.pop(cur_order) # 再删除 return cur_pick def findNearPoint(self, cur_pick): """ :param cur_pick: int, 表示当前选中的点序号 :return: 与当前点距离在sqrt(eps^2)内的所有点序号 """ near_points = [] for p in range(len(self.points)): if p == cur_pick: # 同一个点, 跳过 continue else: # 其他点, 计算一下 if self.points[cur_pick].calculateDistanceSquare(self.points[p]) <= self.eps ** 2: near_points.append(p) return near_points def mainProcess(self): while len(self.wait_for_pick) > 0: # 先随机选取点, 并且在wait_for_pick里面删除其序号 pre_points = [] # 标记这一个簇已经找到的点 seeds = queue.Queue() # 创建一个种子点队列 cur_pick = self.initPick() # 获取一个随机点 near_points = self.findNearPoint(cur_pick) # 获取随机点的邻近点 # 当前点的邻近点数量小于 min_pts, 标记为噪声点 if len(near_points) < self.min_pts: self.points[cur_pick].setPointType(NOISE) # 当前的点是个噪声点 # 大于 min_pts, 标记为核心点 elif len(near_points) > self.min_pts: self.points[cur_pick].setPointType(CORE) # 当前的点是核心点 self.points[cur_pick].setClusterNum(self.cur_cluster_num) # 设置簇号 pre_points.append(cur_pick) # 当前点后续不必再遍历 for near_point_index in near_points: # 把它的邻近点全部加入到seeds队列 seeds.put(near_point_index) # 把邻近点在points中的下标加入seeds队列 # wait_for_pick队列中需要删除这个邻近点下标 if self.wait_for_pick.count(near_point_index) > 0: # 这个点下标在wait_for_pick队列中 self.wait_for_pick.pop(self.wait_for_pick.index(near_point_index)) # 那就pop掉 while not seeds.empty(): # 只有seeds中还有点数时才继续执行 head_point_index = seeds.get() # 获取队列头部元素, get()方法会在取元素的同时删除队列中的它 pre_points.append(head_point_index) # 后续不必再找它 if self.wait_for_pick.count(head_point_index) > 0: # 这个点下标在wait_for_pick队列中 self.wait_for_pick.pop(self.wait_for_pick.index(head_point_index)) # 那就pop掉 self.points[head_point_index].setClusterNum(self.cur_cluster_num) # 在这个seeds里能找到, 说明簇号还是一样 if self.points[head_point_index].getPointType() == UNVISITED: # 还没标记过 cur_near_pts = self.findNearPoint(head_point_index) # 查看一下它的邻近点 if len(cur_near_pts) > self.min_pts: # 如果邻近点数量超过阈值 self.points[head_point_index].setPointType(CORE) # 核心点 # 把邻近点全部加入seeds for p in cur_near_pts: if p not in pre_points: # 没遍历过的邻近点才加入 seeds.put(p) else: # 邻近点数量没到 self.points[head_point_index].setPointType(BOARD) # 边界点 else: # 已经被标记过 if self.points[head_point_index].getPointType() == NOISE: # 之前是噪声 self.points[head_point_index].setPointType(BOARD) # 现在改成边界点 self.cur_cluster_num = self.cur_cluster_num + 1 # 下一个簇号 def getAllCluster(self): """ :return: 返回所有点的标签 """ return [self.points[i].getClusterNum() + 1 for i in range(len(self.points))] # +1 是为了防止标签为-1的点没有对应的颜色 def printPointAndLabel(self): """ 这个方法打印点信息,主要用来测试 :return: 无返回值 """ cluster_list = [[] for i in range(self.cur_cluster_num)] for pot in range(len(self.points)): print(self.points[pot].getClusterNum(), self.cur_cluster_num) if self.points[pot].getClusterNum() > -1: cluster_list[self.points[pot].getClusterNum()].append(pot) for li in range(len(cluster_list)): if len(cluster_list[li]) > 0: print('cluster #{0}'.format(li)) for i in cluster_list[li]: print('({0}, {1})'.format(self.points[i].x, self.points[i].y)) print('--------------') def dbScanCluster(total_size=200, xScale=1000, yScale=1000, eps=80, min_pts=3): # 目前只支持生成随机点 x, y = generate(total=total_size, x_scale=xScale, y_scale=yScale) # 生成随机点 db = dbscan_cluster(x, y, eps, min_pts) db.mainProcess() label = db.getAllCluster() maxCluster = db.getClusterNum() return x, y, label, maxCluster
Cluster.py
""" Cluster类: 属性: - 簇内点集合 - 簇名 方法: - 合并另一个簇 - 计算质心 - 计算与另一个簇的距离 - 提供点集合 - 计算簇与簇间的最大点距离 - 计算簇与簇间的最小点距离 - 计算簇与簇间的质心距离 - 计算簇与簇间的平均点距离(与质心点距离不一样!!!) """ import point class Cluster: def __init__(self, pt, name): self.points = [pt] self.name = name def setName(self, name): self.name = name def getName(self): return self.name def getPoints(self): return self.points def union(self, another_cluster): self.points.extend(another_cluster.getPoints()) def getCore(self): """ 计算质心坐标 :return: 质心坐标(tuple) """ total_x = 0 total_y = 0 for p in self.points: total_x = total_x + p.x_pos total_y = total_y + p.y_pos core = (total_x / len(self.points), total_y / len(self.points)) return core def calcCoreDistance(self, another_cluster): """ 计算两个簇的质心距离 :param another_cluster: 另一个簇 :return: 两个簇的质心距离 """ myCore = self.getCore() anotherCore = another_cluster.getCore() return ((myCore[0] - anotherCore[0]) ** 2 + (myCore[1] - anotherCore[1]) ** 2) ** 0.5 def calcMinDistance(self, another_cluster): """ 计算两个簇的最小距离(两个簇之间最近的两个点的距离) :param another_cluster: 另一个簇 :return: 两个簇的最小距离 """ minDis = -1 # 初始值 -1(非法值) for x in self.getPoints(): for y in another_cluster.getPoints(): if minDis == -1 or minDis > x.calculateDistanceSquare(y): minDis = x.calculateDistanceSquare(y) return minDis ** 0.5 # 因为获取的是平方 所以要取根号 def calcMaxDistance(self, another_cluster): """ 计算两个簇的最大距离(两个簇之间最远的两个点的距离) :param another_cluster: 另一个簇 :return: 两个簇的最小距离平方!!!平方是为了方便计算和比较 """ maxDis = -1 # 初始值 -1(非法值) for x in self.getPoints(): for y in another_cluster.getPoints(): if maxDis == -1 or maxDis < x.calculateDistanceSquare(y): maxDis = x.calculateDistanceSquare(y) return maxDis ** 0.5 # 因为获取的是平方 所以要取根号 def calcAvgDistance(self, another_cluster): """ 计算两个簇的平均距离 :param another_cluster: 另一个簇 :return: 两个簇的平均距离!!! """ totalDis = 0 myPoints = self.getPoints() # 本簇的点集合 anotherPoints = another_cluster.getPoints() # 另一个簇的点集合 for x in myPoints: for y in anotherPoints: totalDis = totalDis + x.calculateDistanceSquare(y) ** 0.5 # 欧式距离 return totalDis/(len(myPoints)*len(anotherPoints)) def calculateDistance(self, another_cluster, method='g'): if method == 'avg': return self.calcAvgDistance(another_cluster) elif method == 'min': return self.calcMinDistance(another_cluster) elif method == 'max': return self.calcMaxDistance(another_cluster) else: # method == 'g' return self.calcCoreDistance(another_cluster)
point.py
from config import * class point: def __init__(self, x_pos, y_pos): self.x = x_pos self.y = y_pos self.point_type = UNVISITED # 未被访问过 self.cluster = NONE # 还不属于任何簇 @property def x_pos(self): return self.x @property def y_pos(self): return self.y def setPointType(self, t): self.point_type = t def getPointType(self): return self.point_type def setClusterNum(self, num): self.cluster = num def getClusterNum(self): return self.cluster def calculateDistanceSquare(self, another_point): return (self.x - another_point.x)**2 + (self.y - another_point.y)**2
config.py
import matplotlib.pyplot as plt import numpy as np # 点类型参数值: UNVISITED: int = -1 # 未被访问过 NOISE: int = 0 # 噪声点 CORE: int = 1 # 核心点 BOARD: int = 2 # 边缘点 # 点所属簇参数值: NONE: int = -1 # 为不同组别绘图的功能函数(因为有噪声点,所以是专供db scan聚类的方法) def scatterDifferentForDB(x, y, label, cluster_num, color_list, marker_list): # 先根据label给不同的x和y分组 x_group = [[] for i in range(cluster_num + 1)] y_group = [[] for i in range(cluster_num + 1)] for i in range(len(label)): x_group[label[i]].append(x[i]) y_group[label[i]].append(y[i]) fig, ax = plt.subplots() plt.scatter(np.array(x_group[0]), np.array(y_group[0]), c='black', label='NOISE') for cluster_order in range(1, len(x_group)): plt.scatter(np.array(x_group[cluster_order]), np.array(y_group[cluster_order]), c=color_list[cluster_order % len(color_list)], marker=marker_list[cluster_order % len(marker_list)], label=cluster_order) # plt.scatter(x, y, c=label, cmap='jet') plt.legend(bbox_to_anchor=(1.05, 0), loc=3, borderaxespad=0) # 让图例显示完全 fig.subplots_adjust(right=0.8) # 显示图片 plt.show() # 为不同组别绘图的功能函数(无噪声点, 专供层次聚类) def scatterDifferentForAgens(clusters, color_list, marker_list): x_group = [[] for i in range(len(clusters))] y_group = [[] for i in range(len(clusters))] for c in clusters: for i in range(len(c.getPoints())): x_group[c.getName()].append(c.getPoints()[i].x_pos) y_group[c.getName()].append(c.getPoints()[i].y_pos) fig, ax = plt.subplots() for cluster_order in range(len(x_group)): plt.scatter(np.array(x_group[cluster_order]), np.array(y_group[cluster_order]), c=color_list[cluster_order % len(color_list)], marker=marker_list[cluster_order % len(marker_list)], label=cluster_order) # plt.scatter(x, y, c=label, cmap='jet') plt.legend(bbox_to_anchor=(1.05, 0), loc=3, borderaxespad=0) # 让图例显示完全 fig.subplots_adjust(right=0.8) # 显示图片 plt.show()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。