赞
踩
欢迎来到BERT-BiLSTM中文情感识别项目!我们利用BERT模型提取文本语义特征,结合BiLSTM网络学习时序信息,显著提升中文情感识别性能。为解决训练时间长问题,我们部署在GPU环境,加速模型训练。项目提供可视化中文情感识别系统,欢迎贡献代码、建议或数据,共同优化模型,让中文情感识别技术更上一层楼!
设计一个基于BERT-BILSTM的模型,用于对中文文本进行情感分类。本系统的关键组成部分在于集成了一个经过训练的BERT-BILSTM模型,该模型专门用于对中文文本进行情感倾向的预测。通过此模型的深度学习架构,系统能够有效地识别和分类文本中所蕴含的复杂情感状态。系统流程如图1所示。
图1 系统流程图
首先,用户提交的文本数据被系统接收并传输至服务器后端。然后,数据通过BertTokenizer进行预处理,这个过程包括分词、文本填充以及序列化等步骤。完成预处理后,系统调用预先训练定型的BERT-BiLSTM模型来对数据进行情感分类预测,并生成预测的分类结果。最后,这些分类结果被转换为相应的情感标签,并反馈至系统前端以供用户查看。整个系统架构是基于Flask框架构建。
数据加载使用python自带的JSON库读取对应JSON格式的TXT文件,将读取的数据封装成一个dataset数据集,在使用Dataloader类进行数据加载。具体的步骤有:打开TXT文件并读取其中的文本和标签,之后加载BertTokenizer对文本数据进行切割处理,最后重写getitem方法,使得能通过数组下标方式直接获取文本和标签数据。代码实现如下。
- # 导入调用库
- import json
- import lib
- import torch
- from transformers import BertTokenizer
- from torch.utils.data import Dataset, DataLoader
- # 加载BERT分词工具
- tokenizer = BertTokenizer.from_pretrained('bert-base-chinese')
-
- # 定义一个名为dataset的类,继承自PyTorch的Dataset类
- class dataset(Dataset):
-
- # 初始化方法,接收文件路径作为参数
- def __init__(self, file_path):
- # 打开指定路径的文件,编码格式为utf-8
- with open(file_path, 'r', encoding="utf-8") as f:
- lines = f.read()
- # 将读取的JSON字符串转换为列表,列表中包含字典
- self.datas = json.loads(lines)
- # 从列表中中获取文本
- self.contents = [data['content'] for data in self.datas]
- # 从列表中获取标签
- self.labels = [data['label'] for data in self.datas]
- # 对文本进行分词:对每个文本内容进行分词处理,填充序列到最大长度,如果文本长度
- 超过最大长度,则进行截断,返回PyTorch张量
- self.text = [tokenizer(text,padding='max_length',max_length=lib.max_length,
- truncation=True,return_tensors='pt') for text in self.contents]
-
- # 实现getitem方法,用于支持下标索引
- def __getitem__(self, idx):
- # 根据下标获取文本数据
- text = self.text[idx]
- # 根据下标获取标签
- label = self.labels[idx]
- # 将标签转化相应的序号
- label = labels_id[label]
- 返回文本数据和标签
- return text, label
-
- # 实现len方法,返回数据集的大小
- def __len__(self):
- return len(self.text)
-
- # 重写ollate_fn的函数,用实现将多个数据样本组合成一个批次的数据
- def collate_fn(batch):
- #将批次中的数据解包并重新组合成列表
- batch = list(zip(*batch))
- # 将标签列表转换为PyTorch长整数张量
- labels = torch.LongTensor(batch[1])
- # 获取input_ids
- texts = [item['input_ids'] for item in batch[0]]
- # 将列表中的张量堆叠成一个形状为(batch_size, max_seq_length)的张量
- texts = torch.stack(texts)
- # 获取attention_mask
- attention_masks = [item['attention_mask'] for item in batch[0]]
- # 将列表中的张量堆叠成一个形状为(batch_size, max_seq_length)的张量
- attention_masks = torch.stack(attention_masks)
-
- # 删除原始的batch变量
- del batch
-
- # 返回堆叠后的文本、注意力掩码和标签张量
- return texts, attention_masks, labels
-
- # 数据集加载
- if __name__ == '__main__':
-
- # 实例化dataset对象
- dataset = dataset(file_path)
-
- # 使用Dataloader类加载dataset
- dataloader = DataLoader(dataset=dataset, batch_size=1, shuffle=True, collate_fn=collate_fn)
数据预处理包括数据清洗、分词、去停用词等操作。首先,在第2小节已经完成了数据加载工作之后,将数据中的文本单独取出,先判断文本内容是否为空,如果为空,则将该条记录删除;否则,依次使用不同的正则表达式对文本进行URL链接去除、微博@标签去除,微博话题标签去除和emoji表情出去;完成这些步骤之后,再将文本中的繁体中文转化为简体中文;之后,对文本标签进行判断,为空则将该记录删除。清洗完之后,还需要将数据保存到TXT中。接下来对这些操作进行代码实现。
- # 导入调用库
- import re
- import zhconv
- from utils import read_json, save_json
-
- # 实现去除url功能
- def remove_url(text):
- vTEXT = re.sub(r'(https|http)?:\/\/(\w|\.|\/|\?|\=|\&|\%)*\b', '', text, flags=re.MULTILINE)
- return vTEXT
-
- # 实现去除微博@标记功能
- def remove_weibo_at_tags(text):
- weibo_at_tag_pattern = r"@[^\s]{1,6}"
- return re.sub(weibo_at_tag_pattern, "", text)
-
- # 实现去除微博话题标签功能
- def remove_weibo_topic_tags(text):
- weibo_topic_tag_pattern = r"#[^\s]+#"
- return re.sub(weibo_topic_tag_pattern, "", text)
-
- # 去除emoji表情符号功能
- def remove_emojis(text):
- # 过滤表情
- try:
- co=re.compile(u'['u'\U0001F300-\U0001F64F' u'\U0001F680-\U0001F6FF'u'\u2600-\u2B55]+')
- except re.error:
- co = re.compile(u'('u'\ud83c[\udf00-\udfff]|'u'\ud83d[\udc00-\ude4f\ude80-\udeff]|'u'[\u2600-\u2B55])+')
- return co.sub("", text)
-
- # 实现繁体中文转简体中文功能
- def traditional_to_simplified(text):
- return zhconv.convert(text, 'zh-cn')
-
- # 完整的数据清洗流程
- def data_clean(file_path='../Data/txt/usual_train.txt', data_path='../Data/txt/data.txt'):
- # 1、读取TXT文件
- datas = read_json(file_path)
- # 2、数据清洗:从文件中取出content,修改,然后写回文件
- for data in datas:
- content = data['content']
- label = data['label']
- # 如果content 为非空字符串
- if content:
- # 去URL
- content = remove_url(content)
- # 去微博@标签
- content = remove_weibo_at_tags(content)
- # 去微博话题标签
- content = remove_weibo_topic_tags(content)
- # 去emoji表情符号
- content = remove_emojis(content)
- # 繁体字转简体字
- content = traditional_to_simplified(content)
- # 将修改后的content写回文件
- data['content'] = content
- else:
- # 如果content为空,则移除相应的数据
- datas.remove(data)
- # 如果标签为空,则移除相应的数据
- if len(label) == 0:
- datas.remove(data)
- # 3、写回TXT文件
- save_json(datas, data_path)
- print("数据清洗完成!")
模型实现基于以下模型网络设计。
图2 模型网络图
首先,定义一个BERT-BILSTM类;然后在init方法中定义网络的各个层级,在后续使用该类进行实例化的时候,即可通过该方法完成对网络参数初始化的操作;最后,根据设定的网络结构进行前向传播(forward)操作,在往该类实例传入数据,即可计算出模型的输出。代码实现如下。
- # 导入调用库
- import torch
- from torch import nn
- from transformers import BertModel
- import torch.nn.functional as F
- import lib
- # 定义模型
- class Bert_BiLstm(nn.Module):
- # 初始化网络
- def __init__(self):
- super(Bert_BiLstm, self).__init__()
- # 使用预训练的BERT模型
- self.bert = BertModel.from_pretrained('bert-base-chinese')
- # 初始化双向LSTM层,输入大小为768(BERT的隐层大小),隐藏层大小和层数 由lib模块定义,设置dropout
- self.lstm = nn.LSTM(768, hidden_size=lib.hidden_size, num_layers=lib.num_layers, batch_first=True,bidirectional=True, dropout=lib.dropout)
- # 初始化层归一化层,输入大小为2倍LSTM的隐藏层大小
- self.ln = nn.LayerNorm(2 * lib.hidden_size)
- # 初始化全连接层,输入大小为2倍LSTM的隐藏层大小,输出大小为6 self.fc = nn.Linear(2 * lib.hidden_size, 6)
- # 前向传播
- def forward(self, input_id, mask):
- # 不进行梯度更新
- with torch.no_grad():
- outputs = self.bert(input_ids=input_id, attention_mask=mask)
- # 获取最后一层bert编码
- hidden_state = outputs.last_hidden_state
- # 将BERT的输出传入LSTM,获取LSTM的输出
- _, (h_n, c_n) = self.lstm(hidden_state)
- # 获取前向LSTM的最后一个时间步的隐状态
- output_fw = h_n[-2, :, :]
- # 获取后向LSTM的最后一个时间步的隐状态
- output_bw = h_n[-1, :, :]
- # 正反两个方向的结果进行拼接
- outputs = torch.cat([output_fw, output_bw], dim=1)
- # 对拼接后的输出进行层归一化
- outputs = self.ln(outputs)
- # 输出传入全连接层
- outputs = self.fc(outputs)
- return F.log_softmax(outputs, dim=-1)
模型训练使用Adam作为模型优化器,CrossEntropyLoss作为损失函数,模型的训练次数通过外部传入的epochs参数决定。首先使用第2小节的实现的数据加载器加载训练数据和测试数据;之后,将加载的数据放入GPU环境中,在GPU环境中能加速模型训练速度;然后,进行梯度清零,防止梯度累加导致梯度爆炸;随后,进行前向传播输出模型预测结果;在随后,将预测结果与实际标签进行比对计算损失值;最后,进行反向传播,更新梯度。在模型评估阶段与训练阶段不同的是,模型评估阶段,不需要进行梯度更新操作。详细代码如下。
- import warnings
-
- import numpy as np
- import torch.nn.functional as F
- from sklearn.metrics import precision_score, recall_score, f1_score
- from torch import optim
- from torch.utils.data import DataLoader
- import torch
-
- import lib
- from dataset_by_bert import dataset, collate_fn
-
- warnings.simplefilter('ignore')
-
-
- def train_eval(model, data_path, epochs, model_path, result_path, is_save=False):
- with open(result_path, "a") as file:
- file.write(model.__class__.__name__ + ' model result:' + '\n')
- print('*********** ' + model.__class__.__name__ + ' model *************')
-
- # 检查是否有可用的GPU
- if torch.cuda.is_available():
- device = torch.device("cuda")
- else:
- device = torch.device("cpu")
- model.to(device)
-
- # 加载训练集和测试集
- datas = dataset(data_path)
- train_dataset, test_dataset = torch.utils.data.random_split(datas,
- [int(len(datas) * 0.8),
- len(datas) - int(len(datas) * 0.8)])
-
- train_dataloader = DataLoader(train_dataset, batch_size=lib.train_batch_size, shuffle=True,
- collate_fn=collate_fn)
- val_dataloader = DataLoader(test_dataset, batch_size=lib.eval_batch_size, collate_fn=collate_fn)
-
- # 定义优化器和损失函数
- optimizer = optim.Adam(model.parameters(), lr=lib.learning_rate)
-
- # 模型训练
- for epoch in range(epochs):
- model.train() # 确保模型处于训练模式
- loss_list_train = []
- acc_list_train = []
- pred_train_list = []
- label_train_list = []
-
- # for idx, (input_id, mask, label) in tqdm(enumerate(train_dataloader), total=len(train_dataloader)):
- for idx, (input_id, mask, label) in enumerate(train_dataloader):
- input_id = input_id.squeeze(1).to(device)
- mask = mask.squeeze(1).to(device)
- label = label.to(device)
-
- optimizer.zero_grad()
- output = model(input_id, mask)
- batch_loss = F.nll_loss(output, label)
- loss_list_train.append(batch_loss.item())
-
- batch_loss.backward()
- optimizer.step()
-
- pred = output.max(dim=-1)[-1]
- cur_acc = pred.eq(label).float().mean()
- acc_list_train.append(cur_acc.item())
-
- # 存储预测和标签
- pred_train_list.append(pred.cpu().numpy())
- label_train_list.append(label.cpu().numpy())
-
- # 计算训练集的精准率、召回率和F1值
- pred_train = np.concatenate(pred_train_list, axis=0)
- label_train = np.concatenate(label_train_list, axis=0)
- precision_train = precision_score(label_train, pred_train, average='weighted')
- recall_train = recall_score(label_train, pred_train, average='weighted')
- f1_train = f1_score(label_train, pred_train, average='weighted')
-
- # 模型保存
- if is_save:
- torch.save(model.state_dict(), model_path)
- print(
- f"epoch{epoch + 1}, 训练集损失={np.mean(loss_list_train):f}, 准确率={np.mean(acc_list_train):f}, 精准率={precision_train:f}, 召回率={recall_train:f}, F1值={f1_train:f}")
-
- model.eval() # 确保模型处于评估模式
- loss_list_eval = []
- acc_list_eval = []
- pred_val_list = []
- label_val_list = []
-
- with torch.no_grad():
- # for _, (input_id, mask, label) in tqdm(enumerate(val_dataloader), total=len(val_dataloader)):
- for _, (input_id, mask, label) in enumerate(val_dataloader):
- input_id = input_id.squeeze(1).to(device)
- mask = mask.squeeze(1).to(device)
- label = label.to(device)
- output = model(input_id, mask)
-
- batch_loss = F.nll_loss(output, label)
- loss_list_eval.append(batch_loss.item())
-
- pred = output.max(dim=-1)[-1]
- cur_acc = pred.eq(label).float().mean()
- acc_list_eval.append(cur_acc.item())
-
- # 存储预测和标签
- pred_val_list.append(pred.cpu().numpy())
- label_val_list.append(label.cpu().numpy())
-
- # 合并预测和标签
- pred_val = np.concatenate(pred_val_list, axis=0)
- label_val = np.concatenate(label_val_list, axis=0)
-
- # 计算验证集的总体精准率、召回率和F1值
- precision_val = precision_score(label_val, pred_val, average='weighted')
- recall_val = recall_score(label_val, pred_val, average='weighted')
- f1_val = f1_score(label_val, pred_val, average='weighted')
-
- file.write(
- f"epoch{epoch + 1}, 测试集损失={np.mean(loss_list_eval):f}, 总体准确率={np.mean(acc_list_eval):f}, 总体精准率={precision_val:f}, 总体召回率={recall_val:f}, 总体F1值={f1_val:f}\n")
-
- print(
- f"epoch{epoch + 1}, 测试集损失={np.mean(loss_list_eval):f}, 总体准确率={np.mean(acc_list_eval):f}, 总体精准率={precision_val:f}, 总体召回率={recall_val:f}, 总体F1值={f1_val:f}")
-
- # 打印每个类别的性能指标
- accuracy_val = []
- unique_labels = np.unique(label_val)
- for label_i in unique_labels:
- pred_i = pred_val[label_val == label_i]
- label_i_true = label_val[label_val == label_i]
- accuracy = (pred_i == label_i_true).mean()
- accuracy_val.append(accuracy)
-
- precision_val = precision_score(label_val, pred_val, average=None)
- recall_val = recall_score(label_val, pred_val, average=None)
- f1_val = f1_score(label_val, pred_val, average=None)
-
- for i in range(len(precision_val)):
- file.write(
- f"标签 {i}: 准确率 = {accuracy_val[i]:.2f}, 精准率 = {precision_val[i]:.2f}, 召回率 = {recall_val[i]:.2f}, F1值 = {f1_val[i]:.2f}\n")
- print(
- f"标签 {i}: 准确率 = {accuracy_val[i]:.2f}, 精准率 = {precision_val[i]:.2f}, 召回率 = {recall_val[i]:.2f}, F1值 = {f1_val[i]:.2f}")
-
- file.close()
根据模型的输入输出进行用户界面设计,用户界面的主要内容是一个文本输入框和一个分析情感的文本提交按钮,用户可以在输入框内输入想要进行中文情感识别的文本,然后点击分析情感的提交按钮即可进行中文文本情感识别。用户输入界面如图所示。
图3 用户输入界面
点击分析情感按钮之后,等待后端处理返回结果。返回的结果为“预测情感”+情感标签。识别结果如图4所示。
图4 情感识别界面
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。