当前位置:   article > 正文

基于keras_bert使用CNN、LSTM、BiLSTM进行文本分类_bert+cnn+bilstm文本分类

bert+cnn+bilstm文本分类

数据集:数据集
选择了其中的10个类别

##train.py
'''
导入所需要的库
'''
from keras_bert import load_trained_model_from_checkpoint, Tokenizer
from keras.layers import Input, Dense, LSTM, Conv1D, Concatenate,MaxPool1D,Flatten,Dropout,GlobalMaxPooling1D,Bidirectional,Lambda
from keras.models import Model
from keras.optimizers import Adam,RMSprop
from keras.utils.np_utils import to_categorical
import codecs
import numpy as np
from random import shuffle
from sklearn.preprocessing import LabelEncoder
from keras.preprocessing import sequence
from keras.engine import Layer
from keras.callbacks import *
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
'''
bert相关文件路径
'''
maxlen = 128 #
config_path = "chinese_L-12_H-768_A-12\\bert_config.json"
checkpoint_path = "chinese_L-12_H-768_A-12\\bert_model.ckpt"
dict_path = "chinese_L-12_H-768_A-12\\vocab.txt"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
# Tokenizer分词后句子首位会分别加上 [CLS] 和 [SEP] 标记,
# 其中 [CLS] 位置对应的输出向量是能代表整句的句向量,
# 而 [SEP] 则是句间的分隔符,其余部分则是单字输出(对于中文来说)
# 重写Tokenizer的 _tokenize 方法是要保证 tokenize 之后的结果,
# 跟原来的字符串长度等长(如果算上两个标记,那么就是等长再加 2)。 
# Tokenizer 自带的 _tokenize 会自动去掉空格,然后有些字符会粘在一块输出,
# 导致 tokenize 之后的列表不等于原来字符串的长度了,这样如果做序列标注的任务会很麻烦。
# [unused*] 这些标记是未经训练的(随即初始化),
# 是 Bert 预留出来用来增量添加词汇的标记,所以我们可以用它们来指代任何新字符。
class OurTokenizer(Tokenizer):
	def _tokenize(self, text):
		R = []
		for c in text:
			if c in self._token_dict:
				R.append(c)
			elif self._is_space(c):
				R.append('[unused1]') # 用[unused1]来表示空格类字符
			else:
				R.append('[UNK]') # 剩余的字符是[UNK]
		return R
'''
:param: dict_path: 是bert模型的vocab.txt文件
:return:将文件中字进行编码
'''
def get_token_dict(dict_path):
    print("获取编码字典")
    token_dict = {}
    with codecs.open(dict_path, 'r', 'utf8') as reader:
        for line in reader:
            token = line.strip()
            token_dict[token] = len(token_dict)
    return token_dict
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
'''
# 读取数据的函数
# :return: list  类型的 数据
'''        
def get_data(datatype):
    print("读取"+datatype+"数据")
    path = 'data\\cnews.' +datatype + '.txt'
    all_data = []    
    with codecs.open(path,'r','utf-8') as reader:
        for line in reader:
            all_data.append(line[3:].strip())    
    return all_data
# 获取标签
def readLable(datatype):
    print("读取"+datatype+"标签")
    path = 'data\\cnews.' +datatype + '.txt'
    all_data = []    
    with codecs.open(path,'r','utf-8') as reader:
        for line in reader:
            all_data.append(line[:3].strip())    
    return all_data
#将标签编码 ##此时还不是one—hot形式
def encodeLable(data):   
    le = LabelEncoder()
    resultLable = le.fit_transform(data)    
    return resultLable
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
# 让每条文本的长度相同,用0填充
def seq_padding(X, padding=0):
	L = [len(x) for x in X]
	ML = max(L)
	return np.array([
		np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X
	])
##数据生成器
class data_generator:
    def __init__(self, data, tokenizer,batch_size=8):        
        self.data = data
        self.tokenizer = tokenizer        # print(self.tokenizer)
        self.batch_size = batch_size        
        self.steps = len(self.data) // self.batch_size
        if len(self.data) % self.batch_size != 0:
            self.steps += 1
    def __len__(self):
        return self.steps
    def __iter__(self):
        while True:
            idxs = range(len(self.data))
            X1, X2, Y = [], [], []
            for i in idxs:
                d = self.data[i]
                text = d[0][:maxlen]
                x1, x2 = self.tokenizer.encode(first=text)
                y = d[1]
                X1.append(x1)
                X2.append(x2)
                Y.append(y)                
                if len(X1) == self.batch_size or i == idxs[-1]:
                	X1 = seq_padding(X1)
                	X2 = seq_padding(X2)
                	Y = seq_padding(Y)
                	yield [X1, X2], Y    
                	X1, X2, Y = [], [], []
                    #### yield构造一个生成器,相当于return 
                    #### 执行到 yield语句时发生了程序中断,
                    ### 下一次调用从上一次中断的地方继续执行下去
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

