赞
踩
一, 代码实例:
import torch
import math
from torch import nn
from d2l import torch as d2l
import matplotlib.pyplot as plt
# 定义transpose_qkv函数 def transpose_qkv(X, num_heads): """为了多注意力头的并行计算而变换形状""" # 输入X的形状:(batch_size, 查询或者“键-值”对的个数,num_hiddens) # 输出X的形状:(batch_size,查询或者“键-值”对的个数,num_heads,num_hiddens/num_heads) print('transpose_qkv:') print(X.shape) X = X.reshape(X.shape[0], X.shape[1], num_heads, 20) print(X.shape) # 输出X的形状:(batch_size,num_heads,查询或者“键-值”对的个数, num_hiddens/num_heads) X = X.permute(0, 2, 1, 3) print(X.shape) # 最终输出的形状:(batch_size*num_heads,查询或者“键-值”对的个数, num_hiddens/num_heads) return X.reshape(-1, X.shape[2], X.shape[3]) def transpose_output(X, num_heads): """逆转transpose_qkv函数的操作""" X = X.reshape(-1, num_heads, X.shape[1], X.shape[2]) X = X.permute(0, 2, 1, 3) return X.reshape(X.shape[0], X.shape[1], -1) def masked_softmax(X, valid_lens): """通过在最后一个轴上掩蔽元素来执行softmax操作""" # X:3D张量,valid_lens:1D或2D张量 print('masked_softmax:', file=log) if valid_lens is None: return nn.functional.softmax(X, dim=-1) else: shape = X.shape if valid_lens.dim() == 1: valid_lens = torch.repeat_interleave(valid_lens, shape[1]) print(valid_lens.shape, file=log) print(valid_lens, file=log) else: valid_lens = valid_lens.reshape(-1) # 最后一轴上被掩蔽的元素使用一个非常大的负值替换,从而其softmax输出为0 X = d2l.sequence_mask(X.reshape(-1, shape[-1]), valid_lens, value=-1e6) return nn.functional.softmax(X.reshape(shape), dim=-1)
二,这个 DotProductAttention缩放点积注意力类需要详细解释一下:实现注意力汇聚(Attention Pooling)过程中前期需要有评分函数,评分函数可以是高斯核,这里使用的是点积,使用完点积运算后求softmax函数分布从而得到一组概率分布作为注意力权重(Attention weights),最后与Values求积和。
class DotProductAttention(nn.Module): """缩放点积注意力""" def __init__(self, dropout, **kwargs): super(DotProductAttention, self).__init__(**kwargs) self.dropout = nn.Dropout(dropout) # queries的形状:(batch_size, 查询的个数, d) # keys的形状:(batch_size, "键-值"对的个数, d) # values的形状:(batch_size, “键-值”对的个数,值的维度) # valid_lens的形状:(batch_size, )或者(batch_size, 查询的个数) def forward(self, queries, keys, values, valid_lens=None): print('DotProductAttention:', file=log) print('queries:', file=log) print(queries.shape, file=log) print(queries, file=log) print('keys:', file=log) print(keys.shape, file=log) print(keys, file=log) print('values:', file=log) print(values.shape, file=log) print(values, file=log) d = queries.shape[-1] print('d:', file=log) print(d, file=log) # 设置transpose_b = true为了交换keys的最后两个维度 scores = torch.bmm(queries, keys.transpose(1,2)) / math.sqrt(d) print('scores:', file=log) print(scores.shape, file=log) print(scores, file=log) self.attention_weights = masked_softmax(scores, valid_lens) print('attention_weights:', file=log) print(self.attention_weights.shape, file=log) print(self.attention_weights, file=log) return torch.bmm(self.dropout(self.attention_weights), values)
三,这个 MultiHeadAttention缩放点积注意力类需要详细解释一下:
class MultiHeadAttention(nn.Module): def __init__(self, key_size, query_size, value_size, num_hiddens, num_heads, dropout, bias=False, **kwargs): super(MultiHeadAttention, self).__init__(**kwargs) self.num_heads = num_heads self.attention = DotProductAttention(dropout) self.weight_query = nn.Linear(query_size, num_hiddens, bias=bias) self.weight_key = nn.Linear(key_size, num_hiddens, bias=bias) self.weight_value = nn.Linear(value_size, num_hiddens, bias=bias) self.weight_output = nn.Linear(num_hiddens, num_hiddens, bias=bias) def forward(self, queries, keys, values, valid_lens): # queries, keys, values的形状: # (batch_size, 查询或者“键-值”对的个数,num_hiddens) # valid_lens 的形状: # (batch_size, )或(batch_size, 查询的个数) # 经过变换后,输出的queries, keys, values的形状 # (batch_size*num_heads, 查询或者“键-值”对的个数) # num_hiddens/num_heads queries = transpose_qkv(self.weight_query(queries), self.num_heads) print('queries:', file=log) print(queries.shape, file=log) print(queries, file=log) keys = transpose_qkv(self.weight_key(keys), self.num_heads) print('keys:', file=log) print(keys.shape, file=log) print(keys, file=log) values = transpose_qkv(self.weight_value(values), self.num_heads) print('values:', file=log) print(values.shape, file=log) print(values, file=log) if valid_lens is not None: # 在轴0,将第一项(标量或者矢量)复制num_heads次, # 然后如此复制第二项,然后诸如此类。 valid_lens = torch.repeat_interleave(valid_lens, repeats=self.num_heads, dim=0) print('valid_lens:', file=log) print(valid_lens.shape, file=log) print(valid_lens, file=log) # output的形状:(batch_size*num_heads,查询的个数,num_hiddens/num_heads) output = self.attention(queries, keys, values, valid_lens) print('output:', file=log) print(output.shape, file=log) print(output, file=log) # output_concat的形状:(batch_size,查询的个数,num_hiddens) output_concat = transpose_output(output, self.num_heads) print('output_concat:', file=log) print(output_concat.shape, file=log) print(output_concat, file=log) return self.weight_output(output_concat) # 打开log.txt文件,文件存在则打开,不存在则创建后再打开,默认将log.txt文件创建在与py文件同一个目录下 # 设置mode='a'是将log.txt文件权限设置为可读写 # 设置encoding='utf-8'是为了正常显示中文 log = open('MultiHeadAttenlog2.txt', mode='a', encoding='utf-8') num_hiddens, num_heads = 100, 5 MultiHeadAttentionNet = MultiHeadAttention(num_hiddens, num_hiddens, num_hiddens, num_hiddens, num_heads, 0.5) MultiHeadAttentionNet.eval() batch_size, num_queries = 2, 4 num_kvpairs, valid_lens = 6, torch.tensor([3,2]) X = torch.ones((batch_size, num_queries, num_hiddens)) print('X:', file=log) print(X, file=log) print(X.shape, file=log) Y = torch.ones((batch_size, num_kvpairs, num_hiddens)) print('Y:', file=log) print(Y, file=log) print(Y.shape, file=log) MultiHeadAttentionNet(X, Y, Y, valid_lens).shape log.close()
transpose_output(X, num_heads): X.reshape(X.shape[0], X.shape[1], -1)
四,最后附上一个手动绘制的MultiHeadAttention网络架构图:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。