输入: n n n个样本的集合 X X X
输出:样本集合的聚类 C ∗ C^* C∗
初始化,令 t = 0 t=0 t=0,随机选择 k k k个样本点作为初始聚类中心
m ( 0 ) = ( m 1 ( 0 ) , … , m l ( 0 ) , … , m k ( 0 ) ) m^{(0)}=(m_1^{(0)},\dots,m_l^{(0)},\dots,m_k^{(0)}) m(0)=(m1(0),…,ml(0),…,mk(0))
对样本进行聚类,对固定的类中心 m ( t ) = ( m 1 ( t ) , … , m l ( t ) , … , m k ( t ) ) m^{(t)}=(m_1^{(t)},\dots,m_l^{(t)},\dots,m_k^{(t)}) m(t)=(m1(t),…,ml(t),…,mk(t)),其中 m k ( t ) m_k^{(t)} mk(t)为类 C l C_l Cl的中心,
计算每个样本到类中心的距离,将每个样本指派到与其最近的中心的类中,构成聚类结果 C ( t ) C^(t) C(t)
计算新的类中心,对聚类结果 C ( t ) C^{(t)} C(t),计算当前各个类中的样本的均值,作为新的类中心
m ( t + 1 ) = ( m 1 ( t + 1 ) , … , m l ( t + 1 ) , … , m k ( t + 1 ) ) m^{(t+1)}=(m_1^{(t+1)},\dots,m_l^{(t+1)},\dots,m_k^{(t+1)}) m(t+1)=(m1(t+1),…,ml(t+1),…,mk(t+1))
如果迭代收敛或符合停止条件,输出 C ∗ = C ( t ) C^*=C^{(t)} C∗=C(t),后者令 t = t + 1 t=t+1 t=t+1
C ( i ) = j t h a t m i n i z i z e s ∣ ∣ x ( i ) − μ j ∣ ∣ 2 C^{(i)}=j\ that \ minizizes \ ||x^{(i)}-\mu_j||^2 C(i)=j that minizizes ∣∣x(i)−μj∣∣2
import re import scipy.io as sio import numpy as np import matplotlib.pyplot as plt from time import * def find_closet_centroids(X, centroids): """ 寻找所属簇,centroids重心的意思,意思就是这个点离所有的中心点最近 :param X: ndarray,所有点 :param centroids: ndarray,上一步计算出或初始化的簇中心 :return: ndarray,每个点所属于的簇 """ # 多加了一个0 res = np.zeros((1,)) for x in X: res = np.append(res, np.argmin(np.sqrt(np.sum((centroids - x) ** 2, axis=1)))) # 返回结果时,去掉多加的一个0 return res[1:] if __name__ == '__main__': begin_time = time() print("========程序开始============") data = sio.loadmat("data\\ex7data2.mat") X = np.array(data['X']) # (300,2) # 随机初始化中心店 init_centroids = np.array([[3, 3], [6, 2], [8, 5]]) idx = find_closet_centroids(X, init_centroids) print(idx[0:10]) # [0. 2. 1.] end_time = time() run_time = end_time - begin_time print("========程序结束============") print('该循环程序运行时间:', run_time)
μ l = 1 C k ∑ i ∈ C k x ( i ) \mu_l=\frac{1}{C_k}\sum_{i\in C_k}x^{(i)} μl=Ck1∑i∈Ckx(i)
def compute_centroids(X, idx): """ 计算新的簇中心 :param X: ndarray,所有点 :param idx: ndarray,每个点对应的簇号 :return: ndarray,所有新簇中心 """ # K是一共分成的类数 K = int(np.max(idx)) + 1 # m为X的行数 m = X.shape[0] n = X.shape[-1] centroids = np.zeros((K, n)) counts = np.zeros((K, n)) for i in range(m): centroids[int(idx[i])] += X[i] counts[int(idx[i])] += 1 centroids = centroids / counts return centroids
def cost(X, idx, centrodis):
c = 0
for i in range(len(X)):
c += np.sum((X[i] - centrodis[int(idx[i])]) ** 2)
c /= len(X)
return c
def random_initialization(X, K): """ 随机选择K组数据,作为簇中心 :param X: ndarray,所有点 :param K: int,聚类的类数 :return: ndarray,簇中心 """ res = np.zeros((1, X.shape[-1])) m = X.shape[0] rl = [] while True: index = random.randint(0, m) if index not in rl: rl.append(index) if len(rl) >= K: break for index in rl: res = np.concatenate((res, X[index].reshape(1, -1)), axis=0) return res[1:] def k_means(X, K): """ k-means聚类算法, :param X: ndarray,所有的数据 :param K: int,分成聚类的类数 :return: tuple,(idx, centroids_all) idx,ndarray 为每个数据所属类标签 centroids_all,[ndarray,...]计算过程中每轮的簇中心 """ centroids = random_initialization(X, K) centroids_all = [centroids] idx = np.zeros((1,)) last_c = -1 now_c = -2 # iterations = 200 # for i in range(iterations): while now_c != last_c: # 当收敛时结束算法,或者可以利用指定迭代轮数 # 算出每个点所属簇类 idx = find_closet_centroids(X, centroids) last_c = now_c # 计算损失函数 now_c = cost(X, idx, centroids) # 根据重新规划后的簇类,重新规划簇类中心点 centroids = compute_centroids(X, idx) # 记录训练过程中所有的中心点 centroids_all.append(centroids) return idx, centroids_all
data = sio.loadmat("data\\ex7data2.mat")
X = data['X'] # (300,2)
idx, centroids_all = k_means(X, 3)
def visualizing(X, idx, centroids_all): """ 可视化聚类结果和簇中心的移动过程 :param X: ndarray,所有的数据 :param idx: ndarray,每个数据所属类标签 :param centroids_all: [ndarray,...]计算过程中每轮的簇中心 :return: None """ plt.scatter(X[..., 0], X[..., 1], c=idx) xx = [] yy = [] for c in centroids_all: xx.append(c[..., 0]) yy.append(c[..., 1]) plt.plot(xx, yy, 'rx--') plt.show()
import random import re import scipy.io as sio import numpy as np import matplotlib.pyplot as plt from time import * def find_closet_centroids(X, centroids): """ 寻找所属簇,centroids重心的意思,意思就是这个点离所有的中心点最近 :param X: ndarray,所有点 :param centroids: ndarray,上一步计算出或初始化的簇中心 :return: ndarray,每个点所属于的簇 """ # 多加了一个0 res = np.zeros((1,)) for x in X: res = np.append(res, np.argmin(np.sqrt(np.sum((centroids - x) ** 2, axis=1)))) # 返回结果时,去掉多加的一个0 return res[1:] def compute_centroids(X, idx): """ 计算新的簇中心 :param X: ndarray,所有点 :param idx: ndarray,每个点对应的簇号 :return: ndarray,所有新簇中心 """ # K是一共分成的类数 K = int(np.max(idx)) + 1 # m为X的行数 m = X.shape[0] n = X.shape[-1] centroids = np.zeros((K, n)) counts = np.zeros((K, n)) for i in range(m): centroids[int(idx[i])] += X[i] counts[int(idx[i])] += 1 centroids = centroids / counts return centroids def cost(X, idx, centrodis): """ 计算损失函数 :param X: :param idx: :param centrodis: :return: """ c = 0 for i in range(len(X)): c += np.sum((X[i] - centrodis[int(idx[i])]) ** 2) c /= len(X) return c def random_initialization(X, K): """ 随机选择K组数据,作为簇中心 :param X: ndarray,所有点 :param K: int,聚类的类数 :return: ndarray,簇中心 """ res = np.zeros((1, X.shape[-1])) m = X.shape[0] rl = [] while True: index = random.randint(0, m) if index not in rl: rl.append(index) if len(rl) >= K: break for index in rl: res = np.concatenate((res, X[index].reshape(1, -1)), axis=0) return res[1:] def k_means(X, K): """ k-means聚类算法, :param X: ndarray,所有的数据 :param K: int,分成聚类的类数 :return: tuple,(idx, centroids_all) idx,ndarray 为每个数据所属类标签 centroids_all,[ndarray,...]计算过程中每轮的簇中心 """ centroids = random_initialization(X, K) centroids_all = [centroids] idx = np.zeros((1,)) last_c = -1 now_c = -2 # iterations = 200 # for i in range(iterations): while now_c != last_c: # 当收敛时结束算法,或者可以利用指定迭代轮数 # 算出每个点所属簇类 idx = find_closet_centroids(X, centroids) last_c = now_c # 计算损失函数 now_c = cost(X, idx, centroids) # 根据重新规划后的簇类,重新规划簇类中心点 centroids = compute_centroids(X, idx) # 记录训练过程中所有的中心点 centroids_all.append(centroids) return idx, centroids_all def visualizing(X, idx, centroids_all): """ 可视化聚类结果和簇中心的移动过程 :param X: ndarray,所有的数据 :param idx: ndarray,每个数据所属类标签 :param centroids_all: [ndarray,...]计算过程中每轮的簇中心 :return: None """ plt.scatter(X[..., 0], X[..., 1], c=idx) xx = [] yy = [] for c in centroids_all: xx.append(c[..., 0]) yy.append(c[..., 1]) plt.plot(xx, yy, 'rx--') plt.show() if __name__ == '__main__': begin_time = time() print("========程序开始============") data = sio.loadmat("data\\ex7data2.mat") X = np.array(data['X']) # (300,2) idx, centroids_all = k_means(X, 3) visualizing(X, idx, centroids_all) end_time = time() run_time = end_time - begin_time print("========程序结束============") print('该循环程序运行时间:', run_time)
def random_initialization(X, K): """ 随机选择K组数据,作为簇中心 :param X: ndarray,所有点 :param K: int,聚类的类数 :return: ndarray,簇中心 """ res = np.zeros((1, X.shape[-1])) m = X.shape[0] rl = [] while True: index = random.randint(0, m) if index not in rl: rl.append(index) if len(rl) >= K: break for index in rl: res = np.concatenate((res, X[index].reshape(1, -1)), axis=0) return res[1:]
def compress(image, colors_num): """ 压缩图片 :param image: ndarray,原始图片 :param colors_num: int,压缩后的颜色数量 :return: (ndarray,ndarray),第一个每个像素点存储一个值,第二个为颜色矩阵 """ # _代表3,d1和d2代表图片的长和宽 d1, d2, _ = image.shape # 变成二维数组 raw_image = image.reshape(d1 * d2, -1) # 展开成二维数组 print(raw_image.shape) # idx, centroids_all = k_means(raw_image, colors_num) # colors 表示最后一次的中心点 colors = centroids_all[-1] # 构造压缩后的图片格式 compressed_image = np.zeros((1, 1)) for i in range(d1 * d2): compressed_image = np.concatenate((compressed_image, idx[i].reshape(1, -1)), axis=0) compressed_image = compressed_image[1:].reshape(d1, d2, -1) return compressed_image, colors
def compressed_format_to_normal_format(compressed_image, colors):
:param compressed_image: ndarray,压缩后的图片,存储颜色序号
:param colors: ndarray,颜色列表
:return: ndarray,正常的rgb格式图片
d1, d2, _ = compressed_image.shape
normal_format_image = np.zeros((1, len(colors[0])))
compressed_image = compressed_image.reshape(d1 * d2, -1)
for i in range(d1 * d2):
normal_format_image = np.concatenate((normal_format_image, colors[int(compressed_image[i][0])].reshape(1, -1)),
normal_format_image = normal_format_image[1:].reshape(d1, d2, -1)
return normal_format_image
image = plt.imread("data\\bird_small.png") # (128,128,3)
plt.subplot(1, 2, 1)
plt.title("raw image")
plt.subplot(1, 2, 2)
compressed_image, colors = compress(image, 16)
print(compressed_image.shape, colors.shape)
plt.imshow(compressed_format_to_normal_format(compressed_image, colors))
plt.title("compressed image")
(128, 128, 3)
(16384, 3)
(128, 128, 1) (16, 3)
该循环程序运行时间: 94.86905074119568
import random import re import scipy.io as sio import numpy as np import matplotlib.pyplot as plt from time import * def find_closet_centroids(X, centroids): """ 寻找所属簇,centroids重心的意思,意思就是这个点离所有的中心点最近 :param X: ndarray,所有点 :param centroids: ndarray,上一步计算出或初始化的簇中心 :return: ndarray,每个点所属于的簇 """ # 多加了一个0 res = np.zeros((1,)) for x in X: res = np.append(res, np.argmin(np.sqrt(np.sum((centroids - x) ** 2, axis=1)))) # 返回结果时,去掉多加的一个0 return res[1:] def compute_centroids(X, idx): """ 计算新的簇中心 :param X: ndarray,所有点 :param idx: ndarray,每个点对应的簇号 :return: ndarray,所有新簇中心 """ # K是一共分成的类数 K = int(np.max(idx)) + 1 # m为X的行数 m = X.shape[0] n = X.shape[-1] centroids = np.zeros((K, n)) counts = np.zeros((K, n)) for i in range(m): centroids[int(idx[i])] += X[i] counts[int(idx[i])] += 1 centroids = centroids / counts return centroids def cost(X, idx, centrodis): """ 计算损失函数 :param X: :param idx: :param centrodis: :return: """ c = 0 for i in range(len(X)): c += np.sum((X[i] - centrodis[int(idx[i])]) ** 2) c /= len(X) return c def random_initialization(X, K): """ 随机选择K组数据,作为簇中心 :param X: ndarray,所有点 :param K: int,聚类的类数 :return: ndarray,簇中心 """ res = np.zeros((1, X.shape[-1])) m = X.shape[0] rl = [] while True: index = random.randint(0, m) if index not in rl: rl.append(index) if len(rl) >= K: break for index in rl: res = np.concatenate((res, X[index].reshape(1, -1)), axis=0) return res[1:] def compress(image, colors_num): """ 压缩图片 :param image: ndarray,原始图片 :param colors_num: int,压缩后的颜色数量 :return: (ndarray,ndarray),第一个每个像素点存储一个值,第二个为颜色矩阵 """ # _代表3,d1和d2代表图片的长和宽 d1, d2, _ = image.shape print(image.shape) # 变成二维数组, # (a,b) # #改变维度为m行、d列 (-1表示列数自动计算,d= a*b /m ) raw_image = image.reshape(d1 * d2, -1) # 展开成二维数组 print(raw_image.shape) # idx, centroids_all = k_means(raw_image, colors_num) # colors 表示最后一次的中心点 colors = centroids_all[-1] # 构造压缩后的图片格式 compressed_image = np.zeros((1, 1)) for i in range(d1 * d2): compressed_image = np.concatenate((compressed_image, idx[i].reshape(1, -1)), axis=0) compressed_image = compressed_image[1:].reshape(d1, d2, -1) return compressed_image, colors def compressed_format_to_normal_format(compressed_image, colors): """ 将压缩后的图片转为正常可以显示的图片格式 :param compressed_image: ndarray,压缩后的图片,存储颜色序号 :param colors: ndarray,颜色列表 :return: ndarray,正常的rgb格式图片 """ d1, d2, _ = compressed_image.shape normal_format_image = np.zeros((1, len(colors[0]))) compressed_image = compressed_image.reshape(d1 * d2, -1) for i in range(d1 * d2): normal_format_image = np.concatenate((normal_format_image, colors[int(compressed_image[i][0])].reshape(1, -1)), axis=0) normal_format_image = normal_format_image[1:].reshape(d1, d2, -1) return normal_format_image def k_means(X, K): """ k-means聚类算法, :param X: ndarray,所有的数据 :param K: int,分成聚类的类数 :return: tuple,(idx, centroids_all) idx,ndarray 为每个数据所属类标签 centroids_all,[ndarray,...]计算过程中每轮的簇中心 """ centroids = random_initialization(X, K) centroids_all = [centroids] idx = np.zeros((1,)) last_c = -1 now_c = -2 # iterations = 200 # for i in range(iterations): while now_c != last_c: # 当收敛时结束算法,或者可以利用指定迭代轮数 # 算出每个点所属簇类 idx = find_closet_centroids(X, centroids) last_c = now_c # 计算损失函数 now_c = cost(X, idx, centroids) # 根据重新规划后的簇类,重新规划簇类中心点 centroids = compute_centroids(X, idx) # 记录训练过程中所有的中心点 centroids_all.append(centroids) return idx, centroids_all def visualizing(X, idx, centroids_all): """ 可视化聚类结果和簇中心的移动过程 :param X: ndarray,所有的数据 :param idx: ndarray,每个数据所属类标签 :param centroids_all: [ndarray,...]计算过程中每轮的簇中心 :return: None """ plt.scatter(X[..., 0], X[..., 1], c=idx) xx = [] yy = [] for c in centroids_all: xx.append(c[..., 0]) yy.append(c[..., 1]) plt.plot(xx, yy, 'rx--') plt.show() if __name__ == '__main__': begin_time = time() print("========程序开始============") # data = sio.loadmat("data\\ex7data2.mat") # X = np.array(data['X']) # (300,2) image = plt.imread("data\\bird_small.png") # (128,128,3) plt.subplot(1, 2, 1) plt.imshow(image) plt.axis('off') plt.title("raw image") plt.subplot(1, 2, 2) compressed_image, colors = compress(image, 16) print(compressed_image.shape, colors.shape) plt.imshow(compressed_format_to_normal_format(compressed_image, colors)) plt.axis('off') plt.title("compressed image") plt.show() end_time = time() run_time = end_time - begin_time print("========程序结束============") print('该循环程序运行时间:', run_time)
首先对数据进行feature scaling/mean normalization,也就是归一化
∑ = 1 m X T X \sum=\frac{1}{m}X^TX ∑=m1XTX
接着计算sigma矩阵的“特征向量”,这里使用奇异值分解(single value decomposition)。
[ U , S , V ] = s v d ( ∑ ) [U,S,V]=svd(\sum) [U,S,V]=svd(∑)
Z = U r e d u c e T X Z=U_{reduce}^TX Z=UreduceTX
data = sio.loadmat("data\\ex7data1.mat")
X = np.array(data['X']) # (50,2)
plt.scatter(X[..., 0], X[..., 1], marker='x', c='b')
def data_preprocess(X):
:param X: ndarray,原始数据
:return: (ndarray.ndarray,ndarray),处理后的数据,每个特征均值,每个特征方差
mean = np.mean(X, axis=0)
std = np.std(X, axis=0, ddof=1) # 默认ddof=0, 这里一定要修改
return (X - mean) / std, mean, std
def data_preprocess(X):
:param X: ndarray,原始数据
:return: (ndarray.ndarray,ndarray),处理后的数据,每个特征均值,每个特征方差
mean = np.mean(X, axis=0)
std = np.std(X, axis=0, ddof=1) # 默认ddof=0, 这里一定要修改
return (X - mean) / std, mean, std
def project_data(X, U, K):
:param X: ndarray,原始数据
:param U: ndarray,奇异值分解后的U
:param K: int,目标维度
:return: ndarray,降维后的数据
return X.dot(U[..., :K])
def reconstruct_data(Z, U, K):
:param Z: ndarray,降维后的数据
:param U: ndarray,奇异值分解后的U
:param K: int,降维的维度
:return: ndarray,原始数据
return Z.dot(U[..., :K].T)
data = sio.loadmat("data\\ex7data1.mat")
X = data['X'] # (50,2)
normalized_X, _, _ = data_preprocess(X)
u, _, _ = pca(normalized_X) # (2,2)
Z = project_data(normalized_X, u, 1)
print(Z[0]) # [1.48127391]
rec_X = reconstruct_data(Z, u, 1)
print(rec_X[0]) # [-1.04741883 -1.04741883]
plt.scatter(normalized_X[..., 0], normalized_X[..., 1], marker='x', c='b', label='normalized x')
plt.scatter(rec_X[..., 0], rec_X[..., 1], marker='x', c='r', label='reconstructed x')
plt.title("Visualizing the projections")
for i in range(len(normalized_X)):
plt.plot([normalized_X[i][0], rec_X[i][0]], [normalized_X[i][1], rec_X[i][1]], 'k--')
plt.xlim((-3, 2))
plt.ylim((-3, 2))
data = sio.loadmat("data\\ex7faces.mat")
X = data['X'] # (5000,1024)
nor_X, _, _ = data_preprocess(X)
u, _, _ = pca(nor_X)
Z = project_data(nor_X, u, 36)
rec_X = reconstruct_data(Z, u, 36)
def visualizing_images(X, d): """ 可视化图片 :param X: ndarray,图片 :param d: int,一行展示多少张图片 :return: None """ m = len(X) n = X.shape[-1] s = int(np.sqrt(n)) for i in range(1, m + 1): plt.subplot(m / d, d, i) plt.axis('off') plt.imshow(X[i - 1].reshape(s, s).T, cmap='Greys_r') # 要把脸摆正需要转置 plt.show()
import random import re import scipy.io as sio import numpy as np import matplotlib.pyplot as plt from time import * def data_preprocess(X): """ 数据归一化 :param X: ndarray,原始数据 :return: (ndarray.ndarray,ndarray),处理后的数据,每个特征均值,每个特征方差 """ mean = np.mean(X, axis=0) std = np.std(X, axis=0, ddof=1) # 默认ddof=0, 这里一定要修改 return (X - mean) / std, mean, std def pca(X): """ numpy中有奇异值分解的功能,直接使用 :param X: :return: """ sigma = X.T.dot(X) / len(X) # (n,m)x(m,n) (n,n) u, s, v = np.linalg.svd(sigma) # u(n,n) s(n,), v(n,n) return u, s, v def project_data(X, U, K): """ 数据降维 :param X: ndarray,原始数据 :param U: ndarray,奇异值分解后的U :param K: int,目标维度 :return: ndarray,降维后的数据 """ return X.dot(U[..., :K]) def reconstruct_data(Z, U, K): """ 数据升维 :param Z: ndarray,降维后的数据 :param U: ndarray,奇异值分解后的U :param K: int,降维的维度 :return: ndarray,原始数据 """ return Z.dot(U[..., :K].T) def visualizing_images(X, d): """ 可视化图片 :param X: ndarray,图片 :param d: int,一行展示多少张图片 :return: None """ m = len(X) n = X.shape[-1] s = int(np.sqrt(n)) for i in range(1, m + 1): plt.subplot(m / d, d, i) plt.axis('off') plt.imshow(X[i - 1].reshape(s, s).T, cmap='Greys_r') # 要把脸摆正需要转置 plt.show() if __name__ == '__main__': begin_time = time() print("========程序开始============") data = sio.loadmat("data\\ex7faces.mat") X = data['X'] # (5000,1024) nor_X, _, _ = data_preprocess(X) u, _, _ = pca(nor_X) Z = project_data(nor_X, u, 36) rec_X = reconstruct_data(Z, u, 36) visualizing_images(X[:25], 5) visualizing_images(rec_X[:25], 5) end_time = time() run_time = end_time - begin_time print("========程序结束============") print('该循环程序运行时间:', run_time)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。