BERT模型

# x[:,n]表示在全部数组(维)中取第n个数据,直观来说,x[:,n]就是取所有集合的第n个数据, 
def build_model_BERT_Only():
    bert_model = load_trained_model_from_checkpoint(config_path, checkpoint_path, seq_len=None)
    for l in bert_model.layers:
        l.trainable = True
    x1_in = Input(shape=(None,))
    x2_in = Input(shape=(None,))
    x = bert_model([x1_in, x2_in]) 
    cls_layer = Lambda(lambda x: x[:, 0])(x) ## 取出[CLS]对应的向量用来做分类,[cls]能代表整句话在经过token后
    output = Dense(10, activation='softmax')(cls_layer)
    model = Model([x1_in, x2_in], output)
    model.compile(
        loss='categorical_crossentropy',
        optimizer=Adam(1e-5),
        metrics=['accuracy']
    )
    model.summary()
    return model
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

BERT接LSTM

def build_model_LSTM():
    bert_model = load_trained_model_from_checkpoint(config_path, checkpoint_path, seq_len=None)
    for l in bert_model.layers:
        l.trainable = True
    x1_in = Input(shape=(None,))
    x2_in = Input(shape=(None,))
    print("加载bert模型")
    x = bert_model([x1_in, x2_in])          # cls_layer = Lambda(lambda x: x[:, 0])(x) ## 取出[CLS]对应的向量用来做分类
    T = LSTM(128, return_sequences=False)(x) 
    T = Dropout(0.3)(T)
    output = Dense(10, activation='softmax')(T)
    model = Model([x1_in, x2_in], output)
    model.compile(
        loss='categorical_crossentropy',
        optimizer=Adam(1e-5),
        metrics=['accuracy']
    )
    model.summary()
    return model
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

BERT接BiLSTM

def build_model_BiLSTM():
    bert_model = load_trained_model_from_checkpoint(config_path, checkpoint_path, seq_len=None)
    for l in bert_model.layers:
        l.trainable = True
    x1_in = Input(shape=(None,))
    x2_in = Input(shape=(None,))
    x = bert_model([x1_in, x2_in])
    T = Bidirectional(LSTM(128, return_sequences=False))(x) 
    T = Dropout(0.3)(T)
    output = Dense(10, activation='softmax')(T)
    model = Model([x1_in, x2_in], output)
    model.compile(
        loss='categorical_crossentropy',
        optimizer=Adam(1e-5), 
        metrics=['accuracy']
    )
    model.summary()
    return model
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

BERT接CNN

def build_model_CNN():
    bert_model = load_trained_model_from_checkpoint(config_path, checkpoint_path, seq_len=None)
    for l in bert_model.layers:
        l.trainable = True
    x1_in = Input(shape=(None,))
    x2_in = Input(shape=(None,))
    x = bert_model([x1_in, x2_in])
    c = Conv1D(128, 3, activation='relu')(x) 
    c = GlobalMaxPooling1D()(c)
    c = Dropout(0.3)(c)  
    output = Dense(10, activation='softmax')(c)
    model = Model([x1_in, x2_in], output)
    model.compile(
        loss='categorical_crossentropy',
        optimizer=Adam(1e-5), 
        metrics=['accuracy']
    )
    model.summary()
    return model
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

训练

def train_model(allTrainData, allValData, tokenizer,modelName):
    if modelName == 'LSTM':
        model = build_model_LSTM()
    elif modelName == 'CNN':
        model = build_model_CNN()
    elif modelName == 'BiLSTM':
        model = build_model_BiLSTM() 
    else: 
        model = build_model_BERT_Only()   
    filepath='1\\'+'BertNoTrain_'+ modelName+'_{epoch:02d}-{accuracy:.4f}-{val_accuracy:.4f}.h5'
    early_stopping = EarlyStopping(monitor='loss', patience=3,verbose=1)  # 早停法,防止过拟合
    plateau = ReduceLROnPlateau(monitor="loss", verbose=1, mode='max', factor=0.5,
                                patience=2)  # 当评价指标不在提升时,减少学习率
    checkpoint = ModelCheckpoint(filepath, monitor='loss', verbose=1, period=1,
                                     save_best_only=True, mode='min', save_weights_only=False)  # 保存最好的模型
    train_D = data_generator( allTrainData,tokenizer)
    valid_D = data_generator(allValData,tokenizer)
    history = model.fit_generator(
        train_D.__iter__(),
        steps_per_epoch=len(train_D),
        epochs=10,
        validation_data=valid_D.__iter__(),
        validation_steps=len(valid_D),
        callbacks=[early_stopping, plateau, checkpoint]
    )
    model.save_weights('\keras_bert_'+ modelName+'.h5')
    return history
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

