赞
踩
看pytorch的中文教程,里面有一个英语的聊天机器人教程的例子。其中运用了encode decode 加Global attention进行生成。
自己试着根据这个例子写了一个中文的,添加了Dataset, DataLoader处理读取数据更方便,也删减了一些处理数据的代码,更突出其中的网络结构。以此让自己更详细的理解一下其中的网络结构及注意力。同时把代码分为两部分,一部分训练,一部分聊天推理。
代码如下
训练代码
import torch import torch.nn as nn from torch import optim import torch.nn.functional as F from torch.utils.data import Dataset, DataLoader import os import itertools import json from tqdm import tqdm import sys # Default word tokens PAD_token = 0 # Used for padding short sentences SOS_token = 1 # Start-of-sentence token EOS_token = 2 # End-of-sentence token UNK_token = 3 class Voc: def __init__(self): self.word2index = {} self.word2count = {} self.index2word = {PAD_token: "PAD", SOS_token: "SOS", EOS_token: "EOS", UNK_token: "UNK"} self.num_words = 4 # Count SOS, EOS, PAD def addSentence(self, sentence): word_list = list(sentence) for word in word_list: self.addWord(word) def addWord(self, word): if word not in self.word2index: self.word2index[word] = self.num_words self.word2count[word] = 1 self.index2word[self.num_words] = word self.num_words += 1 else: self.word2count[word] += 1 def get_word_index(self, word): return self.word2index.get(word, 3)
读取文件的类型为json
内容格式应为
[
[
"speaker1*****************",
"speaker2*****************"
],
[
"speaker1*****************",
"speaker2*****************"
]
]
def loadPrepareData(datafile): print("准备读取文件并建立字典") with open(datafile, 'r', encoding='utf8')as f: pairs = json.load(f) # Read the file and split into lines voc = Voc() print("共读取{!s}对句子".format(len(pairs))) print("开始建立字典...") for pair in pairs: voc.addSentence(pair[0]) voc.addSentence(pair[1]) print("字典大小为{}字".format(voc.num_words)) return voc, pairs class PairsDataset(Dataset): def __init__(self, data, vocab): self.data = data self.vocab = vocab self.seq_len = len(data) def __len__(self): return self.seq_len def __getitem__(self, index): return self.data[index] def indexesFromSentence(voc, sentence): """将句子转换为索引值""" return [voc.get_word_index(word) for word in sentence.split(' ')] + [EOS_token] # zip 对数据进行合并了,相当于行列转置了 def zeroPadding(l, fillvalue=PAD_token): """将句子合并转置""" return list(itertools.zip_longest(*l, fillvalue=fillvalue)) # 记录 PAD_token的位置为0, 其他的为1 def binaryMatrix(l, value=PAD_token): """记录 PAD_token的位置为0, 其他的为1""" m = [] for i, seq in enumerate(l): m.append([]) for token in seq: if token == PAD_token: m[i].append(0) else: m[i].append(1) return m def inputVar(l, voc): """返回填充前(加入结束index EOS_token做标记)的长度 和 填充后的输入序列张量""" indexes_batch = [indexesFromSentence(voc, sentence) for sentence in l] lengths = torch.tensor([len(indexes) for indexes in indexes_batch]) padList = zeroPadding(indexes_batch) padVar = torch.LongTensor(padList) return padVar, lengths def outputVar(l, voc): """返回填充前(加入结束index EOS_token做标记)最长的一个长度 和 填充后的输出序列张量, 和 填充后的标记 mask""" indexes_batch = [indexesFromSentence(voc, sentence) for sentence in l] max_target_len = max([len(indexes) for indexes in indexes_batch]) padList = zeroPadding(indexes_batch) mask = binaryMatrix(padList) mask = torch.ByteTensor(mask).bool() padVar = torch.LongTensor(padList) return padVar, mask, max_target_len def batch2TrainData(voc, pair_batch): """ arg: inp: input_index, shape: max_input_length * batch_size lengths: input_length, shape:1 * batch_size output: output_index, shape: max_output_length * batch_size mask: bool PAD_token的位置为0,其他的为1, shape: max_output_length * batch_size max_target_len: max_output_length """ pair_batch.sort(key=lambda x: len(" ".join(list(x[0].replace(" ", ""))).split(" ")), reverse=True) input_batch, output_batch = [], [] for pair in pair_batch: input_batch.append(" ".join(list(pair[0].replace(" ", "")))) output_batch.append(" ".join(list(pair[1].replace(" ", "")))) inp, lengths = inputVar(input_batch, voc) output, mask, max_target_len = outputVar(output_batch, voc) return inp, lengths, output, mask, max_target_len class EncoderRNN(nn.Module): def __init__(self, hidden_size, embedding, n_layers=1, dropout=0): super(EncoderRNN, self).__init__() self.n_layers = n_layers self.hidden_size = hidden_size self.embedding = embedding # Initialize GRU; the input_size and hidden_size params are both set to 'hidden_size' # because our input size is a word embedding with number of features == hidden_size self.gru = nn.GRU(hidden_size, hidden_size, n_layers, dropout=(0 if n_layers == 1 else dropout), bidirectional=True) def forward(self, input_seq, input_lengths, hidden=None): # Convert word indexes to embeddings embedded = self.embedding(input_seq) # 按照长度降序排列 packed = torch.nn.utils.rnn.pack_padded_sequence(embedded, input_lengths) # Forward pass through GRU outputs, hidden = self.gru(packed, hidden) # Unpack padding outputs, _ = torch.nn.utils.rnn.pad_packed_sequence(outputs) # Sum bidirectional GRU outputs outputs = outputs[:, :, :self.hidden_size] + outputs[:, :, self.hidden_size:] # Return output and final hidden state return outputs, hidden # Luong attention layer class Attn(torch.nn.Module): def __init__(self, hidden_size): super(Attn, self).__init__() self.hidden_size = hidden_size def forward(self, hidden, encoder_outputs): # Calculate the attention weights (energies) based on the given method attn_energies = torch.sum(hidden * encoder_outputs, dim=2) # Transpose max_length and batch_size dimensions attn_energies = attn_energies.t() # Return the softmax normalized probability scores (with added dimension) return F.softmax(attn_energies, dim=1).unsqueeze(1) class LuongAttnDecoderRNN(nn.Module): def __init__(self, embedding, hidden_size, output_size, n_layers=1, dropout=0.1): super(LuongAttnDecoderRNN, self).__init__() # Keep for reference self.hidden_size = hidden_size self.output_size = output_size self.n_layers = n_layers self.dropout = dropout # Define layers self.embedding = embedding self.embedding_dropout = nn.Dropout(dropout) self.gru = nn.GRU(hidden_size, hidden_size, n_layers, dropout=(0 if n_layers == 1 else dropout)) self.concat = nn.Linear(hidden_size * 2, hidden_size) self.out = nn.Linear(hidden_size, output_size) self.attn = Attn(hidden_size) def forward(self, input_step, last_hidden, encoder_outputs): # Note: we run this one step (word) at a time # Get embedding of current input word embedded = self.embedding(input_step) embedded = self.embedding_dropout(embedded) # Forward through unidirectional GRU rnn_output, hidden = self.gru(embedded, last_hidden) # Calculate attention weights from the current GRU output attn_weights = self.attn(rnn_output, encoder_outputs) # Multiply attention weights to encoder outputs to get new "weighted sum" context vector context = attn_weights.bmm(encoder_outputs.transpose(0, 1)) # Concatenate weighted context vector and GRU output using Luong eq. 5 rnn_output = rnn_output.squeeze(0) context = context.squeeze(1) concat_input = torch.cat((rnn_output, context), 1) concat_output = torch.tanh(self.concat(concat_input)) # Predict next word using Luong eq. 6 output = self.out(concat_output) output = F.softmax(output, dim=1) # Return output and final hidden state return output, hidden def maskNLLLoss(inp, target, mask): nTotal = mask.sum() crossEntropy = -torch.log(torch.gather(inp, 1, target.view(-1, 1)).squeeze(1)) loss = crossEntropy.masked_select(mask).mean() loss = loss.to(device) return loss, nTotal.item() def train(): print_losses = [] n_totals = 0 pbar = tqdm(train_loader) for pairs in pbar: pairs = list(zip(*pairs)) encoder_optimizer.zero_grad() decoder_optimizer.zero_grad() input_variable, lengths, target_variable, mask, max_target_len = batch2TrainData(voc, pairs) # Set device options input_variable = input_variable.to(device) lengths = lengths.to(device) target_variable = target_variable.to(device) mask = mask.to(device) # Forward pass through encoder encoder_outputs, encoder_hidden = encoder(input_variable, lengths) # Create initial decoder input (start with SOS tokens for each sentence) decoder_input = torch.LongTensor([[SOS_token for _ in range(batch_size)]]) decoder_input = decoder_input.to(device) # Set initial decoder hidden state to the encoder's final hidden state decoder_hidden = encoder_hidden[:decoder.n_layers] # Initialize variables loss = 0 # Forward batch of sequences through decoder one time step at a time for t in range(max_target_len): decoder_output, decoder_hidden = decoder( decoder_input, decoder_hidden, encoder_outputs ) # Teacher forcing: next input is current target decoder_input = target_variable[t].view(1, -1) # Calculate and accumulate loss # mask_loss平均每个字符的loss, nTotal总共字符数 mask_loss, nTotal = maskNLLLoss(decoder_output, target_variable[t], mask[t]) loss += mask_loss # 一个batch_size的总计损失 print_losses.append(mask_loss.item() * nTotal) n_totals += nTotal # Perform backpropatation loss.backward() # Clip gradients: gradients are modified in place _ = torch.nn.utils.clip_grad_norm_(encoder.parameters(), clip) _ = torch.nn.utils.clip_grad_norm_(decoder.parameters(), clip) # Adjust model weights encoder_optimizer.step() decoder_optimizer.step() pbar.set_description(f'epoch:{epoch} loss:{sum(print_losses) / n_totals:.3f}') return sum(print_losses) / n_totals if __name__ == '__main__': USE_CUDA = False device = torch.device("cuda" if USE_CUDA else "cpu") datafile = "./content.json" voc, pairs = loadPrepareData(datafile) hidden_size = 500 encoder_n_layers = 1 decoder_n_layers = 1 dropout = 0.1 batch_size = 2 embedding = nn.Embedding(voc.num_words, hidden_size) encoder = EncoderRNN(hidden_size, embedding, encoder_n_layers, dropout) decoder = LuongAttnDecoderRNN(embedding, hidden_size, voc.num_words, decoder_n_layers, dropout) encoder = encoder.to(device) decoder = decoder.to(device) # Configure training/optimization nums_epoch = 30 clip = 50.0 learning_rate = 0.0001 decoder_learning_ratio = 5.0 # Ensure dropout layers are in train mode encoder.train() decoder.train() # Initialize optimizers encoder_optimizer = optim.Adam(encoder.parameters(), lr=learning_rate) decoder_optimizer = optim.Adam(decoder.parameters(), lr=learning_rate * decoder_learning_ratio) train_dataset = PairsDataset(pairs, voc) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False) # Run training iterations print("开始训练!") for epoch in range(nums_epoch): loss = train() torch.save({ 'epoch': epoch, 'en': encoder.state_dict(), 'de': decoder.state_dict(), 'en_opt': encoder_optimizer.state_dict(), 'de_opt': decoder_optimizer.state_dict(), 'loss': loss, 'voc_dict': voc.__dict__, 'embedding': embedding.state_dict() }, os.path.join('./', '{}_loss{:.3f}_{}.tar'.format(epoch, loss, 'checkpoint')))
聊天推理代码
import torch import torch.nn as nn import torch.nn.functional as F # Default word tokens PAD_token = 0 # Used for padding short sentences SOS_token = 1 # Start-of-sentence token EOS_token = 2 # End-of-sentence token UNK_token = 3 class EncoderRNN(nn.Module): def __init__(self, hidden_size, embedding, n_layers=1, dropout=0): super(EncoderRNN, self).__init__() self.n_layers = n_layers self.hidden_size = hidden_size self.embedding = embedding # Initialize GRU; the input_size and hidden_size params are both set to 'hidden_size' # because our input size is a word embedding with number of features == hidden_size self.gru = nn.GRU(hidden_size, hidden_size, n_layers, dropout=(0 if n_layers == 1 else dropout), bidirectional=True) def forward(self, input_seq, input_lengths, hidden=None): # Convert word indexes to embeddings embedded = self.embedding(input_seq) # 按照长度降序排列 packed = torch.nn.utils.rnn.pack_padded_sequence(embedded, input_lengths) # Forward pass through GRU outputs, hidden = self.gru(packed, hidden) # Unpack padding outputs, _ = torch.nn.utils.rnn.pad_packed_sequence(outputs) # Sum bidirectional GRU outputs outputs = outputs[:, :, :self.hidden_size] + outputs[:, :, self.hidden_size:] # Return output and final hidden state return outputs, hidden # Luong attention layer class Attn(torch.nn.Module): def __init__(self, hidden_size): super(Attn, self).__init__() self.hidden_size = hidden_size def forward(self, hidden, encoder_outputs): # Calculate the attention weights (energies) based on the given method attn_energies = torch.sum(hidden * encoder_outputs, dim=2) # Transpose max_length and batch_size dimensions attn_energies = attn_energies.t() # Return the softmax normalized probability scores (with added dimension) return F.softmax(attn_energies, dim=1).unsqueeze(1) class LuongAttnDecoderRNN(nn.Module): def __init__(self, embedding, hidden_size, output_size, n_layers=1, dropout=0.1): super(LuongAttnDecoderRNN, self).__init__() # Keep for reference self.hidden_size = hidden_size self.output_size = output_size self.n_layers = n_layers self.dropout = dropout # Define layers self.embedding = embedding self.embedding_dropout = nn.Dropout(dropout) self.gru = nn.GRU(hidden_size, hidden_size, n_layers, dropout=(0 if n_layers == 1 else dropout)) self.concat = nn.Linear(hidden_size * 2, hidden_size) self.out = nn.Linear(hidden_size, output_size) self.attn = Attn(hidden_size) def forward(self, input_step, last_hidden, encoder_outputs): # Note: we run this one step (word) at a time # Get embedding of current input word embedded = self.embedding(input_step) embedded = self.embedding_dropout(embedded) # Forward through unidirectional GRU rnn_output, hidden = self.gru(embedded, last_hidden) # Calculate attention weights from the current GRU output attn_weights = self.attn(rnn_output, encoder_outputs) # Multiply attention weights to encoder outputs to get new "weighted sum" context vector context = attn_weights.bmm(encoder_outputs.transpose(0, 1)) # Concatenate weighted context vector and GRU output using Luong eq. 5 rnn_output = rnn_output.squeeze(0) context = context.squeeze(1) concat_input = torch.cat((rnn_output, context), 1) concat_output = torch.tanh(self.concat(concat_input)) # Predict next word using Luong eq. 6 output = self.out(concat_output) output = F.softmax(output, dim=1) # Return output and final hidden state return output, hidden class GreedySearchDecoder(nn.Module): def __init__(self, encoder, decoder): super(GreedySearchDecoder, self).__init__() self.encoder = encoder self.decoder = decoder def forward(self, input_seq, input_length, max_length): # Forward input through encoder model encoder_outputs, encoder_hidden = self.encoder(input_seq, input_length) # Prepare encoder's final hidden layer to be first hidden input to the decoder decoder_hidden = encoder_hidden[:decoder.n_layers] # Initialize decoder input with SOS_token decoder_input = torch.ones(1, 1, device=device, dtype=torch.long) * SOS_token # Initialize tensors to append decoded words to all_tokens = torch.zeros([0], device=device, dtype=torch.long) all_scores = torch.zeros([0], device=device) # Iteratively decode one word token at a time for _ in range(max_length): # Forward pass through decoder decoder_output, decoder_hidden = self.decoder(decoder_input, decoder_hidden, encoder_outputs) # Obtain most likely word token and its softmax score decoder_scores, decoder_input = torch.max(decoder_output, dim=1) # Record token and score all_tokens = torch.cat((all_tokens, decoder_input), dim=0) all_scores = torch.cat((all_scores, decoder_scores), dim=0) # Prepare current token to be next decoder input (add a dimension) decoder_input = torch.unsqueeze(decoder_input, 0) # Return collections of word tokens and scores return all_tokens, all_scores def indexesFromSentence(voc, sentence): """将句子转换为索引值""" return [voc.get_word_index(word) for word in sentence] + [EOS_token] def evaluate(searcher, voc, sentence, max_length=300): ### Format input sentence as a batch # words -> indexes indexes_batch = [indexesFromSentence(voc, sentence)] # Create lengths tensor lengths = torch.tensor([len(indexes) for indexes in indexes_batch]) # Transpose dimensions of batch to match models' expectations input_batch = torch.LongTensor(indexes_batch).transpose(0, 1) # Use appropriate device input_batch = input_batch.to(device) lengths = lengths.to(device) # Decode sentence with searcher tokens, scores = searcher(input_batch, lengths, max_length) # indexes -> words decoded_words = [voc.index2word[token.item()] for token in tokens] return decoded_words def evaluateInput(searcher, voc): while True: try: # Get input sentence input_sentence = input('请输入:') # Check if it is quit case if input_sentence == 'q' or input_sentence == 'quit': break # Evaluate sentence output_words = evaluate(searcher, voc, input_sentence) # Format and print response sentence output_words[:] = [x for x in output_words if not (x == 'EOS' or x == 'PAD')] print('机器人:', ''.join(output_words)) except KeyError: print("Error: Encountered unknown word.") class Voc: def __init__(self): self.word2index = {} self.word2count = {} self.index2word = {PAD_token: "PAD", SOS_token: "SOS", EOS_token: "EOS", UNK_token: "UNK"} self.num_words = 4 # Count SOS, EOS, PAD def addSentence(self, sentence): word_list = list(sentence) for word in word_list: self.addWord(word) def addWord(self, word): if word not in self.word2index: self.word2index[word] = self.num_words self.word2count[word] = 1 self.index2word[self.num_words] = word self.num_words += 1 else: self.word2count[word] += 1 def get_word_index(self, word): return self.word2index.get(word, 3) if __name__ == '__main__': USE_CUDA = False device = torch.device("cuda" if USE_CUDA else "cpu") loadFilename = './9checkpoint.tar' checkpoint = torch.load(loadFilename) hidden_size = 500 encoder_n_layers = 1 decoder_n_layers = 1 dropout = 0.1 voc = Voc() voc.__dict__ = checkpoint['voc_dict'] embedding = nn.Embedding(voc.num_words, hidden_size) embedding.load_state_dict(checkpoint['embedding']) encoder = EncoderRNN(hidden_size, embedding, encoder_n_layers, dropout) decoder = LuongAttnDecoderRNN(embedding, hidden_size, voc.num_words, decoder_n_layers, dropout) encoder.load_state_dict(checkpoint['en']) decoder.load_state_dict(checkpoint['de']) encoder = encoder.to(device) decoder = decoder.to(device) # Set dropout layers to eval mode encoder.eval() decoder.eval() # Initialize search module searcher = GreedySearchDecoder(encoder, decoder) # Begin chatting (uncomment and run the following line to begin) evaluateInput(searcher, voc)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。