赞
踩
熟悉Pytorch,用Pytorch重写《任务一》,实现CNN、RNN的文本分类;
参考
word embedding 的方式初始化
随机embedding的初始化方式
用glove 预训练的embedding进行初始化 https://nlp.stanford.edu/projects/glove/
知识点:
时间:两周
数据集的加载与预处理都在main函数里面
import torch import torch.nn as nn from tqdm import tqdm, trange # tqdm模块来显示任务进度条 from torch.optim import Adam from tensorboardX import SummaryWriter import pandas as pd import os from torchtext.legacy import data from torchtext.legacy.data import Iterator, BucketIterator from torchtext.vocab import Vectors import matplotlib.pyplot as plt import numpy as np from Model import RNN, CNN, LSTM device = torch.device("cuda" if torch.cuda.is_available() else "cpu") train_epochs = 5 batch_size = 512 learning_rate = 0.001 max_seq_length = 48 num_classes = 5 dropout_rate = 0.1 data_path = "data" clip = 5 embed_size = 200 vectors = Vectors('glove.6B.200d.txt', 'C:/Users/Mechrevo/Desktop/AI/nlp-beginner/code-for-nlp-beginner-master/Task2-Text Classification (RNN&CNN)/embedding') freeze = False use_rnn = True hidden_size = 256 num_layers = 1 bidirectional = True use_lstm = False num_filters = 200 kernel_sizes = [2, 3, 4] def load_iters(batch_size=32, device="cpu", data_path='data', vectors=None): TEXT = data.Field(lower=True, batch_first=True, include_lengths=True) LABEL = data.LabelField(batch_first=True) train_fields = [(None, None), (None, None), ('text', TEXT), ('label', LABEL)] test_fields = [(None, None), (None, None), ('text', TEXT)] train_data = data.TabularDataset.splits( path=data_path, train='train.tsv', format='tsv', fields=train_fields, skip_header=True )[0] test_data = data.TabularDataset.splits( path='data', train='test.tsv', format='tsv', fields=test_fields, skip_header=True )[0] TEXT.build_vocab(train_data.text, vectors=vectors) LABEL.build_vocab(train_data.label) train_data, dev_data = train_data.split([0.8, 0.2]) train_iter, dev_iter = BucketIterator.splits( (train_data, dev_data), batch_sizes=(batch_size, batch_size), device=device, sort_key=lambda x: len(x.text), sort_within_batch=True, repeat=False, shuffle=True ) test_iter = Iterator( test_data, batch_size=batch_size, device=device, sort=False, sort_within_batch=False, repeat=False, shuffle=False ) return train_iter, dev_iter, test_iter, TEXT, LABEL if __name__ == "__main__": train_iter, dev_iter, test_iter, TEXT, LABEL = load_iters(batch_size, device, data_path, vectors) vocab_size = len(TEXT.vocab.itos) # build model if use_lstm: model = LSTM(vocab_size, embed_size, hidden_size, num_layers, num_classes, bidirectional, dropout_rate) elif use_rnn: model = RNN(vocab_size, embed_size, hidden_size, num_layers, num_classes, bidirectional, dropout_rate) else: model = CNN(vocab_size, embed_size, num_classes, num_filters, kernel_sizes, dropout_rate) if vectors is not None: model.embed.from_pretrained(TEXT.vocab.vectors, freeze=freeze) model.to(device) optimizer = Adam(model.parameters(), lr=learning_rate) loss_func = nn.CrossEntropyLoss() writer = SummaryWriter('logs', comment="rnn") loss_history = [] for epoch in trange(train_epochs, desc="Epoch"): model.train() ep_loss = 0 for step, batch in enumerate(tqdm(train_iter, desc="Iteration")): (inputs, lens), labels = batch.text, batch.label outputs = model(inputs, lens) loss = loss_func(outputs, labels) ep_loss += loss.item() model.zero_grad() loss.backward() nn.utils.clip_grad_norm_(model.parameters(), clip) optimizer.step() if step % 10 == 0: loss_history.append(loss.item()) writer.add_scalar('Train_Loss', loss, epoch) if step % 10 == 0: tqdm.write('Epoch {}, Step {}, Loss {}'.format(epoch, step, loss.item())) # evaluating model.eval() with torch.no_grad(): corr_num = 0 err_num = 0 for batch in dev_iter: (inputs, lens), labels = batch.text, batch.label outputs = model(inputs, lens) corr_num += (outputs.argmax(1) == labels).sum().item() err_num += (outputs.argmax(1) != labels).sum().item() tqdm.write('Epoch {}, Accuracy {}'.format(epoch, corr_num / (corr_num + err_num))) if use_lstm: plt.title('LSTM Model') elif use_rnn: plt.title('RNN Model') else: plt.title('CNN Model') plt.plot(np.arange(len(loss_history)), np.array(loss_history)) plt.xlabel('Iterations') plt.ylabel('Training Loss') plt.show() # predicting model.eval() with torch.no_grad(): predicts = [] for batch in test_iter: inputs, lens = batch.text outputs = model(inputs, lens) predicts.extend(outputs.argmax(1).cpu().numpy()) test_data = pd.read_csv(os.path.join(data_path, 'test.tsv'), sep='\t') test_data["Sentiment"] = predicts test_data[['PhraseId', 'Sentiment']].set_index('PhraseId').to_csv('result.csv')
Models : LSTM RNN CNN
import torch import torch.nn as nn import torch.nn.functional as F import math class RNN(nn.Module): def __init__(self, vocab_size, embed_size, hidden_size, num_layers, num_classes, bidirectional=True, dropout_rate=0.3): super(RNN, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers self.embed = nn.Embedding(vocab_size, embed_size) self.rnn = nn.RNN(embed_size, hidden_size, num_layers, batch_first=True, bidirectional=bidirectional) self.bidirectional = bidirectional if not bidirectional: self.fc = nn.Linear(hidden_size, num_classes) else: self.fc = nn.Linear(hidden_size * 2, num_classes) self.dropout = nn.Dropout(dropout_rate) self.init() def init(self): std = 1.0 / math.sqrt(self.hidden_size) for w in self.parameters(): w.data.uniform_(-std, std) def forward(self, x, lens): embeddings = self.embed(x) output, _ = self.rnn(embeddings) real_output = output[range(len(lens)), lens - 1] out = self.fc(self.dropout(real_output)) return out class LSTM(nn.Module): def __init__(self, vocab_size, embed_size, hidden_size, num_layers, num_classes, bidirectional=True, dropout_rate=0.3): super(LSTM, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers self.embed = nn.Embedding(vocab_size, embed_size) self.rnn = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True, bidirectional=bidirectional) self.bidirectional = bidirectional if not bidirectional: self.fc = nn.Linear(hidden_size, num_classes) else: self.fc = nn.Linear(hidden_size * 2, num_classes) self.dropout = nn.Dropout(dropout_rate) self.init() def init(self): std = 1.0 / math.sqrt(self.hidden_size) for w in self.parameters(): w.data.uniform_(-std, std) def forward(self, x, lens): embeddings = self.embed(x) output, _ = self.rnn(embeddings) real_output = output[range(len(lens)), lens - 1] out = self.fc(self.dropout(real_output)) return out class CNN(nn.Module): def __int__(self, vocab_size, embed_size, num_classes, num_filters=100, kernel_size=[2, 3, 4], dropout_rate=0.3): super(CNN, self).__init__() self.embed = nn.Embedding(vocab_size, embed_size) self.convs = nn.ModuleList([ nn.Conv2d(1, num_filters, (k, embed_size), padding=(k - 1, 0)) for k in kernel_size ]) self.fc = nn.Linear(len(kernel_size) * num_filters, num_classes) self.dropout = nn.Dropout(dropout_rate) def conv_and_pool(self, x, conv): x = F.relu(conv(x).squeeze(3)) x_max = F.max_pool1d(x, x.size(2)).squeeze(2) return x_max def forward(self, x, lens): embed = self.embed(x).unsqueeze(1) conv_results = [self.conv_and_pool(embed, conv) for conv in self.convs] out = torch.cat(conv_results, 1) return self.fc(self.dropout(out))
自己用抱抱脸的bert模型重写了一遍:
import torch from transformers import AdamW, AutoTokenizer, AutoModelForSequenceClassification from datasets import load_dataset from transformers import DataCollatorWithPadding from transformers import TrainingArguments import numpy as np from torch.utils.data import DataLoader from transformers import AdamW from transformers import get_scheduler from transformers import Trainer from tqdm.auto import tqdm import os os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" import matplotlib.pyplot as plt from datasets import load_metric def tokenize_function(example): return tokenizer(example["Phrase"], truncation=True) def compute_metrics(eval_preds): metric = load_metric("accuracy") logits, labels = eval_preds predictions = np.argmax(logits, axis=-1) return metric.compute(predictions=predictions, references=labels) if __name__ == "__main__": data_files = {"train": "data/train.tsv", "validation" : "data/validation.tsv" ,"test": "data/test.tsv"} data = load_dataset("csv", data_files=data_files,delimiter="\t") checkpoint = "distilbert-base-uncased" tokenizer = AutoTokenizer.from_pretrained(checkpoint) model = AutoModelForSequenceClassification.from_pretrained(checkpoint, num_labels=5) tokenized_datasets = data.map(tokenize_function, batched=True) tokenized_datasets = tokenized_datasets.remove_columns(["PhraseId", "SentenceId"]) tokenized_datasets = tokenized_datasets.rename_column("Sentiment", "labels") tokenized_datasets = tokenized_datasets.remove_columns(["Phrase"]) tokenized_datasets.set_format("torch") data_collator = DataCollatorWithPadding(tokenizer=tokenizer) training_args = TrainingArguments("test-trainer") train_dataloader = DataLoader( tokenized_datasets["train"], shuffle=True, batch_size=16, collate_fn=data_collator ) eval_dataloader = DataLoader( tokenized_datasets["validation"], batch_size=16, collate_fn=data_collator ) optimizer = AdamW(model.parameters(), lr=0.0001) num_epochs = 1 num_training_steps = num_epochs * len(train_dataloader) lr_scheduler = get_scheduler( "linear", optimizer=optimizer, num_warmup_steps=0, num_training_steps=num_training_steps, ) print(num_training_steps) device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") model.to(device) trainer = Trainer( model, training_args, train_dataset=tokenized_datasets["train"], eval_dataset=tokenized_datasets["validation"], data_collator=data_collator, tokenizer=tokenizer, ) progress_bar = tqdm(range(num_training_steps)) #model.train() loss_list = [] for epoch in range(num_epochs): for idx, batch in enumerate(train_dataloader): batch = {k: v.to(device) for k, v in batch.items()} outputs = model(**batch) loss = outputs.loss if idx % 100 == 0: loss_list.append(loss.item()) tqdm.write('step:{}, loss :{}'.format(idx/100, loss.item() ) ) loss.backward() optimizer.step() lr_scheduler.step() optimizer.zero_grad() progress_bar.update(1) plt.plot(np.arange(len(loss_list)), np.array(loss_list)) plt.xlabel('Iterations') plt.ylabel('Training Loss') plt.title('distilled-bert-uncased') plt.show() metric = load_metric("accuracy") model.eval() for batch in eval_dataloader: batch = {k: v.to(device) for k, v in batch.items()} with torch.no_grad(): outputs = model(**batch) logits = outputs.logits predictions = torch.argmax(logits, dim=-1) metric.add_batch(predictions=predictions, references=batch["labels"]) metric.compute()
LSTM model训练结果:
RNN model训练结果:
bert的结果,可以看到还是非常的拉胯,无法收敛
LSTM和RNN模型在此数据集上差距不是很大…但是RNN出现了异常值,可能是因为有脏数据(噪声过多,很烂的数据集)
之后试试用transformer bert等现在用的更多的模型来跑一跑试试,效果可能会有一定提升(已使用bert,并无提升,看了一下数据集应该是因为数据集噪声太多而导致无法收敛的)
数据集太小,just for fun.
了解了用torchtext读数据和构建trivial神经网络的过程
Pytorch yyds!
抱抱脸 yyds!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。