主函数

if __name__ == "__main__": 
    token_dict = get_token_dict(dict_path)  
    tokenizer = OurTokenizer(token_dict)
    trainlable = encodeLable(readLable("trains")) ##获取标签编码
    ##将标签进行one—hot编码
    trainCate = to_categorical(trainlable,num_classes=10)
    traindata = get_data("trains")   
    allTrainData = []        
    for i in range(len(traindata)):
        allTrainData.insert(i,(traindata[i],trainCate[i]))
    # 获取验证数据     
    vallable = encodeLable(readLable("vals")) ##获取标签编码
    valCate = to_categorical(vallable,num_classes=10) 
    valdata = get_data("vals")
    allValData = []        
    for i in range(len(valdata)):
        allValData.insert(i,(valdata[i],valCate[i]))      
   
    train_model(allTrainData, allValData, tokenizer,"LSTM")
    train_model(allTrainData, allValData, tokenizer,"CNN")
    train_model(allTrainData, allValData, tokenizer,"BiLSTM")
    train_model(allTrainData, allValData, tokenizer,"BERT")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

预测predict.py

from keras.models import load_model
from keras_bert import get_custom_objects
from keras_bert import load_trained_model_from_checkpoint, Tokenizer
from keras.utils.np_utils import to_categorical
import codecs
import numpy as np
from sklearn import metrics
import train as BL ##导入train.py

def BertModelPridect(modelName):
    dict_path = "chinese_L-12_H-768_A-12\\vocab.txt"
    def get_token_dict(dict_path):
        print("获取编码字典")
        token_dict = {}
        with open(dict_path, 'r', encoding='utf8') as reader:
            for line in reader:
                token = line.strip()
                token_dict[token] = len(token_dict)
        return token_dict
    class OurTokenizer(Tokenizer):
    	def _tokenize(self, text):
    		R = []
    		for c in text:
    			if c in self._token_dict:
    				R.append(c)
    			elif self._is_space(c):
    				R.append('[unused1]') # 用[unused1]来表示空格类字符
    			else:
    				R.append('[UNK]') # 剩余的字符是[UNK]
    		return R
    token_dict = get_token_dict(dict_path)
    tokenizer = OurTokenizer(token_dict)  
    # # 获取预测数据 1000
    testlable = BL.encodeLable(BL.readLable("tests")) ##获取标签编码
    valCate = to_categorical(testlable,num_classes=10) 
    testdata = BL.get_data("tests")
    #构造预测数据输入到模型中的格式
    allTestData = []        
    for i in range(len(testdata)):
        allTestData.insert(i,(testdata[i],valCate[i]))
    # print(len(allTestData))
    test_D = BL.data_generator( allTestData,tokenizer,batch_size=16)
    print("加载训练"+modelName+"好的模型") 
    basePath = '1\\'    
    modelpath = basePath + modelName
    # # 保存的model中包含了自定义的层(Custom Layer)
    model = load_model(modelpath, custom_objects=get_custom_objects())
    result = model.predict_generator(test_D.__iter__(),steps=len(test_D), verbose=1)   
    return testlable, result
if __name__ == '__main__':
	modelName = 'BERT_06-1.0000-0.9360.h5' 
	# modelName = 'LSTM_10-1.0000-0.9840.h5'  #
	# modelName = 'BiLSTM_06-1.0000-0.9680.h5' 
	# modelName = 'CNN_07-0.9990-0.9520.h5' 
	testlable, result = BertModelPridect(modelName)
	resultlable = []
	for each in result:
		resultlable.append(np.argmax(each))

	report = metrics.classification_report(testlable, resultlable)
	confusion_matrix = metrics.confusion_matrix(testlable, resultlable)
	accuracy_score = metrics.accuracy_score(testlable, resultlable)
	precision_score = metrics.precision_score(testlable, resultlable,average = "weighted")
	f1_score = metrics.f1_score(testlable, resultlable,average ="weighted")
	recall_score= metrics.recall_score(testlable, resultlable,average ="weighted")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/342469
推荐阅读
相关标签
  

闽ICP备14008679号