赞
踩
一、Seq2Seq的原理
Sequence to sequence (seq2seq)是由encoder(编码器)和decoder(解码器)两个RNN的组成的。其中encoder负责对输入句子的理解,转化为context vector,decoder负责对理解后的句子的向量进行处理,解码,获得输出。上述的过程和我们大脑理解东西的过程很相似,听到一句话,理解之后,尝试组装答案,进行回答。那么此时,就有一个问题,在encoder的过程中得到的context vector作为decoder的输入,那么这样一个输入,怎么能够得到多个输出呢?其实就是当前一步的输出,作为下一个单元的输入,然后得到结果。
- outputs = []
- while True:
- output = decoderd(output)
- outputs.append(output)
在训练数据集中,可以再输出的最后面添加一个结束符<END>
,如果遇到该结束符,则可以终止循环。
- outputs = []
- while output!="<END>":
- output = decoderd(output)
- outputs.append(output)
Seq2seq模型中的encoder接受一个长度为M的序列,得到1个 context vector,之后decoder把这一个context vector转化为长度为N的序列作为输出,从而构成一个M to N的模型,能够处理很多不定长输入输出的问题,比如:文本翻译,问答,文章摘要,关键字写诗等等
二、Seq2Seq模型的实现
2.1.模型需求及实现流程
需求:完成一个模型,实现往模型输入一串数字,输出这串数字+0
例如:
输入123456789,输出1234567890;
输入52555568,输出525555680;
流程:
首先文本转化为序列,使用序列,准备数据集,准备Dataloader。然后完成编码器和解码器。然后完成seq2seq模型。然后完成模型训练的逻辑,进行训练。然后完成模型评估的逻辑,进行模型评估。
2.2.模型的实现
1.创建配置文件(config.py)
- batch_size = 512
- max_len = 10
- dropout = 0
- embedding_dim = 100
- hidden_size = 64
2.文本转化为序列(word_sequence.py)
由于输入的是数字,为了把这写数字和词典中的真实数字进行对应,可以把这些数字理解为字符串。所以需要先把字符串对应为数字,然后把数字转化为字符串。
- class NumSequence:
- UNK_TAG = "UNK"
- PAD_TAG = "PAD"
- EOS_TAG = "EOS"
- SOS_TAG = "SOS"
- UNK = 0
- PAD = 1
- EOS = 2
- SOS = 3
- def __init__(self):
- self.dict = {
- self.UNK_TAG : self.UNK,
- self.PAD_TAG : self.PAD,
- self.EOS_TAG : self.EOS,
- self.SOS_TAG : self.SOS
- }
- for i in range(10):
- self.dict[str(i)] = len(self.dict)
- self.index2word = dict(zip(self.dict.values(),self.dict.keys()))
- def __len__(self):
- return len(self.dict)
- def transform(self,sequence,max_len=None,add_eos=False):
- sequence_list = list(str(sequence))
- seq_len = len(sequence_list)+1 if add_eos else len(sequence_list)
-
- if add_eos and max_len is not None:
- assert max_len>= seq_len, "max_len 需要大于seq+eos的长度"
- _sequence_index = [self.dict.get(i,self.UNK) for i in sequence_list]
- if add_eos:
- _sequence_index += [self.EOS]
- if max_len is not None:
- sequence_index = [self.PAD]*max_len
- sequence_index[:seq_len] = _sequence_index
- return sequence_index
- else:
- return _sequence_index
- def inverse_transform(self,sequence_index):
- result = []
- for i in sequence_index:
- if i==self.EOS:
- break
- result.append(self.index2word.get(int(i),self.UNK_TAG))
- return result
- # 实例化
- num_sequence = NumSequence()
- if __name__ == '__main__':
- num_sequence = NumSequence()
- print(num_sequence.dict)
- print(num_sequence.index2word)
- print(num_sequence.transform("1231230",add_eos=True))
3.数据集(dataset.py)
随机创建[0,100000000]的整型,准备数据集,运行程序可以看到大部分的数字长度为8,在目标值后面添加上0和EOS之后,最大长度为10。所以config配置文件的max_len=10。
- from torch.utils.data import Dataset,DataLoader
- import numpy as np
- from word_sequence import num_sequence
- import torch
- import config
- class RandomDataset(Dataset):
- def __init__(self):
- super(RandomDataset,self).__init__()
- self.total_data_size = 500000
- np.random.seed(10)
- self.total_data = np.random.randint(1,100000000,size=[self.total_data_size])
- def __getitem__(self, idx):
- input = str(self.total_data[idx])
- return input, input+ "0",len(input),len(input)+1
- def __len__(self):
- return self.total_data_size
- def collate_fn(batch):
- #1. 对batch进行排序,按照长度从长到短的顺序排序
- batch = sorted(batch,key=lambda x:x[3],reverse=True)
- input,target,input_length,target_length = zip(*batch)
- #2.进行padding的操作
- input = torch.LongTensor([num_sequence.transform(i,max_len=config.max_len) for i in input])
- target = torch.LongTensor([num_sequence.transform(i,max_len=config.max_len,add_eos=True) for i in target])
- input_length = torch.LongTensor(input_length)
- target_length = torch.LongTensor(target_length)
- return input,target,input_length,target_length
- data_loader = DataLoader(dataset=RandomDataset(),batch_size=config.batch_size,collate_fn=collate_fn,drop_last=True)
- if __name__ == '__main__':
- data_loader = DataLoader(dataset=RandomDataset(),batch_size=config.batch_size,drop_last=True)
- for idx,(input,target,input_lenght,target_length) in enumerate(data_loader):
- print(idx) #输出
- print(input) #输入
- print(target) #输出,后面加0
- print(input_lenght) #输入长度
- print(target_length) #输出长度
- break
4.编码器(encoder.py)
编码器(encoder)的目的就是为了对文本进行编码,把编码后的结果交给后续的程序使用,所以在这里可以使用Embedding+GRU的结构,使用最后一个time step的输出(hidden state)作为句子的编码结果。
- import torch.nn as nn
- from word_sequence import num_sequence
- import config
- class NumEncoder(nn.Module):
- def __init__(self):
- super(NumEncoder,self).__init__()
- self.vocab_size = len(num_sequence)
- self.dropout = config.dropout
- self.embedding_dim = config.embedding_dim
- self.embedding = nn.Embedding(num_embeddings=self.vocab_size,embedding_dim=self.embedding_dim,padding_idx=num_sequence.PAD)
- self.gru = nn.GRU(input_size=self.embedding_dim,
- hidden_size=config.hidden_size,
- num_layers=1,
- batch_first=True,
- dropout=config.dropout)
- def forward(self, input,input_length):
- embeded = self.embedding(input)
- embeded = nn.utils.rnn.pack_padded_sequence(embeded,lengths=input_length,batch_first=True)
- out,hidden = self.gru(embeded)
- out,outputs_length = nn.utils.rnn.pad_packed_sequence(out,batch_first=True,padding_value=num_sequence.PAD)
- return out,hidden
5.解码器(decoder.py)
解码器主要负责实现对编码之后结果的处理,得到预测值,为后续计算损失做准备。解码器也是一个RNN,即也可以使用LSTM or GRU的结构。
- import torch
- import torch.nn as nn
- import config
- import random
- import torch.nn.functional as F
- from word_sequence import num_sequence
-
- class NumDecoder(nn.Module):
- def __init__(self):
- super(NumDecoder,self).__init__()
- self.max_seq_len = config.max_len
- self.vocab_size = len(num_sequence)
- self.embedding_dim = config.embedding_dim
- self.dropout = config.dropout
-
- self.embedding = nn.Embedding(num_embeddings=self.vocab_size,embedding_dim=self.embedding_dim,padding_idx=num_sequence.PAD)
- self.gru = nn.GRU(input_size=self.embedding_dim,
- hidden_size=config.hidden_size,
- num_layers=1,
- batch_first=True,
- dropout=self.dropout)
- self.log_softmax = nn.LogSoftmax()
-
- self.fc = nn.Linear(config.hidden_size,self.vocab_size)
-
- def forward(self, encoder_hidden,target,target_length):
- # encoder_hidden [batch_size,hidden_size]
- # target [batch_size,seq-len]
-
- decoder_input = torch.LongTensor([[num_sequence.SOS]]*config.batch_size)
- # print("decoder_input size:",decoder_input.size())
- decoder_outputs = torch.zeros(config.batch_size,config.max_len,self.vocab_size) #[seq_len,batch_size,14]
-
- decoder_hidden = encoder_hidden #[batch_size,hidden_size]
-
- for t in range(config.max_len):
- decoder_output_t , decoder_hidden = self.forward_step(decoder_input,decoder_hidden)
- # print(decoder_output_t.size(),decoder_hidden.size())
- # print(decoder_outputs.size())
- decoder_outputs[:,t,:] = decoder_output_t
-
- use_teacher_forcing = random.random() > 0.5
- if use_teacher_forcing:
- decoder_input =target[:,t].unsqueeze(1) #[batch_size,1]
- else:
- value, index = torch.topk(decoder_output_t, 1) # index [batch_size,1]
- decoder_input = index
- # print("decoder_input size:",decoder_input.size(),use_teacher_forcing)
- return decoder_outputs,decoder_hidden
-
- def forward_step(self,decoder_input,decoder_hidden):
- """
- :param decoder_input:[batch_size,1]
- :param decoder_hidden: [1,batch_size,hidden_size]
- :return: out:[batch_size,vocab_size],decoder_hidden:[1,batch_size,didden_size]
- """
- embeded = self.embedding(decoder_input) #embeded: [batch_size,1 , embedding_dim]
- # print("forworad step embeded:",embeded.size())
- out,decoder_hidden = self.gru(embeded,decoder_hidden) #out [1, batch_size, hidden_size]
- # print("forward_step out size:",out.size()) #[1, batch_size, hidden_size]
- out = out.squeeze(0)
- out = F.log_softmax(self.fc(out),dim=-1)#[batch_Size, vocab_size]
- out = out.squeeze(1)
- # print("out size:",out.size(),decoder_hidden.size())
- return out,decoder_hidden
-
- def evaluation(self,encoder_hidden): #[1, 20, 14]
- # target = target.transpose(0, 1) # batch_first = False
- batch_size = encoder_hidden.size(1)
-
- decoder_input = torch.LongTensor([[num_sequence.SOS] * batch_size])
- # print("decoder start input size:",decoder_input.size()) #[1, 20]
- decoder_outputs = torch.zeros(batch_size,config.max_len, self.vocab_size) # [seq_len,batch_size,14]
- decoder_hidden = encoder_hidden
-
- for t in range(config.max_len):
- decoder_output_t, decoder_hidden = self.forward_step(decoder_input, decoder_hidden)
- decoder_outputs[:,t,:] = decoder_output_t
- value, index = torch.topk(decoder_output_t, 1) # index [20,1]
- decoder_input = index.transpose(0, 1)
-
- # print("decoder_outputs size:",decoder_outputs.size())
- # # 获取输出的id
- decoder_indices =[]
- # decoder_outputs = decoder_outputs.transpose(0,1) #[batch_size,seq_len,vocab_size]
- # print("decoder_outputs size",decoder_outputs.size())
- for i in range(decoder_outputs.size(1)):
- value,indices = torch.topk(decoder_outputs[:,i,:],1)
- # print("indices size",indices.size(),indices)
- # indices = indices.transpose(0,1)
- decoder_indices.append(int(indices[0][0].data))
- return decoder_indices
6.完成seq2seq模型(seq2seq.py)
- import torch
- import torch.nn as nn
- class Seq2Seq(nn.Module):
- def __init__(self,encoder,decoder):
- super(Seq2Seq,self).__init__()
- self.encoder = encoder
- self.decoder = decoder
-
- def forward(self, input,target,input_length,target_length):
- encoder_outputs,encoder_hidden = self.encoder(input,input_length)
- decoder_outputs,decoder_hidden = self.decoder(encoder_hidden,target,target_length)
- return decoder_outputs,decoder_hidden
-
- def evaluation(self,inputs,input_length):
- encoder_outputs,encoder_hidden = self.encoder(inputs,input_length)
- decoded_sentence = self.decoder.evaluation(encoder_hidden)
- return decoded_sentence
7.完成训练
- import torch
- import config
- from torch import optim
- import torch.nn as nn
- from encoder import NumEncoder
- from decoder import NumDecoder
- from seq2seq import Seq2Seq
- from dataset import data_loader as train_dataloader
- from word_sequence import num_sequence
- from tqdm import tqdm
- encoder = NumEncoder()
- decoder = NumDecoder()
- model = Seq2Seq(encoder,decoder)
- for name, param in model.named_parameters():
- if 'bias' in name:
- torch.nn.init.constant_(param, 0.0)
- elif 'weight' in name:
- torch.nn.init.xavier_normal_(param)
- optimizer = optim.Adam(model.parameters())
- criterion= nn.NLLLoss(ignore_index=num_sequence.PAD,reduction="mean")
- def get_loss(decoder_outputs,target):
- target = target.view(-1) #[batch_size*max_len]
- decoder_outputs = decoder_outputs.view(config.batch_size*config.max_len,-1)
- return criterion(decoder_outputs,target)
- def train(epoch):
- total_loss = 0
- correct = 0
- total = 0
- progress_bar = tqdm(total=len(train_dataloader), desc='Train Epoch {}'.format(epoch), unit='batch')
- for idx, (input, target, input_length, target_len) in enumerate(train_dataloader):
- optimizer.zero_grad()
- ##[seq_len,batch_size,vocab_size] [batch_size,seq_len]
- decoder_outputs, decoder_hidden = model(input, target, input_length, target_len)
- loss = get_loss(decoder_outputs, target)
- total_loss += loss.item()
- loss.backward()
- optimizer.step()
- _, predicted = torch.max(decoder_outputs.data, 2)
- correct += (predicted == target).sum().item()
- total += target.size(0) * target.size(1)
- acc = 100 * correct / total
- avg_loss = total_loss / (idx + 1)
- progress_bar.set_postfix({'loss': avg_loss, 'acc': '{:.2f}%'.format(acc)})
- progress_bar.update()
- progress_bar.close()
- torch.save(model.state_dict(), "models/seq2seq_model.pkl")
- torch.save(optimizer.state_dict(), 'models/seq2seq_optimizer.pkl')
- if __name__ == '__main__':
- for i in range(10):
- train(i)
8.进行评估
随机生成10000个测试集进行模型的验证,然后输入一串数字观察输出结果
- import torch
- from encoder import NumEncoder
- from decoder import NumDecoder
- from seq2seq import Seq2Seq
- from word_sequence import num_sequence
- import random
- encoder = NumEncoder()
- decoder = NumDecoder()
- model = Seq2Seq(encoder,decoder)
- model.load_state_dict(torch.load("models/seq2seq_model.pkl"))
- def evaluate():
- correct = 0
- total = 0
- for i in range(10000):
- test_words = random.randint(1,100000000)
- test_word_len = [len(str(test_words))]
- _test_words = torch.LongTensor([num_sequence.transform(test_words)])
- decoded_indices = model.evaluation(_test_words,test_word_len)
- result = num_sequence.inverse_transform(decoded_indices)
- if str(test_words)+"0" == "".join(result):
- correct += 1
- total += 1
- accuracy = correct/total
- print("10000个测试集的Acc: ", accuracy)
- def predict():
- test_word = input("Enter a number to predict: ")
- test_word_len = [len(test_word)]
- _test_word = torch.LongTensor([num_sequence.transform(int(test_word))])
- decoded_indices = model.evaluation(_test_word,test_word_len)
- result = num_sequence.inverse_transform(decoded_indices)
- print("Prediction: ", "".join(result))
- if __name__ == '__main__':
- evaluate()
- predict()
按照上面步骤一步步进行操作,成功运行是没有问题的,如果想直接获取源代码进行研究,可以关注下面公众号联系~会不定期发布相关设计内容包括但不限于如下内容:信号处理、通信仿真、算法设计、matlab appdesigner,gui设计、simulink仿真......希望能帮到你!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。