当前位置:   article > 正文

熬了一晚上,我从零实现了Transformer模型,把代码讲给你听

熬了一晚上,我从零实现了Transformer模型,把代码讲给你听

点击上方“视学算法”,选择加"星标"或“置顶

重磅干货,第一时间送达d25581ee4153745c898bf290003fb9b0.png

作者丨伟大是熬出来的@知乎(已授权)

来源丨https://zhuanlan.zhihu.com/p/411311520

编辑丨极市平台

导读

 

本文详细介绍了每个模块的实现思路以及笔者在Coding过程中的感悟。

自从彻底搞懂Self_Attention机制之后,笔者对Transformer模型的理解直接从地下一层上升到大气层,任督二脉呼之欲出。夜夜入睡之前,那句柔情百转的"Attention is all you need"时常在耳畔环绕,情到深处不禁拍床叫好。于是在肾上腺素的驱使下,笔者熬了一个晚上,终于实现了Transformer模型。

1. 模型总览

代码讲解之前,首先放出这张经典的模型架构图。下面的内容中,我会将每个模块的实现思路以及笔者在Coding过程中的感悟知无不答。没有代码基础的读者不要慌张,笔者也是最近才入门的,所写Pytorch代码没有花里胡哨,所用变量名词尽量保持与论文一致,对新手十分友好。

40329fa49221052fcef2f60aa820e731.png

我们观察模型的结构图,Transformer模型包含哪些模块?笔者将其分为以下几个部分:

2e00a7b2737066c6f2cc65476af90a33.png

接下来我们首先逐个讲解,最后将其拼接完成模型的复现。

2. config

下面是这个Demo所用的库文件以及一些超参的信息。单独实现一个Config类保存的原因是,方便日后复用。直接将模型部分复制,所用超参保存在新项目的Config类中即可。这里不过多赘述。

  1. import torch
  2. import torch.nn as nn
  3. import numpy as np
  4. import math
  5. class Config(object):
  6.     def __init__(self):
  7.         self.vocab_size = 6
  8.         self.d_model = 20
  9.         self.n_heads = 2
  10.         assert self.d_model % self.n_heads == 0
  11.         dim_k  = d_model % n_heads
  12.         dim_v = d_model % n_heads
  13.         self.padding_size = 30
  14.         self.UNK = 5
  15.         self.PAD = 4
  16.         self.N = 6
  17.         self.p = 0.1
  18. config = Config()

3. Embedding

Embedding部分接受原始的文本输入(batch_size*seq_len,例:[[1,3,10,5],[3,4,5],[5,3,1,1]]),叠加一个普通的Embedding层以及一个Positional Embedding层,输出最后结果。

0b0d798fa223733600c379dab3607332.png

在这一层中,输入的是一个list: [batch_size * seq_len],输出的是一个tensor:[batch_size * seq_len * d_model]

普通的 Embedding 层想说两点:

  • 采用torch.nn.Embedding实现embedding操作。需要关注的一点是论文中提到的Mask机制,包括padding_mask以及sequence_mask(具体请见文章开头给出的理论讲解那篇文章)。在文本输入之前,我们需要进行padding统一长度,padding_mask的实现可以借助torch.nn.Embedding中的padding_idx参数。

  • 在padding过程中,短补长截

  1. class Embedding(nn.Module):
  2.     def __init__(self,vocab_size):
  3.         super(Embedding, self).__init__()
  4.         # 一个普通的 embedding层,我们可以通过设置padding_idx=config.PAD 来实现论文中的 padding_mask
  5.         self.embedding = nn.Embedding(vocab_size,config.d_model,padding_idx=config.PAD)
  6.     def forward(self,x):
  7.         # 根据每个句子的长度,进行padding,短补长截
  8.         for i in range(len(x)):
  9.             if len(x[i]) < config.padding_size:
  10.                 x[i].extend([config.UNK] * (config.padding_size - len(x[i]))) # 注意 UNK是你词表中用来表示oov的token索引,这里进行了简化,直接假设为6
  11.             else:
  12.                 x[i] = x[i][:config.padding_size]
  13.         x = self.embedding(torch.tensor(x)) # batch_size * seq_len * d_model
  14.         return x

