赞
踩
数据集:数据集
选择了其中的10个类别
##train.py ''' 导入所需要的库 ''' from keras_bert import load_trained_model_from_checkpoint, Tokenizer from keras.layers import Input, Dense, LSTM, Conv1D, Concatenate,MaxPool1D,Flatten,Dropout,GlobalMaxPooling1D,Bidirectional,Lambda from keras.models import Model from keras.optimizers import Adam,RMSprop from keras.utils.np_utils import to_categorical import codecs import numpy as np from random import shuffle from sklearn.preprocessing import LabelEncoder from keras.preprocessing import sequence from keras.engine import Layer from keras.callbacks import *
'''
bert相关文件路径
'''
maxlen = 128 #
config_path = "chinese_L-12_H-768_A-12\\bert_config.json"
checkpoint_path = "chinese_L-12_H-768_A-12\\bert_model.ckpt"
dict_path = "chinese_L-12_H-768_A-12\\vocab.txt"
# Tokenizer分词后句子首位会分别加上 [CLS] 和 [SEP] 标记, # 其中 [CLS] 位置对应的输出向量是能代表整句的句向量, # 而 [SEP] 则是句间的分隔符,其余部分则是单字输出(对于中文来说) # 重写Tokenizer的 _tokenize 方法是要保证 tokenize 之后的结果, # 跟原来的字符串长度等长(如果算上两个标记,那么就是等长再加 2)。 # Tokenizer 自带的 _tokenize 会自动去掉空格,然后有些字符会粘在一块输出, # 导致 tokenize 之后的列表不等于原来字符串的长度了,这样如果做序列标注的任务会很麻烦。 # [unused*] 这些标记是未经训练的(随即初始化), # 是 Bert 预留出来用来增量添加词汇的标记,所以我们可以用它们来指代任何新字符。 class OurTokenizer(Tokenizer): def _tokenize(self, text): R = [] for c in text: if c in self._token_dict: R.append(c) elif self._is_space(c): R.append('[unused1]') # 用[unused1]来表示空格类字符 else: R.append('[UNK]') # 剩余的字符是[UNK] return R ''' :param: dict_path: 是bert模型的vocab.txt文件 :return:将文件中字进行编码 ''' def get_token_dict(dict_path): print("获取编码字典") token_dict = {} with codecs.open(dict_path, 'r', 'utf8') as reader: for line in reader: token = line.strip() token_dict[token] = len(token_dict) return token_dict
''' # 读取数据的函数 # :return: list 类型的 数据 ''' def get_data(datatype): print("读取"+datatype+"数据") path = 'data\\cnews.' +datatype + '.txt' all_data = [] with codecs.open(path,'r','utf-8') as reader: for line in reader: all_data.append(line[3:].strip()) return all_data # 获取标签 def readLable(datatype): print("读取"+datatype+"标签") path = 'data\\cnews.' +datatype + '.txt' all_data = [] with codecs.open(path,'r','utf-8') as reader: for line in reader: all_data.append(line[:3].strip()) return all_data #将标签编码 ##此时还不是one—hot形式 def encodeLable(data): le = LabelEncoder() resultLable = le.fit_transform(data) return resultLable
# 让每条文本的长度相同,用0填充 def seq_padding(X, padding=0): L = [len(x) for x in X] ML = max(L) return np.array([ np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X ]) ##数据生成器 class data_generator: def __init__(self, data, tokenizer,batch_size=8): self.data = data self.tokenizer = tokenizer # print(self.tokenizer) self.batch_size = batch_size self.steps = len(self.data) // self.batch_size if len(self.data) % self.batch_size != 0: self.steps += 1 def __len__(self): return self.steps def __iter__(self): while True: idxs = range(len(self.data)) X1, X2, Y = [], [], [] for i in idxs: d = self.data[i] text = d[0][:maxlen] x1, x2 = self.tokenizer.encode(first=text) y = d[1] X1.append(x1) X2.append(x2) Y.append(y) if len(X1) == self.batch_size or i == idxs[-1]: X1 = seq_padding(X1) X2 = seq_padding(X2) Y = seq_padding(Y) yield [X1, X2], Y X1, X2, Y = [], [], [] #### yield构造一个生成器,相当于return #### 执行到 yield语句时发生了程序中断, ### 下一次调用从上一次中断的地方继续执行下去
单BERT模型
# x[:,n]表示在全部数组(维)中取第n个数据,直观来说,x[:,n]就是取所有集合的第n个数据, def build_model_BERT_Only(): bert_model = load_trained_model_from_checkpoint(config_path, checkpoint_path, seq_len=None) for l in bert_model.layers: l.trainable = True x1_in = Input(shape=(None,)) x2_in = Input(shape=(None,)) x = bert_model([x1_in, x2_in]) cls_layer = Lambda(lambda x: x[:, 0])(x) ## 取出[CLS]对应的向量用来做分类,[cls]能代表整句话在经过token后 output = Dense(10, activation='softmax')(cls_layer) model = Model([x1_in, x2_in], output) model.compile( loss='categorical_crossentropy', optimizer=Adam(1e-5), metrics=['accuracy'] ) model.summary() return model
BERT接LSTM
def build_model_LSTM(): bert_model = load_trained_model_from_checkpoint(config_path, checkpoint_path, seq_len=None) for l in bert_model.layers: l.trainable = True x1_in = Input(shape=(None,)) x2_in = Input(shape=(None,)) print("加载bert模型") x = bert_model([x1_in, x2_in]) # cls_layer = Lambda(lambda x: x[:, 0])(x) ## 取出[CLS]对应的向量用来做分类 T = LSTM(128, return_sequences=False)(x) T = Dropout(0.3)(T) output = Dense(10, activation='softmax')(T) model = Model([x1_in, x2_in], output) model.compile( loss='categorical_crossentropy', optimizer=Adam(1e-5), metrics=['accuracy'] ) model.summary() return model
BERT接BiLSTM
def build_model_BiLSTM(): bert_model = load_trained_model_from_checkpoint(config_path, checkpoint_path, seq_len=None) for l in bert_model.layers: l.trainable = True x1_in = Input(shape=(None,)) x2_in = Input(shape=(None,)) x = bert_model([x1_in, x2_in]) T = Bidirectional(LSTM(128, return_sequences=False))(x) T = Dropout(0.3)(T) output = Dense(10, activation='softmax')(T) model = Model([x1_in, x2_in], output) model.compile( loss='categorical_crossentropy', optimizer=Adam(1e-5), metrics=['accuracy'] ) model.summary() return model
BERT接CNN
def build_model_CNN(): bert_model = load_trained_model_from_checkpoint(config_path, checkpoint_path, seq_len=None) for l in bert_model.layers: l.trainable = True x1_in = Input(shape=(None,)) x2_in = Input(shape=(None,)) x = bert_model([x1_in, x2_in]) c = Conv1D(128, 3, activation='relu')(x) c = GlobalMaxPooling1D()(c) c = Dropout(0.3)(c) output = Dense(10, activation='softmax')(c) model = Model([x1_in, x2_in], output) model.compile( loss='categorical_crossentropy', optimizer=Adam(1e-5), metrics=['accuracy'] ) model.summary() return model
训练
def train_model(allTrainData, allValData, tokenizer,modelName): if modelName == 'LSTM': model = build_model_LSTM() elif modelName == 'CNN': model = build_model_CNN() elif modelName == 'BiLSTM': model = build_model_BiLSTM() else: model = build_model_BERT_Only() filepath='1\\'+'BertNoTrain_'+ modelName+'_{epoch:02d}-{accuracy:.4f}-{val_accuracy:.4f}.h5' early_stopping = EarlyStopping(monitor='loss', patience=3,verbose=1) # 早停法,防止过拟合 plateau = ReduceLROnPlateau(monitor="loss", verbose=1, mode='max', factor=0.5, patience=2) # 当评价指标不在提升时,减少学习率 checkpoint = ModelCheckpoint(filepath, monitor='loss', verbose=1, period=1, save_best_only=True, mode='min', save_weights_only=False) # 保存最好的模型 train_D = data_generator( allTrainData,tokenizer) valid_D = data_generator(allValData,tokenizer) history = model.fit_generator( train_D.__iter__(), steps_per_epoch=len(train_D), epochs=10, validation_data=valid_D.__iter__(), validation_steps=len(valid_D), callbacks=[early_stopping, plateau, checkpoint] ) model.save_weights('\keras_bert_'+ modelName+'.h5') return history
主函数
if __name__ == "__main__": token_dict = get_token_dict(dict_path) tokenizer = OurTokenizer(token_dict) trainlable = encodeLable(readLable("trains")) ##获取标签编码 ##将标签进行one—hot编码 trainCate = to_categorical(trainlable,num_classes=10) traindata = get_data("trains") allTrainData = [] for i in range(len(traindata)): allTrainData.insert(i,(traindata[i],trainCate[i])) # 获取验证数据 vallable = encodeLable(readLable("vals")) ##获取标签编码 valCate = to_categorical(vallable,num_classes=10) valdata = get_data("vals") allValData = [] for i in range(len(valdata)): allValData.insert(i,(valdata[i],valCate[i])) train_model(allTrainData, allValData, tokenizer,"LSTM") train_model(allTrainData, allValData, tokenizer,"CNN") train_model(allTrainData, allValData, tokenizer,"BiLSTM") train_model(allTrainData, allValData, tokenizer,"BERT")
预测predict.py
from keras.models import load_model from keras_bert import get_custom_objects from keras_bert import load_trained_model_from_checkpoint, Tokenizer from keras.utils.np_utils import to_categorical import codecs import numpy as np from sklearn import metrics import train as BL ##导入train.py def BertModelPridect(modelName): dict_path = "chinese_L-12_H-768_A-12\\vocab.txt" def get_token_dict(dict_path): print("获取编码字典") token_dict = {} with open(dict_path, 'r', encoding='utf8') as reader: for line in reader: token = line.strip() token_dict[token] = len(token_dict) return token_dict class OurTokenizer(Tokenizer): def _tokenize(self, text): R = [] for c in text: if c in self._token_dict: R.append(c) elif self._is_space(c): R.append('[unused1]') # 用[unused1]来表示空格类字符 else: R.append('[UNK]') # 剩余的字符是[UNK] return R token_dict = get_token_dict(dict_path) tokenizer = OurTokenizer(token_dict) # # 获取预测数据 1000 testlable = BL.encodeLable(BL.readLable("tests")) ##获取标签编码 valCate = to_categorical(testlable,num_classes=10) testdata = BL.get_data("tests") #构造预测数据输入到模型中的格式 allTestData = [] for i in range(len(testdata)): allTestData.insert(i,(testdata[i],valCate[i])) # print(len(allTestData)) test_D = BL.data_generator( allTestData,tokenizer,batch_size=16) print("加载训练"+modelName+"好的模型") basePath = '1\\' modelpath = basePath + modelName # # 保存的model中包含了自定义的层(Custom Layer) model = load_model(modelpath, custom_objects=get_custom_objects()) result = model.predict_generator(test_D.__iter__(),steps=len(test_D), verbose=1) return testlable, result if __name__ == '__main__': modelName = 'BERT_06-1.0000-0.9360.h5' # modelName = 'LSTM_10-1.0000-0.9840.h5' # # modelName = 'BiLSTM_06-1.0000-0.9680.h5' # modelName = 'CNN_07-0.9990-0.9520.h5' testlable, result = BertModelPridect(modelName) resultlable = [] for each in result: resultlable.append(np.argmax(each)) report = metrics.classification_report(testlable, resultlable) confusion_matrix = metrics.confusion_matrix(testlable, resultlable) accuracy_score = metrics.accuracy_score(testlable, resultlable) precision_score = metrics.precision_score(testlable, resultlable,average = "weighted") f1_score = metrics.f1_score(testlable, resultlable,average ="weighted") recall_score= metrics.recall_score(testlable, resultlable,average ="weighted")
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。