赞
踩
本篇继续介绍第二种集成方法Bagging,首先给出训练效果,使用的CNN,和上一篇一致。
Bootstrap Aggregated是并行式集成学习方法最著名的代表,简称Bagging。
Bagging算法思想很简单,就是对原训练集采取自助采样(bootstrap sampling)的方式,得到若干个新训练集。一个新训练集对应训练一个学习器(分类器),因为训练集的样本分布不同,训练得到的学习器就更具多样化。最后采用结合策略(比如投票法)将各个分类器的预测值进行整合
假设每个学习器(分类器)在不同训练样本上训练,会得到分布不同的学习器(即使是结构相同的分类器也一样)。得到多样化的分类器。
当然,为了得到较好的结果,每个学习器性能应尽可能要好。
假设原训练集有n个样本 :(x1,y1),(x2,y2),…,(xn,yn)
x表示数据,y表示标签
Bagging对原训练集采取随机抽样的方式,也就是有放回的在原训练集上抽取n次,这样得到的新数据集和原数据集合在样本数量上就一致了。
值得一提的是,当n足够大(n=训练集样本量=取放次数),自助采样后(从概率的角度),原始训练集有36.8%的样本不在新数据集里。可能采取到的63.2%的数据很大概率还包含了重复的样本。
Bagging算法说白了核心在于怎么实现自助采样,如下:
import numpy as np
Sampling_point= np.random.choice(N, N)
New_Xtrain = Xtrain[Sampling_point, :]
New_Ytrain = Ytrain[Sampling_point, :]
其中原训练集对应数据为Xtrain,标签为Ytrain,样本总量为N。
同理,测试集对应数据为Xtest,标签为Ytest
每次抽样得到的新数据集为New_Xtrain,New_Ytrain
后续,在每次迭代训练中采用自助采样的方式打乱样本,由模型训练,然后通过投票法实现即可。
一个新数据集对应训练一个分类器,每次迭代中就自助采样生成一个新数据集。
def save_final_models(models): for i, model in enumerate(models): model.save(str(i) + ".final.hdf5") train_records = [] test_records = [] # 训练三个分类器 for i in range(3): Sampling_point= np.random.choice(N, N) New_Xtrain = Xtrain[Sampling_point, :] New_Ytrain = Ytrain[Sampling_point, :] ******************************************************** 这里放置设计的model,把New_Xtrain和New_Ytrain喂进去, keras会有以下部分的操作,喂入数据,保存 history = model.fit(New_Xtrain , New_Ytrain, ……) model.load_weights(filepath) ******************************************************** scores = model.evaluate(Xtrain, Ytrain, verbose=1) train_accuracy_records.append(scores[1]) scores = model.evaluate(Xtest, Ytest, verbose=1) test_accuracy_records.append(scores[1]) # models存储每次训练好的模型 models.append(model) # 将每个训练器都保存下来,下次用的时候直接拿过来,不用再从头训练。 save_final_models(models)
到此为止,Bagging的算法的原理已经基本包括在里面了。后续就是使用结合策略进行表决
结合策略大同小异,方法也有很多,Bagging里可以套用上一篇Adaboost中使用的加权投票法。
def weighted_vote(x_test, models, accuracy_records, num_classes=11):
n_learners = len(models)
n_tests = x_test.shape[0]
probs = np.zeros((n_tests, num_classes))
for i in range(n_learners):
accuracy = accuracy_records[i]
model = models[i]
probs = probs + accuracy*model.predict(x_test)
return np.argmax(probs, axis=1)
final_predict = weighted_vote(Xtest, models, train_records)
实现比Adaboost更为简单
import tensorflow as tf import numpy as np import sys import os from tensorflow.keras.layers import * from tensorflow.keras.datasets import cifar10 from tensorflow.keras.models import Sequential from tensorflow.keras.models import Model import random def load_data(): (x_train, y_train), (x_test, y_test) = cifar10.load_data() return (x_train, y_train), (x_test, y_test) def preprocess(x_train, y_train, x_test, y_test): # Normalize data. x_train = x_train.astype('float32') / 255 x_test = x_test.astype('float32') / 255 print('x_train shape:', x_train.shape) print(x_train.shape[0], 'train samples') print(x_test.shape[0], 'test samples') print('y_train shape:', y_train.shape) # 热编码 num_classes = 10 y_train = tf.keras.utils.to_categorical(y_train, num_classes) y_test = tf.keras.utils.to_categorical(y_test, num_classes) return (x_train, y_train), (x_test, y_test) def save_final_models(models, name_prefix): for i, model in enumerate(models): model.save(name_prefix + "." + str(i) + ".final.hdf5") def weighted_vote(x_test, models, accuracy_records, num_classes=10): n_learners = len(models) n_tests = x_test.shape[0] probs = np.zeros((n_tests, num_classes)) for i in range(n_learners): accuracy = accuracy_records[i] model = models[i] probs = probs + accuracy*model.predict(x_test) return np.argmax(probs, axis=1) def predict(model, x_test): test_classes = model.predict(x_test, verbose=0) test_classes = np.argmax(test_classes, axis=1) # print(test_classes.shape) return test_classes def build_CNN(x_train, y_train, x_test, y_test, batch_size, epochs, n, name_prefix): num_classes = 10 input_shape = x_train.shape[1:] inputs = Input(shape=input_shape) input_x_padding = ZeroPadding2D((0, 0), data_format="channels_first")(inputs) layer1 = Conv2D(30, (5, 5), padding='valid',data_format = 'channels_last', activation="relu", name="conv1")(input_x_padding) BN1 = BatchNormalization(name='bn_1')(layer1) layer2 = Conv2D(30, (5, 5), padding="valid", activation="relu",data_format = 'channels_last', name="conv2" )(BN1) BN2 = BatchNormalization(name='bn_2')(layer2) layer3 = Conv2D(30, (5, 5), padding='valid', activation="relu",data_format = 'channels_last', name="conv3")(BN2) x = AveragePooling2D(pool_size=2)(layer3) y = Flatten()(x) outputs = Dense(num_classes, activation='softmax')(y) model = Model(inputs=inputs, outputs=outputs) model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy']) model.summary() save_dir = os.path.join(os.getcwd(), 'save_path') if not os.path.isdir(save_dir): os.makedirs(save_dir) filepath = name_prefix + ".weights.h5" filepath = os.path.join(save_dir, filepath) callbacks = [tf.keras.callbacks.ModelCheckpoint(filepath, monitor='val_accuracy', verbose=0, save_best_only=True, mode='max'), #学习率系数0.5 tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.6, patience=4, verbose=1), tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, verbose=0, mode='auto') ] history = model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, validation_data=(x_test, y_test), shuffle=True, callbacks=callbacks) model.load_weights(filepath) return model, history n=3 n_learners = 3 batch_size = 32 epochs_lst = [10, 10, 10] filename="cnn_bagging.txt" file_prefix="" num_classes = 10 (x_train, y_train), (x_test, y_test) = load_data() y_test_old = y_test[:] # 存储值 (x_train, y_train), (x_test, y_test) = preprocess(x_train, y_train, x_test, y_test) models = [] n_trains = x_train.shape[0] n_tests = x_test.shape[0] train_accuracy_records = [] test_accuracy_records = [] for i in range(n_learners): epochs = epochs_lst[i] # 随机抽样 train_picks = np.random.choice(n_trains, n_trains) x_train_i = x_train[train_picks, :] y_train_i = y_train[train_picks, :] model, history = build_CNN(x_train_i, y_train_i, x_test, y_test, batch_size, epochs, n, "bagging-model-"+str(i)) print("model %d 结束训练" % (i)) scores = model.evaluate(x_train, y_train, verbose=1) train_accuracy_records.append(scores[1]) scores = model.evaluate(x_test, y_test, verbose=1) test_accuracy_records.append(scores[1]) models.append(model) save_final_models(models, "bagging") filename = file_prefix + filename print(filename) out_file = open(filename, "a") out_file.write("--------------------------------------------\n") out_file.write("Random = " + str(random) + "\n") print("Random = " + str(random)) # 投票决策 final_predict = weighted_vote(x_test, models, train_accuracy_records) print(final_predict.shape) errors = np.count_nonzero(final_predict.reshape((n_tests, )) - y_test_old.reshape((n_tests,))) out_file.write("votefun is\n") out_file.write(str(weighted_vote) + "\n") out_file.write('集成测试准确率: %0.6f \n' % ((n_tests - errors)/float(n_tests))) print('集成测试准确率: %0.6f' % ((n_tests - errors)/float(n_tests))) for i in range(n_learners): print("学习器 %d (epochs = %d): %0.6f" % (i, epochs_lst[i], test_accuracy_records[i])) out_file.write("学习器 %d (epochs = %d): %0.6f\n" % (i, epochs_lst[i], test_accuracy_records[i])) out_file.close()
如何获得较好的Bagging集成效果?(一点经验)
从直觉上来看使用Bagging算法进行集成要获得较好的效果应该具备以下几点:
①各分类器性能都较好;
②训练样本充足,否则分类器未充分学习到数据样本的分布,拟合不好;
③分类器数量充足(训练轮次=所需训练的分类器数量),这样可以保证多次自助采样后获得的新数据集的样本的总和,尽可能都包含原始训练集的样本。
举个例子:
原始训练集D={A,B,C,D,E,F}
我们每次自助采样获得新数据集Data1={A,A,A,B,D,F},训练时样本C、D、E未训练。
Data2={B,B,C,C,D,F} ,训练时样本A、E未训练。
D1和D2涵盖的样本有ABCDF,缺少E,在决策时对于这E的内容因为没接触过,分类器就是盲猜了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。