赞
踩
参考文献:
博客:【手撕Transformer】Transformer输入输出细节以及代码实现(pytorch)
博客:Transformer 的 PyTorch 实现
代码+注释
import math import torch import numpy as np import torch.nn as nn import torch.optim as optim import torch.utils.data as Data # 自制数据集 # S:开始符号 E:结束符号 P:占位符 # Encoder_input Decoder_input Decoder_output sentences = [['我 是 学 生 P' , 'S I am a student' , 'I am a student E'], ['我 喜 欢 学 习', 'S I like learning P', 'I like learning P E'], ['我 是 男 生 P' , 'S I am a boy' , 'I am a boy E']] #词源词典 src_vocab = {'P':0, '我':1, '是':2, '学':3, '生':4, '喜':5, '欢':6, '习':7, '男':8} src_idx2word = {src_vocab[key]: key for key in src_vocab} #把字典转换成idx:字形式 src_vocab_size = len(src_vocab) #目标字典 tgt_vocab = {'S':0, 'E':1, 'P':2, 'I':3, 'am':4, 'a':5, 'student':6, 'like':7, 'learning':8, 'boy':9} idx2word = {tgt_vocab[key]: key for key in tgt_vocab} tgt_vocab_size = len(tgt_vocab) src_len = len(sentences[0][0].split(" ")) # Encoder的最大输入长度 tgt_len = len(sentences[0][1].split(" ")) # Decoder的最大输入长度 #print(src_len, tgt_len) 5,5 #把sentences转换成字典索引 def make_data(sentences): enc_inputs, dec_inputs, dec_outputs = [], [], [] for i in range(len(sentences)): enc_input = [[src_vocab[n] for n in sentences[i][0].split()]] dec_input = [[tgt_vocab[n] for n in sentences[i][1].split()]] dec_output = [[tgt_vocab[n] for n in sentences[i][2].split()]] enc_inputs.extend(enc_input) dec_inputs.extend(dec_input) dec_outputs.extend(dec_output) return torch.LongTensor(enc_inputs), torch.LongTensor(dec_inputs), torch.LongTensor(dec_outputs) enc_inputs, dec_inputs, dec_outputs = make_data(sentences) # print(enc_inputs) # print(dec_inputs) # print(dec_inputs) #自定义数据集 class MyDataSet(Data.Dataset): def __init__(self, enc_inputs, dec_inputs, dec_outputs): super(MyDataSet, self).__init__() self.enc_inputs = enc_inputs self.dec_inputs = dec_inputs self.dec_outputs = dec_outputs def __len__(self): return self.enc_inputs.shape[0] def __getitem__(self, idx): return self.enc_inputs[idx], self.dec_inputs[idx], self.dec_outputs[idx] loader = Data.DataLoader(MyDataSet(enc_inputs, dec_inputs, dec_outputs), 2, True) #参数设置 d_model = 512 #字embedding的维度 d_ff = 2048 #前向传播的隐藏层维度 d_k = d_v = 64 #K,Q,V的维度 n_layers = 6 #有多少个Encoder和decoder n_heads = 8 #定义位置信息 class PositionalEncoding(nn.Module): def __init__(self, d_model, dropout=0.1, max_len=5000): super(PositionalEncoding, self).__init__() self.dropout = nn.Dropout(p=dropout) pos_table = np.array([ [pos / np.power(10000, 2 * i / d_model) for i in range(d_model)] if pos!=0 else np.zeros(d_model) for pos in range(max_len) ]) pos_table[1:, 0::2] = np.sin(pos_table[1:, 0::2]) # 字嵌入维度为偶数时 pos_table[1:, 1::2] = np.cos(pos_table[1:, 1::2]) # 字嵌入维度为奇数时 self.pos_table = torch.FloatTensor(pos_table).cuda() def forward(self, enc_inputs): enc_inputs += self.pos_table[:enc_inputs.size(1),:] return self.dropout(enc_inputs.cuda()) # Mask掉停用词 P对应句子没有实际意义,所以需要Mask # seq_k.data.eq(0),这句的作用是返回一个大小和 seq_k 一样的 tensor, # 只不过里面的值只有 True 和 False。 # 如果 seq_k 某个位置的值等于 0,那么对应位置就是 True,否则即为 False # encoder和decoder都会用到,如果实在Encoder调用 seq_len=src_len,如果在decoder调用seq_len有可能调用seq-len也有可能等于src_len def get_attn_pad_mask(seq_q, seq_k): batch_size, len_q = seq_q.size() batch_size, len_k = seq_k.size() pad_attn_mask = seq_k.data.eq(0).unsqueeze(1) #[batch_size, 1, len_k] unsqueeze()函数起升维的作用,参数表示在哪个地方加一个维度 return pad_attn_mask.expand(batch_size, len_q, len_k) #[batch_size, len_q, len_k] # Decoder输入 mask未来输入信息 # 对于decoder,我们每次输入是在前一次基础上加一个词,所以输入是一个下三角矩阵 # 比如输入"S I am a student" # 在T0时刻先输入"S"预测,预测第一个词"I";在下一个T1时刻, # 同时输入"S"和"I"到Decoder预测下一个单词"am"; # 然后在T2时刻把"S,I,am"同时输入到Decoder预测下一个单词"a" # np.triu(data, k) k=0表示正常的上三角矩阵 # S # S I # S I am def get_attn_subsequence_mask(seq): # seq: [batch_size, tgt_len] attn_shape = [seq.size(0), seq.size(1), seq.size(1)] # 生成上三角矩阵,[batch_size, tgt_len, tgt_len] subsequence_mask = np.triu(np.ones(attn_shape), k=1) subsequence_mask = torch.from_numpy(subsequence_mask).byte() return subsequence_mask # 计算注意力 残差和归一化 class ScaledDotProductAttention(nn.Module): def __init__(self): super(ScaledDotProductAttention, self).__init__() def forward(self, Q, K, V, attn_mask): # Q: [batch_size, n_heads, len_q, d_k] # K: [batch_size, n_heads, len_k, d_k] # V: [batch_size, n_heads, len_k, d_v] scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k) scores.masked_fill_(attn_mask, -1e9) attn = nn.Softmax(dim=-1)(scores) context = torch.matmul(attn, V) return context, attn class MultiHeadAttention(nn.Module): def __init__(self): super(MultiHeadAttention, self).__init__() self.W_Q = nn.Linear(d_model, d_k * n_heads, bias=False) self.W_K = nn.Linear(d_model, d_k * n_heads, bias=False) self.W_V = nn.Linear(d_model, d_v * n_heads, bias=False) self.fc = nn.Linear(d_v * n_heads, d_model, bias=False) def forward(self, input_Q, input_K, input_V, attn_mask): ''' input_Q: [batch_size, len_q, d_model] input_K: [batch_size, len_k, d_model] input_V: [batch_size, len_v(=len_k), d_model] attn_mask: [batch_size, seq_len, seq_len] ''' residual, batch_size = input_Q, input_Q.size(0) Q = self.W_Q(input_Q).view(batch_size, -1, n_heads, d_k).transpose(1, 2) K = self.W_K(input_K).view(batch_size, -1, n_heads, d_k).transpose(1, 2) V = self.W_V(input_V).view(batch_size, -1, n_heads, d_v).transpose(1, 2) attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1) context, attn = ScaledDotProductAttention()(Q, K, V, attn_mask) context = context.transpose(1, 2).reshape(batch_size, -1, n_heads*d_v) output = self.fc(context) return nn.LayerNorm(d_model).cuda()(output + residual), attn #前馈神经网络 class PoswizeFeedForwardNet(nn.Module): def __init__(self): super(PoswizeFeedForwardNet, self).__init__() self.fc = nn.Sequential( nn.Linear(d_model, d_ff, bias=False), nn.ReLU(), nn.Linear(d_ff, d_model, bias=False) ) def forward(self, inputs): residual = inputs output = self.fc(inputs) return nn.LayerNorm(d_model).cuda()(output + residual) class EncoderLayer(nn.Module): def __init__(self): super(EncoderLayer, self).__init__() self.enc_self_attn = MultiHeadAttention() self.pos_ffn = PoswizeFeedForwardNet() def forward(self, enc_inputs, enc_self_attn_mask): #输入三个enc_inputs分别与W_q,Q_k,W_v相乘得到Q,K,V enc_outputs, attn = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs, enc_self_attn_mask) enc_outputs = self.pos_ffn(enc_inputs) return enc_outputs, attn # Encoder # 首先 将中文embedding # 将embedding加上位置信息 # Mask掉句子中的占位符 # 通过N个Encoder, N=6 class Encoder(nn.Module): def __init__(self): super(Encoder, self).__init__() self.src_emb = nn.Embedding(src_vocab_size, d_model) self.pos_emb = PositionalEncoding(d_model) self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)]) def forward(self, enc_inputs): ''' :param enc_inputs: [batch_size, src_len] :return: ''' enc_outputs = self.src_emb(enc_inputs) # [batch_size, src_len d_model] enc_outputs = self.pos_emb(enc_outputs.transpose(0, 1)).transpose(0, 1) enc_self_attn_mask = get_attn_pad_mask(enc_inputs, enc_inputs) enc_self_attns = [] for layer in self.layers: # enc_outputs: [batch_size, src_len, d_model], enc_self_attn: [batch_size, n_heads, src_len, src_len] enc_outputs, enc_self_attn = layer(enc_outputs, enc_self_attn_mask) enc_self_attns.append(enc_self_attn) return enc_outputs, enc_self_attns class DecoderLayer(nn.Module): def __init__(self): super(DecoderLayer, self).__init__() self.dec_self_attn = MultiHeadAttention() self.dec_enc_attn = MultiHeadAttention() self.pos_ffn = PoswizeFeedForwardNet() def forward(self, dec_inputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask): dec_outputs, dec_self_attn = self.dec_self_attn(dec_inputs, dec_inputs, dec_inputs, dec_self_attn_mask) dec_outputs, dec_enc_attn = self.dec_enc_attn(dec_outputs, dec_inputs, dec_inputs, dec_enc_attn_mask) dec_outputs = self.pos_ffn(dec_outputs) return dec_outputs, dec_self_attn, dec_enc_attn # Decoder class Decoder(nn.Module): def __init__(self): super(Decoder, self).__init__() self.tgt_emb = nn.Embedding(tgt_vocab_size, d_model) self.pos_emb = PositionalEncoding(d_model) self.layers = nn.ModuleList([DecoderLayer() for _ in range(n_layers)]) def forward(self, dec_inputs, enc_inputs, enc_outputs): ''' :param dec_inputs: [batch_size, tgt_leb] :param enc_inputs: [batch_size, src_len] :param enc_outputs: [batch_size, src_len, d_model] :return: ''' dec_outputs = self.tgt_emb(dec_inputs) # [batch_size, tgt_leb, d_model] dec_outputs = self.pos_emb(dec_outputs.transpose(0,1)).transpose(0,1) dec_self_attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs).cuda() dec_self_attn_subsequence_mask = get_attn_subsequence_mask(dec_inputs).cuda() dec_self_attn_mask = torch.gt((dec_self_attn_pad_mask + dec_self_attn_subsequence_mask), 0).cuda() # 这个mask主要是enc_inputs的pad mask矩阵 dec_enc_attn_mask = get_attn_pad_mask(dec_inputs, enc_inputs) dec_self_attns, dec_enc_attns = [], [] for layer in self.layers: dec_outputs, dec_self_attn, dec_enc_attn = layer(dec_outputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask) dec_self_attns.append(dec_self_attn) dec_enc_attns.append(dec_enc_attn) return dec_outputs, dec_self_attns, dec_enc_attns # Transformer class Transformer(nn.Module): def __init__(self): super(Transformer, self).__init__() self.Encoder = Encoder().cuda() self.Decoder = Decoder().cuda() self.projection = nn.Linear(d_model, tgt_vocab_size, bias=False).cuda() def forward(self, enc_inputs, dec_inputs): enc_outputs, enc_self_attns = self.Encoder(enc_inputs) dec_outputs, dec_self_attns, dec_enc_attns = self.Decoder( dec_inputs, enc_inputs, enc_outputs) dec_logits = self.projection(dec_outputs) return dec_logits.view(-1, dec_logits.size(-1)), enc_self_attns, dec_self_attns, dec_enc_attns # 定义网络 model = Transformer().cuda() criterion = nn.CrossEntropyLoss(ignore_index=0) optimizer = optim.SGD(model.parameters(), lr=1e-3, momentum=0.99) # 训练 # for epoch in range(1000): # for enc_inputs, dec_inputs, dec_outputs in loader: # enc_inputs, dec_inputs, dec_outputs = enc_inputs.cuda(), dec_inputs.cuda(), dec_outputs.cuda() # outputs, enc_self_attns, dec_self_attns, dec_enc_attns = model(enc_inputs, dec_inputs) # # loss = criterion(outputs, dec_outputs.view(-1)) # print('Epoch:', '%04d' % (epoch + 1), 'loss =', '{:.6f}'.format(loss)) # optimizer.zero_grad() # loss.backward() # optimizer.step() # 测试 def test(model, enc_input, start_symbol): enc_outputs, enc_self_attns = model.Encoder(enc_input) dec_input = torch.zeros(1,tgt_len).type_as(enc_input.data) next_symbol = start_symbol for i in range(0,tgt_len): dec_input[0][i] = next_symbol dec_outputs, _, _ = model.Decoder(dec_input,enc_input,enc_outputs) projected = model.projection(dec_outputs) prob = projected.squeeze(0).max(dim=-1, keepdim=False)[1] next_word = prob.data[i] next_symbol = next_word.item() return dec_input enc_inputs, _, _ = next(iter(loader)) predict_dec_input = test(model, enc_inputs[1].view(1, -1).cuda(), start_symbol=tgt_vocab["S"]) predict, _, _, _ = model(enc_inputs[1].view(1, -1).cuda(), predict_dec_input) predict = predict.data.max(1, keepdim=True)[1] print([src_idx2word[int(i)] for i in enc_inputs[1]], '->', [idx2word[n.item()] for n in predict.squeeze()])
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。