赞
踩
import math
import torch
from torch import nn
from d2l import torch as d2l
class MultiHeadAttention(nn.Module): """多头注意力""" def __init__(self, key_size, query_size, value_size, num_hiddens, num_heads, dropout, bias=False, **kwargs): super(MultiHeadAttention, self).__init__(**kwargs) self.num_heads = num_heads self.attention = d2l.DotProductAttention(dropout) self.W_q = nn.Linear(query_size, num_hiddens, bias=bias) self.W_k = nn.Linear(key_size, num_hiddens, bias=bias) self.W_v = nn.Linear(value_size, num_hiddens, bias=bias) self.W_o = nn.Linear(num_hiddens, num_hiddens, bias=bias) def forward(self, queries, keys, values, valid_lens): # queries,keys,values的形状: # (batch_size,查询或者“键-值”对的个数,num_hiddens) # valid_lens 的形状: # (batch_size,)或(batch_size,查询的个数) # 经过变换后,输出的queries,keys,values 的形状: # (batch_size*num_heads,查询或者“键-值”对的个数, # num_hiddens/num_heads) queries = transpose_qkv(self.W_q(queries), self.num_heads) keys = transpose_qkv(self.W_k(keys), self.num_heads) values = transpose_qkv(self.W_v(values), self.num_heads) if valid_lens is not None: # 在轴0,将第一项(标量或者矢量)复制num_heads次, # 然后如此复制第二项,然后诸如此类。 valid_lens = torch.repeat_interleave( valid_lens, repeats=self.num_heads, dim=0) # output的形状:(batch_size*num_heads,查询的个数, # num_hiddens/num_heads) output = self.attention(queries, keys, values, valid_lens) # output_concat的形状:(batch_size,查询的个数,num_hiddens) output_concat = transpose_output(output, self.num_heads) return self.W_o(output_concat)
定义了两个函数 transpose_qkv 和 transpose_output,用于在多头注意力计算中进行张量形状的变换,以适应并行计算的需要。
def transpose_qkv(X, num_heads): """为了多注意力头的并行计算而变换形状""" # 输入X的形状:(batch_size,查询或者“键-值”对的个数,num_hiddens) # 输出X的形状:(batch_size,查询或者“键-值”对的个数,num_heads, # num_hiddens/num_heads) X = X.reshape(X.shape[0], X.shape[1], num_heads, -1) # 输出X的形状:(batch_size,num_heads,查询或者“键-值”对的个数, # num_hiddens/num_heads) X = X.permute(0, 2, 1, 3) # 最终输出的形状:(batch_size*num_heads,查询或者“键-值”对的个数, # num_hiddens/num_heads) return X.reshape(-1, X.shape[2], X.shape[3]) def transpose_output(X, num_heads): """逆转transpose_qkv函数的操作""" X = X.reshape(-1, num_heads, X.shape[1], X.shape[2]) X = X.permute(0, 2, 1, 3) return X.reshape(X.shape[0], X.shape[1], -1)
class DotProductAttention(nn.Module): """缩放点积注意力""" def __init__(self, dropout, **kwargs): super(DotProductAttention, self).__init__(**kwargs) self.dropout = nn.Dropout(dropout) # queries的形状:(batch_size,查询的个数,d) # keys的形状:(batch_size,“键-值”对的个数,d) # values的形状:(batch_size,“键-值”对的个数,值的维度) # valid_lens的形状:(batch_size,)或者(batch_size,查询的个数) def forward(self, queries, keys, values, valid_lens=None): d = queries.shape[-1] # 设置transpose_b=True为了交换keys的最后两个维度 scores = torch.bmm(queries, keys.transpose(1,2)) / math.sqrt(d) self.attention_weights = masked_softmax(scores, valid_lens) return torch.bmm(self.dropout(self.attention_weights), values)
masked_softmax函数用于在最后一个轴上掩蔽元素来执行softmax操作:
def masked_softmax(X, valid_lens):
# X:3D张量,valid_lens:1D或2D张量
if valid_lens is None:
return nn.functional.softmax(X, dim=-1)
else:
shape = X.shape
if valid_lens.dim() == 1:
valid_lens = torch.repeat_interleave(valid_lens, shape[1])
else:
valid_lens = valid_lens.reshape(-1)
# 最后一轴上被掩蔽的元素使用一个非常大的负值替换,从而其softmax输出为0
X = d2l.sequence_mask(X.reshape(-1, shape[-1]), valid_lens,
value=-1e6)
return nn.functional.softmax(X.reshape(shape), dim=-1)
import math
from typing import Optional, List# 从 typing 模块中导入 Optional 和 List 类型,用于类型提示
import torch
from torch import nn
from labml import tracker #跟踪实验中的指标和损失值等信息
在多头自注意力机制中,每个注意力头都需要一个查询(Query)、一个键(Key)和一个值(Value)向量。这些向量通过线性变换从输入特征中提取而来,然后用于计算注意力权重和加权求和。具体来说,给定输入特征张量
X
X
X,我们首先通过三个线性变换来计算查询
Q
Q
Q、键
K
K
K和值向量
V
V
V:
Q
=
X
⋅
W
Q
K
=
X
⋅
W
K
V
=
X
⋅
W
V
Q = X \cdot W_Q\\ K = X \cdot W_K\\ V = X \cdot W_V
Q=X⋅WQK=X⋅WKV=X⋅WV
其中, W Q W_Q WQ、 W K W_K WK 和 W V W_V WV 是学习到的权重矩阵,用于将输入特征 X X X 映射到查询、键和值向量的空间中。这些线性变换可以通过 PyTorch 中的 nn.Linear 层来实现。
该模块用于准备多头自注意力机制中的查询、键和值向量:
class PrepareForMultiHeadAttention(nn.module): ''' d_model:模型输入的特征维度; heads:注意力机制中的头数; d_k:每个头部中以向量表示的维度数; bias:是否使用偏置项 ''' def __init__(self,d_model:int,heads:int,d_k:int,bias:bool): super().__init__() #线性变换的线性层,输入为d_model,输出为heads*d_k self.linear=nn.Linear(d_model,heads*d_k,bias=bias) self.heads=heads self.d_k=d_k ''' x的形状为:seq_len,batch_size,d_model或batch_size,d_model 输出形状:seq_len,batch_size,heads,d_k或batch_size,heads,d_k ''' def forward(self,x:torch.Tensor): #获取输入张量 x 的形状,去掉最后一个维度,得到一个形状为 (seq_len, batch_size) 或 (batch_size,) 的元组 head_shape=x.shape[:-1] x=self.linear(x) #将线性变换后的张量进行重塑操作,将最后一个维度拆分为heads个头部,每个头部的维度为d_k x=x.view(*head_shape,self.heads,self.d_k) return x
pytorch中的view方法:用于对张量进行重塑(reshape)。其作用是将张量的形状变换为指定的形状,但是要求变换后的形状与原始形状的元素数量保持一致。
* 表示解包(unpacking)操作符。在函数调用或函数定义中,*args 表示将传入的参数打包成一个元组,而在函数调用时,*args 则表示将元组解包为独立的参数。
通过计算查询
Q
Q
Q和键
K
K
K之间的点积,并应用缩放因子
1
d
k
\frac{1}{\sqrt{d_k}}
dk
1,得到注意力权重:
α
=
s
o
f
t
m
a
x
(
Q
K
T
d
k
)
\alpha=softmax(\frac{QK^T}{\sqrt{d_k}})
α=softmax(dk
QKT)
利用注意力权重对值向量
V
V
V进行加权求和,得到最终的输出向量
A
t
t
e
n
t
i
o
n
(
Q
,
K
,
V
)
=
α
V
=
s
o
f
t
m
a
x
(
Q
K
T
d
k
)
V
Attention(Q,K,V)=\alpha V=softmax(\frac{QK^T}{\sqrt{d_k}})V
Attention(Q,K,V)=αV=softmax(dk
QKT)V
这样,对于每个注意力头,都可以得到一个输出向量,在多头自注意力机制中,会并行地进行多个这样的注意力头的计算,最后将他们的输出向量连接起来,形成最终的输出。
注意力遮罩(Attention Mask)用于指示哪些位置的信息是有效的,哪些位置是无效的。在自注意力机制中,有时需要对注意力权重进行调整,以便在计算注意力时忽略某些位置的信息,或者对某些位置的信息赋予特定的权重。
注意力遮罩主要应用在查询和键之间的相似度计算过程中,用于调整或者限制查询和键之间的关系。
class MultiHeadAttention(nn.Module): ''' heads:头的数量。 d_model:query 、key 和value 向量中的要素数 d_k:每头特征数 ''' def __init__(self,heads:int,d_model:int,dropout_prob:float=0.1,bias=True): super().__init__() self.d_k=d_model // heads #计算每个头部的查询、键和值的维度 self.heads=heads self.query=PrepareForMultiHeadAttention(d_model,heads,self.d_k,bias=bias) self.key=PrepareForeMultiHeadAttention(d_model,heads,self.d_k,bias=bias) self.value=PrepareForMultiHeadAttention(d_model,heads,self.d_k,bias=True) #创建一个 Softmax 层,用于计算注意力权重,dim=1 表示在时间维度上进行 Softmax 计算。 self.softmax=nn.Softmax(dim=1) #创建一个 Dropout 层,用于在训练过程中进行随机失活 self.dropout=nn.Dropout(dropout_prob) #计算缩放因子,这里使用了倒数的平方根进行缩放 self.scale=1/math.sqrt(self.d_k) #初始化一个属性 attn,用于存储注意力权重 self.attn=None def get_scores(self,query:torch.Tensor,key:torch.Tensor): return torch.einsum('ibhd,jbhd -> ijbh',query,key) ''' mask: 输入的注意力遮罩,形状为(seq_len_q, seq_len_k, batch_size)。 query_shape: 查询张量的形状,其中包含序列长度和批次大小。 key_shape: 键张量的形状,其中包含序列长度和批次大小。 ''' def prepare_mask(self,mask:torch.Tensor,query_shape:List[int],key_shape:List[int]): #确保输入的注意力遮罩的第一个维度的大小与查询张量的序列长度维度大小匹配或者为1 assert mask.shape[0] == 1 or mask.shape[0] == query_shape[0] #确保输入的注意力遮罩的第二个维度大小与键张量的序列长度维度大小匹配 assert mask.shape[1] == key_shape[0] #确保输入的注意力遮罩的第三个维度的大小与查询张量的批次大小维度大小匹配或者为1 assert mask.shape[2] == 1 or mask.shape[2] == query_shape[1] #将注意力遮罩的最后一个维度扩展一个维度 #形状从(seq_len_q, seq_len_k, batch_size)扩展为(seq_len_q, seq_len_k, batch_size, heads) mask = mask.unsqueeze(-1) return mask ''' query、key、value的形状:[seq_len, batch_size, d_model] mask的形状:[seq_len, seq_len, batch_size] ''' def forward(self,*, query:torch.Tensor, key:torch.Tensor, value:torch.Tensor, mask:Optional[torch.Tensor] = None): seq_len,batch_size,_ = query.shape if mask is not None: mask=self.prepare_mask(mask,query.shape,key.shape) #query、key、value的形状经过处理后变为:[seq_len,batch_size,heads,d_k] query=self.query(query) key=self.key(key) value=self.value(value) #计算注意力分数 scores=self.get_scores(query,key) #应用缩放因子 scores *= self.scale #应用mask if mask is not None: #将遮罩中值为 0 的位置替换为负无穷,这样在计算 Softmax 时对应位置的注意力权重将为 0 scores=scores.masked_fill(mask==0,float('-inf')) #计算注意力权重 attn=self.softmax(scores) #在调试过程中输出注意力权重 attn 的信息 tracker.debug('attn',attn) #应用dropout attn=self.dropout(attn) #加权求和 x=torch.einsum("ijbh,jbhd->ibhd",attn,value) #将注意力权重从计算图中分离出来,以避免在反向传播过程中对注意力权重的梯度进行更新 self.attn=attn.detach() #连接多个头:将经过加权求和得到的向量按照特定形状重新排列 x=x.reshape(seq_len,batch_size,-1) return self.output(x)
参考:https://github.com/labmlai/annotated_deep_learning_paper_implementations?tab=readme-ov-file
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。