赞
踩
首先设计数据下载模块,实现可视化下载流程,并保存至指定路径。数据下载模块使用requests库进行http请求,并通过tqdm库对下载百分比进行可视化。此外针对下载安全性,使用IO的方式下载临时文件,而后保存至指定的路径并返回。
import os import shutil import requests import tempfile from tqdm import tqdm from typing import IO from pathlib import Path # 指定保存路径为 `home_path/.mindspore_examples` cache_dir = Path.home() / '.mindspore_examples' def http_get(url: str, temp_file: IO): """使用requests库下载数据,并使用tqdm库进行流程可视化""" req = requests.get(url, stream=True) content_length = req.headers.get('Content-Length') total = int(content_length) if content_length is not None else None progress = tqdm(unit='B', total=total) for chunk in req.iter_content(chunk_size=1024): if chunk: progress.update(len(chunk)) temp_file.write(chunk) progress.close() def download(file_name: str, url: str): """下载数据并存为指定名称""" if not os.path.exists(cache_dir): os.makedirs(cache_dir) cache_path = os.path.join(cache_dir, file_name) cache_exist = os.path.exists(cache_path) if not cache_exist: with tempfile.NamedTemporaryFile() as temp_file: http_get(url, temp_file) temp_file.flush() temp_file.seek(0) with open(cache_path, 'wb') as cache_file: shutil.copyfileobj(temp_file, cache_file) return cache_path
数据集已分割为train和test两部分,且每部分包含neg和pos两个分类的文件夹,因此需分别train和test进行读取并处理数据和标签。
import re import six import string import tarfile class IMDBData(): """IMDB数据集加载器 加载IMDB数据集并处理为一个Python迭代对象。 """ label_map = { "pos": 1, "neg": 0 } def __init__(self, path, mode="train"): self.mode = mode self.path = path self.docs, self.labels = [], [] self._load("pos") self._load("neg") def _load(self, label): pattern = re.compile(r"aclImdb/{}/{}/.*\.txt$".format(self.mode, label)) # 将数据加载至内存 with tarfile.open(self.path) as tarf: tf = tarf.next() while tf is not None: if bool(pattern.match(tf.name)): # 对文本进行分词、去除标点和特殊字符、小写处理 self.docs.append(str(tarf.extractfile(tf).read().rstrip(six.b("\n\r")) .translate(None, six.b(string.punctuation)).lower()).split()) self.labels.append([self.label_map[label]]) tf = tarf.next() def __getitem__(self, idx): return self.docs[idx], self.labels[idx] def __len__(self): return len(self.docs)
预训练词向量是对输入单词的数值化表示,通过nn.Embedding层,采用查表的方式,输入单词对应词表中的index,获得对应的表达向量。 因此进行模型构造前,需要将Embedding层所需的词向量和词表进行构造。这里我们使用Glove(Global Vectors for Word Representation)这种经典的预训练词向量 直接使用第一列的单词作为词表,使用dataset.text.Vocab将其按顺序加载;同时读取每一行的Vector并转为numpy.array,用于nn.Embedding加载权重使用。具体实现如下:
import zipfile import numpy as np def load_glove(glove_path): glove_100d_path = os.path.join(cache_dir, 'glove.6B.100d.txt') if not os.path.exists(glove_100d_path): glove_zip = zipfile.ZipFile(glove_path) glove_zip.extractall(cache_dir) embeddings = [] tokens = [] with open(glove_100d_path, encoding='utf-8') as gf: for glove in gf: word, embedding = glove.split(maxsplit=1) tokens.append(word) embeddings.append(np.fromstring(embedding, dtype=np.float32, sep=' ')) # 添加 <unk>, <pad> 两个特殊占位符对应的embedding embeddings.append(np.random.rand(100)) embeddings.append(np.zeros((100,), np.float32)) vocab = ds.text.Vocab.from_list(tokens, special_tokens=["<unk>", "<pad>"], special_first=False) embeddings = np.array(embeddings).astype(np.float32) return vocab, embeddings
由于数据集中可能存在词表没有覆盖的单词,因此需要加入标记符;同时由于输入长度的不一致,在打包为一个batch时需要将短的文本进行填充,因此需要加入标记符。完成后的词表长度为原词表长度+2。
通过加载器加载的IMDB数据集进行了分词处理,但不满足构造训练数据的需要,因此要对其进行额外的预处理。其中包含的预处理如下:
这里我们使用mindspore.dataset中提供的接口进行预处理操作。这里使用到的接口均为MindSpore的高性能数据引擎设计,每个接口对应操作视作数据流水线的一部分,详情请参考MindSpore数据引擎。 首先针对token到index id的查表操作,使用text.Lookup接口,将前文构造的词表加载,并指定unknown_token。其次为文本序列统一长度操作,使用PadEnd接口,此接口定义最大长度和补齐值(pad_value),这里我们取最大长度为500,填充值对应词表中的index id。
除了对数据集中text进行预处理外,由于后续模型训练的需要,要将label数据转为float32格式。
import mindspore as ms
lookup_op = ds.text.Lookup(vocab, unknown_token='<unk>')
pad_op = ds.transforms.PadEnd([500], pad_value=vocab.tokens_to_ids('<pad>'))
type_cast_op = ds.transforms.TypeCast(ms.float32)
完成预处理操作后,需将其加入到数据集处理流水线中,使用map接口对指定的column添加操作。
imdb_train = imdb_train.map(operations=[lookup_op, pad_op], input_columns=['text'])
imdb_train = imdb_train.map(operations=[type_cast_op], input_columns=['label'])
imdb_test = imdb_test.map(operations=[lookup_op, pad_op], input_columns=['text'])
imdb_test = imdb_test.map(operations=[type_cast_op], input_columns=['label'])
手动将IMDB数据集分割为训练和验证两部分,比例取0.7, 0.3。
imdb_train, imdb_valid = imdb_train.split([0.7, 0.3])
完成数据集的处理后,我们设计用于情感分类的模型结构。首先需要将输入文本(即序列化后的index id列表)通过查表转为向量化表示,此时需要使用nn.Embedding层加载Glove词向量;然后使用RNN循环神经网络做特征提取;最后将RNN连接至一个全连接层,即nn.Dense,将特征转化为与分类数量相同的size,用于后续进行模型优化训练。整体模型结构如下:nn.Embedding -> nn.RNN -> nn.Dense,这里我们使用能够一定程度规避RNN梯度消失问题的变种LSTM(Long short-term memory)做特征提取层。
完成模型主体构建后,首先根据指定的参数实例化网络;然后选择损失函数和优化器。针对本节情感分类问题的特性,即预测Positive或Negative的二分类问题,我们选nn.BCEWithLogitsLoss(二分类交叉熵损失函数)。
在完成模型构建,进行训练逻辑的设计。一般训练逻辑分为一下步骤:
下面按照此逻辑,使用tqdm库,设计训练一个epoch的函数,用于训练过程和loss的可视化。
def forward_fn(data, label): logits = model(data) loss = loss_fn(logits, label) return loss grad_fn = ms.value_and_grad(forward_fn, None, optimizer.parameters) def train_step(data, label): loss, grads = grad_fn(data, label) optimizer(grads) return loss def train_one_epoch(model, train_dataset, epoch=0): model.set_train() total = train_dataset.get_dataset_size() loss_total = 0 step_total = 0 with tqdm(total=total) as t: t.set_description('Epoch %i' % epoch) for i in train_dataset.create_tuple_iterator(): loss = train_step(*i) loss_total += loss.asnumpy() step_total += 1 t.set_postfix(loss=loss_total/step_total) t.update(1)
训练逻辑完成后,需要对模型进行评估。即使用模型的预测结果和测试集的正确标签进行对比,求出预测的准确率。由于IMDB的情感分类为二分类问题,对预测值直接进行四舍五入即可获得分类标签(0或1),然后判断是否与正确标签相等即可。下面为二分类准确率计算函数实现:
def binary_accuracy(preds, y):
"""
计算每个batch的准确率
"""
# 对预测值进行四舍五入
rounded_preds = np.around(ops.sigmoid(preds).asnumpy())
correct = (rounded_preds == y).astype(np.float32)
acc = correct.sum() / len(correct)
return acc
有了准确率计算函数后,类似于训练逻辑,对评估逻辑进行设计, 分别为以下步骤:
同训练逻辑一样,使用tqdm进行loss和过程的可视化。此外返回评估loss至供保存模型时作为模型优劣的判断依据。
在进行evaluate时,使用的模型是不包含损失函数和优化器的网络主体; 在进行evaluate前,需要通过model.set_train(False)将模型置为评估状态,此时Dropout不生效。
def evaluate(model, test_dataset, criterion, epoch=0): total = test_dataset.get_dataset_size() epoch_loss = 0 epoch_acc = 0 step_total = 0 model.set_train(False) with tqdm(total=total) as t: t.set_description('Epoch %i' % epoch) for i in test_dataset.create_tuple_iterator(): predictions = model(i[0]) loss = criterion(predictions, i[1]) epoch_loss += loss.asnumpy() acc = binary_accuracy(predictions, i[1]) epoch_acc += acc step_total += 1 t.set_postfix(loss=epoch_loss/step_total, acc=epoch_acc/step_total) t.update(1) return epoch_loss / total
前序完成了模型构建和训练、评估逻辑的设计,下面进行模型训练。这里我们设置训练轮数为5轮。同时维护一个用于保存最优模型的变量best_valid_loss,根据每一轮评估的loss值,取loss值最小的轮次,将模型进行保存。为节省用例运行时长,此处num_epochs设置为2,可根据需要自行修改。
num_epochs = 2
best_valid_loss = float('inf')
ckpt_file_name = os.path.join(cache_dir, 'sentiment-analysis.ckpt')
for epoch in range(num_epochs):
train_one_epoch(model, imdb_train, epoch)
valid_loss = evaluate(model, imdb_valid, loss_fn, epoch)
if valid_loss < best_valid_loss:
best_valid_loss = valid_loss
ms.save_checkpoint(model, ckpt_file_name)
模型训练完成后,一般需要对模型进行测试或部署上线,此时需要加载已保存的最优模型(即checkpoint),供后续测试使用。这里我们直接使用MindSpore提供的Checkpoint加载和网络权重加载接口:1.将保存的模型Checkpoint加载到内存中,2.将Checkpoint加载至模型。
load_param_into_net接口会返回模型中没有和Checkpoint匹配的权重名,正确匹配时返回空列表。
param_dict = ms.load_checkpoint(ckpt_file_name)
ms.load_param_into_net(model, param_dict)
对测试集打batch,然后使用evaluate方法进行评估,得到模型在测试集上的效果。
imdb_test = imdb_test.batch(64)
evaluate(model, imdb_test, loss_fn)
最后我们设计一个预测函数,实现开头描述的效果,输入一句评价,获得评价的情感分类。具体包含以下步骤:
score_map = {
1: "Positive",
0: "Negative"
}
def predict_sentiment(model, vocab, sentence):
model.set_train(False)
tokenized = sentence.lower().split()
indexed = vocab.tokens_to_ids(tokenized)
tensor = ms.Tensor(indexed, ms.int32)
tensor = tensor.expand_dims(0)
prediction = model(tensor)
return score_map[int(np.round(ops.sigmoid(prediction).asnumpy()))]
predict_sentiment(model, vocab, "This film is terrible")
此章节学习到此结束,感谢昇思平台。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。