赞
踩
# -*- coding: utf-8 -*- # @Time : 2021/7/21 21:28 # @Email : 2635681517@qq.com from collections import Counter import numpy as np from math import sqrt class KNNClassifier: def __init__(self, k): """初始化kNN分类器""" if k <= 1: return "K值无效" self.k = k self._X_train = None self._y_train = None def fit(self, _X_train, _y_train): """根据训练数据集X_train和y_train训练kNN分类器""" self._X_train = _X_train self._y_train = _y_train return self def predict(self, X_predict): """给定待预测数据集X_predict,返回表示X_predict的结果向量""" assert self._X_train is not None and self._y_train is not None, \ "must fit before predict!" assert X_predict.shape[1] == self._X_train.shape[1], \ "the feature number of X_predict must be equal to X_train" y_predict = [] for x in X_predict: # 把预测值放入一个列表里返回预测的数组 y_predict.append(self._predict(x)) return np.array(y_predict) def _predict(self, x): """给定单个待预测数据x,返回x的预测结果值""" assert x.shape[0] == self._X_train.shape[1], \ "the feature number of x must be equal to X_train" distances = [] for x_train in self._X_train: distances.append(sqrt(np.sum((x_train - x) ** 2))) print("预测点到原来点的距离为:") print(distances) """ nearest:找到距离最近的K个点位置(索引) """ nearest = np.argsort(distances) """ 在距离预测值 最近的 k个值中 进行 分类 统计 得出最多的比例类别 """ """ _y_train[i]:分类 """ topK_y = [] # 得到最近的三个点的距离分别是多大 for i in nearest[:self.k]: # nearest[:k]计算距离最近的k个点的距离 topK_y.append(self._y_train[i]) # 分类 """Counter(topK_y)统计分类""" votes = Counter(topK_y) # 分类统计 """votes.most_common(1)[0][0]取出分类过后最多的那个类别是0还是1""" return votes.most_common(1)[0][0] def score(self, X_test, y_test): """根据测试数据集 X_test 和 y_test 确定当前模型的准确度""" y_predict = self.predict(X_test) return accuracy_score(y_test, y_predict) def __repr__(self): return "KNN(k=%d)" % self.k
测试函数代码(数据是乱编的):
# -*- coding: utf-8 -*- # @Time : 2021/7/21 21:53 # @Email : 2635681517@qq.com import sys import numpy as np from sf.KNN1 import KNNClassifier if __name__ == '__main__': raw_data_X = [[3.393533211, 2.331273381], [3.110073483, 1.781539638], [1.343808831, 3.368360954], [3.582294042, 4.679179110], [2.280362439, 2.866990263], [7.423436942, 4.696522875], [5.745051997, 3.533989803], [9.172168622, 2.511101045], [7.792783481, 3.424088941], [7.939820817, 0.791637231] ] x = np.array([8.093607318, 3.365731514]) X_predict = x.reshape(1, -1) raw_data_y = [0, 0, 0, 0, 0, 1, 1, 1, 1, 1] # 相当于是类别 X_train = np.array(raw_data_X) y_train = np.array(raw_data_y) knn_clf = KNNClassifier(3) knn_clf.fit(X_train, y_train) y_predict = knn_clf.predict(X_predict) print("所属类别") print(y_predict) print(y_predict[0])
distances.append(sqrt(np.sum((x_train - x) ** 2)))
换成
distances.append((np.sum(abs(x_train - x))))
distances.append((np.sum(abs(x_train - x)**2))**1/2)
# -*- coding: utf-8 -*- # @Time : 2021/7/26 10:14 # @Email : 2635681517@qq.com import numpy as np import matplotlib.pyplot as plt from sklearn import datasets from KNN1 import KNNClassifier def train_test_split(X, y, test_ratio=0.2, seed=None): """ :param X: :param y: :param test_ratio: 测试集比例0.2 :param seed: 随机数 :return: 分类好的数据:X_train, y_train, X_test, y_test """ if X.shape[0] != y.shape[0]: print("X的行数需要和y的列数相同") if 0.0 >= test_ratio >= 1.0: print("测试比例需要在0.0-1.0之间") if seed: np.random.seed(seed) shuffled_indexes = np.random.permutation(len(X)) test_size = int(len(X) * test_ratio) # 测试数据的大小 train_indexes = shuffled_indexes[test_size:] # 训练数据集的索引后80% test_indexes = shuffled_indexes[:test_size] # 测试数据集的索引前20% """根据索引进行取出数据取出训练集""" X_train = X[train_indexes] y_train = y[train_indexes] """根据索引进行取出数据取出测试集""" X_test = X[test_indexes] y_test = y[test_indexes] return X_train, y_train, X_test, y_test
def accuracy_score(y_true, y_predict):
"""
:param y_true: 测试值,分类出来的y_test
:param y_predict: 预测值,利用X_test跑predict算法得出来的预测值
:return: 精确度
"""
"""计算y_true和y_predict之间的准确率"""
if y_true.shape[0] != y_predict.shape[0]:
print("预测值和训练数据的行数要相同")
return sum(y_true==y_predict)/len(y_true)
5、超参数,在knn算法中存在一个k值,k值的选取适当与否直接关系到这个KNN算法预测值是否准确,所以我们怎么确定这个参数呢,一个方法就是在一定范围内利用不同k值计算出所有的准确率,精确率最高的那个k值就是我们所需要的k值,我们查看源码可以可以看到KNeighborsClassifier算法的参数有 ,这些参数的合适值我都可以找到,在不同的应用场景参数的值可能会不一样。
def __init__(self, n_neighbors=5, *,
weights='uniform', algorithm='auto', leaf_size=30,
p=2, metric='minkowski', metric_params=None, n_jobs=None,
**kwargs):
下面用代码进行找到这样一个k值,(说明:这里也可以直接调用sklearn的网格化GridSearchCV方法得出k值)。
start1 = time.perf_counter() # 寻找最好的k best_score = 0.0 best_k = -1 best_method = "" for method in ["uniform", "distance"]: for k in range(1, 11): """得到一个对象""" knn_clf = KNeighborsClassifier(n_neighbors=k, weights=method) """利用X_train和y_train进行拟合""" knn_clf.fit(X_train, y_train) """计算出精确度score""" score = knn_clf.score(X_test, y_test) if score > best_score: best_k = k best_score = score best_method = method end1 = time.perf_counter() print("best_method =", best_method) print("best_k =", best_k) print("best_score =", best_score) print("final is in : %s Seconds " % (end1 - start1))
结果:k值为3,
best_method = uniform
best_k = 3
best_score = 0.9916666666666667
final is in : 0.4128426999999999 Seconds
best_score = 0.0 best_k = -1 best_p = "" for k in range(1, 11): for p in range(1, 6): """得到一个对象""" knn_clf = KNeighborsClassifier(n_neighbors=k, weights="distance", p=p) """利用X_train和y_train进行拟合""" knn_clf.fit(X_train, y_train) """计算出精确度score""" score = knn_clf.score(X_test, y_test) if score > best_score: best_k = k best_score = score best_p = p end1 = time.perf_counter() print("best_p =", best_p) print("best_k =", best_k) print("best_score =", best_score) print("final is in : %s Seconds " % (end1 - start1))
结果:
best_p = 2
best_k = 3
best_score = 0.9916666666666667
final is in : 24.7262921 Seconds
best_score = 0.0 best_k = -1 best_p = "" best_leaf_size = -1 for leaf_size in range(20, 40): for k in range(1, 11): # for p in range(1, 6): """得到一个对象""" knn_clf = KNeighborsClassifier(n_neighbors=k, weights="distance", leaf_size=leaf_size) """利用X_train和y_train进行拟合""" knn_clf.fit(X_train, y_train) """计算出精确度score""" score = knn_clf.score(X_test, y_test) if score > best_score: best_score = score best_k = k best_leaf_size = leaf_size end1 = time.perf_counter() print("best_k =", best_k) print("best_score =", best_score) print("best_leaf_size =", best_leaf_size) print("final is in : %s Seconds " % (end1 - start1))
结果如下:
best_k = 3
best_score = 0.9916666666666667
best_leaf_size = 20
final is in : 27.014338 Seconds
6 、前面提到过的网格化搜索GridSearchCV也可以找到最合适的值,但是我还是习惯自己写算法来进行操作:GridSearchCV的使用代码如下:
# -*- coding: utf-8 -*- # @Time : 2021/7/27 10:20 # @Email : 2635681517@qq.com import time import numpy as np from sklearn import datasets digits = datasets.load_digits() from sklearn.neighbors import KNeighborsClassifier X = digits.data y = digits.target from sklearn.model_selection import train_test_split from sklearn.model_selection import GridSearchCV start1 = time.perf_counter() X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=666) sk_knn_clf = KNeighborsClassifier(n_neighbors=3, weights="uniform") sk_knn_clf.fit(X_train, y_train) res = sk_knn_clf.score(X_test, y_test) end1 = time.perf_counter() print(res) print("final is in : %s Seconds " % (end1 - start1)) # 网格化搜索 parm_grid = [ { 'weights': ['uniform'], 'n_neighbors': [i for i in range(1, 11)] }, { 'weights': ['distance'], 'n_neighbors': [i for i in range(1, 11)], 'p': [i for i in range(1, 6)], # 'leaf_size': [i for i in range(10, 40)] } ] """再加上一个参数leaf_size的话搜索的时间复杂度就是O(n^3)这样的算法是不采用的,很花时间, 而实际上,我们写的算法大多数都是O(n),都是在这基础上改进的""" knn_clf = KNeighborsClassifier() grid_search = GridSearchCV(knn_clf, parm_grid) start1 = time.perf_counter() # 网格化搜索的拟合 grid_search.fit(X_train, y_train) end1 = time.perf_counter() print("final is in : %s Seconds " % (end1 - start1)) start1 = time.perf_counter() print(grid_search.best_estimator_) end1 = time.perf_counter() print("final is in : %s Seconds " % (end1 - start1)) print("grid_search.best_score_:") print(grid_search.best_score_) print("========================") # 把计算机本身的核数n_jobs(CPU的核数)加进去就可以得到一个较快的速度 grid_search = GridSearchCV(knn_clf, parm_grid, n_jobs=-1, verbose=2) start1 = time.perf_counter() grid_search.fit(X_train, y_train) end1 = time.perf_counter() print("final is in : %s Seconds " % (end1 - start1)) print("==========================") print(grid_search.best_estimator_) print(str(grid_search.best_score_*100)+'%')
结果:
0.9916666666666667
final is in : 0.01844010000000007 Seconds
final is in : 79.1339263 Seconds
KNeighborsClassifier(n_neighbors=1)
final is in : 0.0012061999999986028 Seconds
grid_search.best_score_:
0.9860820751064653
==========================
加入cpu核数后搜索时间明显比前面的79.1339263少,这里的k值是1,在不同的环境下k值会不一样,准确度也会不一样
final is in : 22.741624200000004 Seconds
==========================
KNeighborsClassifier(n_neighbors=1)
98.60820751064652%
# -*- coding: utf-8 -*- # @Time : 2021/7/27 11:11 # @Email : 2635681517@qq.com import numpy as np import matplotlib.pyplot as plt # 最值归一化Normalization x = np.random.randint(0, 100, 100) print(np.min(x)) print(np.max(x)) array = (x - np.min(x)) / (np.max(x) - np.min(x)) X = np.random.randint(0, 100, (50, 2)) # print(X) print("=======") # print(X[1:3, 1:])# 取第一行到第2行的元素,再取第一列的元素 X = np.array(X, dtype=float) # print(X[:, 0]) # 取所有行中的第0列所有元素放在一个列表中 # print(X[:10, :]) X[:, 0] = (X[:, 0] - np.min(X[:, 0])) / (np.max(X[:, 0]) - np.min(X[:, 0])) X[:, 1] = (X[:, 1] - np.min(X[:, 1])) / (np.max(X[:, 1]) - np.min(X[:, 1])) print(X[:10, :]) plt.scatter(X[:, 0], X[:, 1]) plt.show() print(np.mean(X[:, 0])) print(np.std(X[:, 0])) print(np.mean(X[:, 1])) print(np.std(X[:, 1])) # 均值方差归一化 Standardization X2 = np.random.randint(0, 100, (50, 2)) X2 = np.array(X2, dtype=float) X2[:, 0] = (X2[:, 0] - np.mean(X2[:, 0])) / np.std(X2[:, 0]) X2[:, 1] = (X2[:, 1] - np.mean(X2[:, 1])) / np.std(X2[:, 1]) plt.scatter(X2[:, 0], X2[:, 1]) plt.show() print("============") print(np.mean(X2[:, 0])) print(np.std(X2[:, 0])) print(np.mean(X2[:, 1])) print(np.std(X2[:, 1]))
# -*- coding: utf-8 -*- # @Time : 2021/7/27 16:45 # @Email : 2635681517@qq.com import numpy as np from sklearn import datasets from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler from sklearn.neighbors import KNeighborsClassifier iris = datasets.load_iris() X = iris.data y = iris.target X_train, X_test, y_train, y_test = train_test_split(iris.data, iris.target, test_size=0.2, random_state=666) standardScalar = StandardScaler() # print(X_train[:10, :]) print(y_test) standardScalar.fit(X_train) # print(standardScalar.mean_) # 标准差 # print(standardScalar.scale_) # 方差 # print(standardScalar.transform(X_train)) # 归一化 # print(standardScalar.transform(X_test)) X_train = standardScalar.transform(X_train) X_test_standard = standardScalar.transform(X_test) print(X_test_standard) print('===============================') knn_clf = KNeighborsClassifier(n_neighbors=3) knn_clf.fit(X_train, y_train) # 归一化后的数据 print(y_test) """因为X_train是经过归一化处理后的数据,所以在测试(X_test_standard)时也是需要使用归一化的数据进行测试得出预测值""" print(knn_clf.score(X_test_standard, y_test)) # X_test这个没有进行归一化处理不行,要么都归一化,要么都不归一化处理 print(knn_clf.score(X_test, y_test))
# -*- coding: utf-8 -*- # @Time : 2021/7/27 18:02 # @Email : 2635681517@qq.com import numpy as np """归一化,求均值和方差""" class StandardScaler: def __init__(self): self.mean_ = None self.scale_ = None def fit(self, X): """根据训练数据集X获得数据的均值和方差""" if X.ndim != 2: raise Exception("维度需要是2维") self.mean_ = np.array([np.mean(X[:, i]) for i in range(X.shape[1])]) self.scale_ = np.array([np.std(X[:, i]) for i in range(X.shape[1])]) return self def transform(self, X): """将X根据这个StandardScaler进行均值方差归一化处理""" if X.ndim != 2: raise Exception("维度需要是2维") if self.mean_ is None and self.scale_ is None: raise Exception("mean_和scale_的特征值不能为空") resX = np.empty(shape=X.shape, dtype=float) for col in range(X.shape[1]): # 均值归一化,这里才是算法的核心部分 resX[:, col] = (X[:, col] - self.mean_[col]) / self.scale_[col] return resX
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。