Seq2seq模型中的encoder接收一个长度为M的序列,得到1个 context vector,之后decoder把这一个context vector转化为长度为N的序列作为输出,从而构成一个M to N
下载地址:GitHub - codemayq/chinese-chatbot-corpus: 中文公开聊天语料库
- from tqdm import tqdm
- from jieba import lcut as jieba_cut
- import re
- def format_xiaohuangji_corpus(word=False):
- """处理小黄鸡的语料,分为两个部分,一个是input部分,就是发起聊天的语料
- output就是回应聊天的语料"""
- if word:
- corpus_path = "corpus_pre/xiaohuangji50w_nofenci.conv"
- input_path = "corpus/input_word.txt"
- output_path = "corpus/corpus/output_word.txt"
- else:
- corpus_path = "corpus_pre/xiaohuangji50w_nofenci.conv"
- input_path = "corpus/input.txt"
- output_path = "corpus/output.txt"
- f_input = open(input_path,"a")
- f_output = open(output_path,"a")
- pair = []
- for line in tqdm(open(corpus_path, encoding='utf-8'),ascii=True):
- if line.strip() == "E":
- if not pair:
- continue
- else:
- assert len(pair) == 2,"长度必须是2"
- if len(pair[0].strip())>=1 and len(pair[1].strip())>=1:
- f_input.write(pair[0]+"\n")
- f_output.write(pair[1]+"\n")
- pair = []
- elif line.startswith("M"):
- line = line[1:]
- if word:
- pair.append(" ".join(list(line.strip())))
- else:
- pair.append(" ".join(jieba_cut(line.strip())))
- def format_weibo(word=False):
- """
- 微博数据存在一些噪声,未处理
- :return:
- """
- if word:
- origin_input = "corpus_pre/stc_weibo_train_post"
- input_path = "corpus/input_word.txt"
- origin_output = "corpus_pre/stc_weibo_train_response"
- output_path = "corpus/output_word.txt"
- else:
- origin_input = "corpus_pre/stc_weibo_train_post"
- input_path = "corpus/input.txt"
- origin_output = "corpus_pre/stc_weibo_train_response"
- output_path = "corpus/output.txt"
- f_input = open(input_path,"a",encoding='utf-8')
- f_output = open(output_path, "a",encoding='utf-8')
- with open(origin_input, encoding='utf-8') as in_o,open(origin_output,encoding='utf-8') as out_o:
- for _in,_out in tqdm(zip(in_o,out_o),ascii=True):
- _in = _in.strip()
- _out = _out.strip()
- if _in.endswith(")") or _in.endswith("」") or _in.endswith(")"):
- _in = re.sub("(.*)|「.*?」|\(.*?\)"," ",_in)
- """由于微博语料较多表情与符号,所以需要用正则表达式进行处理"""
- _in = re.sub("我在.*?alink|alink|(.*?\d+x\d+.*?)|#|】|【|-+|_+|via.*?:*.*"," ",_in)
- _in = re.sub("\s+"," ",_in)
- if len(_in)<1 or len(_out)<1:
- continue
- if word:
- _in = re.sub("\s+","",_in) #转化为一整行,不含空格
- _out = re.sub("\s+","",_out)
- if len(_in)>=1 and len(_out)>=1:
- f_input.write(" ".join(list(_in)) + "\n")
- f_output.write(" ".join(list(_out)) + "\n")
- else:
- if len(_in) >= 1 and len(_out) >= 1:
- f_input.write(_in.strip()+"\n")
- f_output.write(_out.strip()+"\n")
- f_input.close()
- f_output.close()
- if __name__ == '__main__':
- format_xiaohuangji_corpus(False)
- format_xiaohuangji_corpus(True)
- format_weibo(False)
- format_weibo(True)
解决: 添加encoding = ’utf-8‘即可。
- import config
- import pickle
- class Word2Sequence():
- UNK = 0
- PAD = 1
- SOS = 2
- EOS = 3
- def __init__(self):
- self.dict = {
- self.UNK_TAG: self.UNK,
- self.PAD_TAG: self.PAD,
- self.SOS_TAG: self.SOS,
- self.EOS_TAG: self.EOS
- }
- self.count = {}
- self.fited = False
- def to_index(self, word):
- """word -> index"""
- assert self.fited == True, "必须先进行fit操作"
- return self.dict.get(word, self.UNK)
- def to_word(self, index):
- """index -> word"""
- assert self.fited, "必须先进行fit操作"
- if index in self.inversed_dict:
- return self.inversed_dict[index]
- return self.UNK_TAG
- def __len__(self):
- return len(self.dict)
- def fit(self, sentence):
- """
- :param sentence:[word1,word2,word3]
- :param min_count: 最小出现的次数
- :param max_count: 最大出现的次数
- :param max_feature: 总词语的最大数量
- :return:
- """
- for a in sentence:
- if a not in self.count:
- self.count[a] = 0
- self.count[a] += 1
- self.fited = True
- def build_vocab(self, min_count=1, max_count=None, max_feature=None):
- # 比最小的数量大和比最大的数量小的需要
- if min_count is not None:
- self.count = {k: v for k, v in self.count.items() if v >= min_count}
- if max_count is not None:
- self.count = {k: v for k, v in self.count.items() if v <= max_count}
- # 限制最大的数量
- if isinstance(max_feature, int):
- count = sorted(list(self.count.items()), key=lambda x: x[1])
- if max_feature is not None and len(count) > max_feature:
- count = count[-int(max_feature):]
- for w, _ in count:
- self.dict[w] = len(self.dict)
- else:
- for w in sorted(self.count.keys()):
- self.dict[w] = len(self.dict)
- # 准备一个index->word的字典
- self.inversed_dict = dict(zip(self.dict.values(), self.dict.keys()))
- def transform(self, sentence, max_len=None, add_eos=False):
- """
- 实现吧句子转化为数组(向量)
- :param sentence:
- :param max_len:
- :return:
- """
- assert self.fited, "必须先进行fit操作"
- r = [self.to_index(i) for i in sentence]
- if max_len is not None:
- if max_len > len(sentence):
- if add_eos:
- r += [self.EOS] + [self.PAD for _ in range(max_len - len(sentence) - 1)]
- else:
- r += [self.PAD for _ in range(max_len - len(sentence))]
- else:
- if add_eos:
- r = r[:max_len - 1]
- r += [self.EOS]
- else:
- r = r[:max_len]
- else:
- if add_eos:
- r += [self.EOS]
- # print(len(r),r)
- return r
- def inverse_transform(self, indices):
- """
- 实现从数组 转化为 向量
- :param indices: [1,2,3....]
- :return:[word1,word2.....]
- """
- sentence = []
- for i in indices:
- word = self.to_word(i)
- sentence.append(word)
- return sentence
- # 之后导入该word_sequence使用
- # word_sequence = pickle.load(open("./ws", "rb")) if not config.use_word else pickle.load(
- # open("./ws_word", "rb"))
- if __name__ == '__main__':
- from tqdm import tqdm
- import pickle
- word_sequence = Word2Sequence()
- # 词语级别
- input_path = "corpus/input.txt"
- target_path = "corpus/output.txt"
- for line in tqdm(open(input_path, encoding='utf-8').readlines()):
- word_sequence.fit(line.strip().split())
- for line in tqdm(open(target_path, encoding='utf-8').readlines()):
- word_sequence.fit(line.strip().split())
- # 使用max_feature=5000个数据
- word_sequence.build_vocab(min_count=5, max_count=None, max_feature=5000)
- print(len(word_sequence))
- pickle.dump(word_sequence, open("./ws", "wb"))
- import torch
- import config
- from torch.utils.data import Dataset,DataLoader
- from wordsequence import word_sequence
- class ChatDataset(Dataset):
- def __init__(self):
- super(ChatDataset,self).__init__()
- input_path = "corpus/input.txt"
- target_path = "corpus/output.txt"
- if config.use_word:
- input_path = "corpus/input_word.txt"
- target_path = "corpus/output_word.txt"
- self.input_lines = open(input_path).readlines()
- self.target_lines = open(target_path).readlines()
- assert len(self.input_lines) == len(self.target_lines) ,"input和target文本的数量必须相同"
- def __getitem__(self, index):
- input = self.input_lines[index].strip().split()
- target = self.target_lines[index].strip().split()
- if len(input) == 0 or len(target)==0:
- input = self.input_lines[index+1].strip().split()
- target = self.target_lines[index+1].strip().split()
- #此处句子的长度如果大于max_len,那么应该返回max_len
- return input,target,min(len(input),config.max_len),min(len(target),config.max_len)
- def __len__(self):
- return len(self.input_lines)
- def collate_fn(batch):
- #1.排序
- batch = sorted(batch,key=lambda x:x[2],reverse=True)
- input, target, input_length, target_length = zip(*batch)
- # 2.进行padding的操作
- input = torch.LongTensor([word_sequence.transform(i, max_len=config.max_len) for i in input])
- target = torch.LongTensor([word_sequence.transform(i, max_len=config.max_len, add_eos=True) for i in target])
- input_length = torch.LongTensor(input_length)
- target_length = torch.LongTensor(target_length)
- return input, target, input_length, target_length
- data_loader = DataLoader(dataset=ChatDataset(),batch_size=config.batch_size,shuffle=True,collate_fn=collate_fn,drop_last=True)
- import torch.nn as nn
- from wordsequence import word_sequence
- import config
- class Encoder(nn.Module):
- def __init__(self):
- super(Encoder,self).__init__()
- self.vocab_size = len(word_sequence)
- self.dropout = config.dropout
- self.embedding_dim = config.embedding_dim
- self.embedding = nn.Embedding(num_embeddings=self.vocab_size,embedding_dim=self.embedding_dim,padding_idx=word_sequence.PAD)
- self.gru = nn.GRU(input_size=self.embedding_dim,
- hidden_size=config.hidden_size,
- num_layers=1,
- batch_first=True,
- dropout=config.dropout)
- def forward(self, input,input_length):
- embeded = self.embedding(input)
- """
- 可变长度的tensor
- """
- embeded = nn.utils.rnn.pack_padded_sequence(embeded,lengths=input_length,batch_first=True)
- #hidden:[1,batch_size,vocab_size]
- out,hidden = self.gru(embeded)
- out,outputs_length = nn.utils.rnn.pad_packed_sequence(out,batch_first=True,padding_value=word_sequence.PAD)
- #hidden [1,batch_size,hidden_size]
- return out,hidden
- import torch
- import torch.nn as nn
- import config
- import random
- import torch.nn.functional as F
- from wordsequence import word_sequence
- class Decoder(nn.Module):
- def __init__(self):
- super(Decoder,self).__init__()
- self.max_seq_len = config.max_len
- self.vocab_size = len(word_sequence)
- self.embedding_dim = config.embedding_dim
- self.dropout = config.dropout
- self.embedding = nn.Embedding(num_embeddings=self.vocab_size,embedding_dim=self.embedding_dim,padding_idx=word_sequence.PAD)
- self.gru = nn.GRU(input_size=self.embedding_dim,
- hidden_size=config.hidden_size,
- num_layers=1,
- batch_first=True,
- dropout=self.dropout)
- self.log_softmax = nn.LogSoftmax()
- self.fc = nn.Linear(config.hidden_size,self.vocab_size)
- def forward(self, encoder_hidden,target,target_length):
- # encoder_hidden [batch_size,hidden_size]
- # target [batch_size,seq-len]
- decoder_input = torch.LongTensor([[word_sequence.SOS]]*config.batch_size).to(config.device)
- decoder_outputs = torch.zeros(config.batch_size,config.max_len,self.vocab_size).to(config.device) #[batch_size,seq_len,14]
- decoder_hidden = encoder_hidden #[batch_size,hidden_size]
- for t in range(config.max_len):
- decoder_output_t , decoder_hidden = self.forward_step(decoder_input,decoder_hidden)
- decoder_outputs[:,t,:] = decoder_output_t
- value, index = torch.topk(decoder_output_t, 1) # index [batch_size,1]
- decoder_input = index
- return decoder_outputs,decoder_hidden
- def forward_step(self,decoder_input,decoder_hidden):
- """
- :param decoder_input:[batch_size,1]
- :param decoder_hidden: [1,batch_size,hidden_size]
- :return: out:[batch_size,vocab_size],decoder_hidden:[1,batch_size,didden_size]
- """
- embeded = self.embedding(decoder_input) #embeded: [batch_size,1 , embedding_dim]
- out,decoder_hidden = self.gru(embeded,decoder_hidden) #out [1, batch_size, hidden_size]
- out = out.squeeze(0)
- out = F.log_softmax(self.fc(out),dim=-1)#[batch_Size, vocab_size]
- out = out.squeeze(1)
- # print("out size:",out.size(),decoder_hidden.size())
- return out,decoder_hidden
- import torch
- import torch.nn as nn
- class Seq2Seq(nn.Module):
- def __init__(self,encoder,decoder):
- super(Seq2Seq,self).__init__()
- self.encoder = encoder
- self.decoder = decoder
- def forward(self, input,target,input_length,target_length):
- encoder_outputs,encoder_hidden = self.encoder(input,input_length)
- decoder_outputs,decoder_hidden = self.decoder(encoder_hidden,target,target_length)
- return decoder_outputs,decoder_hidden
- def evaluation(self,inputs,input_length):
- encoder_outputs,encoder_hidden = self.encoder(inputs,input_length)
- decoded_sentence = self.decoder.evaluation(encoder_hidden)
- return decoded_sentence
- import torch
- import config
- from torch import optim
- import torch.nn as nn
- from Encoder import Encoder
- from Decoder import Decoder
- from seq2seq import Seq2Seq
- from Dataset_Dataloader import data_loader as train_dataloader
- from wordsequence import word_sequence
- encoder = Encoder()
- decoder = Decoder()
- model = Seq2Seq(encoder,decoder)
- #device在config文件中实现
- model.to(config.device)
- model.load_state_dict(torch.load("model/seq2seq_model"))
- optimizer = optim.Adam(model.parameters())
- optimizer.load_state_dict(torch.load("model/seq2seq_optimizer"))
- criterion= nn.NLLLoss(ignore_index=word_sequence.PAD,reduction="mean")
- def get_loss(decoder_outputs,target):
- target = target.view(-1) #[batch_size*max_len]
- decoder_outputs = decoder_outputs.view(config.batch_size*config.max_len,-1)
- return criterion(decoder_outputs,target)
- def train(epoch):
- for idx,(input,target,input_length,target_len) in enumerate(train_dataloader):
- input = input.to(config.device)
- target = target.to(config.device)
- input_length = input_length.to(config.device)
- target_len = target_len.to(config.device)
- optimizer.zero_grad()
- ##[seq_len,batch_size,vocab_size] [batch_size,seq_len]
- decoder_outputs,decoder_hidden = model(input,target,input_length,target_len)
- loss = get_loss(decoder_outputs,target)
- loss.backward()
- optimizer.step()
- print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
- epoch, idx * len(input), len(train_dataloader.dataset),
- 100. * idx / len(train_dataloader), loss.item()))
- torch.save(model.state_dict(), "model/seq2seq_model")
- torch.save(optimizer.state_dict(), 'model/seq2seq_optimizer')
- if __name__ == '__main__':
- for i in range(10):
- train(i)
参考:Hung-yi Lee
Local-attention指的是使用了部分的encoder端的输入的权重(当前时间步上的encoder的hidden state),这样可以减少计算量,特别是当句子的长度比较长的时候。
- class Attention(nn.Module):
- def __init__(self,method,batch_size,hidden_size):
- super(Attention,self).__init__()
- self.method = method
- self.hidden_size = hidden_size
- assert self.method in ["dot","general","concat"],"method 只能是 dot,general,concat,当前是{}".format(self.method)
- if self.method == "dot":
- pass
- elif self.method == "general":
- self.Wa = nn.Linear(hidden_size,hidden_size,bias=False)
- elif self.method == "concat":
- self.Wa = nn.Linear(hidden_size*2,hidden_size,bias=False)
- self.Va = nn.Parameter(torch.FloatTensor(batch_size,hidden_size))
- def forward(self, hidden,encoder_outputs):
- """
- :param hidden:[1,batch_size,hidden_size]
- :param encoder_outputs: [batch_size,seq_len,hidden_size]
- :return:
- """
- batch_size,seq_len,hidden_size = encoder_outputs.size()
- hidden = hidden.squeeze(0) #[batch_size,hidden_size]
- if self.method == "dot":
- return self.dot_score(hidden,encoder_outputs)
- elif self.method == "general":
- return self.general_score(hidden,encoder_outputs)
- elif self.method == "concat":
- return self.concat_score(hidden,encoder_outputs)
- def _score(self,batch_size,seq_len,hidden,encoder_outputs):
- # 速度太慢
- # [batch_size,seql_len]
- attn_energies = torch.zeros(batch_size,seq_len).to(config.device)
- for b in range(batch_size):
- for i in range(seq_len):
- #encoder_output : [batch_size,seq_len,hidden_size]
- #deocder_hidden :[batch_size,hidden_size]
- #torch.Size([256, 128]) torch.Size([128]) torch.Size([256, 24, 128]) torch.Size([128])
- # print("attn size:",hidden.size(),hidden[b,:].size(),encoder_output.size(),encoder_output[b,i].size())
- attn_energies[b,i] = hidden[b,:].dot(encoder_outputs[b,i]) #dot score
- return F.softmax(attn_energies).unsqueeze(1) # [batch_size,1,seq_len]
- def dot_score(self,hidden,encoder_outputs):
- """
- dot attention
- :param hidden:[batch_size,hidden_size] --->[batch_size,hidden_size,1]
- :param encoder_outputs: [batch_size,seq_len,hidden_size]
- :return:
- """
- #hiiden :[hidden_size] -->[hidden_size,1] ,encoder_output:[seq_len,hidden_size]
- hidden = hidden.unsqueeze(-1)
- attn_energies = torch.bmm(encoder_outputs, hidden)
- attn_energies = attn_energies.squeeze(-1) #[batch_size,seq_len,1] ==>[batch_size,seq_len]
- return F.softmax(attn_energies).unsqueeze(1) # [batch_size,1,seq_len]
- def general_score(self,hidden,encoder_outputs):
- """
- general attenion
- :param batch_size:int
- :param hidden: [batch_size,hidden_size]
- :param encoder_outputs: [batch_size,seq_len,hidden_size]
- :return:
- """
- x = self.Wa(hidden) #[batch_size,hidden_size]
- x = x.unsqueeze(-1) #[batch_size,hidden_size,1]
- attn_energies = torch.bmm(encoder_outputs,x).squeeze(-1) #[batch_size,seq_len,1]
- return F.softmax(attn_energies,dim=-1).unsqueeze(1) # [batch_size,1,seq_len]
- def concat_score(self,hidden,encoder_outputs):
- """
- concat attention
- :param batch_size:int
- :param hidden: [batch_size,hidden_size]
- :param encoder_outputs: [batch_size,seq_len,hidden_size]
- :return:
- """
- #需要先进行repeat操作,变成和encoder_outputs相同的形状,让每个batch有seq_len个hidden_size
- x = hidden.repeat(1,encoder_outputs.size(1),1) ##[batch_size,seq_len,hidden_size]
- x = torch.tanh(self.Wa(torch.cat([x,encoder_outputs],dim=-1))) #[batch_size,seq_len,hidden_size*2] --> [batch_size,seq_len,hidden_size]
- #va [batch_size,hidden_size] ---> [batch_size,hidden_size,1]
- attn_energis = torch.bmm(x,self.Va.unsqueeze(2)) #[batch_size,seq_len,1]
- attn_energis = attn_energis.squeeze(-1)
- # print("concat attention:",attn_energis.size(),encoder_outputs.size())
- return F.softmax(attn_energis,dim=-1).unsqueeze(1) #[batch_size,1,seq_len]
基本的建模以及完成,在模型评估时我们应该选择概率最大的tokenid进行输出,但是得到的句子可能并不通顺,我们叫他为greedy search。所以为了解决输出句子不通顺的问题,我们可以选择累积概率最大的那一个,但是这就意味着句子将会特别长,不方便进行保存。
为了解决以上两个问题,我们采用Beam Search。假设Beam width=2,表示每次保存的最大的概率的个数,这里每次保存两个,在下一个时间步骤一样,也是保留两个,这样就可以达到约束搜索空间大小的目的,从而提高算法的效率。
Beam Search的实现,用数据结构大根堆。
- class Beam:
- def __init__(self):
- self.heap = list() #保存数据的位置
- self.beam_width = config.beam_width #保存数据的总数
- def add(self,probility,complete,seq,decoder_input,decoder_hidden):
- """
- 添加数据,同时判断总的数据个数,多则删除
- :param probility: 概率乘积
- :param complete: 最后一个是否为EOS
- :param seq: list,所有token的列表
- :param decoder_input: 下一次进行解码的输入,通过前一次获得
- :param decoder_hidden: 下一次进行解码的hidden,通过前一次获得
- :return:
- """
- heapq.heappush(self.heap,[probility,complete,seq,decoder_input,decoder_hidden])
- #判断数据的个数,如果大,则弹出。保证数据总个数小于等于3
- if len(self.heap)>self.beam_width:
- heapq.heappop(self.heap)
- def __iter__(self):#让该beam能够被迭代
- return iter(self.heap)
- # decoder中的新方法
- def evaluatoin_beamsearch_heapq(self,encoder_outputs,encoder_hidden):
- """使用 堆 来完成beam search,对是一种优先级的队列,按照优先级顺序存取数据"""
- batch_size = encoder_hidden.size(1)
- #1. 构造第一次需要的输入数据,保存在堆中
- decoder_input = torch.LongTensor([[word_sequence.SOS] * batch_size]).to(config.device)
- decoder_hidden = encoder_hidden #需要输入的hidden
- prev_beam = Beam()
- prev_beam.add(1,False,[decoder_input],decoder_input,decoder_hidden)
- while True:
- cur_beam = Beam()
- #2. 取出堆中的数据,进行forward_step的操作,获得当前时间步的output,hidden
- #这里使用下划线进行区分
- for _probility,_complete,_seq,_decoder_input,_decoder_hidden in prev_beam:
- #判断前一次的_complete是否为True,如果是,则不需要forward
- #有可能为True,但是概率并不是最大
- if _complete == True:
- cur_beam.add(_probility,_complete,_seq,_decoder_input,_decoder_hidden)
- else:
- decoder_output_t, decoder_hidden,_ = self.forward_step(_decoder_input, _decoder_hidden,encoder_outputs)
- value, index = torch.topk(decoder_output_t, config.beam_width) # [batch_size=1,beam_widht=3]
- #3. 从output中选择topk(k=beam width)个输出,作为下一次的input
- for m, n in zip(value[0], index[0]):
- decoder_input = torch.LongTensor([[n]]).to(config.device)
- seq = _seq + [n]
- probility = _probility * m
- if n.item() == word_sequence.EOS:
- complete = True
- else:
- complete = False
- #4. 把下一个实践步骤需要的输入等数据保存在一个新的堆中
- cur_beam.add(probility,complete,seq,
- decoder_input,decoder_hidden)
- #5. 获取新的堆中的优先级最高(概率最大)的数据,判断数据是否是EOS结尾或者是否达到最大长度,如果是,停止迭代
- best_prob,best_complete,best_seq,_,_ = max(cur_beam)
- if best_complete == True or len(best_seq)-1 == config.max_len: #减去sos
- return self._prepar_seq(best_seq)
- else:
- #6. 则重新遍历新的堆中的数据
- prev_beam = cur_beam
- def _prepar_seq(self,seq):#对结果进行基础的处理,共后续转化为文字使用
- if seq[0].item() == word_sequence.SOS:
- seq= seq[1:]
- if seq[-1].item() == word_sequence.EOS:
- seq = seq[:-1]
- seq = [i.item() for i in seq]
- return seq
- use_teacher_forcing = random.random() > 0.5
- if use_teacher_forcing: #使用teacher forcing
- for t in range(config.max_len):
- decoder_output_t, decoder_hidden, decoder_attn_t = self.forward_step(decoder_input, decoder_hidden,
- encoder_outputs)
- decoder_outputs[:, t, :] = decoder_output_t
- #使用正确的输出作为下一步的输入
- decoder_input = target[:, t].unsqueeze(1) # [batch_size,1]
- else:#不适用teacher forcing,使用预测的输出作为下一步的输入
- for t in range(config.max_len):
- decoder_output_t ,decoder_hidden,decoder_attn_t = self.forward_step(decoder_input,decoder_hidden,encoder_outputs)
- decoder_outputs[:,t,:] = decoder_output_t
- value, index = torch.topk(decoder_output_t, 1) # index [batch_size,1]
- decoder_input = index
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。