赞
踩
def i2i_euclidean(ins1, ins2):
return np.sqrt(np.sum((ins1 - ins2)**2))
def ave_hausdorff(bag1, bag2):
# 统计总距离值
sum_dis = 0
for ins1 in bag1:
# 计算当前实例与最近实例的距离
temp_min = np.inf
for ins2 in bag2:
temp_min = min(i2i_euclidean(ins1, ins2), temp_min)
sum_dis += temp_min
for ins2 in bag2:
temp_min = np.inf
for ins1 in bag1:
temp_min = min(i2i_euclidean(ins2, ins1), temp_min)
sum_dis += temp_min
return sum_dis / (len(bag1) + len(bag2))
np.average(bag, 0)
0:列均值
1:行均值
def simple_dis(bag1, bag2):
return i2i_euclidean(np.average(bag1, 0), np.average(bag2, 0))
计算函数
np.savez(filename,arra=arra,arrb=arrb):将多个数组保存到.npz文件中
np.load(filename):从.npy或.npz文件中加载numpy数组
获取包:
self._bags[i][0][:, : -1]
得到 N × N N\times N N×N的距离矩阵
def __compute_dis(self): """ 计算距离 """ if not os.path.exists(self._save_b2b_path): # 包的大小 N = len(self._bags) dis = np.zeros((N, N)) print("使用%s距离计算距离矩阵..." % self._b2b_name[self._b2b_type]) for i in range(N): # 打印进度条 print_progress_bar(i, N) # 包i和j的距离即j和i的距离 for j in range(i, N): if self._b2b_type == 'ave': dis[i, j] = dis[j, i] = ave_hausdorff(self._bags[i][0][:, : -1], self._bags[j][0][:, : -1]) else: dis[i, j] = dis[j, i] = simple_dis(self._bags[i][0][:, : -1], self._bags[j][0][:, : -1]) # 结束的时候需要换行一下 print() np.savez(self._save_b2b_path, dis=dis) self._dis = np.load(self._save_b2b_path)['dis']
bag_space:包空间,一个包中
bag_space[i][[0]表示包数据,其中有多个实例;bag_space[i][1]表示包标签;bag_space[i, 0][:, :self.d]表示没有实例标签的包数据;bag_space[i, 0][:, -1]实例的标签
bag_size:每个包的大小,即bag[i][0],拿到包之后其中实例的个数
ins_idx:每个包对应的实例在实例空间中的索引,包i对应的实例索引ins_idx[i]~ins_idx[i+1]的索引,左闭右开
ins_bag_idx:每个实例对应的包索引
strip() 方法用于移除字符串头尾指定的字符(默认为空格或换行符)或字符序列
split() 字符串分隔,从左边开始分割,默认分字符是空格,返回一个列表
def __init_mil(self): """ 初始化函数 """ if self.bag_space is None: self.bag_space = load_file(self.data_path) self.N = len(self.bag_space) self.bag_size = np.zeros(self.N, dtype=int) self.bag_lab = np.zeros_like(self.bag_size, dtype=int) self.bag_idx = np.arange(self.N) for i in range(self.N): self.bag_size[i] = len(self.bag_space[i][0]) self.bag_lab[i] = self.bag_space[i][1] # 将所有包的标签调整到 [0, C - 1]的范围,C为数据集的类别数 self.__bag_lab_map() self.n = sum(self.bag_size) self.d = len(self.bag_space[0, 0][0]) - 1 self.C = len(list(set(self.bag_lab))) self.ins_space = np.zeros((self.n, self.d)) self.ins_idx = np.zeros(self.N + 1, dtype=int) self.ins_lab = np.zeros(self.n) self.ins_bag_idx = np.zeros(self.n, dtype=int) for i in range(self.N): self.ins_idx[i + 1] = self.bag_size[i] + self.ins_idx[i] self.ins_space[self.ins_idx[i]: self.ins_idx[i + 1]] = self.bag_space[i, 0][:, :self.d] self.ins_lab[self.ins_idx[i]: self.ins_idx[i + 1]] = self.bag_space[i, 0][:, -1] self.ins_bag_idx[self.ins_idx[i]: self.ins_idx[i + 1]] = np.ones(self.bag_size[i]) * i self.data_name = self.data_path.strip().split("/")[-1].split(".")[0] self.zero_ratio = len(self.ins_space[self.ins_space == 0]) / (self.n * self.d) self.__generate_save_home()
enumerate():列举索引、值
字典以键值对的形式存在
def __bag_lab_map(self):
"""
Map the label of the bag to class \in [0, 1, 2, ...]
"""
lab_list = list(set(self.bag_lab))
lab_dict = {}
for i, lab in enumerate(lab_list):
lab_dict[lab] = i
for i in range(self.N):
self.bag_lab[i] = lab_dict[self.bag_lab[i]]
给定包索引数组,返回实例空间中对应的子集
def get_sub_ins_space(self, bag_idx): """ Given a bag idx array, and return a subset of instance space. """ n = sum(self.bag_size[bag_idx]) ret_ins_space = np.zeros((n, self.d)) ret_ins_label = np.zeros(n) ret_ins_bag_idx = np.zeros(n, dtype=int) count = 0 for i in bag_idx: bag_size = self.bag_size[i] ret_ins_space[count: count + bag_size] = self.bag_space[i, 0][:, :-1] ret_ins_label[count: count + bag_size] = self.bag_lab[i] ret_ins_bag_idx[count: count + bag_size] = i count += bag_size return ret_ins_space, ret_ins_label, ret_ins_bag_idx
传入参数:classifier_type、performance_type
主要初始化分类器和度量指标
将设置好参数的分类器模型放入self.__classifier列表中,输出如下:
[KNeighborsClassifier(n_neighbors=3), SVC(max_iter=10000), DecisionTreeClassifier()]
度量指标放入__performance_er列表中,输出如下
[<function accuracy_score at 0x0000029908720940>, <function f1_score at 0x0000029908729160>]
def __init_classify(self): """ 分类器初始化 """ self.__classifier = [] self.__performance_er = [] if self.__classifier_type is None: self.__classifier_type = ["knn"] for classifier_type in self.__classifier_type: if classifier_type == "knn": from sklearn.neighbors import KNeighborsClassifier self.__classifier.append(KNeighborsClassifier(n_neighbors=3)) elif classifier_type == "svm": from sklearn.svm import SVC self.__classifier.append(SVC(max_iter=10000)) elif classifier_type == "j48": from sklearn.tree import DecisionTreeClassifier self.__classifier.append(DecisionTreeClassifier()) if self.__performance_type is None: self.__performance_type = ["f1_score"] for performance_type in self.__performance_type: if performance_type == "f1_score": from sklearn.metrics import f1_score self.__performance_er.append(f1_score) elif performance_type == "acc": from sklearn.metrics import accuracy_score self.__performance_er.append(accuracy_score) elif performance_type == "roc": from sklearn.metrics import roc_auc_score self.__performance_er.append(roc_auc_score)
为训练集和测试集的真实标签、预测标签、分类性能设置key
输出如下:
{'knn': [], 'svm': [], 'j48': []} {'knn': [], 'svm': [], 'j48': []} {'knn': [], 'svm': [], 'j48': []}
def __reset_record(self):
"""
重设记录向量
"""
for classifier_type in self.__classifier_type:
self.tr_predict_arr[classifier_type], self.tr_true_label_arr[classifier_type] = [], []
self.tr_per[classifier_type] = []
self.te_predict_arr[classifier_type], self.te_true_label_arr[classifier_type] = [], []
self.te_per[classifier_type] = []
zip():将对象中对应的元素打包成一个个元组,然后返回由这些元组组成的列表
将对应分类器的预测结果加入对应的键值列表
self.te_predict_arr[classifier_type].extend(predict)
将对应度量指标中对应分类器的分类性能加入对应的键值列表
self.te_per[classifier_type].append(per_er(
self.te_predict_arr[classifier_type],
self.te_true_label_arr[classifier_type]
))
def test(self, data_iter, is_pre_tr=False): """ :param data_iter: 数据迭代器 is_pre_tr: 是否需要预测训练集 """ self.__reset_record() for tr_data, tr_label, te_data, te_label in data_iter: for classifier, classifier_type in zip(self.__classifier, self.__classifier_type): model = classifier.fit(tr_data, tr_label) if is_pre_tr: predict = model.predict(tr_data) self.tr_predict_arr[classifier_type].extend(predict) self.tr_true_label_arr[classifier_type].extend(tr_label) predict = model.predict(te_data) self.te_predict_arr[classifier_type].extend(predict) self.te_true_label_arr[classifier_type].extend(te_label) for classifier_type in self.__classifier_type: for per_er in self.__performance_er: try: self.tr_per[classifier_type].append(per_er( self.tr_predict_arr[classifier_type], self.tr_true_label_arr[classifier_type] )) self.te_per[classifier_type].append(per_er( self.te_predict_arr[classifier_type], self.te_true_label_arr[classifier_type] )) except ValueError: self.tr_per[classifier_type].append(0) self.te_per[classifier_type].append(0) if is_pre_tr: return self.tr_per, self.te_per return self.te_per
yield的函数是一个生成器,而不再是一个函数。其返回一个迭代器,其中生成器有一个next函数,next相当于运行一步一步的运行。每一步next开始运行是从上一次停止的地方开始,在遇到yield后,return后面的值,此步结束
def get_iter(tr, tr_lab, te, te_lab):
"""
获取单词迭代器
:param tr: 训练集
:param tr_lab: 训练集标签
:param te: 测试集
:param te_lab: 测试集标签
:return 相应迭代器
"""
yield tr, tr_lab, te, te_lab
np.random.permutation():对序列或者数组进行随机排列
返回10次的训练集和测试集列表
def get_k_cv_idx(num_x, k=10): """ 获取k次交叉验证的索引 :param num_x: 数据集的大小 :param k: 决定使用多少折的交叉验证 :return: 训练集索引,测试集索引 """ # 随机初始化索引 rand_idx = np.random.permutation(num_x) # 每一折的大小 fold = int(np.floor(num_x / k)) ret_tr_idx = [] ret_te_idx = [] for i in range(k): # 获取当前折的训练集索引 tr_idx = rand_idx[0: i * fold].tolist() tr_idx.extend(rand_idx[(i + 1) * fold:]) ret_tr_idx.append(tr_idx) # 添加当前折的测试集索引 ret_te_idx.append(rand_idx[i * fold: (i + 1) * fold].tolist()) return ret_tr_idx, ret_te_idx
loadmat():读取.mat文件,并获取数据部分
data=loadmat(path)——data为一个字典格式的输出
data=loadmat(path)[‘data’]——data为其中数据部分,是一个numpy数组
def load_file(data_path):
"""
载入.mat类型的多示例数据集
:param data_path: 数据集的存储路径
"""
return loadmat(data_path)['data']
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。