赞
踩
文本情感分类是一种自然语言处理技术,它旨在自动识别一段文本中表达的情感,并将其分类为正面、负面或中性等不同的情感类别。文本情感分类的应用十分广泛,例如在社交媒体舆情分析、产品评论分析、用户满意度调查等领域都有重要的应用。通过文本情感分类,可以自动化地对大量文本数据进行分类和分析,为决策提供有用的信息和参考。
本文利用的数据:weibo_senti_100k.csv
下载地址:
https://github.com/SophonPlus/ChineseNlpCorpus/blob/master/datasets/weibo_senti_100k/intro.ipynb
数据展示
data_processing.py:使用结巴分词库统计词频
# 数据来源 https://github.com/SophonPlus/ChineseNlpCorpus/blob/master/datasets/weibo_senti_100k/intro.ipynb # 数据概览: 10 万多条,带情感标注 新浪微博,正负向评论约各 5 万条 # 停用词字典 https://github.com/goto456/stopwords import jieba # 导入中文分词的第三方库,jieba分词 data_path = "../sources/weibo_senti_100k.csv" # 数据路径 data_stop_path = "../sources/hit_stopwords.txt" # 停用词数据路径 data_list = open(data_path, encoding='UTF-8').readlines()[1:] # 读出数据并去掉第一行的介绍标签, 每一行为一个大字符串 stops_word = open(data_stop_path, encoding='UTF-8').readlines() # 读取停用词内容 stops_word = [line.strip() for line in stops_word] # 将每行换行符去掉(去掉换行符),并生成停用词列表 stops_word.append(" ") # 可以自己根据需要添加停用词 stops_word.append("\n") voc_dict = {} min_seq = 1 # 用于过滤词频数 top_n = 1000 UNK = "<UNK>" PAD = "<PAD>" print(data_list[0]) # 对data_list进行分词的处理 # for item in data_list[:100]: 使用前100条数据测试,100000条数据太多 for item in data_list: label = item[0] # 字符串的第一个为标签 content = item[2:].strip() # 从第三项开始为文本内容, strip()去掉最后的换行符 seg_list = jieba.cut(content, cut_all=False) # 调用结巴分词对每一行文本内容进行分词 seg_res = [] # 打印分词结果 for seg_item in seg_list: if seg_item in stops_word: # 如果分词字段在停用词列表里,则取出 continue seg_res.append(seg_item) # 如果不在则加入分词结果中 if seg_item in voc_dict.keys(): # 使用字典统计词频seg_item in voc_dict.keys(): voc_dict[seg_item] += 1 else: voc_dict[seg_item] = 1 # print(content) # 打印未分词前的句子 # print(seg_res) # 对字典进行排序,取TOPK词,如果将所有词都要,将会导致字典过大。我们只关注一些高频的词 voc_list = sorted([_ for _ in voc_dict.items() if _[1] > min_seq], key=lambda x: x[1], # key:指定一个参数的函数,该函数用于从每个列表元素中提取比较键 reverse=True)[:top_n] # 取排完序后的前top_n个词, voc_dict = {word_count[0]: idx for idx, word_count in enumerate(voc_list)} # 根据排序后的字典重新字典 voc_dict.update({UNK: len(voc_dict), PAD: len(voc_dict) + 1}) # 将前top_n后面的归类为UNK print(voc_dict) # '泪': 0, '嘻嘻': 1, '都': 2, # 保存字典 ff = open("../sources/dict.txt", "w") for item in voc_dict.keys(): ff.writelines("{},{}\n".format(item, voc_dict[item])) # '泪': 0, '嘻嘻': 1, '都': 2,
这段代码是为了对中文文本数据进行预处理,以便进行情感分析。代码使用中文分词库jieba对微博数据进行处理。代码读取微博数据的csv文件,去除第一行(其中包含每列的标签),然后循环处理每一行数据。对于每一行,提取内容(微博帖子的文本)并使用jieba进行分词。得到的分词结果被过滤以去除任何停用词(常见的不具有很多意义的词,例如“和”或“的”),并计算它们的频率在一个字典中。
处理完所有行之后,将结果字典按单词频率降序排序,并选择前n个单词(在这种情况下,n为1000)。任何出现频率低于给定阈值(在这种情况下为1)的词都将从字典中过滤掉。剩余的单词被分配唯一的索引,并保存到字典文件中,以便将来用于训练情感分析模型。
除了单词字典之外,代码定义了两个特殊标记:“”和“”。"“用于表示模型训练或推理过程中遇到的任何词汇外单词(OOV),而”"用于表示添加的填充标记,以确保模型的所有输入具有相同的长度。这些特殊标记被添加到字典的末尾,其索引比字典中的任何其他单词都要高。
dataset.py
使用PyTorch的提供的dataset的接口,根据项目重写自己的dataset和dataloader
定义dataset必须重写PyTorch中的dataset中的 init,len,__getitem__函数。
import numpy as np import jieba from torch.utils.data import Dataset, DataLoader # 传入字典路径,将文件读入内存 def read_dict(voc_dict_path): voc_dict = {} dict_list = open(voc_dict_path).readlines() print(dict_list[0]) # '泪,0' for item in dict_list: item = item.split(",") # ['泪', '0\n'] voc_dict[item[0]] = int(item[1].strip()) # item[0]值'泪' item[1].strip()值为'0' # print(voc_dict) return voc_dict # 将数据集进行处理(分词,过滤...) def load_data(data_path, data_stop_path): data_list = open(data_path, encoding='utf-8').readlines()[1:] stops_word = open(data_stop_path, encoding='utf-8').readlines() stops_word = [line.strip() for line in stops_word] stops_word.append(" ") stops_word.append("\n") voc_dict = {} data = [] max_len_seq = 0 # 统计最长的句子长度 np.random.shuffle(data_list) for item in data_list[:]: label = item[0] content = item[2:].strip() seg_list = jieba.cut(content, cut_all=False) seg_res = [] for seg_item in seg_list: if seg_item in stops_word: continue seg_res.append(seg_item) if seg_item in voc_dict.keys(): voc_dict[seg_item] = voc_dict[seg_item] + 1 else: voc_dict[seg_item] = 1 if len(seg_res) > max_len_seq: # 以句子分词词语最长为标准 max_len_seq = len(seg_res) data.append([label, seg_res]) # [标签,分词结果的列表] # print(max_len_seq) return data, max_len_seq # 句子分词后,词语最大长度 # 定义Dataset class text_CLS(Dataset): def __init__(self, voc_dict_path, data_path, data_stop_path): self.data_path = data_path self.data_stop_path = data_stop_path self.voc_dict = read_dict(voc_dict_path) # 返回数据[[label,分词词语列表],......] self.data, self.max_len_seq = load_data(self.data_path, self.data_stop_path) np.random.shuffle(self.data) # 将数据的顺序打乱 def __len__(self): # 返回数据集长度 return len(self.data) def __getitem__(self, item): data = self.data[item] label = int(data[0]) word_list = data[1] # 句子分词后的词语列表 input_idx = [] for word in word_list: if word in self.voc_dict.keys(): # 如果词语在自己创建的字典中 input_idx.append(self.voc_dict[word]) # 将这个单词的词频数放进列表 else: input_idx.append(self.voc_dict["<UNK>"]) # 不在则统一归为其他类(词频太低的归为一类) if len(input_idx) < self.max_len_seq: # 词语长度小于最长长度,则需要用PAD填充 input_idx += [self.voc_dict["<PAD>"] for _ in range(self.max_len_seq - len(input_idx))] # input_idx += [1001 for _ in range(self.max_len_seq - len(input_idx))] data = np.array(input_idx) # 将得到的词频数列表,转化为numpy数据 return label, data # 定义DataLoader def data_loader(dataset, config): return DataLoader(dataset, batch_size=config.batch_size, shuffle=config.is_shuffle) # if __name__ == "__main__": # data_path = "../sources/weibo_senti_100k.csv" # data_stop_path = "../sources/hit_stopwords.txt" # dict_path = "../sources/dict" # # train_dataLoader = data_loader(data_path, data_stop_path, dict_path) # for i, batch in enumerate(train_dataLoader): # print(batch[0], batch[1].size()) # print(batch[0], batch[1])
这段代码实现了一个文本分类任务的数据预处理部分,包括读取字典文件和数据文件、分词、过滤停用词、将文本转化为词频数列表、以及构建Dataset和DataLoader。
具体实现包括以下几个函数和类:
read_dict(voc_dict_path):读取字典文件,将文件内容读入内存,并返回一个字典,其中字典的键为词语,值为对应的词频数。
load_data(data_path,
data_stop_path):读取数据文件,对每个文本进行分词、过滤停用词,并将文本转化为词频数列表,返回一个数据列表和词语最大长度。
text_CLS(Dataset):定义了一个Dataset类,用于在PyTorch中加载数据集。在初始化函数中,调用read_dict和load_data函数读取数据和字典,然后打乱数据顺序。在__getitem__函数中,将文本数据转化为词频数列表,并返回标签和列表。__len__函数返回数据集长度。
data_loader(dataset,
config):定义了一个DataLoader函数,用于在PyTorch中批量加载数据。将dataset和config作为参数传入,返回一个DataLoader对象。
这段代码实现了将文本数据转化为数值化的词频数列表,并将其打包成一个PyTorch的Dataset和DataLoader对象,方便进行后续的模型训练。
models.py
import torch import torch.nn as nn import torch.nn.functional as F import numpy as np # padding_idx:padding_idx (python:int, optional) – 填充id,比如,输入长度为100,但是每次的句子长度并不一样,后面就需要用统一的数字填充, # 而这里就是指定这个数字,这样,网络在遇到填充id时,就不会计算其与其它符号的相关性。(初始化为0) class Model(nn.Module): def __init__(self, config): super(Model, self).__init__() # 词嵌入层 self.embeding = nn.Embedding(config.n_vocab, # 字典大小:congif.n_vocab,表示词典中词的数量 embedding_dim=config.embed_size, # 词嵌入的输出大小,就是每个词经过embedding后用多少位向量表示。表示每个词对应的向量维度 padding_idx=config.n_vocab - 1) # padding_idx ,pad # lstm层 self.lstm = nn.LSTM(input_size=config.embed_size, # 输入大小,即每个词的维度 hidden_size=config.hidden_size, # 隐藏层输出大小 num_layers=config.num_layers, # lstm的层数 bidirectional=True, # 双向lstm层 batch_first=True, # 数据结构:[batch_size,seq_len,input_size] dropout=config.dropout) # 防止过拟合 # 卷积层 self.maxpooling = nn.MaxPool1d(config.pad_size) # 一维卷积。积核长度: # 全连接层 self.fc = nn.Linear(config.hidden_size * 2 + config.embed_size, # 因为是双向LSTM,所以要*2, config.num_classes) # 第二个为预测的类别数 self.softmax = nn.Softmax(dim=1) def forward(self, x): embed = self.embeding(x) # 输出为[batchsize, seqlen, embed_size] 标准RNN网络的输入 # print("embed.size:",embed.size()) out, _ = self.lstm(embed) # out的shape:[batch_size, seq_len, hidden_size*2] # print("lstm层的输出size:",out.size()) # torch.cat((x,y),dim) 在dim上拼接,x,y out = torch.cat((embed, out), 2) # 这里解析全连接层输入大小为 config.hidden_size * 2 + cinfig.embed_size。 # [batch_size,seg_len,config.hidden_size * 2 + cinfig.embed_size] # print("cat后的size:",out.size()) out = F.relu(out) # 经过relu层,增加非线性表达能力 # print("relu层的out.size:",out.size()) out = out.permute(0, 2, 1) # 交换维度 # print("交换维度后的out.size:",out.size()) out = self.maxpooling(out).reshape(out.size()[0], -1) # 转化为2维tensor # print("MaxPooling后的out.size:",out.size()) out = self.fc(out) # print("全连接层的out.size:",out.size()) out = self.softmax(out) # print("softmax后的out.size:",out.size()) return out # 测试网络是否正确 if __name__ == '__main__': cfg = Config() cfg.pad_size = 640 model_textcls = Model(config=cfg) input_tensor = torch.tensor([i for i in range(640)]).reshape([1, 640]) out_tensor = model_textcls.forward(input_tensor) print(out_tensor.size()) print(out_tensor)
config.py
import torch # 定义网路的配置类 class Config(): def __init__(self): ''' self.embeding = nn.Embedding(config.n_vocab, config.embed_size, padding_idx=config.n_vocab - 1) self.lstm = nn.LSTM(config.embed_size, config.hidden_size, config.num_layers, bidirectional=True, batch_first=True, dropout=config.dropout) self.maxpool = nn.MaxPool1d(config.pad_size) self.fc = nn.Linear(config.hidden_size * 2 + config.embed_size, config.num_classes) self.softmax = nn.Softmax(dim=1) ''' self.n_vocab = 1002 # 字典长度 self.embed_size = 128 # 词嵌入表达的大小 self.hidden_size = 128 # 隐藏层输出大小 self.num_layers = 3 # lstm网络的层数 self.dropout = 0.8 # self.num_classes = 2 # 二分类问题 self.pad_size = 32 self.batch_size = 128 self.is_shuffle = True self.learn_rate = 0.001 self.num_epochs = 100 self.devices = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
run_train.py
import torch import torch.nn as nn from torch import optim from models import Model from datasets import data_loader, text_CLS from configs import Config cfg = Config() data_path = "weibo_senti_100k.csv" data_stop_path = "hit_stopwords.txt" dict_path = "dict" dataset = text_CLS(dict_path, data_path, data_stop_path) train_dataloader = data_loader(dataset, cfg) cfg.pad_size = dataset.max_len_seq # model_text_cls = Model(cfg) model_text_cls.to(cfg.devices) loss_func = nn.CrossEntropyLoss() # 损失函数。交叉熵损失函数 optimizer = optim.Adam(model_text_cls.parameters(), lr=cfg.learn_rate) # 定义优化器 for epoch in range(cfg.num_epochs): for i, batch in enumerate(train_dataloader): label, data = batch data = torch.tensor(data).to(cfg.devices) label = torch.tensor(label).to(cfg.devices) optimizer.zero_grad() pred = model_text_cls.forward(data) loss_val = loss_func(pred, label) print("epoch is {},ite is {},val is {}".format(epoch, i, loss_val)) loss_val.backward() # 后向传播 optimizer.step() # 更新参数 if epoch % 10 == 0: torch.save(model_text_cls.state_dict(), "../models/{}.pth".format(epoch))
为了简单测试,本文没有数据集拆分为训练集和测试集。
拆分训练集可以将CSV表格中打乱,然后根据需要拆分训练集和测试集。然后构建测试集的dataset和datalaoder。
test.py
import torch import torch.nn as nn from torch import optim from models import Model from datasets import data_loader, text_CLS from configs import Config cfg = Config() #读取数据 data_path = "sources/weibo_senti_100k.csv" data_stop_path = "sources/hit_stopword" dict_path = "sources/dict" dataset = text_CLS(dict_path, data_path, data_stop_path) train_dataloader = data_loader(dataset, cfg) cfg.pad_size = dataset.max_len_seq model_text_cls = Model(cfg) model_text_cls.to(cfg.devices) #加载模型,保存好的模型 model_text_cls.load_state_dict(torch.load("models/10.pth")) for i, batch in enumerate(train_dataloader): label, data = batch data = torch.tensor(data).to(cfg.devices) label = torch.tensor(label,dtype=torch.int64).to(cfg.devices) pred_softmax = model_text_cls.forward(data) #print(pred_softmax) print(label) pred = torch.argmax(pred_softmax, dim=1) print(pred) #统计准确率 out = torch.eq(pred,label) print(out.sum() * 1.0 / pred.size()[0])
可以上github把整个项目download下来
https://github.com/yingzhang123/Text_Sentiment_Classification
Time:2023.4.16 (周日)
如果上面代码对您有帮助,欢迎点个赞!!!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。