赞
踩
目录
1、获取数据:选用小说《斗破苍穹》章节内容,并对数据进行人工标注。数据连接:斗破苍穹数据,提取码:jkzi
2、数据优化:根据规律尽可能将不属于小说剧情内容的文本删除(例如作者的感言,求月票等等。)
3、对小说的章节内容拆分成较短的文段或句子:章节内容的文本过长,大大降低了模型运行的速度。
4、将文本通过预训练模型(BERT、XLNet等)或者Word2Vec等别的方式进行分词转向量:通常预训练模型得到的词向量效果更好。分词一般分字,一个字对应一个标签
5、数据预处理:将转成的向量文本数据和标签数据规范化:文本数据张量形状为(批数据条数,最大序列长度,词向量维度),标签数据张量形状为(批数据条数,最大序列长度,类别个数)。
6、搭建模型:一般只需要搭建编码器部分,在接上全连接做分类器,损失函数选择条件随机场(CRF)。它能处理类别之间的紧密关系。条件随机场介绍:机器学习(有监督)——条件随机场CRF
7、模型调参、训练与评估:根据模型参数进行调参,一般主要调整的参数有:学习率、模型层数、训练次数。模型评估一般选择准确率和p、r、f1值。
1、模型:Bert-Att-CRF(由Bert、Self-attention、CRF组成)。如下图所示:
2、项目结构如下图所示:bert-base-chinese(BERT预训练模型)、Bert_att_crf(模型训练过程文件)、data(数据文件)。
3、代码文件内容:
项目地址:EntityRecognition · 唯有读书高/Knowledge Graph - 码云 - 开源中国 (gitee.com)
- import os
- import torch
-
-
- class Config(object):
- def __init__(self):
- self.save_file_name = 'Bert_att_crf'
- self.base_path = os.path.abspath('./') # 获取当前目录的绝对路径
- self.min_seq_len = 150
- self.max_seq_len = 200
- self.learning_rate = 1e-5
- self.drop_rate = 1e-2
- self.batch_size = 12
- self.label_num = 23
- self.layer_num = 2
- self.epoch = 20
- self.word_dim = 768
- self.save_model_path = os.path.join(self.base_path, self.save_file_name, 'model_weights.pth')
- self.Bert_path = os.path.join(self.base_path, 'bert-base-chinese')
- self.do_lower_case = True
- self.data_set_path = r'data/斗破苍穹_实体识别模型训练数据.xlsx'
- # 优先使用GPU
- self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
- import pickle
- from torch.utils.data import Dataset
- import pandas as pd
- from config import Config
- import os
-
-
- class NERDataset(Dataset):
- def __init__(self, config, tokenizer):
- """
- :param config: 本项目的参数设置
- :param tokenizer: 预训练模型的分词器
- """
- super(NERDataset, self).__init__()
- self.config = config
- self.tokenizer = tokenizer
-
- def load_pkl(self, path: str):
- """加载pkl文件"""
- with open(path, 'rb') as f:
- data = pickle.load(f)
- return data
-
- def save_pkl(self, data, path: str):
- """保存pkl文件"""
- with open(path, 'wb') as f:
- pickle.dump(data, f)
-
- def save_label_data(self, label: list, label_pkl_path: str):
- """
- :param label: 标签数据
- :param label_pkl_path: 存储路径
- :return:
- """
- label_set = '、'.join(label).split('、')
- label_set = list(set(label_set)) # 去重
- label_set.extend(['<START>', '<END>']) # 加入特殊标签
- label2id = {value: idx for idx, value in enumerate(label_set)}
- id2label = {idx: value for idx, value in enumerate(label_set)}
- label_dict = {'label2id': label2id, 'id2label': id2label}
- self.save_pkl(label_dict, label_pkl_path) # 保存标签文件
-
- return label_dict
-
- def read_excel(self, excel_path: str, sheet_name: str = 'Sheet1',
- train_mode: bool = True) -> (list, dict):
- """
- :param excel_path: 表格文件路径
- :param sheet_name: 表格名字
- :param train_mode: 是否是训练模式
- :return:
- """
- excel_path = os.path.join(self.config.base_path, excel_path)
- data = pd.read_excel(excel_path, sheet_name=sheet_name)
- # 训练模式
- if train_mode:
- text = data['文本'].tolist()
- label = data['标签'].tolist()
- # 获取标签字典
- os.makedirs(f'./{self.config.save_file_name}', exist_ok=True)
- label_pkl_path = os.path.join(self.config.base_path, self.config.save_file_name, "label_dict.pkl")
- # 是否有保存的标签类别信息,并且类别个数符合要求
- if os.path.exists(label_pkl_path):
- label_dict = self.load_pkl(label_pkl_path) # {'label2id': label2id, 'id2label':id2label}
- if len(self.load_pkl(label_pkl_path)['id2label']) != self.config.label_num:
- self.save_label_data(label, label_pkl_path)
- else: # 没有则从数据集集获取
- label_dict = self.save_label_data(label, label_pkl_path)
- """获取文本标注数据"""
- line = [[' '.join(list(text[i])), ' '.join(label[i].split('、'))] for i in range(len(text))]
-
- else: # 预测模式
- text = data['文本'].tolist()
- title = data['标题'].tolist()
- """获取标签字典"""
- label_pkl_path = os.path.join(self.config.base_path, self.config.save_file_name, "label_dict.pkl")
- # {'label2id': label2id, 'id2label':id2label}
- label_dict = self.load_pkl(label_pkl_path)
-
- """获取文本标注数据,并根据标点符号拆分句子到最大序列范围内"""
- line = []
- for i in range(len(text)): # 遍历章节数据
- this_text = text[i].replace('\n', '').replace(' ', '')
- this_title = title[i].replace('\n', '')
-
- start_idx = 0 # 句子的头一个index
- i = start_idx # 正在查询的index
- min_len = self.config.min_seq_len # 句子的最小长度
- while i < len(this_text):
- # 句子最后一个index没有超过文本长度, 并且是结束符号
- if i + min_len < len(this_text) and this_text[i + min_len] in '。?!,':
- end_idx = i + min_len
- this_sentence = this_text[start_idx:end_idx + 1] # 句子提取
- line.append((this_title, this_sentence))
- start_idx = end_idx + 1 # 更新开始index
- i = start_idx # 更新查询index
-
- # 超过文本长度(最后剩下的一点)
- elif i + min_len >= len(this_text):
- this_sentence = this_text[start_idx:] # 截取剩余句子
- line.append((this_title, this_sentence))
- break
- # 以上条件都不满足,查询下一个index
- else:
- i += 1
-
- return line, label_dict
-
- def data_process(self, excel_path: str, sheet_name: str = 'Sheet1',
- train_mode: bool = True) -> (list, dict):
- """
- :param excel_path: 表格文件路径
- :param sheet_name: 表格名字
- :param train_mode: 是否是训练模式
- :return:
- """
- # 读取数据
- pre_proces_line = []
- line, label_dict = self.read_excel(excel_path, sheet_name, train_mode=train_mode)
- # 训练模式
- if train_mode:
- label2id = label_dict['label2id'] # 获取转换字典
- for index, item in enumerate(line):
- text = item[0].split(' ')
- label = item[1].split(' ')
-
- # 使用BERT的tokenizer功能
- # 词嵌入
- max_seq_length = self.config.max_seq_len
- encoded_dict = self.tokenizer(''.join(text), padding='max_length', max_length=max_seq_length,
- truncation=True)
- decoded_text = self.tokenizer.convert_ids_to_tokens(encoded_dict['input_ids'])
- # 短补长截
- label = [label2id[seq] for seq in label]
- if len(label) >= max_seq_length-2: # 截断
- label = [label2id["<START>"]] + label[:max_seq_length-2] + [label2id["<END>"]]
- else: # 补充
- label = [label2id["<START>"]] + label + [label2id["<END>"]]
- while len(label) < max_seq_length:
- label.append(-1)
-
- text = encoded_dict['input_ids'] # 输入序列
- mask = encoded_dict['attention_mask'] # 输入掩码
- token_type_ids = encoded_dict['token_type_ids'] # 输入序列的token类别
- assert len(text) == len(label) == len(mask)
- pre_proces_line.append({'text': text, 'mask': mask, 'label': label,
- 'token_type_ids': token_type_ids, 'str_text': decoded_text})
-
- return pre_proces_line, label_dict
-
- # 预测模式
- else:
- for index, item in enumerate(line):
- title = item[0]
- text = item[1]
-
- # 使用BERT的tokenizer功能"""
- max_seq_length = self.config.max_seq_len
- encoded_dict = self.tokenizer(text, padding='max_length', max_length=max_seq_length,
- truncation=True)
- decoded_text = self.tokenizer.convert_ids_to_tokens(encoded_dict['input_ids'])
-
- text = encoded_dict['input_ids']
- mask = encoded_dict['attention_mask']
- token_type_ids = encoded_dict['token_type_ids']
- pre_proces_line.append({'text': text, 'mask': mask, 'label': title,
- 'token_type_ids': token_type_ids, 'str_text': decoded_text})
- assert len(text) == len(decoded_text)
-
- return pre_proces_line, label_dict
-
-
- if __name__ == '__main__':
- from transformers import BertTokenizer
-
- tokenizer_ = BertTokenizer.from_pretrained(Config().Bert_path, do_lower_case=Config().do_lower_case)
-
- dataset = NERDataset(Config(), tokenizer_)
- pre_processing_line, label_tag_dict = dataset.data_process(r'data/斗破苍穹(标注与未标注数据).xlsx',
- sheet_name='未标注数据', train_mode=False)
- print(label_tag_dict)
- print(len(label_tag_dict['label2id']))
也可以至今调用TorchCRF的CRF,这个是为了搞懂CRF写的。
- import torch.nn as nn
- import torch
- from torch import FloatTensor, Tensor, BoolTensor
- from config import Config
-
- class CRF(nn.Module):
- def __init__(self, num_labels: int):
- super(CRF, self).__init__()
- self.config = Config()
- self.num_labels = num_labels
- # 使用均匀分布初始化一个转移矩阵
- self.transfer_matrix = nn.Parameter(torch.empty(self.num_labels, self.num_labels))
- nn.init.uniform_(self.transfer_matrix, -0.1, 0.1)
- # 使用均匀分布初始化一个开始矩阵
- self.start_matrix = nn.Parameter(torch.empty(self.num_labels))
- nn.init.uniform_(self.start_matrix, -0.1, 0.1)
- # 使用均匀分布初始化一个结束矩阵
- self.end_matrix = nn.Parameter(torch.empty(self.num_labels))
- nn.init.uniform_(self.end_matrix, -0.1, 0.1)
-
- def forward(self, x: FloatTensor, y: Tensor, mask: BoolTensor
- ) -> Tensor:
- """
- 分子除以分母改为相减,希望的概率越大,获取的loss值会负方向趋近于0
- :param x: 特征序列(通常是经过RNN等模型提取到的特征张量)
- :param y: 标签序列
- :param mask: 填充符掩码(特征序列里含有填充符<pad>,对应的标签也有)
- :return: 损失值(负数)
- 公式: 概率 = 标签路径上的边和节点得分之和/所有边和节点得分之和
- 希望概率最大,因此公式转log使概率从负方向趋近于0。再取反便是loss(正数)
- """
-
- molecule = self.formula_molecule(x, y, mask).to(self.config.device)
- denominator = self.formula_denominator(x, mask).to(self.config.device)
- loss = molecule - denominator
-
- return loss
-
- def formula_molecule(self, x: FloatTensor, y: Tensor, mask: BoolTensor
- ) -> Tensor:
- """
- 计算公式的分子部分
- :param x: 特征序列(通常是经过RNN等模型提取到的特征张量)
- :param y: 标签序列
- :param mask: 填充符掩码(特征序列里含有填充符<pad>,对应的标签也有)
- :return: 分子得分
- """
- batch_size, len_seq, _ = x.size()
- batch_idx = torch.arange(batch_size) # tensor([ 0, 1, ...., batch_size])
- first_y = y[:, 0] # 每个序列的第一个类别标签
- last_y = y[:, -1] # 每个序列的最后的类别标签
-
- # 由开始到第一个标签的转移得分
- score = self.start_matrix[first_y]
- # 中间的得分
- for i in range(len_seq-1):
- now_y = y[:, i] # 当前标签的值y1
- next_y = y[:, i + 1] # 下一个标签的值y2
- now_mask = mask[:, i] # 排除掩码部分
- next_mask = mask[:, i + 1]
- transfer = self.transfer_matrix[now_y, next_y] # 当前时刻y1——>y2的转移权重
- now_x = x[batch_idx, i, now_y] # 当前标签的值x1
- score += now_x * now_mask + transfer * next_mask
- # 最后的得分
- score += self.end_matrix[last_y] # 加上最后结束的转移得分
-
- return score
-
- def formula_denominator(self, x: FloatTensor, mask: BoolTensor):
- """
- 计算所有边(转移权重)和节点(类别)的总得分作为分母,与有效序列长度有关,越长越大
- :param x: 特征序列(通常是经过RNN等模型提取到的特征张量)
- :param mask: 填充符掩码(特征序列里含有填充符<pad>,对应的标签也有)
- :return: 分别得分
- """
- batch_size, len_seq, _ = x.size()
- # 设置张量形状
- mask = mask.unsqueeze(-1).expand(batch_size, len_seq, self.num_labels)
- start_matrix = self.start_matrix.unsqueeze(0).expand(batch_size, self.num_labels)
- end_matrix = self.end_matrix.unsqueeze(0).expand(batch_size, self.num_labels)
-
- # 第一个token
- x_0 = x[:, 0]
- score = start_matrix + x_0
- # 中间的token
- for i in range(1, len_seq):
- this_x = x[:, i].unsqueeze(1)
- this_mask = mask[:, i]
- this_score = score.unsqueeze(-1) + self.transfer_matrix + this_x # 当前的结果
- this_score = torch.logsumexp(this_score, dim=1) # label1-->(label1/label2....)维度求和
- score = torch.where(this_mask, this_score, score) # 该位置是True就更新为当前结果
- # 最后的token
- score = score + end_matrix
- score = torch.logsumexp(score, dim=1) # len_seq维度求和
-
- return score
-
- def viterbi_decode(self, x: FloatTensor, mask: BoolTensor):
- """
- 预测时,利用维特比算法进行解码,获取到预测的标签序列
- :param x: 特征序列(通常是经过RNN等模型提取到的特征张量)
- :param mask: 填充符掩码(特征序列里含有填充符<pad>,对应的标签也有)
- :return: 标签结果[tensor(标签值), tensor(标签值)]
- """
- batch_size, len_seq, _ = x.size()
- # 用维特比算法筛选最大的得分路径
- # 将维度都拓展成(batch_size, num_labels, num_labels)
- start_matrix = self.start_matrix.unsqueeze(0).expand(batch_size, self.num_labels)
- x_0 = x[:, 0] # 序列第一个标签
- score = [start_matrix + x_0] # 记录维特比计算的得分
- path = [] # 记录维特比路径最大的id
- for i in range(1, len_seq):
- # 获取当前时刻的标签
- x_i = x[:, i].unsqueeze(1)
- # 对应路径的得分求和
- this_score = score[i-1].unsqueeze(-1) + self.transfer_matrix + x_i
-
- # 获取上个时刻标签分别到当前时刻标签得分的最大值和标签id(当前同一标签里的路径对比,不同的不比)
- # 例如有标签:1、2。获取上个时刻1与2里到当前时刻1(或2)得分的最大值和id,
- # 所以结果形状为(batch_size,num_labels)
- last_score, last_path = this_score.max(1)
- score.append(last_score) # 将更新后的得分添加到列表,用于下一个时刻的相加对比
- path.append(last_path)
-
- # 对筛选出来的得分路径进行解码
- effective_length = mask.sum(dim=1).squeeze(0) # 获取有效序列的长度(去除掩码部分)
- new_path = []
- _, max_index = score[-1].max(1) # 从最后一个筛选结果里进一步获取最好的结果
- # 将结果添加进去(从后面解码,结果是倒序的)
- new_path.append(max_index.tolist())
- for i in range(len(path)):
- rear_path = path[-1-i] # 倒数第i个序列的标签集
- batch_id = torch.arange(batch_size)
- max_index = rear_path[batch_id, max_index] # 根据结果索引max_index查找上一个最好的标签索引
- new_path.append(max_index.tolist())
-
- new_path = torch.tensor(new_path).T
- new_path = torch.flip(new_path, [1]).tolist() # 因为结果是倒序的,所以将每一行元素再进行倒序
- new_path = [new_path[i][:effective_length[i]] for i in range(batch_size)] # 只取有效序列部分
-
- return new_path
-
-
- if __name__ == '__main__':
- labels = ['a', 'b', 'c']
- X = torch.FloatTensor([[[0.1, 0.2, 0.8], [0.3, 0.8, 0.3], [0.5, 0.6, 0.3]],
- [[0.3, 0.2, 0.5], [0.3, 0.2, 0.8], [0.9, 0.1, 0.6]],
- [[0.7, 0.8, 0.8], [0.9, 0.1, 0.8], [0.2, 0.3, 0.6]]])
- Y = torch.LongTensor([[0, 1, 1],
- [2, 0, 1],
- [0, 2, 1]])
- Mask = torch.LongTensor([[1, 1, 1],
- [1, 1, 0],
- [1, 1, 1]])
-
- crf = CRF(len(labels))
- Loss = crf.forward(X, Y, Mask.byte())
- label = crf.viterbi_decode(X, Mask.byte())
- print(Loss)
- print(label)
- import torch
- import torch.nn as nn
- from transformers import BertModel
- from CRF import CRF
- from torch import Tensor
-
-
- class BertAttCRF(nn.Module):
- def __init__(self, myconfig, pre_config):
- """
- :param myconfig: 本次项目需要传入的参数配置
- :param pre_config: 预训练模型的参数配置
- """
- super(BertAttCRF, self).__init__()
- self.config = myconfig
- self.bert = BertModel.from_pretrained(self.config.Bert_path, config=pre_config)
- self.drop = nn.Dropout(p=self.config.drop_rate) # 随机丢失一小部分,放在过拟合
-
- # self-attention
- self.attention = nn.MultiheadAttention(embed_dim=self.config.word_dim, num_heads=8)
- self.layer_norm = nn.LayerNorm(self.config.word_dim) # 层归一化
- self.linear_layer = nn.Linear(self.config.word_dim, self.config.label_num) # 全连接
-
- self.crf = CRF(num_labels=self.config.label_num)
-
- def forward(self, input_ids: Tensor, attention_mask: Tensor,
- token_type_ids: Tensor, tags: Tensor):
- """
- :param input_ids: torch.Size([batch_size,seq_len]), 代表输入实例的tensor张量
- :param token_type_ids: torch.Size([batch_size,seq_len]), 一个实例可以含有两个句子,相当于标记
- :param attention_mask: torch.Size([batch_size,seq_len]), 指定对哪些词进行self-Attention操作
- :param tags: 标签
- :return:
- """
- output = self.bert(input_ids, token_type_ids=token_type_ids,
- attention_mask=attention_mask)
- sequence_output = output[0] # torch.Size([batch_size,seq_len,hidden_size])
- # attention n_layer
- for _ in range(self.config.layer_num): # 残差结构
- output = self.layer_norm(sequence_output) # LayerNormal归一化
- output = self.attention(output, output, output,
- key_padding_mask=attention_mask.T)
- sequence_output = torch.add(sequence_output, output[0])
-
- sequence_output = self.drop(sequence_output)
- emissions = self.linear_layer(sequence_output) # [seq_length, batch_size, num_labels]
- loss = -1 * self.crf(emissions, tags, mask=attention_mask.byte())
-
- return loss
-
- def predict(self, input_ids: Tensor, attention_mask=None,
- token_type_ids: Tensor = None):
- """
- :param input_ids: torch.Size([batch_size,seq_len]), 代表输入实例的tensor张量
- :param token_type_ids: torch.Size([batch_size,seq_len]), 一个实例可以含有两个句子,相当于标记
- :param attention_mask: torch.Size([batch_size,seq_len]), 指定对哪些词进行self-Attention操作
- :return:
- """
- outputs = self.bert(input_ids, token_type_ids=token_type_ids,
- attention_mask=attention_mask)
- sequence_output = outputs[0]
- for _ in range(self.config.layer_num): # 残差结构
- output = self.layer_norm(sequence_output) # LayerNormal归一化
- output = self.attention(output, output, output,
- key_padding_mask=attention_mask.T)
- sequence_output = torch.add(sequence_output, output[0])
-
- sequence_output = self.drop(sequence_output)
- sequence_output = self.linear_layer(sequence_output)
- # CRF维特比算法解码
- sequence_output = self.crf.viterbi_decode(sequence_output,
- attention_mask.byte())
- return sequence_output
- import pandas
- from tqdm import tqdm
- from config import Config
- from dataload import NERDataset
- from BERT_ATT_CRF import BertAttCRF
- import torch
- from transformers import BertTokenizer, BertConfig
- import time
- import random
- import os
-
-
- class RunBertAttCRF(object):
- def __init__(self, config: Config):
- """
- :param config: 本次项目需要传入的参数配置
- """
- self.config = config
- # 优先使用GPU
- self.device = self.config.device
-
- # Bert
- self.tokenizer = BertTokenizer.from_pretrained(self.config.Bert_path,
- do_lower_case=self.config.do_lower_case)
- self.pre_config = BertConfig.from_pretrained(self.config.Bert_path,
- num_labels=self.config.label_num)
- self.model = BertAttCRF(self.config, pre_config=self.pre_config).to(self.device)
-
- # 初始化模型参数优化器
- self.optimizer = torch.optim.Adam(self.model.parameters(),
- lr=self.config.learning_rate)
-
- def train(self, excel_path: str, sheet_name: str = 'Sheet1', ):
- """
- :param excel_path: 训练数据表格路径
- :param sheet_name: 表格名字
- :return:
- """
- self.model.train()
- data_set = NERDataset(self.config, self.tokenizer) # 实例化数据处理类
- # 获取预处理的数据
- process_line, label_tag_dict = data_set.data_process(excel_path,
- sheet_name=sheet_name)
- # process_line = process_line[:int(len(process_line)*0.02)]
- # 走一遍数据需要的批数
- batch_num = (len(process_line) // self.config.batch_size
- if len(process_line) % self.config.batch_size == 0
- else (len(process_line) // self.config.batch_size) + 1)
-
- random.shuffle(process_line) # 打乱
- max_acc = 0
- for e in range(self.config.epoch):
- all_loss = [] # 汇总一遍数据的损失值
- start_time = time.time() # 记时
- for batch in range(batch_num):
- # 选的批次数据位置没超过最大数据长度
- if (batch + 1) * self.config.batch_size <= len(process_line):
- batch_line = process_line[batch * self.config.batch_size:
- (batch + 1) * self.config.batch_size]
- else:
- batch_line = process_line + process_line
- batch_line = batch_line[batch * self.config.batch_size:
- (batch + 1) * self.config.batch_size]
- text = torch.tensor([item['text'] for item in batch_line], dtype=torch.long)
- mask = torch.tensor([item['mask'] for item in batch_line], dtype=torch.float)
- token_type_ids = torch.tensor([item['token_type_ids'] for item in batch_line],
- dtype=torch.long)
- label_ = torch.tensor([item['label'] for item in batch_line])
-
- # 开始训练,计算梯度
- self.optimizer.zero_grad()
- loss = self.model.forward(text.to(self.device), mask.to(self.device),
- token_type_ids.to(self.device), label_.to(self.device))
- loss.mean().backward() # 损失反传
- self.optimizer.step() # 更新梯度
- all_loss += loss.tolist()
- print(f'\repoch:{e},batch:{(batch + 1)}, '
- f'LOSS:{round(loss.mean().item(), 3)}', end='') #
-
- need_time = (time.time() - start_time) / 60 # 获取一个epoch的运行时间
- mean_loss = round(sum(all_loss) / len(all_loss), 3)
- print(f'\repoch:{e}, mean_LOSS:{mean_loss},'
- f' time:{round(need_time, 3)}m')
-
- if (e + 1) % 2 == 0:
- # 记录参数的验证效果
- verify_result, verify_label, _ = self.test(self.config.data_set_path, sheet_name='verify')
- accuracy_, precision_, recall_, f1_, conf_matrix_ = self.acc_prf1(verify_result,
- verify_label)
- print(f'acc{accuracy_}\np{precision_}\nr{recall_}\nf1{f1_}\n') # {conf_matrix_}\n
-
- os.makedirs(f'./{self.config.save_file_name}', exist_ok=True)
- # 保存训练过程
- file_ = open(f'./{self.config.save_file_name}/verify_result.txt', 'a', encoding='utf-8')
- file_.write(f'参数:epoch:{e}, mean_loss:{mean_loss}, lr:{self.config.learning_rate}, '
- f'drop_rate:{self.config.drop_rate}, '
- f'batch_size:{self.config.batch_size}, layer_num:{self.config.layer_num}\n'
- f'verify评估:acc:{accuracy_}, p:{precision_}, r:{recall_}, f1:{f1_}, '
- f'time:{round(need_time, 3)}\n\n') # , \nconf_matrix:{conf_matrix_}
- # 如果模型效果更好,保存模型
- if accuracy_ - max_acc >= 0:
- # 保存模型
- torch.save(self.model.state_dict(), self.config.save_model_path)
- max_acc = accuracy_ # 准确率更新
- # 加载目前效果最好的权重
- self.model.load_state_dict(torch.load(myconfig.save_model_path))
-
- def test(self, excel_path: str, sheet_name: str = 'Sheet1') -> (list, list, dict):
- """
- :param excel_path: 训练数据表格路径
- :param sheet_name: 表格名字
- :return:
- """
- self.model.eval()
- # 获取预处理的数据
- data_set = NERDataset(self.config, self.tokenizer)
- process_line, label_tag_dict = data_set.data_process(excel_path, sheet_name=sheet_name)
- batch_num = len(process_line) // self.config.batch_size
-
- all_result_ = []
- all_label = []
- for batch in range(batch_num):
- # 按顺序取批数据, 多出来不够一个batch_size的不要了
- batch_line = process_line[batch * self.config.batch_size: (batch + 1) * self.config.batch_size]
- text = torch.tensor([item['text'] for item in batch_line], dtype=torch.long)
- mask = torch.tensor([item['mask'] for item in batch_line], dtype=torch.float)
- token_type_ids = torch.tensor([item['token_type_ids'] for item in batch_line], dtype=torch.long)
- label_ = [item['label'] for item in batch_line]
- # 模型预测
- result_ = self.model.predict(text.to(self.device), mask.to(self.device),
- token_type_ids.to(self.device))
- # 结果汇总
- all_result_ += result_
- all_label += label_
- # 将测试结果加上填充符发标签,方便进行评估指标计算
- new_all_result = []
- for item in all_result_:
- if len(item) < self.config.max_seq_len: # 预测结果小于最大长度进行填充
- item = item + [-1] * (self.config.max_seq_len - len(item))
- new_all_result.append(item)
-
- return new_all_result, all_label, label_tag_dict['label2id']
-
- def predict(self, excel_path: str, sheet_name: str = 'Sheet1') -> list:
- """
- :param excel_path: 训练数据表格路径
- :param sheet_name: 表格名字
- :return:
- """
- self.model.eval()
- # 获取预处理的数据
- print('数据加载中···')
- data_set = NERDataset(self.config, self.tokenizer) # 实例化数据处理类
- process_line, label_tag_dict = data_set.data_process(excel_path, sheet_name=sheet_name, train_mode=False)
- batch_num = len(process_line) // self.config.batch_size
-
- all_result_ = []
- for batch in tqdm(range(batch_num + 1)):
- end_id = None # 用于去掉最后凑batch size部分
- # 按顺序取批数据
- if (batch + 1) * self.config.batch_size <= len(process_line): # 选的批次数据位置没超过最大数据长度
- batch_line = process_line[batch * self.config.batch_size: (batch + 1) * self.config.batch_size]
- else: # 最后凑batch size
- batch_line = process_line + process_line
- batch_line = batch_line[batch * self.config.batch_size: (batch + 1) * self.config.batch_size]
- end_id = len(process_line) - batch * self.config.batch_size # 记录数据结束位置
-
- text = torch.tensor([item['text'] for item in batch_line], dtype=torch.long)
- mask = torch.tensor([item['mask'] for item in batch_line], dtype=torch.float)
- token_type_ids = torch.tensor([item['token_type_ids'] for item in batch_line], dtype=torch.long)
- title = [item['label'] for item in batch_line]
- str_text = [item['str_text'] for item in batch_line]
- # 模型预测
- result_ = self.model.predict(text.to(self.device), mask.to(self.device),
- token_type_ids.to(self.device))
- # 如果存在凑batch size,去掉凑的部分
- if end_id is not None:
- result_ = result_[:end_id]
- all_result_ += [(result_[i], title[i], str_text[i]) for i in range(len(result_))]
-
- return all_result_
-
- def acc_prf1(self, result_: list, result_label: list):
- """
- :param result_: 预测结果
- :param result_label: 标签
- :return:
- """
- # 预测值和标签值
- predicted = torch.tensor(result_)
- target = torch.tensor(result_label)
- # acc
- correct = torch.sum((predicted == target).int()).item() # 计算准确预测的样本数量
- accuracy_ = correct / target.numel() # 计算准确率
- # 计算混淆矩阵
- conf_matrix_ = torch.zeros((self.config.label_num, self.config.label_num))
- for t, p in zip(target, predicted):
- for i in range(len(t)):
- conf_matrix_[t[i], p[i]] += 1
-
- p = torch.diag(conf_matrix_) / (conf_matrix_.sum(dim=0) + 1e-8) # 计算精确率
- r = torch.diag(conf_matrix_) / (conf_matrix_.sum(dim=1) + 1e-8) # 计算召回率
- f1_ = 2 * p * r / (p + r + 1e-8) # 计算 F1 值
-
- return accuracy_, p, r, f1_, (conf_matrix_ / conf_matrix_.sum(dim=1, keepdim=True))
-
-
- if __name__ == '__main__':
- # 设置TensorFlow的OneDNN自定义操作环境变量
- os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'
- myconfig = Config()
- """设置不同参数组训练,完成后比较效果进行调参"""
- # params = {'drop_rate':[1e-1,1e-2,1e-3,1e-4],
- # 'learning_rate':[1e-1,1e-2,1e-3,1e-4],
- # 'layer_num':[1,2,3,4],}
- # for key in params.keys():
- # for item in params[key]:
- # setattr(myconfig, key, item) # 使用setattr函数将参数值赋给类的属性
- # print(key, item)
- # the_model = RunBertAttCRF(myconfig)
- # the_model.train(myconfig.data_set_path,sheet_name='train')
-
- """训练模型"""
- run = RunBertAttCRF(myconfig)
- run.train(myconfig.data_set_path, sheet_name='train')
-
- # """最终测试模型效果"""
- run = RunBertAttCRF(myconfig)
- run.model.load_state_dict(torch.load(myconfig.save_model_path))
- run.model.eval()
- result, label, label2id = run.test(myconfig.data_set_path, sheet_name='test')
- accuracy, precision, recall, f1, conf_matrix = run.acc_prf1(result, label)
- print(f'acc{accuracy}\np{precision}\nr{recall}\nf1{f1}\n') # {conf_matrix}
- os.makedirs(f'./{myconfig.save_file_name}', exist_ok=True)
- file = open(f'./{myconfig.save_file_name}/test_result.txt', 'a', encoding='utf-8')
- file.write(f'test评估:acc:{accuracy}, p:{precision}, r:{recall}, f1:{f1}\n'
- f'\n\n') # conf_matrix:{conf_matrix}
-
- """模型应用,预测未标注数据"""
- run = RunBertAttCRF(myconfig)
- run.model.load_state_dict(torch.load(myconfig.save_model_path))
- run.model.eval()
- all_result = run.predict(r'data/斗破苍穹(标注与未标注数据).xlsx', sheet_name='未标注数据部分')
- header = ['标签', '标题', '文本']
- all_result = pandas.DataFrame(all_result, columns=header)
- all_result.to_excel("data/斗破苍穹_未标注数据实体预测结果.xlsx")
- import pickle
-
- import pandas as pd
- from config import Config
- import os
-
- # 不同类别实体的标签
- jz_entity_target = ['B-jz', 'I-jz']
- zmsl_entity_target = ['B-zmsl', 'I-zmsl']
- zy_entity_target = ['B-zy', 'I-zy']
- djhj_entity_target = ['B-djhj', 'I-djhj']
- gf_entity_target = ['B-gf', 'I-gf']
- mf_entity_target = ['B-mf', 'I-mf']
- yh_entity_target = ['B-yh', 'I-yh']
- wq_entity_target = ['B-wq', 'I-wq']
- zw_entity_target = ['B-zw', 'I-zw']
- rw_entity_target = ['B-rw', 'I-rw']
- # 方便查找标签属于哪个类别
- entity_data_dict = {'jz': jz_entity_target,
- 'zmsl': zmsl_entity_target,
- 'zy': zy_entity_target,
- 'djhj': djhj_entity_target,
- 'gf': gf_entity_target,
- 'mf': mf_entity_target,
- 'yh': yh_entity_target,
- 'wq': wq_entity_target,
- 'zw': zw_entity_target,
- 'rw': rw_entity_target,
- }
-
- # 加载id2label信息
- config = Config()
- label_pkl_path = os.path.join(config.base_path, config.save_file_name, "label_dict.pkl")
- label_dict = open(label_pkl_path, 'rb')
- label_dict = pickle.load(label_dict)
- id2label = label_dict['id2label']
-
- results = pd.read_excel('data/斗破苍穹_未标注数据实体预测结果.xlsx')
-
- last_title = ''
- last_title_label = []
- last_title_text = ''
- last_title_entity = []
- all_title_label = []
- for index, row in results.iterrows():
- label = row['标签'] # 字符串形式的列表
- label = eval(label) # 转回列表
- text = row['文本']
- text = eval(text)
- text = [item for item in text if item != '[PAD]'] # 去除填充符
- title = row['标题']
- entity = []
- assert len(text) == len(label)
-
- start = None
- start_type = None
- end = None
- label_id_0_type = None
- label_id_1_type = None
- for i in range(len(label)-1):
- # 查看当前标签
- str_label_0 = id2label[label[i]] # 将数字标签转为字符串标签
- if str_label_0 == '<START>' or str_label_0 == '<END>': # 特殊符号跳过
- continue
- elif str_label_0 == 'O': # 非实体标签
- label_id_0 = 9999
- else: # 实体标签
- label_id_0 = str_label_0.split('-')[-1]
- label_id_0_type = label_id_0 # 标签对应的实体类型
- label_id_0 = entity_data_dict[label_id_0] # 该类型的标签列表
- label_id_0 = label_id_0.index(str_label_0) # 获取该字符标签在该列表里的索引
-
- # 查看下一个标签
- str_label_1 = id2label[label[i + 1]] # 将数字标签转为字符串标签
- if str_label_1 == '<START>' or str_label_1 == '<END>':
- label_id_1 = '特殊符号'
- elif str_label_1 == 'O':
- label_id_1 = 9999
- else:
- label_id_1 = str_label_1.split('-')[-1] # 获取标签对应的实体类型
- label_id_1_type = label_id_1
- label_id_1 = entity_data_dict[label_id_1] # 根据类型获取改类型的标签列表
- label_id_1 = label_id_1.index(str_label_1) # 获取该字符标签在该列表里的索引
-
- # 匹配(B,O) {B:0, I:1, O:9999}
- if ((label_id_0 == 0 and label_id_1 == 9999) or # B、O情况
- (label_id_0 == 0 and label_id_1 == 1 and label_id_0_type != label_id_1_type)): # 不同类型的B、I情况
- print("(B,O):", str_label_0)
- # start = i
- # start_type = label_id_0_type
- # end = i + 1
-
- # 本数据没这种情况
- start = None
- start_type = None
- end = None
-
- # 匹配(B,I,···,O)、(B,I,O)
- else:
- if label_id_0 == 0 and label_id_1 == 1 and label_id_0_type == label_id_1_type: # 同类型B、I情况(开始位置)
- print("(B,I)start:", str_label_0)
- start = i
- start_type = label_id_0_type
- elif ((label_id_0 == 1 and label_id_1 == 0) or # I、B情况(结束位置)
- (label_id_0 == 1 and label_id_1 == 9999) # I、O情况(结束位置)
- ): # I、B情况(结束位置)
- print("(I,O)end:", str_label_0)
- end = i + 1
- elif label_id_0 == 9999: # 当前出现O,清空标记 # O、?情况(无用位置, 表示已经获取完成)
- start = None
- start_type = None
- end = None
- else: pass
-
- # 根据start和end截取实体
- if start is not None and end is not None and start_type is not None and int(start) < int(end):
- this_entity = text[start:end]
- this_entity = ''.join(this_entity)
- print('result————>', this_entity)
- entity.append((this_entity, start_type))
- start = None
- start_type = None
- end = None
-
- # 按章节归类
- if title == last_title: # 同一章节名
- last_title_label += label
- last_title_text += text
- last_title_entity += entity
- if int(index) == len(results)-1: # 最后一章最后一条数据,添加最后一章
- all_title_label.append([last_title, list(set(last_title_entity))])
- else: # 不同章节名,则上一个章节提取完成
- if int(index) > 0: # 除去开始,之后的标题不同,说明上一章节处理完成
- all_title_label.append([last_title, list(set(last_title_entity))])
- last_title = title
- last_title_label = label
- last_title_text = text
- last_title_entity = entity
-
- # 添加进对应的章节内容(完整的章节内容)
- text_file = pd.read_excel(r'data/斗破苍穹(标注与未标注数据).xlsx', sheet_name='未标注数据部分')
- for index, row in text_file.iterrows():
- title = row['标题']
- text = row['文本'].replace(' ', '')
-
- if str(title) != str(all_title_label[int(index)][0]):
- print(title, all_title_label[int(index)][0])
- all_title_label[int(index)].append(text)
-
- header = ['标题', '识别结果', '文本']
- data = pd.DataFrame(all_title_label, columns=header)
- data.to_excel('data/斗破苍穹_预测结果提取.xlsx')
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。