赞
踩
1.seq_example代表问题,seq_answer代表答案,数据内容如下所示:
- seq_example = ["你认识我吗", "你住在哪里", "你知道我的名字吗", "你是谁", "你会唱歌吗", "你有父母吗"]
- seq_answer = ["当然认识", "我住在成都", "我不知道", "我是机器人", "我不会", "我没有父母"]
2.将数据进行jieba分词并加入索引index,其中SOS代表单词开头,EOS代表单词结尾,PAD补全,数据如下:
{'你': 3, '认识': 4, '我': 5, '吗': 6, '住': 7, '在': 8, '哪里': 9, '知道': 10, '的': 11, '名字': 12, '是': 13, '谁': 14, '会': 15, '唱歌': 16, '有': 17, '父母': 18, '当然': 19, '成都': 20, '不': 21, '机器人': 22, '不会': 23, '没有': 24, 'PAD': 0, 'SOS': 1, 'EOS': 2}
3. 最后将seq_example与seq_answer分词后使用索引表示
采用双向LSTM处理输入向量,代码如下:
- class lstm_encoder(nn.Module):
- def __init__(self):
- super(lstm_encoder, self).__init__()
- # 双向LSTM
- self.encoder = nn.LSTM(embedding_size, n_hidden, 1, bidirectional=True)
-
- def forward(self, embedding_input):
- encoder_output, (encoder_h_n, encoder_c_n) = self.encoder(embedding_input)
- # 拼接前向和后向最后一个隐层
- encoder_h_n = torch.cat([encoder_h_n[0], encoder_h_n[1]], dim=1)
- encoder_c_n = torch.cat([encoder_c_n[0], encoder_c_n[1]], dim=1)
- return encoder_output, encoder_h_n.unsqueeze(0), encoder_c_n.unsqueeze(0)
decoder采用单向LSTM并加入Attention机制,即将decoder输出与encoder输出通过Atention拼接后进入全连接层做预测,Attention机制采用的General方式,具体过程如下所示:
代码如下:
- class lstm_decoder(nn.Module):
- def __init__(self):
- super(lstm_decoder, self).__init__()
- # 单向LSTM
- self.decoder = nn.LSTM(embedding_size, n_hidden * 2, 1)
- # attention参数
- self.att_weight = nn.Linear(n_hidden * 2, n_hidden * 2)
- # attention_joint参数
- self.att_joint = nn.Linear(n_hidden * 4, n_hidden * 2)
- # 定义全连接层
- self.fc = nn.Linear(n_hidden * 2, num_classes)
-
- def forward(self, input_x, encoder_output, hn, cn):
- decoder_output, (decoder_h_n, decoder_c_n) = self.decoder(input_x, (hn, cn))
- decoder_output = decoder_output.permute(1, 0, 2)
- encoder_output = encoder_output.permute(1, 0, 2)
- decoder_output_att = self.att_weight(encoder_output)
- decoder_output_att = decoder_output_att.permute(0, 2, 1)
- # 计算分数score
- decoder_output_score = decoder_output.bmm(decoder_output_att)
- # 计算权重at
- at = nn.functional.softmax(decoder_output_score, dim=2)
- # 计算新的context向量ct
- ct = at.bmm(encoder_output)
- # 拼接ct和decoder_ht
- ht_joint = torch.cat((ct, decoder_output), dim=2)
- fc_joint = torch.tanh(self.att_joint(ht_joint))
- fc_out = self.fc(fc_joint)
- return fc_out, decoder_h_n, decoder_c_n
- import torch
- import torch.nn as nn
- import torch.optim as optim
- import jieba
- import os
-
-
- seq_example = ["你认识我吗", "你住在哪里", "你知道我的名字吗", "你是谁", "你会唱歌吗", "你有父母吗"]
- seq_answer = ["当然认识", "我住在成都", "我不知道", "我是机器人", "我不会", "我没有父母"]
- # 所有词
- example_cut = []
- answer_cut = []
- word_all = []
- # 分词
- for i in seq_example:
- example_cut.append(list(jieba.cut(i)))
- for i in seq_answer:
- answer_cut.append(list(jieba.cut(i)))
- # 所有词
- for i in example_cut + answer_cut:
- for word in i:
- if word not in word_all:
- word_all.append(word)
- # 词语索引表
- word2index = {w: i+3 for i, w in enumerate(word_all)}
- # 补全
- word2index['PAD'] = 0
- # 句子开始
- word2index['SOS'] = 1
- # 句子结束
- word2index['EOS'] = 2
- index2word = {value: key for key, value in word2index.items()}
- # 一些参数
- vocab_size = len(word2index)
- seq_length = max([len(i) for i in example_cut + answer_cut]) + 1
- embedding_size = 5
- num_classes = vocab_size
- n_hidden = 10
-
- # 将句子用索引表示
- def make_data(seq_list):
- result = []
- for word in seq_list:
- seq_index = [word2index[i] for i in word]
- if len(seq_index) < seq_length:
- seq_index += [0] * (seq_length - len(seq_index))
- result.append(seq_index)
- return result
- encoder_input = make_data(example_cut)
- decoder_input = make_data([['SOS'] + i for i in answer_cut])
- decoder_target = make_data([i + ['EOS'] for i in answer_cut])
- # 训练数据
- encoder_input, decoder_input, decoder_target = torch.LongTensor(encoder_input), torch.LongTensor(decoder_input), torch.LongTensor(decoder_target)
-
-
- # 建立encoder模型
- class lstm_encoder(nn.Module):
- def __init__(self):
- super(lstm_encoder, self).__init__()
- # 双向LSTM
- self.encoder = nn.LSTM(embedding_size, n_hidden, 1, bidirectional=True)
-
- def forward(self, embedding_input):
- encoder_output, (encoder_h_n, encoder_c_n) = self.encoder(embedding_input)
- # 拼接前向和后向最后一个隐层
- encoder_h_n = torch.cat([encoder_h_n[0], encoder_h_n[1]], dim=1)
- encoder_c_n = torch.cat([encoder_c_n[0], encoder_c_n[1]], dim=1)
- return encoder_output, encoder_h_n.unsqueeze(0), encoder_c_n.unsqueeze(0)
-
-
- # 建立attention_decoder模型
- class lstm_decoder(nn.Module):
- def __init__(self):
- super(lstm_decoder, self).__init__()
- # 单向LSTM
- self.decoder = nn.LSTM(embedding_size, n_hidden * 2, 1)
- # attention参数
- self.att_weight = nn.Linear(n_hidden * 2, n_hidden * 2)
- # attention_joint参数
- self.att_joint = nn.Linear(n_hidden * 4, n_hidden * 2)
- # 定义全连接层
- self.fc = nn.Linear(n_hidden * 2, num_classes)
-
- def forward(self, input_x, encoder_output, hn, cn):
- decoder_output, (decoder_h_n, decoder_c_n) = self.decoder(input_x, (hn, cn))
- decoder_output = decoder_output.permute(1, 0, 2)
- encoder_output = encoder_output.permute(1, 0, 2)
- decoder_output_att = self.att_weight(encoder_output)
- decoder_output_att = decoder_output_att.permute(0, 2, 1)
- # 计算分数score
- decoder_output_score = decoder_output.bmm(decoder_output_att)
- # 计算权重at
- at = nn.functional.softmax(decoder_output_score, dim=2)
- # 计算新的context向量ct
- ct = at.bmm(encoder_output)
- # 拼接ct和decoder_ht
- ht_joint = torch.cat((ct, decoder_output), dim=2)
- fc_joint = torch.tanh(self.att_joint(ht_joint))
- fc_out = self.fc(fc_joint)
- return fc_out, decoder_h_n, decoder_c_n
-
-
- class seq2seq(nn.Module):
- def __init__(self):
- super(seq2seq, self).__init__()
- self.word_vec = nn.Embedding(vocab_size, embedding_size)
- # encoder
- self.seq2seq_encoder = lstm_encoder()
- # decoder
- self.seq2seq_decoder = lstm_decoder()
-
- def forward(self, encoder_input, decoder_input, inference_threshold=0):
- embedding_encoder_input = self.word_vec(encoder_input)
- embedding_decoder_input = self.word_vec(decoder_input)
- # 调换第一维和第二维度
- embedding_encoder_input = embedding_encoder_input.permute(1, 0, 2)
- embedding_decoder_input = embedding_decoder_input.permute(1, 0, 2)
- # 编码器
- encoder_output, h_n, c_n = self.seq2seq_encoder(embedding_encoder_input)
- # 判断为训练还是预测
- if inference_threshold:
- # 解码器
- decoder_output, h_n, c_n = self.seq2seq_decoder(embedding_decoder_input, encoder_output, h_n, c_n)
- return decoder_output
- else:
- # 创建outputs张量存储Decoder的输出
- outputs = []
- for i in range(seq_length):
- decoder_output, h_n, c_n = self.seq2seq_decoder(embedding_decoder_input, encoder_output, h_n, c_n)
- decoder_x = torch.max(decoder_output.reshape(-1, 25), dim=1)[1].item()
- if decoder_x in [0, 2]:
- return outputs
- outputs.append(decoder_x)
- embedding_decoder_input = self.word_vec(torch.LongTensor([[decoder_x]]))
- embedding_decoder_input = embedding_decoder_input.permute(1, 0, 2)
- return outputs
-
-
- model = seq2seq()
- print(model)
- criterion = nn.CrossEntropyLoss()
- optimizer = optim.SGD(model.parameters(), lr=0.05)
-
- # 判断是否有模型文件
- if os.path.exists("./seq2seqModel.pkl"):
- model.load_state_dict(torch.load('./seq2seqModel.pkl'))
- else:
- # 训练
- model.train()
- for epoch in range(10000):
- pred = model(encoder_input, decoder_input, 1)
- loss = criterion(pred.reshape(-1, 25), decoder_target.view(-1))
- optimizer.zero_grad()
- loss.backward()
- optimizer.step()
- if (epoch + 1) % 1000 == 0:
- print("Epoch: %d, loss: %.5f " % (epoch + 1, loss))
- # 保存模型
- torch.save(model.state_dict(), './seq2seqModel.pkl')
- # 测试
- model.eval()
- question_text = '你住在哪里'
- question_cut = list(jieba.cut(question_text))
- encoder_x = make_data([question_cut])
- decoder_x = [[word2index['SOS']]]
- encoder_x, decoder_x = torch.LongTensor(encoder_x), torch.LongTensor(decoder_x)
- out = model(encoder_x, decoder_x)
- answer = ''
- for i in out:
- answer += index2word[i]
- print('问题:', question_text)
- print('回答:', answer)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。