赞
踩
import numpy as np
import torch
import torch.nn as nn
import math
import torch.nn.functional as F
# 001
def get_attn_pad_mask(seq_q, seq_k):
# 针对句子长短不一,为方便进行并行运算,引入padding,为减少padding对相关性分数的影响,要将padding部分mask起来
"""
# 得到句子中padding的位置信息,以便于在计算自注意力和交互注意力的时候去掉padding符号的影响
# 在自注意力机制部分中,计算出Q*K的转置除以根号d_k之后,softmax之前,会得到一个张量
# 其形状为[batch_size,len_q,len_k],代表单词间(Q、K)的影响力分数
# 而get_attn_pad_mask的结果提供了一个与之尺度相同的张量,助力定位padding位置
# 计算softmax之前会将它们置为无穷大,以达到消除padding影响的作用
"""
batch_size, len_q = seq_q.size()
batch_size, len_k = seq_k.size()
"""
K、Q不一定一致,比如交叉注意力中,Q来自于解码端,K来自编码端
实际上,本方法只对K中的padding符号进行标识,并没有对Q进行处理
或许,Q、K的角色不同,导致了这样的结果
"""
pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)
# pad的编号是 0,因此找到为零的位置即可,然后再给张量升维(为了符合注意力分数的尺寸)
# [batch_size , 1 , len_k] 张量中数值为1的是被mask掉的位置
pad_attn_mask = pad_attn_mask.expand(batch_size, len_q, len_k)
# 将张量膨胀到与注意力分数同样尺寸
# [batch_size , len_q , len_k]
return pad_attn_mask
# 002
class ScaledDotProductAttention(nn.Module):
# 引入缩放因子的点积自注意力
def __init__(self):
super(ScaledDotProductAttention, self).__init__()
def forward(self, Q, K, V, attn_mask):
"""
Q:[batch_size, n_heads, len_q, d_k]
K:[batch_size, n_heads, len_k, d_k]
V:[batch_size, n_heads, len_k, d_v]
KQ的编码尺寸d_k相同,KV的长度相同,这都是潜在的信息
"""
d_k = K.size(-1)
# 根据公式求得注意力分数scores,其形状为:[batch_size, n_heads, len_q, len_k]
scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k)
# 关键点attn_mask,把被mask的地方置为无限小,softmax之后基本就是0,这样padding就对Q的单词不起作用了
scores.masked_fill_(attn_mask, -1e9)
attn = nn.Softmax(dim=-1)(scores)
context = torch.matmul(attn, V)
return context
# 003
class MultiHeadAttention(nn.Module):
# 多头自注意力机制
def __init__(self,config):
super(MultiHeadAttention, self).__init__()
self.d_model = config.d_model
self.d_k = config.d_k
self.d_v = config.d_v
self.n_heads = config.n_heads
# 通过线性变换获取QKV,同时完成多头可用的分头数据量准备
self.W_Q = nn.Linear(self.d_model, self.d_k * self.n_heads)
self.W_K = nn.Linear(self.d_model, self.d_k * self.n_heads)
self.W_V = nn.Linear(self.d_model, self.d_v * self.n_heads)
# 隐变量维度转换
self.linear = nn.Linear(self.n_heads * self.d_v, self.d_model)
# 层归一化
self.layer_norm = nn.LayerNorm(self.d_model)
def forward(self, Q, K, V, attn_mask):
# 这个多头分为这几个步骤,首先映射分头,然后计算atten_scores,然后计算atten_value
"""
数据形状:
Q: [batch_size, len_q, d_model]
K: [batch_size, len_k, d_model]
V: [batch_size, len_k, d_model]
"""
# 准备残差项
residual, batch_size = Q, Q.size(0)
# 分头
# (B, S, D) -proj-> (B, S, D) -split-> (B, S, H, W) -trans-> (B, H, S, W)
# 下面这个就是先映射,后分头;一定要注意的是q和k分头之后维度是一致额,所以一看这里都是dk
q_s = self.W_Q(Q).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
# q_s: [batch_size, n_heads, len_q, d_k]
k_s = self.W_K(K).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
# k_s: [batch_size, n_heads, len_k, d_k]
v_s = self.W_V(V).view(batch_size, -1, self.n_heads, self.d_v).transpose(1, 2)
# v_s: [batch_size, n_heads, len_k, d_v]
# 输入进行的attn_mask形状是 [batch_size, len_q, len_k]
# 然后经过下面这个代码得到新的attn_mask : [batch_size, n_heads, len_q, len_k],就是把pad信息重复了n个头上
attn_mask = attn_mask.unsqueeze(1).repeat(1, self.n_heads, 1, 1)
# 然后计算 ScaledDotProductAttention
# 得到的结果:context: [batch_size, n_heads, len_q, d_v]
context = ScaledDotProductAttention()(q_s, k_s, v_s, attn_mask)
# 拼接
context = context.transpose(1, 2).contiguous().view(batch_size, -1, self.n_heads * self.d_v)
# context: [batch_size, len_q, n_heads * d_v]
"""
contiguous()的作用要从pytorch多维数组的低层存储开始说起,
它一般是配合torch.permute()、torch.transpose()、torch.view()一起使用
以上的方法对张量改变“形状”其实并没有改变张量在内存中真正的形状,只是改变了访问策略罢了
而torch.contiguous()方法首先拷贝了一份张量在内存中的地址,然后将地址按照形状改变后的张量的语义进行排列
也就是说它改变了内存中的存储方式。
"""
# 维度映射
output = self.linear(context)
# output: [batch_size, len_q, d_model]
# 残差链接,层归一化
output = context + residual
output = self.layer_norm(output)
return output
# 004
class PoswiseFeedForwardNet(nn.Module):
# 前馈神经网络(用卷积实现)
def __init__(self,config):
super(PoswiseFeedForwardNet, self).__init__()
self.d_model = config.d_model
self.d_ff = config.d_ff
self.conv1 = nn.Conv1d(in_channels=self.d_model, out_channels=self.d_ff, kernel_size=1)
self.conv2 = nn.Conv1d(in_channels=self.d_ff, out_channels=self.d_model, kernel_size=1)
self.layer_norm = nn.LayerNorm(self.d_model)
def forward(self, inputs):
residual = inputs # inputs : [batch_size, len_q, d_model]
output = nn.ReLU()(self.conv1(inputs.transpose(1, 2))) # 把一个样本“竖”起来,当成一个[1,d_model]的数据过卷积
output = self.conv2(output).transpose(1, 2) # 过完卷积,再把样本在“躺”下来
output = self.layer_norm(output + residual)
return output
# 005
class PositionalEncoding(nn.Module):
# 输入数据为经过embedding编码的数据,它先实现位置编码,然后将位置编码与embedding编码进行相加,并返回
def __init__(self,config):
super(PositionalEncoding, self).__init__()
self.dropout = config.dropout
self.d_model = config.d_model
self.max_len = config.max_len
self.dropout = nn.Dropout(p=self.dropout)
pe = torch.zeros(self.max_len, self.d_model)
# 位置编码的初始值,默认都为0
position = torch.arange(0, self.max_len, dtype=torch.float).unsqueeze(1)
# 转化成[max_len ,1]的形状,即绝对位置矩阵。
# position代表的是单词在句子中的索引
# 比如max_len(句子的长度)是128个,那么索引就是从0,1,2,...,127
# i的取值范围是0~d_model/2,则2i/2i+1即为词向量维度的编号
# 因此,假设d_model是512,2i那个符号中i从0取到了255,那么2i对应取值就是0,2,4...510,2i+1的取值1,3...511。
div_term = torch.exp(-1 * math.log(10000.0) * (torch.arange(0, self.d_model, 2).float() / self.d_model))
# 公式中的公共部分
# torch.arange(0, d_model, 2).float()就是公式中的2i
pe[:, 0::2] = torch.sin(position * div_term)
# 这里需要注意的是pe[:, 0::2]这个用法,就是从0开始到最后面,步长为2,其实代表的就是偶数位置
pe[:, 1::2] = torch.cos(position * div_term)
# 这里需要注意的是pe[:, 1::2]这个用法,就是从1开始到最后面,步长为2,其实代表的就是奇数位置
# 上面代码获取之后得到的pe:[max_len,d_model]
# 为了能和embedding相加还需要扩展一个维度,因此执行下面的代码
# 下面这个代码之后,我们得到的pe形状是:[1, max_len, d_model]
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)
# 为什么要把pe注册成buffer?什又是buffer?
# pe是对模型效果有帮助的,但又不是超参数、参数,无需随着优化步骤而变化
# 这就可以通过注册成buffer来保持它不变,并且在保存模型时保存,加载时与模型结构与参数一同加载。
def forward(self, x):
# x: [batch_size,seq_len,d_model],x是经过embedding编码的
# 相加前对pe做一下适配工作,显然pe第二维是max_len=5000太长了,不妨将它切片成输入x的第二维相同大小,即x.size(1)
# 相加是基于广播机制完成的
x = x + self.pe[:, :x.size(1), :]
# 此时的x是 输入数据的embedding编码 + positional编码
return self.dropout(x)
# 006
class EncoderLayer(nn.Module):
def __init__(self,config):
super(EncoderLayer, self).__init__()
self.enc_self_attn = MultiHeadAttention(config)
self.pos_ffn = PoswiseFeedForwardNet(config)
def forward(self, enc_inputs, enc_self_attn_mask):
# 下面这个就是做自注意力层,输入是enc_inputs
# 形状是[batch_size, seq_len_q, d_model]
# 需要注意的是,因为此处采用的是最基础的自注意力机制,因此原始QKV矩阵是相同的,就是enc_inputs
# enc_self_attn_mask是掩码策略
enc_outputs = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs, enc_self_attn_mask)
enc_outputs = self.pos_ffn(enc_outputs)
# enc_outputs: [batch_size, len_q, d_model]
return enc_outputs
# 007
class Encoder(nn.Module):
def __init__(self,config):
super(Encoder, self).__init__()
self.d_model = config.d_model
self.src_vocab_size = config.src_vocab_size
self.enc_n_layers = config.enc_n_layers
self.src_emb = nn.Embedding(self.src_vocab_size, self.d_model)
# src_vocab_size 是原始语言的词典长度,d_model是词向量的维度,在初始阶段就实现了,词典中不同编号的词向量生成。
self.pos_emb = PositionalEncoding(config)
# 位置编码情况,这里是固定的正余弦函数,也可以使用类似词向量的nn.Embedding获得一个可以更新学习的位置编码
self.layers = nn.ModuleList([EncoderLayer(config) for _ in range(self.enc_n_layers)])
# 使用ModuleList对多个encoderlayer进行堆叠
def forward(self, enc_inputs):
# enc_inputs形状为[batch_size , src_len]
# 此时每个单词还都是用一个编号在代替
# 下面这个代码通过src_emb,进行索引(用编号找词对应的向量)定位,enc_outputs输出形状是[batch_size, src_len, d_model]
enc_outputs = self.src_emb(enc_inputs)
# 获取位置编码,并将上一步的结果与之相加。
enc_outputs = self.pos_emb(enc_outputs)
# get_attn_pad_mask是为了得到句子中(qk中)padding的位置信息,给到模型后面,
# 在计算自注意力和交互注意力的时候去掉padding符号的影响
# 此处qk的来源相同,都是enc_inputs
enc_self_attn_mask = get_attn_pad_mask(enc_inputs, enc_inputs)
# 嵌套encoderlayer,得到encoder最终的输出值
for layer in self.layers:
enc_outputs = layer(enc_outputs, enc_self_attn_mask)
return enc_outputs
# 至此,与encoder相关的部分全部完成。
# 008
def get_attn_subsequent_mask(tgt_len):
# 生成掩码张量,为了防止后续位置的信息被添加到当前位置中,这样就能保障对第i个位置的预测只依赖于i前面的信息。
# 实际上就是生成一个上三角阵
# tgt_len就是翻译完的句子的长度,在训练中是decoder的输入dec_inputs:[batch_size,tgt_len]的第二个维度
# 在训练时是直接作弊给模型看标准答案的。
attn_shape = [1, tgt_len, tgt_len]
# 就像把句子复制多次,组成方阵,然后用上三角阵一盖,就间接的实现了一次揭秘一个
subsequence_mask = np.triu(np.ones(attn_shape), k=1) # 用np生成一个上三角矩阵,后续利用该上三角阵实现掩码操作
subsequence_mask = torch.from_numpy(subsequence_mask).byte() # 把np形式的上三角阵转化成tensor
return subsequence_mask # [1, tgt_len, tgt_len]
# 009
class DecoderLayer(nn.Module):
def __init__(self,config):
super(DecoderLayer, self).__init__()
self.dec_self_attn = MultiHeadAttention(config)
self.dec_enc_attn = MultiHeadAttention(config)
self.pos_ffn = PoswiseFeedForwardNet(config)
def forward(self, dec_inputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask):
dec_outputs = self.dec_self_attn(dec_inputs, dec_inputs, dec_inputs, dec_self_attn_mask)
dec_outputs = self.dec_enc_attn(dec_outputs, enc_outputs, enc_outputs, dec_enc_attn_mask)
dec_outputs = self.pos_ffn(dec_outputs)
return dec_outputs
# 010
class Decoder(nn.Module):
def __init__(self,config):
super(Decoder, self).__init__()
self.d_model = config.d_model
self.tgt_vocab_size = config.tgt_vocab_size
self.dec_n_layers = config.dec_n_layers
self.tgt_emb = nn.Embedding(self.tgt_vocab_size, self.d_model)
self.pos_emb = PositionalEncoding(config)
self.layers = nn.ModuleList([DecoderLayer(config) for _ in range(self.dec_n_layers)])
def forward(self, dec_inputs, enc_inputs, enc_outputs):
# enc_inputs是为了给交叉注意力机制的掩码提供信息。
# dec_inputs : [batch_size , target_len]训练时相当于直接给看答案(经过mask的)
dec_outputs = self.tgt_emb(dec_inputs) # [batch_size, tgt_len, d_model]
dec_outputs = self.pos_emb(dec_outputs) # [batch_size, tgt_len, d_model]
# 以下三句话是为了生成decoderlayer的第一个自注意力的掩码,两部分组成:既包括padding掩码,也包括防止看到后续信息的掩码张量
# get_attn_pad_mask 自注意力层的时候的pad 部分
dec_self_attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs)
# get_attn_subsequent_mask 这个做的是自注意层的mask部分,就是当前单词之后看不到,使用一个上三角为1的矩阵,即防止看到后续信息的掩码张量
tgt_len = dec_inputs.size(1)
dec_self_attn_subsequent_mask = get_attn_subsequent_mask(tgt_len)
# 两个矩阵相加,大于0的为1,不大于0的为0,为1的在之后就会被fill到无限小,然后在做softmax,就可以同时避开padding和后续信息的影响
dec_self_attn_mask = torch.gt((dec_self_attn_pad_mask + dec_self_attn_subsequent_mask), 0)
# 这个做的是交叉注意力机制中的mask矩阵,enc的输入是k,我去看这个k里面哪些是pad符号,给到后面的模型;
# 注意哦,q肯定也是有pad符号,但是这里不在意它的,具体什么道理不是很明白。
dec_enc_attn_mask = get_attn_pad_mask(dec_inputs, enc_inputs)
for layer in self.layers:
dec_outputs = layer(dec_outputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask)
return dec_outputs
# 011
class Transformer(nn.Module):
# 返回的是预测的句子中每个词的概率
def __init__(self,config):
super(Transformer, self).__init__()
self.d_model = config.d_model
self.tgt_vocab_size = config.tgt_vocab_size
self.encoder = Encoder(config) # 编码层
self.decoder = Decoder(config) # 解码层
self.projection = nn.Linear(self.d_model, self.tgt_vocab_size, bias=False) # 输出层
# 通过线性映射,将输出的d_model维的隐变量 转化成 与目标词典长度(tgt_vocab_size)相同的向量
# 然后通过softmax就可以确定预测的词 或者计算损失了,这点看起来与分类问题很像。
self.softmax = F.softmax
def forward(self, enc_inputs, dec_inputs):
# 这里有两个数据进行输入
# 一个enc_inputs,形状为[batch_size, src_len](src_len表示一句话中最多允许有多少个单词),其作为encoder的输入;
# 一个dec_inputs,形状为[batch_size, tgt_len],主要是作为decoder的输入(以训练的视角,所以此处是标签或者说是正确答案)。
# 每个词使用一个编号记录的,比如apple是001,banana是002,在encoder/decoder中再对它们编码
enc_outputs = self.encoder(enc_inputs)
# enc_outputs是主要输出
dec_outputs = self.decoder(dec_inputs, enc_inputs, enc_outputs)
# 之所以用到编码器的输入,是为了在求交叉注意力机制的mask张量时,给pad_mask函数提供padding信息
# dec_outputs是主要输出,用于后续的linear映射;
dec_logits = self.projection(dec_outputs)
# dec_outputs做映射到词表大小
# dec_logits : [batch_size , new_seq_len , tgt_vocab_size]
# new_seq_len生成的句子的长度。
dec_logits = self.softmax(dec_logits, dim=-1)
return dec_logits # [batch_size , new_seq_len , tgt_vocab_size],每个元素代表着对应标号词的置信度
# model_config.py
import json
# 构建参数管理类ModelConfig
class ModelConfig:
def __init__(self, # 把要用到的参数在这声明
d_k = None, # 做多头时,K、Q的维度,但实际上一般 d_k=d_v=d_model/n_heads
d_v = None, # 做多头时,V的维度,实际上 d_v * n_heads = d_model
d_model = None, # 隐层向量的维度
src_vocab_size = None, # 原始语言字典的长度
tgt_vocab_size = None, # 目标语言字典的长度
enc_n_layers = 6, # encoder的堆叠层数,默认为6
dec_n_layers = 6, # decoder的堆叠层数,默认为6
dropout = 0.1, # dropout的丢弃概率,默认为0.1
max_len = 5000, # 位置编码默认的句子中含词量的最大长度,默认为5000
n_heads = 8, # 做多头时,采用的多头数,默认为8
d_ff = 2048, # 前馈神经网络模块的中间层,默认为2048
):
self.d_k = d_k
self.d_v = d_v
self.d_model = d_model
self.src_vocab_size = src_vocab_size
self.tgt_vocab_size = tgt_vocab_size
self.enc_n_layers = enc_n_layers
self.dec_n_layers = dec_n_layers
self.dropout = dropout
self.max_len = max_len
self.n_heads = n_heads
self.d_ff = d_ff
def save(self, save_path): # save_path参数文件的保存路径,保存成json文件
f = open(save_path, "w")
d = {
"d_k": self.d_k,
"d_v": self.d_v,
"d_model": self.d_model,
"src_vocab_size": self.src_vocab_size,
"tgt_vocab_size": self.tgt_vocab_size,
"enc_n_layers": self.enc_n_layers,
"dec_n_layers": self.dec_n_layers,
"dropout": self.dropout,
"max_len": self.max_len,
"n_heads": self.n_heads,
"d_ff": self.d_ff
}
d = json.dumps(d)
f.write(d)
f.close()
def load(self, load_path):
# load_path是参数导入的路径,加载的是json文件,直接构建一个类不做赋值,然后执行加载文件
d = open(load_path).read()
d = json.loads(d)
self.d_k = d["d_k"]
self.d_v = d["d_v"]
self.d_model = d["d_model"]
self.src_vocab_size = d["src_vocab_size"]
self.tgt_vocab_size = d["tgt_vocab_size"]
self.enc_n_layers = d["enc_n_layers"]
self.dec_n_layers = d["dec_n_layers"]
self.dropout = d["dropout"]
self.max_len = d["max_len"]
self.n_heads = d["n_heads"]
self.d_ff = d["d_ff"]
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import math
def make_batch(sentences):
input_batch = [[src_vocab[n] for n in sentences[0].split()]]
output_batch = [[tgt_vocab[n] for n in sentences[1].split()]]
target_batch = [[tgt_vocab[n] for n in sentences[2].split()]]
return torch.LongTensor(input_batch), torch.LongTensor(output_batch), torch.LongTensor(target_batch)
if __name__ == '__main__':
## 句子的输入部分,
sentences = ['ich mochte ein bier P', 'S i want a beer', 'i want a beer E']
# Transformer Parameters
# Padding Should be Zero
## 构建词表
# 编码端的词表
src_vocab = {'P': 0, 'ich': 1, 'mochte': 2, 'ein': 3, 'bier': 4}
src_vocab_size = len(src_vocab) # src_vocab_size:实际情况下,它的长度应该是所有德语单词的个数
# 解码端的词表
tgt_vocab = {'P': 0, 'i': 1, 'want': 2, 'a': 3, 'beer': 4, 'S': 5, 'E': 6}
tgt_vocab_size = len(tgt_vocab) # 实际情况下,它应该是所有英语单词个数
src_len = 5 # length of source 编码端的输入长度
tgt_len = 5 # length of target 解码端的输入长度
config = ModelConfig(d_k=64, d_v=64, d_model=512, src_vocab_size=src_vocab_size, tgt_vocab_size=tgt_vocab_size)
model = Transformer(config)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
enc_inputs, dec_inputs, target_batch = make_batch(sentences)
for epoch in range(50):
optimizer.zero_grad()
outputs = model(enc_inputs, dec_inputs)
outputs = outputs.squeeze(0)
loss = criterion(outputs, target_batch.contiguous().view(-1))
print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))
loss.backward()
optimizer.step()
get_attn_pad_mask
函数。get_attn_subsequent_mask
函数。Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。