关于Positional Embedding,我们需要参考论文给出的公式。说一句题外话,在作者的实验中对比了Positional Embedding与单独采用一个Embedding训练模型对位置的感知两种方式,模型效果相差无几。

1f9dd8475a1a647d5643b85a8c4d575c.png

9087df9c9b1dcdedf11461847a8c1da8.png

  1. class Positional_Encoding(nn.Module):
  2.     def __init__(self,d_model):
  3.         super(Positional_Encoding,self).__init__()
  4.         self.d_model = d_model
  5.     def forward(self,seq_len,embedding_dim):
  6.         positional_encoding = np.zeros((seq_len,embedding_dim))
  7.         for pos in range(positional_encoding.shape[0]):
  8.             for i in range(positional_encoding.shape[1]):
  9.                 positional_encoding[pos][i] = math.sin(pos/(10000**(2*i/self.d_model))) if i % 2 == 0 else math.cos(pos/(10000**(2*i/self.d_model)))
  10.         return torch.from_numpy(positional_encoding)

4. Encoder

8b812b1d3c97e5ea7201b9e2f3956c23.png

Muti_head_Attention

这一部分是模型的核心内容,理论部分就不过多讲解了,读者可以参考文章开头的第一个传送门,文中有基础的代码实现。

Encoder 中的 Muti_head_Attention 不需要Mask,因此与我们上一篇文章中的实现方式相同。

为了避免模型信息泄露的问题,Decoder 中的 Muti_head_Attention 需要Mask。这一节中我们重点讲解Muti_head_Attention中Mask机制的实现。

