- word2vec_size = 768 # 词向量维度
- max_len = 250 # 最大句子长度
- batch_size = 16 # 一次训练批数
- head_num = 8 # 多头个数, 必须小于词向量维度,(head_dim=word2vec_size//head_num)
- transformer_layer = 1 # 编码器(解码器)层数
- class_num = 2 # 分类的类别数
- learning_rate = 1e-5 # 学习率
- steps = 10 # 训练次数
- Train = True # 是否选择训练模式,True为训练模式, False为预测模式
- cnn_layer = 3 # CNN层数
- kernel_num = 32 # 卷积核个数
- import numpy as np
- from all_param import *
- def word2vec_index(file_path):
- """
- :param file_path: 词向量文件路径
- :return word2vector: 字到向量的字典
- :return word2index: 字到词袋表示的字典
- :return index2word: 词袋表示到字的字典
- """
- word2vector = {}
- word2index = {}
- index2word = {}
- with open(file_path, 'r', encoding='utf-8') as file:
- index = 1
- data = file.readlines()[1:]
- for line in data:
- line = line.replace('\n', '')
- line = line.split(' ')
- word = line[0]
- vector = np.array(line[1:], dtype=float)
- #建立索引
- word2vector[word] = vector
- word2index[word] = index
- index2word[index] = word
- index +=1
- # 加入填充符
- word2vector['<pad>'] = np.zeros(shape=(word2vec_size))
- word2index['<pad>'] = 0
- index2word[0] = '<pad>'
- return word2vector, word2index, index2word
- def data_processing(path, data_len, word2vector, word2index, data_batch, data_start_site):
- """
- :param path: 数据集路径
- :param data_len: 数据数
- :param word2vector: 转词向量字典
- :param word2index: 转词词袋表示字典
- :param data_batch: 一次取的数据数
- :param data_start_site: 开始取的数据位置
- :return comment2vector: 评论向量表示
- :return comment2index: 评论词袋表示
- :return labels: 标签(独热编码)
- """
- with open(path, 'r', encoding='utf-8') as file1:
- data = file1.readlines()
- if data_start_site + data_batch > data_len: # 选取数据下标超出列表的长度但小于所取的数据批数时
- end_site = data_start_site + data_batch - data_len # 应取数据的末尾位置
- data = data[data_start_site:] + data[:end_site]
- else:
- end_site = data_start_site + data_batch # 应取数据的末尾位置
- data = data[data_start_site:end_site]
- file1.close()
- #初始化向量空间和词袋空间
- comment2vector = np.zeros(shape=(len(data), max_len, word2vec_size))
- comment2index = np.zeros(shape=(len(data), max_len))
- labels = np.zeros(shape=(len(data), class_num), dtype=float)
- #遍历每一条评论
- for i in range(len(data)):
- comment = data[i][2:] # 获取评论
- comment = comment.replace('\n', '')
- comment = comment.split(' ')
- comment = [i for i in comment if i !=''] # 去除列表里所有空元素
- for word in range(max_len): #对评论进行数值转换
- if word > len(comment) - 1: #评论长度短需要填充时
- continue
- else: #正常数值转换时
- comment2vector[i][word] = word2vector[comment[word]] #向量转换
- comment2index[i][word] = word2index[comment[word]] #词袋转换
- label = int(data[i][:1]) # 获取标签
- # 独热编码
- labels[i][label] = 1
- # 标签平滑
- for zero in range(len(labels[i])):
- if labels[i][zero] == 0:
- labels[i][zero] = 0.0000001
- else:
- labels[i][zero] = 0.9999999
- return comment2vector, comment2index, labels
- if __name__ == '__main__':
- word2vector, word2index, index2word = word2vec_index(
- 'word2vec/douban_comment/fen_ci128/balanced/balanced_data.vector') # 加载词向量
- # 获取数据集个数
- with open('data_set/douban_comment/balanced/balanced_train.txt', 'r', encoding='utf-8') as file1:
- datas_len = len(file1.readlines())
- file1.close()
- print('一共有{}条数据'.format(datas_len))
- # 分批次输入数据集
- #batch_num = datas_len // batch_size # 可分的批次数
- batch_num = 1
- for i in range(batch_num+1):
- comment_vector, comment_index, labels = data_processing(
- 'data_set/douban_comment/balanced/balanced_train.txt', datas_len,word2vector, word2index, batch_size, i * batch_size)
- print(labels)
- """
- 定义transformer模块
- """
- from all_param import *
- from tensorflow import keras
- import numpy as np
- import tensorflow as tf
- class transformer(keras.Model):
- def __init__(self, max_len, word_dim, head_num, class_num, learning_rate, Train):
- super(transformer, self).__init__()
- self.Train = Train
- self.pe = self.positional_encoding(word_dim,max_len) # 位置编码
- self.head_dim = word_dim // head_num # 分头后的维度
- # Q、K、V矩阵 kernel_initializer='RandomUniform'
- self.Wq = keras.layers.Dense(self.head_dim * head_num,kernel_initializer='RandomUniform')
- self.Wk = keras.layers.Dense(self.head_dim * head_num,kernel_initializer='RandomUniform')
- self.Wv = keras.layers.Dense(self.head_dim * head_num,kernel_initializer='RandomUniform')
- # 前馈神经网络
- self.feed_forward_network = keras.layers.Dense(word_dim * head_num,kernel_initializer='RandomUniform',
- activation=keras.activations.relu)
- self.adjust_shape = [keras.layers.Dense(word_dim,kernel_initializer='RandomUniform') for _ in range(2)] # 调整多头注意力输出张量形状
- self.drop = [keras.layers.Dropout(rate=learning_rate) for _ in range(2)] # 防止过拟合,让神经元以rate的概率停止工作
- self.layer_norm = [keras.layers.LayerNormalization(axis=-1) for _ in range(2)] # Norm
- self.linear = keras.layers.Dense(class_num,kernel_initializer='RandomUniform') # 初始化全连接层(linear层)
- # 位置编码
- def positional_encoding(self,word_dim,max_len):
- """
- :return pe: 位置编码
- """
- # 初始化变量pos和i
- pos = np.array([[i for i in range(max_len)]]).T
- I = np.array([[i if i%2==0 else (i-1) for i in range(word_dim)]])
- # 公式计算
- pe = pos / np.power(10000, I/word_dim)
- pe[:, 0::2] = np.sin(pe[:, 0::2])
- pe[:, 1::2] = np.cos(pe[:, 1::2])
- return pe
- # 多头注意力机制
- def multi_head_attention(self,x_embedding, x_index, this_layer):
- """
- :param x_embedding: 词向量表示
- :return output: 含注意力信息的词向量
- """
- # 公式计算
- q,k,v = self.Wq(x_embedding), self.Wk(x_embedding), self.Wv(x_embedding)
- h_q = tf.reshape(q, (q.shape[0], head_num, q.shape[1], self.head_dim)) # 分头
- h_k = tf.reshape(k, (k.shape[0], head_num, k.shape[1], self.head_dim))
- h_v = tf.reshape(v, (v.shape[0], head_num, v.shape[1], self.head_dim))
- dk = h_q.shape[-1]
- attention = tf.matmul(h_q, h_k, transpose_b=True) / np.sqrt(dk) # 未加掩码的注意力
- attention_mask = self.mask(x_index)
- # 加入掩码
- attention += attention_mask * -1e10 # 使要遮掩的位置的注意力为负无穷大
- self.attention = tf.nn.softmax(attention, axis=-1) # 经过softmax后需要遮掩位置的注意力为无限接近0
- att_massage = tf.matmul(self.attention, h_v) # 获得通过注意力表示的词向量
- # 输出数据形状调整
- att_massage = tf.transpose(att_massage, perm=[0, 2, 1, 3]) # 为了方便下一步降维,将head_num和head_dim整合成word_dim
- att_massage = tf.reshape(att_massage, (att_massage.shape[0], att_massage.shape[1], -1))
- output = self.adjust_shape[0](att_massage) # 词向量形状规范化,head_num * head_dim不一定等于word_dim
- output = self.drop[0](output, training=self.Train)
- return output
- # 多头注意力机制里的掩码
- def mask(self, x_index):
- """
- :param x_index: 词袋表示
- :return word_mask: 填充符向量掩码
- :return attention_mask: 注意力掩码
- """
- mask = tf.math.equal(x_index, np.zeros(shape=x_index.shape)) # 找到需要遮掩的元素位置,值为True
- attention_mask = mask[:, np.newaxis, np.newaxis, :]
- attention_mask = tf.cast(attention_mask, dtype=tf.float32) # 获得词向量填充符掩码
- return attention_mask
- # 前馈神经网络
- def feed_forward(self,attention, this_layer):
- """
- :param attention: 含注意力信息的词向量
- :return output: 调整后的词向量
- """
- # 数据输入计算
- output = self.feed_forward_network(attention)
- output = self.adjust_shape[1](output)
- output = self.drop[1](output, training=self.Train)
- return output
- # 编码器层
- def encoder_layer(self, x_embedding, x_index, this_layer):
- """
- :param x_embedding: 含位置编码的词向量表示
- :param x_index: 词袋表示
- :param this_layer: 编码器层
- :return:
- """
- x_attention = self.layer_norm[0](x_embedding) # Norm (layerNorm)
- x_attention = self.multi_head_attention(x_attention, x_index, this_layer) # 多头注意力机制
- x_attention += x_embedding # Add
- x_message = self.layer_norm[1](x_attention) # Norm (layerNorm)
- x_message = self.feed_forward(x_message, this_layer) # 前馈神经网络
- x_message += x_attention # Add
- return x_message
- # 整个编码器模块
- def encoder(self, x_embedding, x_index, layer_num):
- """
- :param x_embedding: 含位置编码的词向量表示
- :param x_index: 词袋表示
- :return x_message: 编码器提取到的信息
- """
- # 各个模块组成编码器
- x_message = x_embedding
- for i in range(layer_num): # encoder的个数
- x_message = self.encoder_layer(x_message, x_index, i)
- return x_message
- # 整个transformer模型
- def calls(self, x_vector, x_index, layer_num):
- """
- :param x_vector: 词向量表示
- :param x_index: 词袋表示
- :return: 预测类别的概率
- """
- # 各个模块拼接成transformer
- x_embedding = x_vector + self.pe # 位置编码嵌入
- scores = self.encoder(x_embedding, x_index, layer_num) # 编码器
- #scores = tf.reduce_mean(scores, axis=1) # 降维形成句向量,去掉max_len维度
- #scores = self.linear(scores) # 全链接,实现类别数值的计算[batch_size,class_num]
- #scores = tf.math.softmax(scores, axis=-1) # 获得类别概率
- return scores
- if __name__ == '__main__':
- """
- 测试transformer能不能正常使用
- """
- import os
- import data2vector
- import pickle
- import time
- # 训练
- def train(model, data_path, batch_size, steps, word2vector, word2index, class_num, layer_num,
- cross_entropy,optimizer,save_path,writing_mode):
- """
- :param data_path: 训练集路径
- :param batch_size: 批数
- :param steps: 训练次数
- """
- # 获取数据总数
- with open(data_path, 'r', encoding='utf-8') as file1:
- datas_len = len(file1.readlines())
- print('共有{}条数据'.format(datas_len))
- file1.close()
- # 训练轮数
- all_time = 0 # 记录训练总耗时
- for step in range(steps):
- start_time = time.time()
- # 遍历数据集,分批次输入数据集
- data_copies = datas_len // batch_size # 可分的批次数
- #data_copies = 10
- # 用来记录每一批数据的训练结果
- all_loss = []
- all_scores = np.zeros(shape=((data_copies + 1) * batch_size, class_num))
- all_labels = np.zeros(shape=((data_copies + 1) * batch_size, class_num))
- for i in range(data_copies):
- x_vector, x_index, labels = data2vector.data_processing(
- data_path, datas_len, word2vector, word2index,
- batch_size, i * batch_size)
- # 开始训练并计算损失
- with tf.GradientTape() as tape:
- scores = model.calls(x_vector, x_index,layer_num) # 获取模型预测值
- loss = cross_entropy(labels, scores) # 计算交叉熵损失
- derivative = tape.gradient(loss, model.trainable_variables) # 自动求导
- optimizer.apply_gradients(zip(derivative, model.trainable_variables)) # 更新参数
- # 记录遍历一遍数据的总结果
- all_loss.append(loss)
- all_scores[i * batch_size: (i + 1) * batch_size, :] = scores
- all_labels[i * batch_size: (i + 1) * batch_size, :] = labels
- print('\r共有{}批数据,第 {:3} 批数据,当前损失: {:4f} '.format(data_copies,i, loss), end='')
- # 打印并保存本次训练结果
- if step % 1 == 0:
- this_time = time.time() - start_time # 本次耗时
- all_time += this_time # 总耗时
- predict_value = np.argmax(all_scores, axis=-1)[:, None] # 预测标签(0或1)
- actual_value = np.argmax(all_labels, axis=-1)[:, None] # 实际标签
- result = np.concatenate((predict_value, actual_value), axis=1) # 标签拼接对比[预测,实际]
- # 保存和打印
- look_and_save_data(model,result, this_time, save_path,writing_mode,word2vector=word2vector,
- word2index=word2index, index2word=index2word, step=step,
- loss=np.array(all_loss).mean(), all_time=all_time)
- writing_mode = 'a'
- # 测试
- def test(model, data_path, batch_size,layer_num,class_num,save_path,writing_mode):
- """
- :param data_path: 测试集路径
- :param batch_size: 批数
- """
- # 加载训练好的模型
- with open(save_path+"/tmp/transformer_word2idx_idx2word.pkl", "rb") as f:
- dic = pickle.load(f)
- f.close()
- word2vector = dic['word2vector']
- word2index = dic['word2idx']
- model.load_weights(save_path+"/model.ckpt")
- # 获取数据集长度
- with open(data_path, 'r', encoding='utf-8') as file1:
- datas_len = len(file1.readlines())
- print('共有{}条数据'.format(datas_len))
- file1.close()
- # 测试
- start_time = time.time()
- batch_num = datas_len // batch_size # 需要处理的次数
- # 记录全部预测结果
- results = np.zeros(shape=((batch_num) * batch_size, class_num))
- for i in range(batch_num):
- x_vector, x_index, labels = data2vector.data_processing(
- data_path, datas_len, word2vector, word2index,
- batch_size, i * batch_size)
- scores = model.calls(x_vector, x_index,layer_num) # 获取模型预测值
- predict_value = np.argmax(scores, axis=-1)[:, None] # 预测标签(0或1)
- actual_value = np.argmax(labels, axis=-1)[:, None] # 实际标签
- result = np.concatenate((predict_value, actual_value), axis=1) # 标签拼接对比[预测,实际]
- results[i * batch_size: (i + 1) * batch_size, :] = result # 将该批结果存入总结果
- print('\r第 {:3} 批数据,共有{}批数据'.format(i+1, batch_num+1), end='')
- times = time.time() - start_time
- look_and_save_data(model,results, times,save_path,writing_mode)
- # 打印和保存训练过程或预测结果
- def look_and_save_data(model, result, this_time, save_path,writing_mode,word2vector=None, word2index=None, index2word=None,
- step=None, loss=None, all_time=None):
- """
- :param result: 预测和标签 [预测,标签]
- :param this_time: 本次耗时
- :param step: 训练次数
- :param loss: 损失值
- :param all_time: 总耗时
- """
- # 计算P、R、F1、Accuracy
- TP = len([i for i in result if i.sum() == 2])
- TN = len([i for i in result if i.sum() == 0])
- FP = len([i for i in result if (i[0] - i[1]) == 1])
- FN = len([i for i in result if (i[0] - i[1]) == -1])
- P = (TP + 0.0001) / (TP + FP + 0.0001)
- R = (TP + 0.0001) / (TP + FN + 0.0001)
- F1 = (2 * P * R + 0.00001) / (P + R + 0.00001)
- Accuracy = (TP + TN) / len(result)
- os.makedirs(save_path, exist_ok=True) # 创建文件目录
- # 输出并保存结果
- if Train == True: # 训练模式
- # 打印并保存训练过程
- print("\tstep: {:3} | mean_loss: {:3f} | time: {:3f}m | Accuracy: {:3f} |".format(
- step, loss, this_time / 60, Accuracy))
- # 保存训练过程的数据
- with open(save_path+'/train_process.txt', writing_mode, encoding='utf-8') as file:
- file.write(
- "step: {:3} | mean_loss: {:3f} | time: {:3f} | P: {:3f} | R: {:3f} | F1: {:3f} | Accuracy: {:3f} |\n".format(
- step, loss, all_time, P, R, F1, Accuracy))
- file.close()
- # 保存模型
- model.save_weights(save_path+"/model.ckpt")
- os.makedirs(save_path+"/tmp", exist_ok=True)
- with open(save_path+"/tmp/transformer_word2idx_idx2word.pkl", "wb") as f:
- pickle.dump({"word2vector": word2vector, "word2idx": word2index, "idx2word": index2word}, f)
- else: # 预测模式
- print("P: {:3f} | R: {:3f} | F1: {:3f} | Accuracy: {:3f} | time: {:3f} |\n".format(
- P, R, F1, Accuracy, this_time))
- with open(save_path+'/test_result.txt', writing_mode, encoding='utf-8') as file:
- file.write("P: {:3f} | R: {:3f} | F1: {:3f} | Accuracy: {:3f} | time: {:3f} |\n".format(
- P, R, F1, Accuracy, this_time))
- file.close()
- # 初始化交叉熵和优化器
- cross_entropy = keras.losses.CategoricalCrossentropy(from_logits=False)
- optimizer = keras.optimizers.Adam()
- writing_mode = 'w' # 初始写入模式为覆盖
- save_path = './model_data/cg'
- # 模型参数初始化
- model = transformer(max_len, word2vec_size, head_num, class_num, learning_rate, Train)
- if Train == True: # 模型训练
- word2vector, word2index, index2word = data2vector.word2vec_index(
- 'word2vec/douban_comment/fen_ci128/balanced/balanced_data.vector')
- train(model,'data_set/douban_comment/balanced/balanced_train.txt', batch_size, steps,
- word2vector,word2index, class_num, transformer_layer,cross_entropy,optimizer,save_path,writing_mode)
- else: # 测试模型
- # 模型参数初始化
- test(model,'data_set/douban_comment/balanced/balanced_test.txt', batch_size, transformer_layer,
- class_num,save_path,writing_mode)
- import tensorflow as tf
- from all_param import *
- import numpy as np
- class TextCNN(tf.keras.Model):
- def __init__(self, word2vec_size, kernel_num, cnn_layer, learning_rate, class_num, Train):
- super(TextCNN, self).__init__()
- self.Train = Train
- # 初始化第一层卷积核大小分别为(2,embed_dim),(3,embed_dim),(4,embed_dim)的卷积层
- self.conv = [tf.keras.layers.Conv2D(kernel_num, (i,word2vec_size), strides=(1,1), padding='valid',
- kernel_initializer='RandomUniform', activation='relu') for i in range(2,5)]
- self.max_pool = tf.keras.layers.MaxPool1D(pool_size=2,padding='same')
- self.drop = tf.keras.layers.Dropout(rate=learning_rate)
- # self.line = tf.keras.layers.Dense(512, kernel_initializer='RandomUniform') # 初始化全连接层
- self.line0 = tf.keras.layers.Dense(word2vec_size, kernel_initializer='RandomUniform') # 初始化全连接层
- self.line1 = tf.keras.layers.Dense(class_num,kernel_initializer='RandomUniform') # 初始化全连接层
- # 后续的深层卷积层
- if cnn_layer>1:
- self.conv_add = [tf.keras.layers.Conv1D(tf.math.pow(2, i + 1) * kernel_num, 2, strides=1,
- kernel_initializer='RandomUniform', activation='relu',padding='valid') for i in range(cnn_layer - 1)]
- # 一个cnn结构
- def conv_and_pool(self,input,conv):
- """
- :param input: 输入数据
- :param conv: 卷积层
- :return:
- """
- data = conv(input) # 卷积 [batch, max_len-1, 1, kernel_num]
- data = tf.reshape(data, (data.shape[0], data.shape[1], -1)) # 降维[batch, max_len-1, kernel_num]
- data = self.max_pool(data) # 池化 [batch, (max_len-1)/2, kernel_num]
- if cnn_layer > 1: # 进入深度卷积层
- for this_layer in range(cnn_layer-1): # 例如第二层卷积数据形状
- data = self.conv_add[this_layer](data) # 卷积 [batch, (max_len-1)/2-1, kernel_num]
- data = self.max_pool(data) # 池化 [batch, ((max_len-1)/2-1)/2, kernel_num]
- data = tf.reshape(data, (data.shape[0], -1)) # 展开最后一维进行降维
- return data
- # 用上2,3,4这三个cnn
- def calls(self, input):
- """
- :param input: 输入数据
- :return:
- """
- datas = []
- # 获取三个cnn的结果
- for i in range(len(self.conv)):
- data = self.conv_and_pool(input,self.conv[i])
- datas.append(data)
- # 将结果进行拼接
- for i in range(1,len(datas)):
- datas[0] = tf.concat((datas[0],datas[i]),1)
- output = self.drop(datas[0],training=self.Train) # 防止过拟合
- output = self.line0(output)
- #output = self.line1(output) # 全连接
- #output = tf.math.softmax(output, axis=-1) # 获得类别概率
- return output
- if __name__=='__main__':
- """
- 测试CNN能不能正常使用
- """
- import os
- import data2vector
- import pickle
- import time
- # 训练
- def train(model, data_path, batch_size, steps, word2vector, word2index, index2word, class_num,
- cross_entropy,optimizer,save_path,writing_mode):
- """
- :param data_path: 训练集路径
- :param batch_size: 批数
- :param steps: 训练次数
- """
- # 获取数据总数
- with open(data_path, 'r', encoding='utf-8') as file1:
- datas_len = len(file1.readlines())
- print('共有{}条数据'.format(datas_len))
- file1.close()
- # 训练轮数
- all_time = 0 # 记录训练总耗时
- for step in range(steps):
- start_time = time.time()
- # 遍历数据集,分批次输入数据集
- data_copies = datas_len // batch_size # 可分的批次数
- #data_copies = 80
- # 用来记录每一批数据的训练结果
- all_loss = []
- all_scores = np.zeros(shape=(data_copies * batch_size, class_num))
- all_labels = np.zeros(shape=(data_copies * batch_size, class_num))
- for i in range(data_copies):
- x_vector, x_index, labels = data2vector.data_processing(
- data_path, datas_len, word2vector, word2index,
- batch_size, i * batch_size)
- # 开始训练并计算损失
- with tf.GradientTape() as tape:
- x_vector = x_vector[:,:,:,np.newaxis]
- scores = model.calls(x_vector) # 获取模型预测值
- loss = cross_entropy(labels, scores) # 计算交叉熵损失
- derivative = tape.gradient(loss, model.trainable_variables) # 自动求导
- optimizer.apply_gradients(zip(derivative, model.trainable_variables)) # 更新参数
- # 记录遍历一遍数据的总结果
- all_loss.append(loss)
- all_scores[i * batch_size: (i + 1) * batch_size, :] = scores
- all_labels[i * batch_size: (i + 1) * batch_size, :] = labels
- print('\r共有{}批数据,第 {:3} 批数据,当前损失: {:4f} '.format(data_copies,i+1, loss), end='')
- # 打印并保存本次训练结果
- if step % 1 == 0:
- this_time = time.time() - start_time # 本次耗时
- all_time += this_time # 总耗时
- predict_value = np.argmax(all_scores, axis=-1)[:, None] # 预测标签(0或1)
- actual_value = np.argmax(all_labels, axis=-1)[:, None] # 实际标签
- result = np.concatenate((predict_value, actual_value), axis=1) # 标签拼接对比[预测,实际]
- mean_loss = np.array(all_loss).mean() # 平均损失
- look_and_save_data(model,result, this_time, save_path,writing_mode,word2vector=word2vector,
- word2index=word2index, index2word=index2word, step=step,
- loss=mean_loss, all_time=all_time) # 保存和打印
- writing_mode = 'a'
- # 测试
- def test(model, data_path, batch_size,class_num,save_path,writing_mode):
- """
- :param data_path: 测试集路径
- :param batch_size: 批数
- """
- # 加载训练好的模型
- with open(save_path+"/tmp/transformer_word2idx_idx2word.pkl", "rb") as f:
- dic = pickle.load(f)
- f.close()
- word2vector = dic['word2vector']
- word2index = dic['word2idx']
- model.load_weights(save_path+"/model.ckpt")
- # 获取数据集长度
- with open(data_path, 'r', encoding='utf-8') as file1:
- datas_len = len(file1.readlines())
- print('共有{}条数据'.format(datas_len))
- file1.close()
- # 测试
- start_time = time.time()
- batch_num = datas_len // batch_size # 需要处理的次数
- # 记录全部预测结果
- results = np.zeros(shape=(batch_num * batch_size, class_num))
- for i in range(batch_num):
- x_vector, x_index, labels = data2vector.data_processing(
- data_path, datas_len, word2vector, word2index,
- batch_size, i * batch_size)
- x_vector = x_vector[:, :, :, np.newaxis]
- scores = model.calls(x_vector) # 获取模型预测值
- predict_value = np.argmax(scores, axis=-1)[:, None] # 预测标签(0或1)
- actual_value = np.argmax(labels, axis=-1)[:, None] # 实际标签
- result = np.concatenate((predict_value, actual_value), axis=1) # 标签拼接对比[预测,实际]
- results[i * batch_size: (i + 1) * batch_size, :] = result # 将该批结果存入总结果
- print('\r第 {:3} 批数据,共有{}批数据'.format(i+1, batch_num), end='')
- times = time.time() - start_time
- look_and_save_data(model,results, times,save_path,writing_mode)
- # 打印和保存训练过程或预测结果
- def look_and_save_data(model, result, this_time, save_path,writing_mode,word2vector=None, word2index=None, index2word=None,
- step=None, loss=None, all_time=None):
- """
- :param result: 预测和标签 [预测,标签]
- :param this_time: 本次耗时
- :param step: 训练次数
- :param loss: 损失值
- :param all_time: 总耗时
- """
- # 计算P、R、F1、Accuracy
- TP = len([i for i in result if i.sum() == 2])
- TN = len([i for i in result if i.sum() == 0])
- FP = len([i for i in result if (i[0] - i[1]) == 1])
- FN = len([i for i in result if (i[0] - i[1]) == -1])
- P = (TP + 0.0001) / (TP + FP + 0.0001)
- R = (TP + 0.0001) / (TP + FN + 0.0001)
- F1 = (2 * P * R + 0.00001) / (P + R + 0.00001)
- Accuracy = (TP + TN) / len(result)
- os.makedirs(save_path, exist_ok=True) # 创建文件目录
- # 输出并保存结果
- if Train == True: # 训练模式
- # 打印并保存训练过程
- print("\tstep: {:3} | mean_loss: {:3f} | time: {:3f}m | Accuracy: {:3f} |".format(
- step, loss, this_time / 60, Accuracy))
- # 保存训练过程的数据
- with open(save_path+'/train_process.txt', writing_mode, encoding='utf-8') as file:
- file.write(
- "step: {:3} | mean_loss: {:3f} | time: {:3f} | P: {:3f} | R: {:3f} | F1: {:3f} | Accuracy: {:3f} |\n".format(
- step, loss, all_time, P, R, F1, Accuracy))
- file.close()
- # 保存模型
- model.save_weights(save_path+"/model.ckpt")
- os.makedirs(save_path+"/tmp", exist_ok=True)
- with open(save_path+"/tmp/transformer_word2idx_idx2word.pkl", "wb") as f:
- pickle.dump({"word2vector": word2vector, "word2idx": word2index, "idx2word": index2word}, f)
- else: # 预测模式
- print("P: {:3f} | R: {:3f} | F1: {:3f} | Accuracy: {:3f} | time: {:3f} |\n".format(
- P, R, F1, Accuracy, this_time))
- with open(save_path+'/test_result.txt', writing_mode, encoding='utf-8') as file:
- file.write("P: {:3f} | R: {:3f} | F1: {:3f} | Accuracy: {:3f} | time: {:3f} |\n".format(
- P, R, F1, Accuracy, this_time))
- file.close()
- # 初始化交叉熵和优化器
- cross_entropy = tf.keras.losses.CategoricalCrossentropy(from_logits=False)
- optimizer = tf.keras.optimizers.Adam()
- writing_mode = 'w' # 初始写入模式为覆盖
- save_path = './model_data/balanced_CNN_4_128'
- # 模型参数初始化
- model = TextCNN(word2vec_size,kernel_num,cnn_layer,learning_rate,class_num,Train)
- if Train == True: # 模型训练
- word2vector, word2index, index2word = data2vector.word2vec_index(
- 'word2vec/douban_comment/fen_ci128/balanced/balanced_data.vector')
- train(model,'data_set/douban_comment/balanced/balanced_train.txt', batch_size, steps,
- word2vector,word2index, index2word, class_num,cross_entropy,optimizer,save_path,writing_mode)
- else: # 测试模型
- # 模型参数初始化
- test(model,'data_set/douban_comment/balanced/balanced_test.txt', batch_size,
- class_num,save_path,writing_mode)
- import os
- import pickle
- from all_param import *
- import data2vector
- import numpy as np
- import tensorflow as tf
- import time
- import block_CNN,block_transformer
- class TransformerCNN(tf.keras.Model):
- def __init__(self,max_len, word2vec_size, head_num, class_num, learning_rate, Train,kernel_num,cnn_layer):
- super(TransformerCNN, self).__init__()
- self.transformer = block_transformer.transformer(max_len, word2vec_size, head_num, class_num, learning_rate, Train)
- self.CNN = block_CNN.TextCNN(word2vec_size,kernel_num,cnn_layer,learning_rate,class_num,Train)
- self.Train = Train
- # 初始化交叉熵和优化器
- self.cross_entropy = tf.keras.losses.CategoricalCrossentropy(from_logits=False)
- self.optimizer = tf.keras.optimizers.Adam()
- self.tf_linear = tf.keras.layers.Dense(word2vec_size, kernel_initializer='RandomUniform')
- self.cnn_linear = tf.keras.layers.Dense(word2vec_size, kernel_initializer='RandomUniform')
- self.linear = tf.keras.layers.Dense(class_num, kernel_initializer='RandomUniform') # 初始化全连接层(linear层)
- self.writing_mode = 'w' # 初始写入模式为覆盖
- self.attention = None
- # 训练
- def train(self, data_path, batch_size, steps, word2vector, word2index, class_num, save_path, layer_num):
- """
- :param data_path: 训练集路径
- :param batch_size: 批数
- :param steps: 训练次数
- """
- # 获取数据总数
- with open(data_path, 'r', encoding='utf-8') as file1:
- datas_len = len(file1.readlines())
- print('共有{}条数据'.format(datas_len))
- file1.close()
- # 训练轮数
- all_time = 0 # 记录训练总耗时
- for step in range(steps):
- start_time = time.time()
- # 遍历数据集,分批次输入数据集
- data_copies = datas_len // batch_size # 可分的批次数
- # data_copies = 10
- # 用来记录每一批数据的训练结果
- all_loss = []
- all_scores = np.zeros(shape=(data_copies * batch_size, class_num),)
- all_labels = np.zeros(shape=(data_copies * batch_size, class_num), dtype=float)
- for i in range(data_copies):
- x_vector, x_index, labels = data2vector.data_processing(
- data_path, datas_len, word2vector, word2index,
- batch_size, i * batch_size)
- # 开始训练并计算损失
- with tf.GradientTape() as tape:
- scores1 = self.transformer.calls(x_vector, x_index, layer_num) # 获取transformer模型预测值
- scores2 = self.CNN.calls(scores1[:, :, :, np.newaxis]) # 获取cnn模型预测值
- scores = self.linear(scores2) # 全连接
- scores = tf.math.softmax(scores, axis=-1) # 获得类别概率
- loss = self.cross_entropy(labels, scores) # 计算交叉熵损失
- derivative = tape.gradient(loss, self.trainable_variables) # 自动求导
- self.optimizer.apply_gradients(zip(derivative, self.trainable_variables)) # 更新参数
- # 记录遍历一遍数据的总结果
- all_loss.append(loss)
- all_scores[i * batch_size: (i + 1) * batch_size, :] = scores
- all_labels[i * batch_size: (i + 1) * batch_size, :] = labels
- print('\r共有{}批数据,第 {:3} 批数据,当前损失: {:4f} '.format(data_copies, i, loss), end='')
- # 打印并保存本次训练结果
- if step % 1 == 0:
- this_time = time.time() - start_time # 本次耗时
- all_time += this_time # 总耗时
- predict_value = np.argmax(all_scores, axis=-1)[:, None] # 预测标签(0或1)
- actual_value = np.argmax(all_labels, axis=-1)[:, None] # 实际标签
- result = np.concatenate((predict_value, actual_value), axis=1) # 标签拼接对比[预测,实际]
- mean_loss = np.array(all_loss).mean()
- self.look_and_save_data(result, this_time, save_path, word2vector=word2vector,
- word2index=word2index, index2word=index2word, step=step,
- loss=mean_loss, all_time=all_time) # 保存和打印
- self.writing_mode = 'a'
- # 测试
- def test(self, data_path, batch_size, layer_num, class_num, save_path):
- """
- :param data_path: 测试集路径
- :param batch_size: 批数
- """
- # 加载训练好的模型
- with open(save_path+"/tmp/transformer_word2idx_idx2word.pkl", "rb") as f:
- dic = pickle.load(f)
- f.close()
- word2vector = dic['word2vector']
- word2index = dic['word2idx']
- self.load_weights(save_path+"/model.ckpt")
- # 获取数据集长度
- with open(data_path, 'r', encoding='utf-8') as file1:
- datas_len = len(file1.readlines())
- print('共有{}条数据'.format(datas_len))
- file1.close()
- # 测试
- start_time = time.time()
- batch_num = datas_len // batch_size # 需要处理的次数
- # 记录全部预测结果
- results = np.zeros(shape=(batch_num * batch_size, class_num))
- for i in range(batch_num):
- x_vector, x_index, labels = data2vector.data_processing(
- data_path, datas_len, word2vector, word2index,
- batch_size, i * batch_size)
- scores1 = self.transformer.calls(x_vector, x_index, layer_num) # 获取transformer模型预测值
- scores2 = self.CNN.calls(scores1[:, :, :, np.newaxis]) # 获取cnn模型预测值
- scores = self.linear(scores2) # 全连接
- predict_value = np.argmax(scores, axis=-1)[:, None] # 预测标签(0或1)
- actual_value = np.argmax(labels, axis=-1)[:, None] # 实际标签
- result = np.concatenate((predict_value, actual_value), axis=1) # 标签拼接对比[预测,实际]
- results[i * batch_size: (i + 1) * batch_size, :] = result # 将该批结果存入总结果
- print('\r第 {:3} 批数据,共有{}批数据'.format(i + 1, batch_num + 1), end='')
- times = time.time() - start_time
- self.look_and_save_data(results, times,save_path)
- # 打印和保存训练过程或预测结果
- def look_and_save_data(self, result, this_time, save_path, word2vector=None, word2index=None, index2word=None,
- step=None, loss=None, all_time=None):
- """
- :param result: 预测和标签 [预测,标签]
- :param this_time: 本次耗时
- :param step: 训练次数
- :param loss: 损失值
- :param all_time: 总耗时
- """
- # 计算P、R、F1、Accuracy
- TP = len([i for i in result if i.sum() == 2])
- TN = len([i for i in result if i.sum() == 0])
- FP = len([i for i in result if (i[0] - i[1]) == 1])
- FN = len([i for i in result if (i[0] - i[1]) == -1])
- P = (TP + 0.0001) / (TP + FP + 0.0001)
- R = (TP + 0.0001) / (TP + FN + 0.0001)
- F1 = (2 * P * R + 0.00001) / (P + R + 0.00001)
- Accuracy = (TP + TN) / len(result)
- os.makedirs(save_path, exist_ok=True)
- # 输出并保存结果
- if self.Train == True: # 训练模式
- # 打印并保存训练过程
- print("\tstep: {:3} | mean_loss: {:3f} | time: {:3f}m | Accuracy: {:3f} |".format(
- step, loss, this_time / 60, Accuracy))
- # 保存训练过程的数据
- with open(save_path+'/train_process.txt', self.writing_mode, encoding='utf-8') as file:
- file.write(
- "step: {:3} | mean_loss: {:3f} | time: {:3f} | P: {:3f} | R: {:3f} | F1: {:3f} | Accuracy: {:3f} |\n".format(
- step, loss, all_time, P, R, F1, Accuracy))
- file.close()
- # 保存模型
- self.save_weights(save_path+"/model.ckpt")
- os.makedirs(save_path+"/tmp", exist_ok=True)
- with open(save_path+"/tmp/transformer_word2idx_idx2word.pkl", "wb") as f:
- pickle.dump({"word2vector": word2vector, "word2idx": word2index, "idx2word": index2word}, f)
- else: # 预测模式
- print("P: {:3f} | R: {:3f} | F1: {:3f} | Accuracy: {:3f} | time: {:3f} |\n".format(
- P, R, F1, Accuracy, this_time))
- with open(save_path+'/test_result.txt', self.writing_mode, encoding='utf-8') as file:
- file.write("P: {:3f} | R: {:3f} | F1: {:3f} | Accuracy: {:3f} | time: {:3f} |\n".format(
- P, R, F1, Accuracy, this_time))
- file.close()
- if __name__ == '__main__':
- # 模型参数初始化
- model = TransformerCNN(max_len, word2vec_size, head_num, class_num, learning_rate, Train,kernel_num,cnn_layer)
- save_path = './model_data/balanced_RU_1_64_CNN_4_64_label'
- if Train == True: # 模型训练
- word2vector, word2index, index2word = data2vector.word2vec_index(
- 'word2vec/douban_comment/fen_ci128/balanced/balanced_data.vector')
- model.train('data_set/douban_comment/balanced/balanced_train.txt', batch_size, steps, word2vector, word2index,
- class_num, save_path,transformer_layer)
- else: # 测试模型
- # 模型参数初始化
- model.test('data_set/douban_comment/balanced/balanced_test.txt',batch_size, transformer_layer, class_num, save_path)
- import numpy as np
- # 数据处理和提取
- def get_input(path, data_num, data_batch, data_start_site):
- # 读取对应批数的数据
- with open(path, 'r', encoding='utf-8') as file1:
- data = file1.readlines()
- if data_start_site + data_batch > data_num: # 选取数据下标超出列表的长度但小于所取的数据批数时
- end_site = data_start_site + data_batch - data_num # 应取数据的末尾位置
- data = data[data_start_site:] + data[:end_site]
- else:
- end_site = data_start_site + data_batch # 应取数据的末尾位置
- data = data[data_start_site:end_site]
- file1.close()
- labels = np.zeros(shape=(len(data)))
- comments = []
- # 数据处理
- for i in range(len(data)):
- one_data = data[i].replace('\n', '')
- one_data = one_data.split(' ')
- label, comment = int(one_data[0]) ,one_data[1:]
- if label != 0 and label != 1: # 如果标签不存在,舍弃这条数据
- labels[i] = 0
- else:
- if label == 0:
- labels[i] = 0.001
- if label == 1:
- labels[i] = 1.001
- comments.append(''.join(comment))
- return labels, comments
- if __name__ == '__main__':
- with open('data_set/douban_comment/balanced/balanced_train.txt', 'r', encoding='utf-8') as file:
- data_len = len(file.readlines())
- file.close()
- labels, comments = get_input('data_set/douban_comment/balanced/balanced_train.txt', data_len, 10, 0)
- print(labels)
- from transformers import BertModel, BertTokenizer
- import torch
- #print(torch.cuda.is_available()) # 查看GPU是否可用
- #print(torch.cuda.device_count()) # 查看GPU数量
- #print(torch.cuda.current_device()) # 查看GPU索引号
- #print(torch.cuda.get_device_name(0)) # 根据索引号得到GPU名称
- class bert(torch.nn.Module):
- def __init__(self):
- super(bert, self).__init__()
- self.tokenizer = BertTokenizer.from_pretrained('hfl/chinese-bert-wwm') # Bert分词器
- self.BERT = BertModel.from_pretrained('hfl/chinese-bert-wwm') # Bert模型,放GPU上
- def calls(self,input_list):
- batch_tokenized = self.tokenizer.batch_encode_plus(input_list, add_special_tokens=True,
- max_length=max_len, padding='max_length',
- truncation=True)
- input_ids = torch.tensor(batch_tokenized['input_ids'])
- attention_mask = torch.tensor(batch_tokenized['attention_mask'])
- #with torch.no_grad():
- hidden_outputs = self.BERT(input_ids, attention_mask=attention_mask)
- outputs = hidden_outputs[0] # [0]表示输出结果(last_hidden_state部分),[:,0,:]表示[CLS]对应的结果
- cls = outputs[:, 0, :]
- return outputs, cls
- if __name__ == '__main__':
- import get_data
- import numpy as np
- import os
- import time
- from all_param import *
- def train(BERT, data_path, epoch, batch_size, class_num, optimizer, line, cross_entropy, save_path, writing_mode, Train):
- # 获取数据总数
- with open(data_path, 'r', encoding='utf-8') as file1:
- datas_len = len(file1.readlines())
- file1.close()
- print('一共有{}条数据'.format(datas_len))
- # 训练
- all_time_start = time.time()
- torch.cuda.empty_cache()
- for e in range(epoch):
- this_time_start = time.time() # 起始时间
- batch_num = datas_len // batch_size # 可取的批数
- batch_num = 2
- all_loss = []
- all_outputs = torch.tensor(np.zeros(shape=(1, class_num)), dtype=torch.float32)
- all_labels = torch.tensor(np.zeros(shape=(1)), dtype=torch.float32)
- # 批训练
- for batch in range(batch_num):
- # 获取数据
- labels, comments = get_data.get_input(data_path, datas_len, batch_size, batch)
- labels = torch.tensor(labels, dtype=torch.float32).long()
- optimizer.zero_grad() # 1.梯度置零
- _, cls = BERT.calls(comments) # 2.模型获得结果
- cls = line(cls)
- #cls = torch.softmax(cls, dim=-1)
- loss = cross_entropy(cls, labels) # 3.计算损失
- loss.requires_grad_(True)
- loss.backward() # 4.反向传播
- optimizer.step() # 5.修改参数,w,b
- print('\r共有{}批数据,第 {:3} 批数据,当前损失: {:4f} '.format(batch_num, batch, loss), end='')
- ## 记录遍历一遍数据的总结果
- all_loss.append(loss.item()) # item()返回loss的值
- all_outputs = torch.cat((all_outputs, cls), dim=0)
- all_labels = torch.cat((all_labels, labels), dim=0)
- # 打印并保存本次训练结果
- if e % 1 == 0:
- this_time = time.time() - this_time_start # 本次耗时
- all_time = time.time() - all_time_start # 当前总耗时
- predict_value = np.argmax(all_outputs[1:].detach().numpy(), axis=-1)[:, None] # 预测标签(0或1)
- actual_value = all_labels[1:].detach().numpy()[:, None] # 实际标签
- result = np.concatenate((predict_value, actual_value), axis=1) # 标签拼接对比[预测,实际]
- look_and_save_data(BERT, result, this_time, save_path, writing_mode, Train, step=e,
- loss=np.array(all_loss).mean(), all_time=all_time)
- writing_mode = 'a' # 更改写入模式为追加
- def test(BERT, data_path, batch_size, class_num, save_path, writing_mode, Train):
- # 获取数据总数
- with open(data_path, 'r', encoding='utf-8') as file1:
- datas_len = len(file1.readlines())
- file1.close()
- print('一共有{}条数据'.format(datas_len))
- BERT.load_state_dict(torch.load(save_path+"/model.ckpt"))
- BERT.eval()
- this_time_start = time.time() # 起始时间
- batch_num = datas_len // batch_size # 可取的批数
- all_outputs = torch.tensor(np.zeros(shape=(1, class_num)), dtype=torch.float32)
- all_labels = torch.tensor(np.zeros(shape=(1, class_num)), dtype=torch.float32)
- # 批训练
- for batch in range(batch_num):
- # 获取数据
- labels, comments = get_data.get_input(data_path, datas_len, batch_size, batch)
- labels = torch.tensor(labels, dtype=torch.float32)
- outputs, cls = BERT.call(comments) # 2.模型获得结果
- cls = line(cls)
- cls = torch.softmax(cls, dim=-1)
- # 记录遍历一遍数据的总结果
- all_outputs = torch.cat((all_outputs, cls), dim=0)
- all_labels = torch.cat((all_labels, labels), dim=0)
- print('\r共有{}批数据, 第 {:3} 批数据'.format(batch_num, batch+1), end='')
- this_time = time.time() - this_time_start # 本次耗时
- predict_value = np.argmax(all_outputs[1:], axis=-1)[:, None] # 预测标签(0或1)
- actual_value = np.argmax(all_labels[1:], axis=-1)[:, None] # 实际标签
- result = np.concatenate((predict_value, actual_value), axis=1) # 标签拼接对比[预测,实际]
- look_and_save_data(BERT, result, this_time, save_path, writing_mode, Train)
- # 打印和保存训练过程或预测结果
- def look_and_save_data(model, result, this_time, save_path,writing_mode, Train,
- step=None, loss=None, all_time=None):
- # 计算P、R、F1、Accuracy
- TP = len([i for i in result if i.sum() == 2])
- TN = len([i for i in result if i.sum() == 0])
- FP = len([i for i in result if (i[0] - i[1]) == 1])
- FN = len([i for i in result if (i[0] - i[1]) == -1])
- P = (TP + 0.0001) / (TP + FP + 0.0001)
- R = (TP + 0.0001) / (TP + FN + 0.0001)
- F1 = (2 * P * R + 0.00001) / (P + R + 0.00001)
- Accuracy = (TP + TN) / len(result)
- os.makedirs(save_path, exist_ok=True) # 创建文件目录
- # 输出并保存结果
- if Train == True: # 训练模式
- # 打印并保存训练过程
- print("\tstep: {:3} | mean_loss: {:3f} | time: {:3f}m | Accuracy: {:3f} |".format(
- step, loss, this_time / 60, Accuracy))
- # 保存训练过程的数据
- with open(save_path+'/train_process.txt', writing_mode, encoding='utf-8') as file:
- file.write(
- "step: {:3} | mean_loss: {:3f} | time: {:3f}m | P: {:3f} | R: {:3f} | F1: {:3f} | Accuracy: {:3f} |\n".format(
- step, loss, all_time / 60, P, R, F1, Accuracy))
- file.close()
- # 保存模型
- torch.save(model.state_dict(), save_path+"/model.ckpt")
- else: # 预测模式
- print("P: {:3f} | R: {:3f} | F1: {:3f} | Accuracy: {:3f} | time: {:3f}m |\n".format(
- P, R, F1, Accuracy, this_time / 60))
- with open(save_path+'/test_result.txt', writing_mode, encoding='utf-8') as file:
- file.write("P: {:3f} | R: {:3f} | F1: {:3f} | Accuracy: {:3f} | time: {:3f}m |\n".format(
- P, R, F1, Accuracy, this_time / 60))
- file.close()
- # 初始化交叉熵和优化器
- bert = bert()
- line = torch.nn.Linear(768, class_num)
- cross_entropy = torch.nn.CrossEntropyLoss() # 定义损失函数,交叉熵损失函数
- optimizer = torch.optim.Adam(bert.parameters(),lr=learning_rate)
- writing_mode = 'w' # 初始写入模式为覆盖
- save_path = './model_data/cg'
- # 模型参数初始化
- if Train == True: # 模型训练
- train(bert, 'data_set/douban_comment/balanced/balanced_train.txt', steps, batch_size, class_num,
- optimizer, line, cross_entropy, save_path, writing_mode, Train)
- else: # 测试模型
- # 模型参数初始化
- test(bert, 'data_set/douban_comment/balanced/balanced_test.txt', batch_size, class_num,
- save_path, writing_mode, Train)
- import os
- import time
- import numpy as np
- import get_data
- from all_param import *
- import bert_torch
- import torch
- import math
- #from transformers import BertTokenizer, BertModel
- class TextCNN(torch.nn.Module):
- def __init__(self,embed_dim,kernel_num,cnn_layer,learning_rate,class_num,DEVICE):
- super(TextCNN, self).__init__()
- # 初始化第一层卷积核大小分别为(2,embed_dim),(3,embed_dim),(4,embed_dim)的卷积层
- self.conv = [torch.nn.Conv2d(1,kernel_num,(i,embed_dim)).to(DEVICE) for i in range(2,5)]
- self.relu = torch.nn.ReLU()
- self.max_pool = torch.nn.MaxPool1d(2,ceil_mode=True) # 最大池化层
- self.drop = torch.nn.Dropout(learning_rate)
- # 后续的深层卷积层
- if cnn_layer>1:
- self.conv_add = [torch.nn.Conv1d(int(math.pow(2, i)) * kernel_num,
- 2 *int(math.pow(2, i)) * kernel_num, 2).to(DEVICE) for i in range(cnn_layer-1)]
- # 根据矩阵变化的规律求出最后得到全连接前的矩阵[batch_size,line_dim]里的dim
- line_dim = max_len / 2 # 由第一层池化操作得到的
- if cnn_layer > 1:
- for i in range(cnn_layer - 1): # 第二层到第cnn_layer层
- if i%2==0: # 偶数层刚好卷积后全部池化
- line_dim = int((line_dim - 1) / 2)
- if i%2==1: # 奇数层卷积后会剩一个没池化到,便多池化一次
- line_dim = int((line_dim - 1) / 2) + 1
- line_dim = int(math.pow(2, cnn_layer - 1)) * kernel_num * line_dim # 乘上卷积核个
- # 初始化全连接层
- self.line = torch.nn.Linear(line_dim * 3, class_num)
- # 一个cnn结构
- def conv_and_pool(self,input,conv):
- """
- :param input: 输入数据
- :param conv: 卷积层
- :return:
- """
- data = conv(input) # 卷积 [batch,kernel_num,max_len,1]
- data = data.squeeze(3) # 降维 [batch,kernel_num,max_len]
- data = self.relu(data) # relu激活函数
- data = self.max_pool(data) # 池化 [batch,kernel_num,max_len/2]
- #print(data.shape)
- if cnn_layer>1: # 进入深度卷积层
- for this_layer in range(len(self.conv_add)): # 例如第二层卷积数据形状
- data = self.conv_add[this_layer](data) # 卷积 [batch, kernel_num*2, max_len/2-1]
- data = self.relu(data) # relu激活函数[batch, kernel_num*2, max_len/2-1]
- data = self.max_pool(data) # 池化 [batch, kernel_num*2, (max_len/2-1)/2]
- #print(data.shape)
- data = torch.reshape(data,shape=(data.shape[0],-1)) # 展开最后一维进行降维
- return data
- # 用上2,3,4这三个cnn
- def calls(self,input):
- """
- :param input: 输入数据
- :return:
- """
- datas = []
- # 获取三个cnn的结果
- for i in range(len(self.conv)):
- data = self.conv_and_pool(input,self.conv[i])
- datas.append(data)
- # 将结果进行拼接
- for i in range(1,len(datas)):
- datas[0] = torch.cat((datas[0],datas[i]),dim=1)
- datas = self.drop(datas[0]) # 防止过拟合
- output = self.line(datas) # 全连接
- return output
- class mymodel(torch.nn.Module):
- def __init__(self, embed_dim, kernel_num, cnn_layer, learning_rate, class_num, Train, DEVICE):
- super(mymodel, self).__init__()
- self.bert = bert_torch.bert(class_num)
- self.cnn = TextCNN(embed_dim,kernel_num,cnn_layer,learning_rate,class_num,DEVICE)
- # none表示不降维,返回和target相同形状;mean表示对一个batch的损失求均值;sum表示对一个batch的损失求和
- self.cross_entropy = torch.nn.CrossEntropyLoss() # 定义损失函数,交叉熵损失函数
- self.optimizer = torch.optim.Adam(self.parameters(),lr=learning_rate)
- #self.scheduler = torch.optim.lr_scheduler.StepLR(self.optimizer, step_size=10, gamma=0.1) # 设置学习率下降策略"
- #self.drop = torch.nn.Dropout(learning_rate)
- ## 根据矩阵变化的规律求出最后得到全连接前的矩阵[batch_size,line_dim]里的dim
- #line_dim = max_len / 2 # 由第一层池化操作得到的
- #if cnn_layer > 1:
- # for i in range(cnn_layer - 1): # 第二层到第cnn_layer层
- # if i % 2 == 0: # 偶数层刚好卷积后全部池化
- # line_dim = int((line_dim - 1) / 2)
- # if i % 2 == 1: # 奇数层卷积后会剩一个没池化到,便多池化一次
- # line_dim = int((line_dim - 1) / 2) + 1
- # line_dim = int(math.pow(2, cnn_layer - 1)) * kernel_num * line_dim # 乘上卷积核个数
- #
- ## 初始化全连接层
- #self.line = torch.nn.Linear(line_dim * 3, class_num)
- #
- self.writing_mode = 'w'
- self.Train = Train
- def Training(self, data_path, verify_path, max_len, DEVICE, epoch, batch_size, class_num, save_path):
- self.train()
- # 获取数据总数
- with open(data_path, 'r', encoding='utf-8') as file1:
- datas_len = len(file1.readlines())
- file1.close()
- print('一共有{}条数据'.format(datas_len))
- # 训练
- all_time_start = time.time()
- bast_acc = 0
- for e in range(epoch):
- this_time_start = time.time() # 起始时间
- batch_num = datas_len // batch_size # 可取的批数
- batch_num = 10
- all_loss = []
- all_outputs = torch.tensor(np.zeros(shape=(1, class_num)), dtype=torch.float32)
- all_labels = torch.tensor(np.zeros(shape=1), dtype=torch.float32)
- # 批训练
- for batch in range(batch_num):
- # 获取数据
- labels, comments = get_data.get_input(data_path, datas_len, batch_size, batch)
- long_labels = torch.tensor(labels, dtype=torch.float32).long()
- self.optimizer.zero_grad() # 1.梯度置零
- outputs, _ = self.bert.calls(comments, max_len, DEVICE) # 2.模型获得结果
- outputs = outputs.unsqueeze(1)
- #print(outputs.shape)
- outputs = self.cnn.calls(outputs)
- #cls = self.drop(cls) # 防止过拟合
- #cls = self.line(cls) # 全连接
- #outputs = torch.softmax(outputs, dim=-1)
- loss = self.cross_entropy(outputs.to('cpu'), long_labels) # 3.计算损失
- #loss.requires_grad_(True)
- loss.backward() # 4.反向传播
- self.optimizer.step() # 5.修改参数,w,b
- ## 记录遍历一遍数据的总结果
- all_loss.append(loss.item()) # item()返回loss的值
- all_outputs = torch.cat((all_outputs, outputs.to('cpu')), dim=0)
- for i in range(len(labels)):
- if labels[i] == 0.001:
- labels[i] = 0
- else:
- labels[i] = 1
- labels = torch.tensor(labels, dtype=torch.float32)
- all_labels = torch.cat((all_labels, labels), dim=0)
- ## 选择训练最好的参数保存
- #Acc = self.test(verify_path, batch_size, class_num, save_path)
- #if Acc > bast_acc:
- # bast_acc = Acc
- # # 保存模型
- # torch.save(self.state_dict(), save_path + "/model.pth")
- print('\r训练进度{:2d}%, 共有{}批数据, 已完成{:2d}%, 当前损失: {:4f}, ACC: {} '.format(
- int((e) / epoch * 100), batch_num, int((batch + 1) / batch_num * 100),loss, 'None'), end='')
- # 打印并保存本次训练结果
- if e % 1 == 0:
- torch.save(self,save_path + "/model.pth")
- this_time = time.time() - this_time_start # 本次耗时
- all_time = time.time() - all_time_start # 当前总耗时
- predict_value = np.argmax(all_outputs[1:].detach().numpy(), axis=-1)[:, None] # 预测标签(0或1)
- actual_value = all_labels[1:].detach().numpy()[:, None] # 实际标签
- result = np.concatenate((predict_value, actual_value), axis=1) # 标签拼接对比[预测,实际]
- mean_loss = np.array(all_loss).mean()
- acc = self.look_and_save_data(result, this_time, save_path, self.writing_mode, self.Train, step=e,
- loss=mean_loss, all_time=all_time)
- self.writing_mode = 'a' # 更改写入模式为追加
- def test(self, data_path, batch_size, class_num, save_path, test_data_save=False):
- self.eval()
- # 获取数据总数
- with open(data_path, 'r', encoding='utf-8') as file1:
- datas_len = len(file1.readlines())
- file1.close()
- print('一共有{}条数据'.format(datas_len))
- this_time_start = time.time() # 起始时间
- batch_num = datas_len // batch_size # 可取的批数
- all_outputs = torch.tensor(np.zeros(shape=(1, class_num)), dtype=torch.float32)
- all_labels = torch.tensor(np.zeros(shape=1), dtype=torch.float32)
- batch_num = 30
- # 批训练
- for batch in range(batch_num):
- # 获取数据
- labels, comments = get_data.get_input(data_path, datas_len, batch_size, batch)
- labels = torch.tensor(labels, dtype=torch.float32)
- with torch.no_grad(): # 不进行梯度计算,节省内存
- outputs, _ = self.bert.calls(comments, max_len, DEVICE) # 2.模型获得结果
- outputs = self.cnn.calls(outputs.unsqueeze(1))
- #cls = self.drop(cls) # 防止过拟合
- #cls = self.line(cls) # 全连接
- #outputs = torch.softmax(outputs, dim=-1)
- # 记录遍历一遍数据的总结果
- all_outputs = torch.cat((all_outputs, outputs.to('cpu')), dim=0)
- for i in range(len(labels)):
- if labels[i] == 0.001:
- labels[i] = 0
- else:
- labels[i] = 1
- labels = torch.tensor(labels, dtype=torch.float32)
- all_labels = torch.cat((all_labels, labels), dim=0)
- if test_data_save != False:
- print('\r共有{}批数据, 测试进度{:2d}% '.format(batch_num, int((batch + 1) / batch_num * 100)), end='')
- this_time = time.time() - this_time_start # 本次耗时
- all_outputs = np.argmax(all_outputs[1:].detach().numpy(), axis=-1)[:, None] # 预测标签(0或1)
- all_labels = all_labels[1:].detach().numpy()[:, None] # 实际标签
- all_outputs = np.concatenate((all_outputs, all_labels), axis=1) # 标签拼接对比[预测,实际]
- # 计算评价指标并保存训练情况
- Acc = self.look_and_save_data(all_outputs, this_time, save_path, self.writing_mode, test_data_save=test_data_save)
- return Acc
- # 打印和保存训练过程或预测结果
- def look_and_save_data(self, result, this_time, save_path, writing_mode, Train=False, step=None, loss=None,
- all_time=None, test_data_save=False):
- # 计算P、R、F1、Accuracy
- TP = len([i for i in result if i.sum() == 2])
- TN = len([i for i in result if i.sum() == 0])
- FP = len([i for i in result if (i[0] - i[1]) == 1])
- FN = len([i for i in result if (i[0] - i[1]) == -1])
- P = (TP + 0.0001) / (TP + FP + 0.0001)
- R = (TP + 0.0001) / (TP + FN + 0.0001)
- F1 = (2 * P * R + 0.00001) / (P + R + 0.00001)
- Accuracy = (TP + TN) / len(result)
- # 输出并保存结果
- if Train == True: # 训练模式
- # 打印并保存训练过程
- print("\tstep: {:3} | mean_loss: {:3f} | time: {:3f}m | train_data_Acc: {:3f} |".format(
- step, loss, this_time / 60, Accuracy))
- # 保存训练过程的数据
- with open(save_path + '/train_process.txt', writing_mode, encoding='utf-8') as file:
- file.write(
- "step: {:3} | mean_loss: {:3f} | time: {:3f}m | P: {:3f} | R: {:3f} | F1: {:3f} | train_data_Acc: {:3f} |\n".format(
- step, loss, all_time / 60, P, R, F1, Accuracy))
- file.close()
- ## 保存模型
- # torch.save(model.state_dict(), save_path+"/model.pth")
- else: # 预测模式
- if test_data_save == True:
- print("P: {:3f} | R: {:3f} | F1: {:3f} | Accuracy: {:3f} | time: {:3f}m |\n".format(
- P, R, F1, Accuracy, this_time / 60))
- with open(save_path + '/test_result.txt', writing_mode, encoding='utf-8') as file:
- file.write("P: {:3f} | R: {:3f} | F1: {:3f} | Accuracy: {:3f} | time: {:3f}m |\n".format(
- P, R, F1, Accuracy, this_time / 60))
- file.close()
- return Accuracy
- if __name__ == '__main__':
- #tokenizer = BertTokenizer.from_pretrained('bert-base-chinese') # 加载base模型的对应的切词器
- #model = BertModel.from_pretrained('bert-base-chinese')
- DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
- print('GPU: ', '可用' if str(DEVICE) == "cuda" else "不可用") # 查看GPU是否可用
- print('torch版本: ', torch.__version__) # 查看torch版本
- print('GPU数量: ', torch.cuda.device_count()) # 查看GPU数量
- print('GPU索引号: ', torch.cuda.current_device()) # 查看GPU索引号
- print('GPU名称: ', torch.cuda.get_device_name(0)) # 根据索引号得到GPU名称
- # 获取数据集个数
- save_path = 'model_data/balanced_bert_output_CNN_in_50_3_label'
- os.makedirs(save_path, exist_ok=True) # 创建保存文件目录
- train_path = 'data_set/douban_comment/balanced/balanced_train.txt'
- test_path = 'data_set/douban_comment/balanced/balanced_test.txt'
- verify_path = 'data_set/douban_comment/balanced/balanced_verify.txt'
- if Train == True:
- model = mymodel(word2vec_size, kernel_num, cnn_layer, learning_rate, class_num, Train, DEVICE).to(DEVICE)
- model.Training(train_path, verify_path, max_len, DEVICE, steps, batch_size, class_num, save_path)
- # 自行测试
- Train = False
- model.test(test_path, batch_size, class_num, save_path, test_data_save=True)
- model.test(test_path, batch_size, class_num, save_path, test_data_save=True)
- else:
- model = torch.load(save_path + "/model.pth") # 加载模型参数
- model.test(test_path, batch_size, class_num, save_path, test_data_save=True)
