赞
踩
kdtree_node.py
-
- class KDTreeNode:
- """
- KD树结点信息封装
- """
- def __init__(self, instance_node=None, instance_label=None, instance_idx=None,
- split_feature=None, left_child=None, right_child=None, kdt_depth=None):
- """
- 用于封装kd树的结点信息结构
- :param instance_node: 实例点,一个样本
- :param instance_label: 实例点对应的类别标记
- :param instance_idx: 该实例点对应的样本索引,用于kd树的可视化
- :param split_feature: 划分的特征属性,x^(i)
- :param left_child: 左子树,小于划分点的
- :param right_child: 右子树,大于切分点的
- :param kdt_depth: kd树的深度
- """
- self.instance_node = instance_node
- self.instance_label = instance_label
- self.instance_idx = instance_idx
- self.split_feature = split_feature
- self.left_child = left_child
- self.right_child = right_child
- self.kdt_depth = kdt_depth
-
distUtils.py
- import numpy as np
-
-
- class DistanceUtils:
- """
- 距离度量的工具类,此处仅实现闵可夫斯基距离
- """
- def __init__(self, p=2):
- self.p = p # 默认欧式距离,p=1曼哈顿距离,p=np。inf是切比雪夫距离
-
- def distance_func(self, xi, xj):
- """
- 特征空间中两个样本示例的距离计算
- :param xi: k维空间某个样本示例
- :param xj: k维空间某个样本示例
- :return:
- """
- xi, xj = np.asarray(xi), np.asarray(xj)
- if self.p == 1 or self.p == 2:
- return (((np.abs(xi - xj)) ** self.p).sum()) ** (1 / self.p)
- elif self.p == np.inf:
- return np.max(np.abs(xi - xj))
- elif self.p == "cos": # 余弦距离或余弦相似度
- return xi.dot(xj) / np.sqrt((xi ** 2).sum()) / np.sqrt((xj ** 2).sum())
- else:
- raise ValueError("目前仅支持p=1、p=2、p=np.inf或余弦距离四种距离...")
-
knn_kdtree.py
- import numpy as np
- from kdtree_node import KDTreeNode
- from distUtils import DistanceUtils
- import heapq # 堆结构,实现堆排序
- from collections import Counter # 集合中的计数功能
- import networkx as nx # 网络图,可视化
- import matplotlib.pyplot as plt
-
-
- class KNearestNeighborKDTree:
- """
- K近邻算法的实现,基于KD树结构
- 1. fit: 特征向量空间的划分,即构建KD树(建立KNN算法模型)
- 2. predict: 预测,近邻搜索
- 3. 可视化kd树
- """
- def __init__(self, k: int=5, p=2, view_kdt=False):
- """
- KNN算法的初始化必要参数
- :param k: 近邻数
- :param p: 距离度量标准
- :param view_kdt: 是否可视化KD树
- """
- self.k = k # 预测,近邻搜索时,使用的参数,表示近邻树
- self.p = p # 预测,近邻搜索时,使用的参数,表示样本的近邻度
- self.view_kdt = view_kdt
- self.dis_utils = DistanceUtils(self.p) # 距离度量的类对象
- self.kdt_root: KDTreeNode() = None # KD树的根节点
- self.k_dimension = 0 # 特征空间维度,即样本的特征属性数
- self.k_neighbors = [] # 用于记录某个测试样本的近邻实例点
-
- def fit(self, x_train, y_train):
- """
- 递归创建KD树,即对特征向量空间进行划分,递归调用进行创建
- :param x_train: 训练样本集
- :param y_train: 训练样本目标集合
- :return:
- """
- if self.k < 1:
- raise ValueError("k must be greater than 0 and be int.")
- x_train, y_train = np.asarray(x_train), np.asarray(y_train)
- self.k_dimension = x_train.shape[1] # 特征维度
- idx_array = np.arange(x_train.shape[0]) # 训练样本索引编号
- self.kdt_root = self._build_kd_tree(x_train, y_train, idx_array, 0)
- if self.view_kdt:
- self.draw_kd_tree() # 可视化kd树
-
- def _build_kd_tree(self, x_train, y_train, idx_array, kdt_depth):
- """
- 递归创建KD树,KD树是二叉树,严格区分左子树右子树,表示对k维空间的一个划分
- :param x_train: 递归划分的训练样本子集
- :param y_train: 递归划分的训练样本目标子集
- :param idx_array: 递归划分的样本索引
- :param depth: kd树的深度
- :return:
- """
- if x_train.shape[0] == 0: # 递归出口
- return
-
- split_dimension = kdt_depth % self.k_dimension # 数据的划分维度x^(i)
- sorted(x_train, key=lambda x: x[split_dimension]) # 按某个划分维度排序
- median_idx = x_train.shape[0] // 2 # 中位数所对应的数据的索引
- median_node = x_train[median_idx] # 切分点作为当前子树的根节点
- # 划分左右子树区域
- left_instances, right_instances = x_train[:median_idx], x_train[median_idx + 1:]
- left_labels, right_labels = y_train[:median_idx], y_train[median_idx + 1:]
- left_idx, right_idx = idx_array[:median_idx], idx_array[median_idx + 1:]
- # 递归调用
- left_child = self._build_kd_tree(left_instances, left_labels, left_idx, kdt_depth + 1)
- right_child = self._build_kd_tree(right_instances, right_labels, right_idx, kdt_depth + 1)
- kdt_new_node = KDTreeNode(median_node, y_train[median_idx], idx_array[median_idx],
- split_dimension, left_child, right_child, kdt_depth)
- return kdt_new_node
-
- def _search_kd_tree(self, kd_tree: KDTreeNode, x_test):
- """
- kd树的递归搜索算法,后序遍历,搜索k个最近邻实例点
- 数据结构:堆排序,搜索过程中,维护一个小根堆
- :param kd_tree: 已构建的kd树
- :param x_test: 单个测试样本
- :return:
- """
- if kd_tree is None: # 递归出口
- return
-
- # 计算测试样本与当前kd子树的根结点的距离(相似度)
- distance = self.dis_utils.distance_func(kd_tree.instance_node, x_test)
- # 1. 如果不够k个样本,继续递归
- # 2. 如果搜索了k个样本,但是k个样本未必是最近邻的。
- # 当计算的当前实例点的距离小于k个样本的最大距离,则递归,大于最大距离,没必要递归
- if (len(self.k_neighbors) < self.k) or (distance < self.k_neighbors[-1]["distance"]):
- self._search_kd_tree(kd_tree.left_child, x_test) # 递归左子树
- self._search_kd_tree(kd_tree.right_child, x_test) # 递归右子树
- # 在整个搜索路径上的kd树的结点,存储在self.k_neighbors中,包含三个值
- # 当前实例点,类别,距离
- self.k_neighbors.append({
- "node": kd_tree.instance_node, # 结点
- "label": kd_tree.instance_label, # 当前实例的类别
- "distance": distance # 当前实例点与测试样本的距离
- })
- # 按照距离进行排序,选择最小的k个最近邻样本实例,更新最近邻距离
- # 小根堆,k_neighbors中第一个结点是距离测试样本最近的
- self.k_neighbors = heapq.nsmallest(self.k, self.k_neighbors,
- key=lambda d: d["distance"])
-
- def predict(self, x_test):
- """
- KD树的近邻搜索,即测试样本的预测
- :param x_test: 测试样本,ndarray: (n * k)
- :return:
- """
- x_test = np.asarray(x_test)
- if self.kdt_root is None:
- raise ValueError("KDTree is None, Please fit KDTree...")
- elif x_test.shape[1] != self.k_dimension:
- raise ValueError("Test Sample dimension unmatched KDTree's dimension.")
- else:
- y_test_hat = [] # 用于存储测试样本的预测类别
- for i in range(x_test.shape[0]):
- self.k_neighbors = [] # 调用递归搜索,则包含了k个最近邻的实例点
- self._search_kd_tree(self.kdt_root, x_test[i])
- # print(self.k_neighbors)
- y_test_labels = []
- # 取每个近邻样本的类别标签
- for k in range(self.k):
- y_test_labels.append(self.k_neighbors[k]["label"])
- # 按分类规则(多数表决法)
- # print(y_test_labels)
- counter = Counter(y_test_labels)
- idx = int(np.argmax(list(counter.values())))
- y_test_hat.append(list(counter.keys())[idx])
- return np.asarray(y_test_hat)
-
- def _create_kd_tree(self, graph, kdt_node: KDTreeNode, pos=None, x=0, y=0, layer=1):
- """
- 递归可视化KD树,递归构造树的结点、边。
- :param graph: 有向图对象,递归中逐步增加结点和左子树右子树
- :param kdt_node: 递归创建KD树的结点
- :param pos: 可视化中树结点位置,初始化(0, 0)绘制根结点
- :param x: 对应pos中的横坐标,随着递归,更新
- :param y: 对应pos中的纵坐标,随着递归,更新
- :param layer: kd树的层次
- :return:
- """
- if pos is None:
- pos = {}
- pos[str(kdt_node.instance_idx)] = (x, y)
- if kdt_node.left_child:
- # 父结点指向左子树
- graph.add_edge(str(kdt_node.instance_idx), str(kdt_node.left_child.instance_idx))
- l_x, l_y = x - 1 / 2 ** layer, y - 1 # 下一个树结点位置的计算
- l_layer = layer + 1 # 树的层次 + 1
- self._create_kd_tree(graph, kdt_node.left_child, x=l_x, y=l_y, pos=pos, layer=l_layer) # 递归
- if kdt_node.right_child:
- # 父结点指向右子树
- graph.add_edge(str(kdt_node.instance_idx), str(kdt_node.right_child.instance_idx))
- r_x, r_y = x + 1 / 2 ** layer, y - 1
- r_layer = layer + 1
- self._create_kd_tree(graph, kdt_node.right_child, x=r_x, y=r_y, pos=pos, layer=r_layer) # 递归
- return graph, pos
-
- def draw_kd_tree(self):
- """
- 可视化kd树
- :return:
- """
- directed_graph = nx.DiGraph() # 初始化一个有向图,树
- graph, pos = self._create_kd_tree(directed_graph, self.kdt_root)
- fig, ax = plt.subplots(figsize=(20, 10)) # 比例可以根据树的深度适当调节
- nx.draw_networkx(graph, pos, ax=ax, node_size=500, font_color="w", font_size=15,
- arrowsize=20)
- plt.tight_layout()
- plt.show()
test_knn_1.py
- import numpy as np
- from sklearn.datasets import load_iris, load_breast_cancer
- from knn_kdtree import KNearestNeighborKDTree
- from sklearn.model_selection import train_test_split
- from sklearn.metrics import classification_report, accuracy_score
- import matplotlib.pyplot as plt
- from sklearn.model_selection import StratifiedKFold
- from sklearn.preprocessing import StandardScaler
-
-
- iris = load_iris()
- X, y = iris.data, iris.target
-
- # bc_data = load_breast_cancer()
- # X, y = bc_data.data, bc_data.target
- X = StandardScaler().fit_transform(X)
-
- X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=0, stratify=y)
- k_neighbors = np.arange(3, 21)
-
- # acc = []
- # for k in k_neighbors:
- # knn = KNearestNeighborKDTree(k=k)
- # knn.fit(X_train, y_train)
- # y_test_hat = knn.predict(X_test)
- # # print(classification_report(y_test, y_test_hat))
- # acc.append(accuracy_score(y_test, y_test_hat))
-
- accuracy_scores = [] # 存储每个alpha阈值下的交叉验证均分
- for k in k_neighbors:
- scores = []
- k_fold = StratifiedKFold(n_splits=10).split(X, y)
- for train_idx, test_idx in k_fold:
- # knn = KNearestNeighborKDTree(k=k, p="cos")
- knn = KNearestNeighborKDTree(k=k)
- knn.fit(X[train_idx], y[train_idx])
- y_test_pred = knn.predict(X[test_idx])
- scores.append(accuracy_score(y[test_idx], y_test_pred))
- del knn
- print("k = %d:" % k, np.mean(scores))
- accuracy_scores.append(np.mean(scores))
-
- plt.figure(figsize=(7, 5))
- plt.plot(k_neighbors, accuracy_scores, "ko-", lw=1)
- plt.grid(ls=":")
- plt.xlabel("K Neighbors", fontdict={"fontsize": 12})
- plt.ylabel("Accuracy Scores", fontdict={"fontsize": 12})
- plt.title("KNN(KDTree) Testing Scores under different K Neighbors", fontdict={"fontsize": 14})
- plt.show()
-
- # knn = KNearestNeighborKDTree(k=3)
- # knn.fit(X_train, y_train)
- # knn.draw_kd_tree()
-
-
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。