"])">
赞
踩
实例代码网址:https://pytorch.org/tutorials/beginner/text_sentiment_ngrams_tutorial.html
注意:代码适用于jupyter notebook分块运行
- import torch
- from torchtext.datasets import AG_NEWS # 导入数据集
-
- train_iter = iter(AG_NEWS(split="train")) # 构建训练数据集的迭代器,节约内存
- next(train_iter) # 打印第一个元素(即第一个数据对)查看
- from torchtext.data.utils import get_tokenizer # 用于对句子进行分词,返回一个列表
- from torchtext.vocab import build_vocab_from_iterator # 用于生成文本文件(这里是训练集)对应的字典,需要输入一个生成器对象帮助返回文本文件中的所有单词
-
- tokenizer = get_tokenizer("basic_english") # 基于英语的分词器
- train_iter = AG_NEWS(split="train") # 训练数据集
-
- def yield_tokens(data_iter):
- """对每个训练数据集中的句子进行分词并返回分词对应的列表"""
- for _, text in data_iter:
- yield tokenizer(text)
-
- vocab = build_vocab_from_iterator(yield_tokens(train_iter), specials=['<unk>']) # 建立训练数据集的词表,并添加一个特殊字符"<unk>"
- vocab.set_default_index(vocab['<unk>']) # 给特殊字符单独设置索引
- vocab(['here', 'is', 'an', 'example']) # 查看这4个单词在字典中对应的索引
- # 准备文本和标签的处理器,即给文本(根据词表)和标签设置索引
- text_pipeline = lambda x: vocab(tokenizer(x))
- label_pipeline = lambda x: int(x) - 1
- from torch.utils.data import DataLoader # 数据加载类
-
- device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # 根据电脑情况设置使用gpu还是cpu进行训练
-
- def collate_batch(batch):
- """数据加载器需要的加载batch的方式函数,即如何把一个原始的batch数据转换成一个可以训练的batch"""
- label_list, text_list, offsets = [], [], [0]
- for _label, _text in batch:
- label_list.append(label_pipeline(_label)) # 把标签转换成标签索引并且加入标签列表中
- processed_text = torch.tensor(text_pipeline(_text), dtype=torch.int64) # 先把文本转换成字典中对应的索引,再把索引换成张量,得到处理过后的句子向量
- text_list.append(processed_text) # 把文本(一句话)向量加入文本列表中
- offsets.append(processed_text.size(0)) # 把文本向量的大小(即这个向量有多少个元素,对应句子中有多少个)作为偏移值
- label_list = torch.tensor(label_list, dtype=torch.int64) # 把列表转换成张量
- offsets = torch.tensor(offsets[:-1]).cumsum(dim=0) # 把偏移值转换成张量并在第一个维度求和
- text_list = torch.cat(text_list) # 对文本的张量进行拼接,默认在第一个维度进行
- return label_list.to(device), text_list.to(device), offsets.to(device) # 把标签、文本和偏移值全部转换成适合对应设备的
-
- train_iter = AG_NEWS(split="train") # 获取原始的训练数据集
- # 定义训练数据集的数据加载类
- dataloader = DataLoader(
- train_iter, batch_size=8, shuffle=False, collate_fn=collate_batch
- )
- from torch import nn
-
- class TextClassificationModel(nn.Module):
- def __init__(self, vocab_size, embed_dim, num_class):
- super(TextClassificationModel, self).__init__() # 继承nn.Module
- self.embedding = nn.EmbeddingBag(vocab_size, embed_dim, sparse=False) # 设置词嵌入层,其中第一个参数是指字典的大小;第二个参数是设置每个单词转换成的向量的维度
- self.fc = nn.Linear(embed_dim, num_class) # 线性层,进行维度的转换(把文本向量从最初嵌入时的维度大小转换成类别的维度大小)
- self.init_weights() # 设置初始权重
-
- def init_weights(self):
- initrange = 0.5 # 设置初始权重的范围是[-0.5, 0.5]
- self.embedding.weight.data.uniform_(-initrange, initrange) # 初始化词嵌入层的权重
- self.fc.weight.data.uniform_(-initrange, initrange) # 初始化线性层的权重
- self.fc.bias.data.zero_() # 初始化线性层的偏置
-
- def forward(self, text, offsets):
- embedded = self.embedding(text, offsets) # 得到文本的嵌入表示,即向量
- return self.fc(embedded) # 让文本向量经过线性层,即得到文本分类的结果
-
- train_iter = AG_NEWS(split="train") # 获得原始数据集
- num_class = len(set([label for (label, text) in train_iter])) # 获得数据集中标签类别的个数
- vocab_size = len(vocab) # 获得字典的大小
- emsize = 64 # 设置每个单词转换成向量的维度
- model = TextClassificationModel(vocab_size, emsize, num_class).to(device) # 创建模型
- import time
-
- def train(dataloader):
- model.train() # 模型进入训练模式
- total_acc, total_count = 0, 0 # 总共预测对几个标签和一共预测了多少个标签,用于计算准确率
- log_interval = 500 # 每次预测500个时就打印一次
- start_time = time.time() # 模型开始训练的时间
-
- for idx, (label, text, offsets) in enumerate(dataloader):
- optimizer.zero_grad() # 清除上次的梯度
- predicted_label = model(text, offsets) # 预测标签
- loss = criterion(predicted_label, label) # 计算损失
- loss.backward() # 反向传播
- torch.nn.utils.clip_grad_norm(model.parameters(), 0.1) # 计算梯度
- optimizer.step() # 更新模型参数
- total_acc += (predicted_label.argmax(1) == label).sum().item() # 指定预测的标签值第二个维度中的最大值所在的索引,与真实标签的索引进行对比是否一样(结果只有0和1),然后求和得到一个张量值,最后将其转换成标量作为这次训练的准确率
- total_count += label.size(0) # 这里就是8,一个batch中有8个label
- if idx % log_interval == 0 and idx > 0:
- elapsed = time.time() - start_time # 计算每500次花费的时间
- print(
- "| epoch {:3d} | {:5d}/{:5d} batches"
- "| accuracy {:8.3f}".format(
- epoch, idx, len(dataloader), total_acc / total_count
- )
- )
- total_acc, total_count = 0, 0 # 满500次重新计算这2个指标
- start_time = time.time() # 满500次重新计算运行时间
-
- def evaluate(dataloader):
- model.eval()
- total_acc, total_count = 0, 0
-
- with torch.no_grad(): # 不计算梯度,因为只用于预测
- for idx, (label, text, offsets) in enumerate(dataloader):
- predicted_label = model(text, offsets) # 预测标签
- loss = criterion(predicted_label, label) # 计算损失
- total_acc += (predicted_label.argmax(1) == label).sum().item() # 计算正确预测数量
- total_count += label.size(0) # 计算总的预测数量
-
- return total_acc / total_count # 计算验证时的准确率
- from torch.utils.data.dataset import random_split # 将数据集切分成不重复的部分
- from torchtext.data.functional import to_map_style_dataset # 将可迭代的数据集转换成映射格式的数据集
-
- # 超参数
- EPOCHS = 10 # 所有batch重复迭代的次数
- LR = 5 # 学习率
- BATCH_SIZE = 64 # 每个batch的大小
-
- criterion = torch.nn.CrossEntropyLoss() # 交叉熵损失函数
- optimizer = torch.optim.SGD(model.parameters(), lr = LR) # 采用随机梯度下降优化算法
- scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.1) # 对随机梯度下降采用学习率动态调整策略,gamma是学习率指数衰减的底数(还有其他调整策略如步长衰减等)
- total_accu = None
- train_iter, test_iter = AG_NEWS() # 加载训练数据集合测试数据集
- train_dataset = to_map_style_dataset(train_iter) # 将训练数据集转换成映射格式
- test_dataset = to_map_style_dataset(test_iter) # 将测试数据集转化成映射格式
- num_train = int(len(train_dataset) * 0.95) # 将95%的训练数据集用于训练
- split_train, split_valid = random_split(
- train_dataset, [num_train, len(train_dataset) - num_train]
- ) # 随机切分,95%的训练数据集用于训练,剩余5%用于验证
-
- train_dataloader = DataLoader(
- split_train, batch_size = BATCH_SIZE, shuffle = True, collate_fn = collate_batch
- ) # 训练数据集
- valid_dataloader = DataLoader(
- split_valid, batch_size=BATCH_SIZE, shuffle = True, collate_fn = collate_batch
- ) # 验证训练集(用于反映真实的训练效果)
- test_dataloader = DataLoader(
- test_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_batch
- ) # 测试数据集(直接用于预测,不计算准确率)
-
- for epoch in range(1, EPOCHS + 1):
- epoch_start_time = time.time()
- train(train_dataloader) # 训练模型
- accu_val = evaluate(valid_dataloader) # 验证模型
- if total_accu is not None and total_accu > accu_val:
- scheduler.step() # 当新一次的验证准确率没有上一次验证时的学习率高,则让学习率动态衰减以提升下次训练的效果
- else:
- total_accu = accu_val
- print("-" * 59)
- print(
- "| end of epoch {:3d} | time: {:5.2f}s |"
- "| valid accuracy {:8.3f}".format(
- epoch, time.time() - epoch_start_time, accu_val
- )
- )
- print("-" * 59)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。