from io import open
import unicodedata
import string
import re
import random
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
- datapre.py实现了数据的预处理和数据集的准备
- evaluate.py定义了一些评估方法,用于测试
- seq2seq_model.py搭建了encoder和attention decoder,均采用GRU实现
- test.py加载训练好的模型,实现翻译
- train.py顾名思义就是训练脚本
- util.py包含了一些用到的工具函数,如绘图等
I see. Je comprends.
单词token化,主要是建立word → index的映射。
SOS_token = 0 EOS_token = 1 # 建一个辅助类Lang # 包含 word → index ( word2index) 和 index → word ( index2word) 字典,以及每个单词的计数word2count, class Lang: def __init__(self, name): self.name = name self.word2index = {} self.word2count = {} self.index2word = {0: "SOS", 1: "EOS"} self.n_words = 2 # Count SOS and EOS def addSentence(self, sentence): for word in sentence.split(' '): self.addWord(word) def addWord(self, word): if word not in self.word2index: self.word2index[word] = self.n_words self.word2count[word] = 1 self.index2word[self.n_words] = word self.n_words += 1 else: self.word2count[word] += 1
# 统一编码为ascii
def unicodeToAscii(s):
return ''.join(
c for c in unicodedata.normalize('NFD', s)
if unicodedata.category(c) != 'Mn'
# 统一小写、修剪和删除非字母字符
def normalizeString(s):
s = unicodeToAscii(s.lower().strip())
s = re.sub(r"([.!?])", r" \1", s)
s = re.sub(r"[^a-zA-Z.!?]+", r" ", s)
return s
''' 为了读取数据文件,我们将文件分割成行,然后将行分割成对。 数据集都是英语→其他语言,所以如果我们想从其他语言→英语翻译,添加reverse 标志来反转对。 ''' def readLangs(language1, language2, reverse=False): print("Reading lines...") # 分行 lines = open('data/%s-%s.txt' % (language1, language2), encoding='utf-8'). \ read().strip().split('\n') # 将每行分割成语言对,并正则化 pairs = [[normalizeString(s) for s in l.split('\t')] for l in lines] # 反转语言对 if reverse: pairs = [list(reversed(p)) for p in pairs] input_lang = Lang(language2) output_lang = Lang(language1) else: input_lang = Lang(language1) output_lang = Lang(language2) return input_lang, output_lang, pairs
代码设定句子的最大长度为 10 个单词(包括结尾标点符号)。
def filterPair(p):
return len(p[0].split(' ')) < MAX_LENGTH and len(p[1].split(' ')) < MAX_LENGTH
def filterPairs(pairs):
return [pair for pair in pairs if filterPair(pair)]
def prepareData(lang1, lang2, reverse=False):
input_lang, output_lang, pairs = readLangs(lang1, lang2, reverse)
print("Read %s sentence pairs" % len(pairs))
pairs = filterPairs(pairs)
print("Trimmed to %s sentence pairs" % len(pairs))
print("Counting words...")
for pair in pairs:
print("Counted words:")
print(input_lang.name, input_lang.n_words)
print(output_lang.name, output_lang.n_words)
return input_lang, output_lang, pairs
seq2seq 网络的编码器是一个 RNN,对于每个输入单词,编码器输出一个词向量和一个隐藏状态,并将隐藏状态用于下一个GRU的输入状态。
class EncoderRNN(nn.Module): def __init__(self, input_size, hidden_size): super(EncoderRNN, self).__init__() self.hidden_size = hidden_size self.embedding = nn.Embedding(input_size, hidden_size) self.gru = nn.GRU(hidden_size, hidden_size) def forward(self, input, hidden): embedded = self.embedding(input).view(1, 1, -1) # 维度调整为1*1*n output = embedded output, hidden = self.gru(output, hidden) # 获取每个GRU的输出和隐藏状态,用于后续计算attention return output, hidden def initHidden(self): return torch.zeros(1, 1, self.hidden_size, device=device)
(2)attention decoder
(embedding): Embedding(10, 256)
(attn): Linear(in_features=512, out_features=10, bias=True)
(attn_combine): Linear(in_features=512, out_features=256, bias=True)
(dropout): Dropout(p=0.1, inplace=False)
(gru): GRU(256, 256)
(out): Linear(in_features=256, out_features=10, bias=True)
class AttnDecoderRNN(nn.Module): def __init__(self, hidden_size, output_size, dropout_p=0.1, max_length=MAX_LENGTH): super(AttnDecoderRNN, self).__init__() self.hidden_size = hidden_size self.output_size = output_size self.dropout_p = dropout_p self.max_length = max_length self.embedding = nn.Embedding(self.output_size, self.hidden_size) self.attn = nn.Linear(self.hidden_size * 2, self.max_length) # 全连接层 self.attn_combine = nn.Linear(self.hidden_size * 2, self.hidden_size) self.dropout = nn.Dropout(self.dropout_p) self.gru = nn.GRU(self.hidden_size, self.hidden_size) self.out = nn.Linear(self.hidden_size, self.output_size) def forward(self, input, hidden, encoder_outputs): # 先把输入embedding embedded = self.embedding(input).view(1, 1, -1) # dropout防止过拟合 embedded = self.dropout(embedded) # 计算注意力权重 attn_weights = F.softmax(self.attn(torch.cat((embedded[0], hidden[0]), 1)), dim=1) # 矩阵相乘,用注意力权重乘以编码输出 attn_applied = torch.bmm(attn_weights.unsqueeze(0), encoder_outputs.unsqueeze(0)) # 将输入的embedding层和注意力层拼接,按维数1拼接(横着拼) output = torch.cat((embedded[0], attn_applied[0]), 1) # 拼好后加个全连接层然后压缩维度0。 output = self.attn_combine(output).unsqueeze(0) # 激活函数 output = F.relu(output) # 输入GRU output, hidden = self.gru(output, hidden) output = F.log_softmax(self.out(output[0]), dim=1) return output, hidden, attn_weights def initHidden(self): return torch.zeros(1, 1, self.hidden_size, device=device)
# 获取句子中每个单词的索引,返回的是索引序列
def indexesFromSentence(lang, sentence):
return [lang.word2index[word] for word in sentence.split(' ')]
# 根据索引序列建立张量
def tensorFromSentence(lang, sentence):
indexes = indexesFromSentence(lang, sentence)
return torch.tensor(indexes, dtype=torch.long, device=device).view(-1, 1)
# 输入张量是输入句子中单词的索引,输出张量是目标句子中单词的索引
def tensorsFromPair(pair):
input_tensor = tensorFromSentence(input_lang, pair[0])
target_tensor = tensorFromSentence(output_lang, pair[1])
return (input_tensor, target_tensor)
teacher_forcing_ratio = 0.5 ''' 为了训练,我们通过编码器运行输入句子,并跟踪每个输出和最新的隐藏状态。 然后解码器被赋予<SOS>令牌作为它的第一个输入,编码器的最后一个隐藏状态作为它的第一个隐藏状态。 ''' def train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, max_length=MAX_LENGTH): encoder_hidden = encoder.initHidden() encoder_optimizer.zero_grad() decoder_optimizer.zero_grad() input_length = input_tensor.size(0) target_length = target_tensor.size(0) encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device) loss = 0 # 获取编码器的每个输出和隐藏状态,用于计算注意力权重 for ei in range(input_length): encoder_output, encoder_hidden = encoder( input_tensor[ei], encoder_hidden) encoder_outputs[ei] = encoder_output[0, 0] decoder_input = torch.tensor([[SOS_token]], device=device) # 解码器第一个隐藏状态是编码器输出的隐藏状态 decoder_hidden = encoder_hidden # 训练可以使用“Teacher forcing”策略:使用真实目标输出作为下一个输入,而不是使用解码器的猜测作为下一个输入。 # 使用Teacher forcing会使模型收敛更快,但使用训练得到的网络时,可能会表现出不稳定。 use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False if use_teacher_forcing: # 将目标单词作为下一个解码输入 for di in range(target_length): decoder_output, decoder_hidden, decoder_attention = decoder( decoder_input, decoder_hidden, encoder_outputs) loss += criterion(decoder_output, target_tensor[di]) decoder_input = target_tensor[di] # Teacher forcing else: # 用预测结果作为下一个解码输入 for di in range(target_length): decoder_output, decoder_hidden, decoder_attention = decoder( decoder_input, decoder_hidden, encoder_outputs) topv, topi = decoder_output.topk(1) decoder_input = topi.squeeze().detach() # detach from history as input loss += criterion(decoder_output, target_tensor[di]) # 遇到终止符号就退出解码 if decoder_input.item() == EOS_token: break # 反向传播 loss.backward() encoder_optimizer.step() decoder_optimizer.step() return loss.item() / target_length
''' @函数名:迭代训练 @参数说明: encoder:编码器 decoder:解码器 n_iters:训练迭代次数 print_every:多少代输出一次训练信息 plot_every:多少代绘制一下图 learning_rate:学习率 ''' def trainIters(encoder, decoder, n_iters, print_every=1000, plot_every=100, learning_rate=0.01): start = time.time() plot_losses = [] print_loss_total = 0 # Reset every print_every plot_loss_total = 0 # Reset every plot_every # 优化器用SGD encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate) decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate) training_pairs = [tensorsFromPair(random.choice(pairs)) for i in range(n_iters)] # 因为模型的输出已经进行了log和softmax,因此这里损失韩式只用NLL,三者结合起来就算二元交叉熵损失 criterion = nn.NLLLoss() for iter in range(1, n_iters + 1): training_pair = training_pairs[iter - 1] input_tensor = training_pair[0] target_tensor = training_pair[1] loss = train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion) print_loss_total += loss plot_loss_total += loss if iter % print_every == 0: print_loss_avg = print_loss_total / print_every print_loss_total = 0 print('epoch:%d %s (%d%%) loss:%.4f' % (iter, timeSince(start, iter / n_iters), iter / n_iters * 100, print_loss_avg)) if iter % plot_every == 0: plot_loss_avg = plot_loss_total / plot_every plot_losses.append(plot_loss_avg) plot_loss_total = 0 showPlot(plot_losses)
