import torch import torchtext import torch.nn as nn import torch.nn.functional as F from torchtext.vocab import GloVe import numpy as np import pandas as pd import matplotlib.pyplot as plt import re from sklearn.model_selection import train_test_split import time start=time.time() device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") #读取文件 data = pd.read_csv('D:/pycharmworkspace/ISLR-master/Tweets.csv') #取指定的两列 data = data[['airline_sentiment','text']] #去掉重复值 data.drop_duplicates(inplace=True) #返回每一种类型的数量 data.airline_sentiment.value_counts() #print(data.airline_sentiment.value_counts()) #在数据后面新建名字为review的一列数据 #neutral编号为0 positive编号为1 negative编号为2 取前一部分 data['review'] = pd.factorize(data.airline_sentiment)[0] #删除airline_sentiment这一列数据 del data['airline_sentiment'] #保留规则 保留A-Z a-z 加号是按原来句子样式表达 token = re.compile('[A-Za-z]+|[!?,.()]') def reg_text(text): # 保留所有大写和小写字母 new_text = token.findall(text) # 将所有大写变成小写 new_text = [word.lower() for word in new_text] return new_text #对指定的text这列数据进行操作 data['text'] = data.text.apply(reg_text) #创建词表 词表就是每一个单词给它编码的数值 word_set = set() #x是列表的集合 t是每一条文本的列表 for text in data.text: # 对文本列表内继续迭代 for word in text: # 重复单词不添加进去 word_set.add(word) #需要额外加1个 max_word = len(word_set) + 1 #将()转换成列表[] word_list = list(word_set) #word_list.index(w)+1是找到单词w的位置并加1 #这样词表就创建好了 word_index是词表 word_index = dict((word, word_list.index(word) + 1) for word in word_list) #将文本转换成数值表示 #现在文本是x apply是应用函数 lambda是匿名函数 t代表文本的每一行元素 取不到则置为0 text = data.text.apply(lambda x: [word_index.get(word, 0) for word in x]) #每行文本长度不齐故获取最大长度的文本 maxlen = max(len(x) for x in text) #将每行文本用0进行填充 pad_text = [l + (maxlen-len(l))*[0] for l in text] #将列表转换成数组 pad_text = np.array(pad_text) #制作标签 labels = data.review.values #划分训练集合测试集 x_train, x_test, y_train, y_test = train_test_split(pad_text, labels) class Mydataset(torch.utils.data.Dataset): def __init__(self, text_list, label_list): self.text_list = text_list self.label_list = label_list # index是序号 因为要进行切片 def __getitem__(self, index): # 切片并装换成长类型 text = torch.LongTensor(self.text_list[index]) label = self.label_list[index] return text, label def __len__(self): return len(self.text_list) train_ds = Mydataset(x_train, y_train) test_ds = Mydataset(x_test, y_test) BTACH_SIZE = 16 train_dl = torch.utils.data.DataLoader( train_ds, batch_size=BTACH_SIZE, shuffle=True ) test_dl = torch.utils.data.DataLoader( test_ds, batch_size=BTACH_SIZE ) em_dim = 100 hidden_size = 200 class Net(nn.Module): def __init__(self): super(Net, self).__init__() # max_word是总共单词的数量 将这些单词映射到100的张量上 #batch*maxlen*em_dim self.em = nn.Embedding(max_word, em_dim) #em_dim是输入特征 200是隐藏单元数 batch在第一维故bacth_first为True self.lstm = nn.LSTM(em_dim, hidden_size, batch_first=True) self.fc1 = nn.Linear(hidden_size, 256) self.fc2 = nn.Linear(256, 3) def forward(self, x): x = self.em(x) #lstm有两个输出 第一个是当前状态输出 第二个是当前隐藏输出 不要第二个 x, _ = self.lstm(x) #取出最后一个的输出 经过lstm的x数据形式是 batch,time_step,output #第一个维度是batch 第二个维度是每一次时间步的输出 第三个维度是预测结果 #对于第一个维度全要 第二个维度只要最后一个 第三个维度全要 x = F.relu(self.fc1(x[:, -1, :])) x = self.fc2(x) return x model = Net() model = model.to(device) loss_fn = nn.CrossEntropyLoss() optimizer = torch.optim.Adam(model.parameters(), lr=0.001) def fit(epoch, model, trainloader, testloader): correct = 0 total = 0 running_loss = 0 model.train() for x, y in trainloader: x, y = x.to(device), y.to(device) y_pred = model(x) loss = loss_fn(y_pred, y) optimizer.zero_grad() loss.backward() optimizer.step() with torch.no_grad(): y_pred = torch.argmax(y_pred, dim=1) correct += (y_pred == y).sum().item() total += y.size(0) running_loss += loss.item() # exp_lr_scheduler.step() epoch_loss = running_loss / len(trainloader.dataset) epoch_acc = correct / total test_correct = 0 test_total = 0 test_running_loss = 0 model.eval() with torch.no_grad(): for x, y in testloader: x, y = x.to(device), y.to(device) y_pred = model(x) loss = loss_fn(y_pred, y) y_pred = torch.argmax(y_pred, dim=1) test_correct += (y_pred == y).sum().item() test_total += y.size(0) test_running_loss += loss.item() epoch_test_loss = test_running_loss / len(testloader.dataset) epoch_test_acc = test_correct / test_total print('epoch: ', epoch, 'loss: ', round(epoch_loss, 3), 'accuracy:', round(epoch_acc, 3), 'test_loss: ', round(epoch_test_loss, 3), 'test_accuracy:', round(epoch_test_acc, 3) ) return epoch_loss, epoch_acc, epoch_test_loss, epoch_test_acc epochs = 30 train_loss = [] train_acc = [] test_loss = [] test_acc = [] for epoch in range(epochs): epoch_loss, epoch_acc, epoch_test_loss, epoch_test_acc = fit(epoch, model, train_dl, test_dl) train_loss.append(epoch_loss) train_acc.append(epoch_acc) test_loss.append(epoch_test_loss) test_acc.append(epoch_test_acc) end = time.time() print(end-start)
有略微改善 但过拟合仍然严重