如果读者阅读了我们的上一篇文章,可以发现下面的代码有一点小小的不同,主要体现在 forward 函数的参数。

  • forward 函数的参数从 x 变为 x,y:请读者观察模型架构,Decoder需要接受Encoder的输入作为公式中的V,即我们参数中的y。在普通的自注意力机制中,我们在调用中设置y=x即可。

  • requires_mask:是否采用Mask机制,在Decoder中设置为True

  1. class Mutihead_Attention(nn.Module):
  2.     def __init__(self,d_model,dim_k,dim_v,n_heads):
  3.         super(Mutihead_Attention, self).__init__()
  4.         self.dim_v = dim_v
  5.         self.dim_k = dim_k
  6.         self.n_heads = n_heads
  7.         self.q = nn.Linear(d_model,dim_k)
  8.         self.k = nn.Linear(d_model,dim_k)
  9.         self.v = nn.Linear(d_model,dim_v)
  10.         self.o = nn.Linear(dim_v,d_model)
  11.         self.norm_fact = 1 / math.sqrt(d_model)
  12.     def generate_mask(self,dim):
  13.         # 此处是 sequence mask ,防止 decoder窥视后面时间步的信息。
  14.         # padding mask 在数据输入模型之前完成。
  15.         matirx = np.ones((dim,dim))
  16.         mask = torch.Tensor(np.tril(matirx))
  17.         return mask==1
  18.     def forward(self,x,y,requires_mask=False):
  19.         assert self.dim_k % self.n_heads == 0 and self.dim_v % self.n_heads == 0
  20.         # size of x : [batch_size * seq_len * batch_size]
  21.         # 对 x 进行自注意力
  22.         Q = self.q(x).reshape(-1,x.shape[0],x.shape[1],self.dim_k // self.n_heads) # n_heads * batch_size * seq_len * dim_k
  23.         K = self.k(x).reshape(-1,x.shape[0],x.shape[1],self.dim_k // self.n_heads) # n_heads * batch_size * seq_len * dim_k
  24.         V = self.v(y).reshape(-1,y.shape[0],y.shape[1],self.dim_v // self.n_heads) # n_heads * batch_size * seq_len * dim_v
  25.         # print("Attention V shape : {}".format(V.shape))
  26.         attention_score = torch.matmul(Q,K.permute(0,1,3,2)) * self.norm_fact
  27.         if requires_mask:
  28.             mask = self.generate_mask(x.shape[1])
  29.             attention_score.masked_fill(mask,value=float("-inf")) # 注意这里的小Trick,不需要将Q,K,V 分别MASK,只MASKSoftmax之前的结果就好了
  30.         output = torch.matmul(attention_score,V).reshape(y.shape[0],y.shape[1],-1)
  31.         # print("Attention output shape : {}".format(output.shape))
  32.         output = self.o(output)
  33.         return output

Feed Forward

ca6a7cad0bd39752e2b36dbce221b14d.png
img

这一部分实现很简单,两个Linear中连接Relu即可,目的是为模型增添非线性信息,提高模型的拟合能力。

  1. class Feed_Forward(nn.Module):
  2.     def __init__(self,input_dim,hidden_dim=2048):
  3.         super(Feed_Forward, self).__init__()
  4.         self.L1 = nn.Linear(input_dim,hidden_dim)
  5.         self.L2 = nn.Linear(hidden_dim,input_dim)
  6.     def forward(self,x):
  7.         output = nn.ReLU()(self.L1(x))
  8.         output = self.L2(output)
  9.         return output

Add & LayerNorm

这一节我们实现论文中提出的残差连接以及LayerNorm。

论文中关于这部分给出公式:

eb552cd24099bca4ad9233e1eca3c077.png

代码中的dropout,在论文中也有所解释,对输入layer_norm的tensor进行dropout,对模型的性能影响还是蛮大的。

代码中的参数sub_layer ,可以是Feed Forward,也可以是Muti_head_Attention。

  1. class Add_Norm(nn.Module):
  2.     def __init__(self):
  3.         self.dropout = nn.Dropout(config.p)
  4.         super(Add_Norm, self).__init__()
  5.     def forward(self,x,sub_layer,**kwargs):
  6.         sub_output = sub_layer(x,**kwargs)
  7.         # print("{} output : {}".format(sub_layer,sub_output.size()))
  8.         x = self.dropout(x + sub_output)
  9.         layer_norm = nn.LayerNorm(x.size()[1:])
  10.         out = layer_norm(x)
  11.         return out

OK,Encoder中所有模块我们已经讲解完毕,接下来我们将其拼接作为Encoder

  1. class Encoder(nn.Module):
  2.     def __init__(self):
  3.         super(Encoder, self).__init__()
  4.         self.positional_encoding = Positional_Encoding(config.d_model)
  5.         self.muti_atten = Mutihead_Attention(config.d_model,config.dim_k,config.dim_v,config.n_heads)
  6.         self.feed_forward = Feed_Forward(config.d_model)
  7.         self.add_norm = Add_Norm()
  8.     def forward(self,x): # batch_size * seq_len 并且 x 的类型不是tensor,是普通list
  9.         x += self.positional_encoding(x.shape[1],config.d_model)
  10.         # print("After positional_encoding: {}".format(x.size()))
  11.         output = self.add_norm(x,self.muti_atten,y=x)
  12.         output = self.add_norm(output,self.feed_forward)
  13.         return output

5.Decoder

在 Encoder 部分的讲解中,我们已经实现了大部分Decoder的模块。Decoder的Muti_head_Attention引入了Mask机制,Decoder与Encoder 中模块的拼接方式不同。以上两点读者在Coding的时候需要注意。

  1. class Decoder(nn.Module):
  2.     def __init__(self):
  3.         super(Decoder, self).__init__()
  4.         self.positional_encoding = Positional_Encoding(config.d_model)
  5.         self.muti_atten = Mutihead_Attention(config.d_model,config.dim_k,config.dim_v,config.n_heads)
  6.         self.feed_forward = Feed_Forward(config.d_model)
  7.         self.add_norm = Add_Norm()
  8.     def forward(self,x,encoder_output): # batch_size * seq_len 并且 x 的类型不是tensor,是普通list
  9.         # print(x.size())
  10.         x += self.positional_encoding(x.shape[1],config.d_model)
  11.         # print(x.size())
  12.         # 第一个 sub_layer
  13.         output = self.add_norm(x,self.muti_atten,y=x,requires_mask=True)
  14.         # 第二个 sub_layer
  15.         output = self.add_norm(output,self.muti_atten,y=encoder_output,requires_mask=True)
  16.         # 第三个 sub_layer
  17.         output = self.add_norm(output,self.feed_forward)
  18.         return output

6.Transformer

至此,所有内容已经铺垫完毕,我们开始组装Transformer模型。论文中提到,Transformer中堆叠了6个我们上文中实现的Encoder 和 Decoder。这里笔者采用nn.Sequential实现了堆叠操作。

Output模块的 Linear 和 Softmax 的实现也包含在下面的代码中

  1. class Transformer_layer(nn.Module):
  2.     def __init__(self):
  3.         super(Transformer_layer, self).__init__()
  4.         self.encoder = Encoder()
  5.         self.decoder = Decoder()
  6.     def forward(self,x):
  7.         x_input,x_output = x
  8.         encoder_output = self.encoder(x_input)
  9.         decoder_output = self.decoder(x_output,encoder_output)
  10.         return (encoder_output,decoder_output)
  11. class Transformer(nn.Module):
  12.     def __init__(self,N,vocab_size,output_dim):
  13.         super(Transformer, self).__init__()
  14.         self.embedding_input = Embedding(vocab_size=vocab_size)
  15.         self.embedding_output = Embedding(vocab_size=vocab_size)
  16.         self.output_dim = output_dim
  17.         self.linear = nn.Linear(config.d_model,output_dim)
  18.         self.softmax = nn.Softmax(dim=-1)
  19.         self.model = nn.Sequential(*[Transformer_layer() for _ in range(N)])
  20.     def forward(self,x):
  21.         x_input , x_output = x
  22.         x_input = self.embedding_input(x_input)
  23.         x_output = self.embedding_output(x_output)
  24.         _ , output = self.model((x_input,x_output))
  25.         output = self.linear(output)
  26.         output = self.softmax(output)
  27.         return output

完整代码

  1. # @Author:Yifx
  2. # @Contact: Xxuyifan1999@163.com
  3. # @Time:2021/9/16 20:02
  4. # @Software: PyCharm
  5. """
  6. 文件说明:
  7. """
  8. import torch
  9. import torch.nn as nn
  10. import numpy as np
  11. import math
  12. class Config(object):
  13.     def __init__(self):
  14.         self.vocab_size = 6
  15.         self.d_model = 20
  16.         self.n_heads = 2
  17.         assert self.d_model % self.n_heads == 0
  18.         dim_k  = self.d_model // self.n_heads
  19.         dim_v = self.d_model // self.n_heads
  20.         self.padding_size = 30
  21.         self.UNK = 5
  22.         self.PAD = 4
  23.         self.N = 6
  24.         self.p = 0.1
  25. config = Config()
  26. class Embedding(nn.Module):
  27.     def __init__(self,vocab_size):
  28.         super(Embedding, self).__init__()
  29.         # 一个普通的 embedding层,我们可以通过设置padding_idx=config.PAD 来实现论文中的 padding_mask
  30.         self.embedding = nn.Embedding(vocab_size,config.d_model,padding_idx=config.PAD)
  31.     def forward(self,x):
  32.         # 根据每个句子的长度,进行padding,短补长截
  33.         for i in range(len(x)):
  34.             if len(x[i]) < config.padding_size:
  35.                 x[i].extend([config.UNK] * (config.padding_size - len(x[i]))) # 注意 UNK是你词表中用来表示oov的token索引,这里进行了简化,直接假设为6
  36.             else:
  37.                 x[i] = x[i][:config.padding_size]
  38.         x = self.embedding(torch.tensor(x)) # batch_size * seq_len * d_model
  39.         return x
  40. class Positional_Encoding(nn.Module):
  41.     def __init__(self,d_model):
  42.         super(Positional_Encoding,self).__init__()
  43.         self.d_model = d_model
  44.     def forward(self,seq_len,embedding_dim):
  45.         positional_encoding = np.zeros((seq_len,embedding_dim))
  46.         for pos in range(positional_encoding.shape[0]):
  47.             for i in range(positional_encoding.shape[1]):
  48.                 positional_encoding[pos][i] = math.sin(pos/(10000**(2*i/self.d_model))) if i % 2 == 0 else math.cos(pos/(10000**(2*i/self.d_model)))
  49.         return torch.from_numpy(positional_encoding)
  50. class Mutihead_Attention(nn.Module):
  51.     def __init__(self,d_model,dim_k,dim_v,n_heads):
  52.         super(Mutihead_Attention, self).__init__()
  53.         self.dim_v = dim_v
  54.         self.dim_k = dim_k
  55.         self.n_heads = n_heads
  56.         self.q = nn.Linear(d_model,dim_k)
  57.         self.k = nn.Linear(d_model,dim_k)
  58.         self.v = nn.Linear(d_model,dim_v)
  59.         self.o = nn.Linear(dim_v,d_model)
  60.         self.norm_fact = 1 / math.sqrt(d_model)
  61.     def generate_mask(self,dim):
  62.         # 此处是 sequence mask ,防止 decoder窥视后面时间步的信息。
  63.         # padding mask 在数据输入模型之前完成。
  64.         matirx = np.ones((dim,dim))
  65.         mask = torch.Tensor(np.tril(matirx))
  66.         return mask==1
  67.     def forward(self,x,y,requires_mask=False):
  68.         assert self.dim_k % self.n_heads == 0 and self.dim_v % self.n_heads == 0
  69.         # size of x : [batch_size * seq_len * batch_size]
  70.         # 对 x 进行自注意力
  71.         Q = self.q(x).reshape(-1,x.shape[0],x.shape[1],self.dim_k // self.n_heads) # n_heads * batch_size * seq_len * dim_k
  72.         K = self.k(x).reshape(-1,x.shape[0],x.shape[1],self.dim_k // self.n_heads) # n_heads * batch_size * seq_len * dim_k
  73.         V = self.v(y).reshape(-1,y.shape[0],y.shape[1],self.dim_v // self.n_heads) # n_heads * batch_size * seq_len * dim_v
  74.         # print("Attention V shape : {}".format(V.shape))
  75.         attention_score = torch.matmul(Q,K.permute(0,1,3,2)) * self.norm_fact
  76.         if requires_mask:
  77.             mask = self.generate_mask(x.shape[1])
  78.             # masked_fill 函数中,对Mask位置为True的部分进行Mask
  79.             attention_score.masked_fill(mask,value=float("-inf")) # 注意这里的小Trick,不需要将Q,K,V 分别MASK,只MASKSoftmax之前的结果就好了
  80.         output = torch.matmul(attention_score,V).reshape(y.shape[0],y.shape[1],-1)
  81.         # print("Attention output shape : {}".format(output.shape))
  82.         output = self.o(output)
  83.         return output
  84. class Feed_Forward(nn.Module):
  85.     def __init__(self,input_dim,hidden_dim=2048):
  86.         super(Feed_Forward, self).__init__()
  87.         self.L1 = nn.Linear(input_dim,hidden_dim)
  88.         self.L2 = nn.Linear(hidden_dim,input_dim)
  89.     def forward(self,x):
  90.         output = nn.ReLU()(self.L1(x))
  91.         output = self.L2(output)
  92.         return output
  93. class Add_Norm(nn.Module):
  94.     def __init__(self):
  95.         self.dropout = nn.Dropout(config.p)
  96.         super(Add_Norm, self).__init__()
  97.     def forward(self,x,sub_layer,**kwargs):
  98.         sub_output = sub_layer(x,**kwargs)
  99.         # print("{} output : {}".format(sub_layer,sub_output.size()))
  100.         x = self.dropout(x + sub_output)
  101.         layer_norm = nn.LayerNorm(x.size()[1:])
  102.         out = layer_norm(x)
  103.         return out
  104. class Encoder(nn.Module):
  105.     def __init__(self):
  106.         super(Encoder, self).__init__()
  107.         self.positional_encoding = Positional_Encoding(config.d_model)
  108.         self.muti_atten = Mutihead_Attention(config.d_model,config.dim_k,config.dim_v,config.n_heads)
  109.         self.feed_forward = Feed_Forward(config.d_model)
  110.         self.add_norm = Add_Norm()
  111.     def forward(self,x): # batch_size * seq_len 并且 x 的类型不是tensor,是普通list
  112.         x += self.positional_encoding(x.shape[1],config.d_model)
  113.         # print("After positional_encoding: {}".format(x.size()))
  114.         output = self.add_norm(x,self.muti_atten,y=x)
  115.         output = self.add_norm(output,self.feed_forward)
  116.         return output
  117. # 在 Decoder 中,Encoder的输出作为Query和KEy输出的那个东西。即 Decoder的Input作为V。此时是可行的
  118. # 因为在输入过程中,我们有一个padding操作,将Inputs和Outputs的seq_len这个维度都拉成一样的了
  119. # 我们知道,QK那个过程得到的结果是 batch_size * seq_len * seq_len .既然 seq_len 一样,那么我们可以这样操作
  120. # 这样操作的意义是,Outputs 中的 token 分别对于 Inputs 中的每个token作注意力
  121. class Decoder(nn.Module):
  122.     def __init__(self):
  123.         super(Decoder, self).__init__()
  124.         self.positional_encoding = Positional_Encoding(config.d_model)
  125.         self.muti_atten = Mutihead_Attention(config.d_model,config.dim_k,config.dim_v,config.n_heads)
  126.         self.feed_forward = Feed_Forward(config.d_model)
  127.         self.add_norm = Add_Norm()
  128.     def forward(self,x,encoder_output): # batch_size * seq_len 并且 x 的类型不是tensor,是普通list
  129.         # print(x.size())
  130.         x += self.positional_encoding(x.shape[1],config.d_model)
  131.         # print(x.size())
  132.         # 第一个 sub_layer
  133.         output = self.add_norm(x,self.muti_atten,y=x,requires_mask=True)
  134.         # 第二个 sub_layer
  135.         output = self.add_norm(x,self.muti_atten,y=encoder_output,requires_mask=True)
  136.         # 第三个 sub_layer
  137.         output = self.add_norm(output,self.feed_forward)
  138.         return output
  139. class Transformer_layer(nn.Module):
  140.     def __init__(self):
  141.         super(Transformer_layer, self).__init__()
  142.         self.encoder = Encoder()
  143.         self.decoder = Decoder()
  144.     def forward(self,x):
  145.         x_input,x_output = x
  146.         encoder_output = self.encoder(x_input)
  147.         decoder_output = self.decoder(x_output,encoder_output)
  148.         return (encoder_output,decoder_output)
  149. class Transformer(nn.Module):
  150.     def __init__(self,N,vocab_size,output_dim):
  151.         super(Transformer, self).__init__()
  152.         self.embedding_input = Embedding(vocab_size=vocab_size)
  153.         self.embedding_output = Embedding(vocab_size=vocab_size)
  154.         self.output_dim = output_dim
  155.         self.linear = nn.Linear(config.d_model,output_dim)
  156.         self.softmax = nn.Softmax(dim=-1)
  157.         self.model = nn.Sequential(*[Transformer_layer() for _ in range(N)])
  158.     def forward(self,x):
  159.         x_input , x_output = x
  160.         x_input = self.embedding_input(x_input)
  161.         x_output = self.embedding_output(x_output)
  162.         _ , output = self.model((x_input,x_output))
  163.         output = self.linear(output)
  164.         output = self.softmax(output)
  165.         return output

7978504863becb95d988767ccaccc2f9.png

outside_default.png

点个在看 paper不断!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/670437
推荐阅读
相关标签
  

闽ICP备14008679号