赞
踩
LSTM+embeddingbag进行文本分类完整代码
需要提前下载好AG_NEWS.data数据哦(随便在网上找就行啦)
import pandas as pd
from torch.utils.data import TensorDataset
train_iter = pd.read_csv(‘train.csv’, names=[‘label’, ‘title’, ‘description’], header=None)
train_iter[‘text’] = train_iter[‘title’] + ’ ’ + train_iter[‘description’]
train_iter.drop([‘title’], axis=1, inplace=True)
train_iter.drop([‘description’], axis=1, inplace=True)
test_iter = pd.read_csv(‘test.csv’, names=[‘label’, ‘title’, ‘description’], header=None)
test_iter[‘text’] = test_iter[‘title’] + ’ ’ + test_iter[‘description’]
test_iter.drop([‘title’], axis=1, inplace=True)
test_iter.drop([‘description’], axis=1, inplace=True)
import pandas as pd
import torch
from torch.utils.data import Dataset
class textDataset(Dataset):
def init(self, pre_dataset):
super().init()
self.install_data = pre_dataset
def len(self):
return len(self.install_data)
def getitem(self, idx):
input_data = pd.DataFrame([])
label = pd.DataFrame([])
input_data = self.install_data.iloc[idx, 1]
label = self.install_data.iloc[idx, 0]
return label, input_data
train_iter = textDataset(train_iter)
test_iter = textDataset(test_iter)
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
tokenizer = get_tokenizer(‘basic_english’) #划分为单词
def yield_tokens(data_iter):
for _, text in data_iter:
yield tokenizer(text)
print(type(yield_tokens(iter(train_iter))))
print(yield_tokens(iter(train_iter))) # 把文本转为 生成器generator
vocab = build_vocab_from_iterator(yield_tokens(iter(train_iter)), specials=[“”]) #给每个词赋予一个id,形成单词表
vocab.set_default_index(vocab[“”])
print(type(vocab))
print(vocab([‘here’, ‘is’, ‘an’, ‘example’]))
text_pipeline = lambda x: vocab(tokenizer(x)) #对文本进行处理 ,tokenizer把文本断开,vocab给每个词赋予一个id
label_pipeline = lambda x: int(x) - 1 #对标签进行处理, 数据的第一列,转成整型
from torch.utils.data import DataLoader #随机的不重复的抓取30个样本,dataloader可以做到,输入存储路径,数据有多长len(),根据索引找到样本getitem(),对样本进行编号
device = torch.device(“cuda” if torch.cuda.is_available() else “cpu”) #能用cpu(像人的大脑一样)还是gpu(单纯用于计算)
def collate_batch(batch): #把数据从list变成tensor
label_list, text_list, offsets = [], [], [0]
for (_label, _text) in batch: #对batch里的每个样本处理
label_list.append(label_pipeline(_label)) #把label_pipeline加到label_list里
processed_text = torch.tensor(text_pipeline(_text), dtype=torch.int64) #转成tensor
text_list.append(processed_text)
offsets.append(processed_text.size(0))
label_list = torch.tensor(label_list, dtype=torch.int64)
offsets = torch.tensor(offsets[:-1]).cumsum(dim=0)
text_list = torch.cat(text_list)
# print(text_list)
# print(text_list.size())
return label_list.to(device), text_list.to(device), offsets.to(device)
#label_list.to(device):把数据复制到gpu/cpu
####### offsets:由于文本条目具有不同的长度要生成偏移量
#####offsets:batch中每个文本的长度,如[10, 20, 30, 40]
dataloader = DataLoader(iter(train_iter), batch_size=8, shuffle=False, collate_fn=collate_batch)#shuffle:打乱数据
from torch import nn
class TextClassificationModel(nn.Module):
def __init__(self, vocab_size, embed_dim, num_classes, hidden_size, num_layers, bidirectional): super(TextClassificationModel, self).__init__() # 把embeddingbag改成embedding self.embedding_dim = embed_dim self.vocab_size = vocab_size self.num_classes = num_classes self.hidden_size = hidden_size self.num_layers = num_layers self.bidirectional = bidirectional # ,padding_idx=word2idx['<PAD>'] # self.embedding = nn.Embedding(vocab_size, embed_dim) self.embedding = nn.EmbeddingBag(self.vocab_size, self.embedding_dim, sparse=True) # 把文本转化成矩阵 # bag才有偏移量offset,才能处理不同长度的文本 # nn.EmbeddingBag默认是mean # nn.Embedding找到一句话的每个单词对应的id 输入维数embedding_dim64,输出维数hidden_size20,2层神经元num_layers self.lstm = nn.LSTM(input_size=self.embedding_dim, hidden_size=self.hidden_size, batch_first=True, num_layers=self.num_layers, bidirectional=self.bidirectional) if self.bidirectional: self.fc = nn.Linear(hidden_size * 2, num_classes) else: self.fc = nn.Linear(hidden_size, num_classes) self.init_weights() def init_weights(self): # 初始化函数 initrange = 0.5 self.embedding.weight.data.uniform_(-initrange, initrange) self.fc.weight.data.uniform_(-initrange, initrange) self.fc.bias.data.zero_() def forward(self, text, offsets): # 改成lstm text = self.embedding(text, offsets) # 文本转化成矩阵 16*64 batchsize16*字典的列数64 print(text.size()) batch_size= text.size(0) #batchsize16 print(batch_size) if self.bidirectional: # 初始化一个h0,也即c0,在RNN中一个Cell输出的ht和Ct是相同的,而LSTM的一个cell输出的ht和Ct是不同的 # 输入维度[layers, batch, hidden_len] h0 = torch.randn(self.num_layers * 2, batch_size, self.hidden_size).to(device) #4*64*20 c0 = torch.randn(self.num_layers * 2, batch_size, self.hidden_size).to(device) else: h0 = torch.randn(self.num_layers, batch_size, self.hidden_size).to(device) c0 = torch.randn(self.num_layers, batch_size, self.hidden_size).to(device) print(h0.size()) #神经元层数4*batchsize16*输出层列数20 text= text.view(len(text), 1, -1) print(text.size()) #batchsize16*1*字典的列数64 out, (_, _) = self.lstm(text, (h0, c0)) output = self.fc(out[:, -1, :]).squeeze(0) return output
num_classes = len(set([label for (label, text) in train_iter])) #初始化类:4个类
####### print(num_classes)
vocab_size = len(vocab) #单词表的行数
######## print(vocab_size)
emsize = 64 #单词表有64列
#def init(self, vocab_size, embed_dim, num_classes,hidden_size,num_layers,bidirectional):
model = TextClassificationModel(vocab_size, emsize,num_classes,hidden_size=20,num_layers=2,bidirectional=True).to(device)
import time
def train(dataloader):
model.train()
total_acc, total_count = 0, 0
log_interval = 500
start_time = time.time()
for idx, (label, text, offsets) in enumerate(dataloader): #从dataloader中拿出数据。 标签,文本,偏移量
optimizer.zero_grad()#初始化优化器的梯度,归零
######## print(text)
predicted_label = model(text, offsets)
loss = criterion(predicted_label, label) #损失函数
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 0.1)
optimizer.step() #对优化器经常更新
total_acc += (predicted_label.argmax(1) == label).sum().item()
#统计当前的准确率 中间不要打印出结果,会让程序非常慢
total_count += label.size(0)
if idx % log_interval == 0 and idx > 0:
elapsed = time.time() - start_time
print('| epoch {:3d} | {:5d}/{:5d} batches ’
‘| accuracy {:8.3f}’.format(epoch, idx, len(dataloader),
total_acc/total_count))
total_acc, total_count = 0, 0
start_time = time.time()
def evaluate(dataloader):
model.eval()
total_acc, total_count = 0, 0
with torch.no_grad():
for idx, (label, text, offsets) in enumerate(dataloader):
predicted_label = model(text, offsets)
loss = criterion(predicted_label, label)
total_acc += (predicted_label.argmax(1) == label).sum().item()
total_count += label.size(0)
return total_acc/total_count
from torch.utils.data.dataset import random_split
from torchtext.data.functional import to_map_style_dataset
####### Hyperparameters
EPOCHS = 10 # epoch
LR = 0.01 # learning rate
BATCH_SIZE = 16# batch size for training
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adagrad(model.parameters(), lr=LR)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.1)
total_accu = None
train_dataset = to_map_style_dataset(iter(train_iter))
test_dataset = to_map_style_dataset(iter(test_iter))
num_train = int(len(train_dataset) * 0.95)
split_train_, split_valid_ =
random_split(train_dataset, [num_train, len(train_dataset) - num_train])
train_dataloader = DataLoader(split_train_, batch_size=BATCH_SIZE,
shuffle=True, collate_fn=collate_batch)
valid_dataloader = DataLoader(split_valid_, batch_size=BATCH_SIZE,
shuffle=True, collate_fn=collate_batch)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE,
shuffle=True, collate_fn=collate_batch)
for epoch in range(1, EPOCHS + 1):
epoch_start_time = time.time()
train(train_dataloader)
accu_val = evaluate(valid_dataloader)
if total_accu is not None and total_accu > accu_val:
scheduler.step()
else:
total_accu = accu_val
print(‘-’ * 59)
print('| end of epoch {:3d} | time: {:5.2f}s | ’
‘valid accuracy {:8.3f} ‘.format(epoch,
time.time() - epoch_start_time,
accu_val))
print(’-’ * 59)
print(‘Checking the results of test dataset.’)
accu_test = evaluate(test_dataloader)
print(‘test accuracy {:8.3f}’.format(accu_test)) #测试集上准确率0.916
ag_news_label = {1: “World”,
2: “Sports”,
3: “Business”,
4: “Sci/Tec”}
def predict(text, text_pipeline):
with torch.no_grad():
text = torch.tensor(text_pipeline(text))
text = text.to(device) #将text移动到gpu上
output = model(text, torch.tensor([0]).to(device)) #将偏移量offset移动到gpu上
output = output.unsqueeze(0) #利用unsqueeze()来扩展张量的维度
return output.argmax(1).item() + 1
ex_text_str = “MEMPHIS, Tenn. – Four days ago, Jon Rahm was
enduring the season’s worst weather conditions on Sunday at The
Open on his way to a closing 75 at Royal Portrush, which
considering the wind and the rain was a respectable showing.
Thursday’s first round at the WGC-FedEx St. Jude Invitational
was another story. With temperatures in the mid-80s and hardly any
wind, the Spaniard was 13 strokes better in a flawless round.
Thanks to his best putting performance on the PGA Tour, Rahm
finished with an 8-under 62 for a three-stroke lead, which
was even more impressive considering he’d never played the
front nine at TPC Southwind.”
model = model.to(device) #模型也要在gpu
print(“This is a %s news” %ag_news_label[predict(ex_text_str, text_pipeline)])
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。