赞
踩
前阵子在本地部署体验完清华开源的ChatGLM-6B后,用P-tuning v2对其做了些微调。遂对P-tuning v2产生了些兴趣,阅读完原论文《P-Tuning v2: Prompt Tuning Can Be Comparable to Fine-tuning Universally Across Scales and Tasks》后,对其实现方式仍保有疑问,于是详细阅读了其源码后,做了以下记录。
论文题目:P-Tuning v2: Prompt Tuning Can Be Comparable to Finetuning Universally Across Scales and Tasks
论文源码:https://github.com/THUDM/P-tuni
网上已经有很多关于论文理论的介绍了,这里只简单提一提。
P-tuning V2不是一个新东西,它是Deep Prompt Tuning (Li and Liang,2021; Qin and Eisner,2021)的一个优化和适应实现。与深度提示调整类似,P-tuning v2被设计用于生成和知识探索,但最重要的改进之一是将连续提示应用于预训练模型的每个层,而不仅仅是输入层。
通过增加连续提示的容量,并针对各种设置(特别是针对小模型和难任务),P-tuning v2提高了与Fine-tuning相媲美的性能。此外,作者还介绍了一系列关键的优化和实现细节,以确保实现Fine-tuning的性能表现。
其结构如图所示:
P-tuning V2的改进
基于多任务数据集预训练,在适配下游任务
不采用Verbalizer,用模型原始的linear head。
效果差不多,但是linear head更加通用,方便适配序列标注等复杂NLU
其实看完之后,不难发现P-tuning v2跟prefix tuning大差不差,但是P-tuning v2适配到了NLU任务之上。
这里会忽略些细节问题,只看其本质的实现方式
以代码中的**RobertaPrefixForTokenClassification**
类为例
class RobertaPrefixForTokenClassification(RobertaPreTrainedModel):
def __init__(self, config):
super().__init__(config)
....
self.prefix_tokens = torch.arange(self.pre_seq_len).long()
self.prefix_encoder = PrefixEncoder(config)
...
prefix_tokens“加在输入前面的prompt"
PrefixEncoder:为了获得连续prompt,设计的模块
class PrefixEncoder(torch.nn.Module): r''' The torch.nn model to encode the prefix Input shape: (batch-size, prefix-length) Output shape: (batch-size, prefix-length, 2*layers*hidden) ''' def __init__(self, config): super().__init__() self.prefix_projection = config.prefix_projection if self.prefix_projection: # Use a two-layer MLP to encode the prefix ... else: self.embedding = torch.nn.Embedding( config.pre_seq_len, config.num_hidden_layers * 2 * config.hidden_size) def forward(self, prefix: torch.Tensor): if self.prefix_projection: ... else: past_key_values = self.embedding(prefix) return past_key_values
self.embedding
这里就是论文附录B所说的Embedding在**RobertaPrefixForTokenClassification
**中的forward函数中
class RobertaPrefixForTokenClassification(RobertaPreTrainedModel): def __init__(self, config): ... def get_prompt(self, batch_size): prefix_tokens = self.prefix_tokens.unsqueeze(0).expand(batch_size, -1).to(self.roberta.device) # 得到连续Prompt past_key_values = self.prefix_encoder(prefix_tokens) # 改变形状 past_key_values = past_key_values.view( batch_size, self.pre_seq_len, self.n_layer * 2, self.n_head, self.n_embd ) past_key_values = self.dropout(past_key_values) # 改变形状,划分成数组。没一个数组元素形状为:(2,batch_size,n_head,seq_len,head_dim) past_key_values = past_key_values.permute([2, 0, 3, 1, 4]).split(2) return past_key_values def forward(...): ... past_key_values = self.get_prompt(batch_size=batch_size) prefix_attention_mask = torch.ones(batch_size, self.pre_seq_len).to(self.roberta.device) attention_mask = torch.cat((prefix_attention_mask, attention_mask), dim=1) outputs = self.roberta( input_ids, ... past_key_values=past_key_values, ) ...
self.get_prompt(batch_size=batch_size)
得到要连续Prompt跟踪past_key_values传递的路径:
RobertaModel -> RobertaEncoder
# 在RobertaEncoder的Forward里,有这样一段代码 for i, layer_module in enumerate(self.layer): # 遍历Roberta的层数,然后每一次得到past_key_values的数组元素, # past_key_value的形状就是上述提到的(2,batch_size,n_head,seq_len,head_dim) past_key_value = past_key_values[i] if past_key_values is not None else None .... layer_outputs = layer_module( hidden_states, attention_mask, layer_head_mask, encoder_hidden_states, encoder_attention_mask, past_key_value, output_attentions, )
self.layer是nn.ModuleList([RobertaLayer(config) for _ in range(config.num_hidden_layers)])
所以看到**RobertaLayer
**层
class RobertaLayer(nn.Module): def __init__(self, config): super().__init__() ... self.attention = RobertaAttention(config) ... def forward( self, hidden_states: torch.Tensor, attention_mask: Optional[torch.FloatTensor] = None, head_mask: Optional[torch.FloatTensor] = None, encoder_hidden_states: Optional[torch.FloatTensor] = None, encoder_attention_mask: Optional[torch.FloatTensor] = None, past_key_value: Optional[Tuple[Tuple[torch.FloatTensor]]] = None, output_attentions: Optional[bool] = False, ) -> Tuple[torch.Tensor]: # decoder uni-directional self-attention cached key/values tuple is at positions 1,2 self_attn_past_key_value = past_key_value[:2] if past_key_value is not None else None self_attention_outputs = self.attention( hidden_states, attention_mask, head_mask, output_attentions=output_attentions, past_key_value=self_attn_past_key_value, ) attention_output = self_attention_outputs[0] ...
RobertaAttention
,继续跟踪class RobertaAttention(nn.Module): def __init__(self, config, position_embedding_type=None): super().__init__() self.self = RobertaSelfAttention(config, position_embedding_type=position_embedding_type) ... def forward(...) -> Tuple[torch.Tensor]: self_outputs = self.self( hidden_states, attention_mask, head_mask, encoder_hidden_states, encoder_attention_mask, past_key_value, output_attentions, ) attention_output = self.output(self_outputs[0], hidden_states) outputs = (attention_output,) + self_outputs[1:] # add attentions if we output them return outputs
继续跟踪,RobertaSelfAttention
class RobertaSelfAttention(nn.Module): def __init__(self, config, position_embedding_type=None): super().__init__() ... self.num_attention_heads = config.num_attention_heads self.attention_head_size = int(config.hidden_size / config.num_attention_heads) self.all_head_size = self.num_attention_heads * self.attention_head_size self.query = nn.Linear(config.hidden_size, self.all_head_size) self.key = nn.Linear(config.hidden_size, self.all_head_size) self.value = nn.Linear(config.hidden_size, self.all_head_size) ... return outputs def transpose_for_scores(self, x: torch.Tensor) -> torch.Tensor: new_x_shape = x.size()[:-1] + (self.num_attention_heads, self.attention_head_size) x = x.view(new_x_shape) return x.permute(0, 2, 1, 3) def forward( ... past_key_value: Optional[Tuple[Tuple[torch.FloatTensor]]] = None, output_attentions: Optional[bool] = False, ) -> Tuple[torch.Tensor]: # query mixed_query_layer = self.query(hidden_states) ... if is_cross_attention and past_key_value is not None: ... elif is_cross_attention: ... elif past_key_value is not None: # key_layer = self.transpose_for_scores(self.key(hidden_states)) value_layer = self.transpose_for_scores(self.value(hidden_states)) key_layer = torch.cat([past_key_value[0], key_layer], dim=2) value_layer = torch.cat([past_key_value[1], value_layer], dim=2) else: ... query_layer = self.transpose_for_scores(mixed_query_layer) ... # Take the dot product between "query" and "key" to get the raw attention scores. attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2)) ... attention_scores = attention_scores / math.sqrt(self.attention_head_size) ... # Normalize the attention scores to probabilities. attention_probs = nn.functional.softmax(attention_scores, dim=-1) # This is actually dropping out entire tokens to attend to, which might # seem a bit unusual, but is taken from the original Transformer paper. attention_probs = self.dropout(attention_probs) # Mask heads if we want to ... context_layer = torch.matmul(attention_probs, value_layer) context_layer = context_layer.permute(0, 2, 1, 3).contiguous() new_context_layer_shape = context_layer.size()[:-2] + (self.all_head_size,) context_layer = context_layer.view(new_context_layer_shape) outputs = (context_layer, attention_probs) if output_attentions else (context_layer,) if self.is_decoder: outputs = outputs + (past_key_value,) return outputs
到这里情况就很明了,因为past_key_value is not None
成立,会执行
key_layer = self.transpose_for_scores(self.key(hidden_states))
value_layer = self.transpose_for_scores(self.value(hidden_states))
key_layer = torch.cat([past_key_value[0], key_layer], dim=2)
value_layer = torch.cat([past_key_value[1], value_layer], dim=2)
其中transpose_for_scores
这个函数会将张量转换形状,调换维度。这个代码会在seq_length维度进行拼接,其他维度不可动。
然后,mixed_query_layer
经过transpose_for_scores
得到query_layer
,方便和key_layer
和value_layer
做运算,即attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2))
然后就是注意分数的计算细节,得到注意力分数attention_probs
后,做注意力汇聚context_layer = torch.matmul(attention_probs, value_layer)
最后调整context_layer
的形状,使其和输入hidden state
保持一致
RobertaPrefixForTokenClassification
中的get_prompt
函数,调用PrefixEncoder
,调整形状转换维度,划分数组,得到每一层RobertaLayer
需要的past_key_value
past_key_value
传递,到RobertaSelfAttention
,发现past_key_value
会在seq_length维度和key_layer
及其value_layer
拼接。P-tuning V2巧妙的利用past_key_value这个参数,完成了连续Promtpt参数的融入。
为了方便读者直观理解,整个过程,我写了一段仿真代码,如下:
from torch import nn def transpose_for_scores(x: torch.Tensor) -> torch.Tensor: new_x_shape = x.size()[:-1] + (12, 64) x = x.view(new_x_shape) return x.permute(0, 2, 1, 3) prompt = torch.rand(32,128,48,12,64) # batch_size, seq_len, num_layer*2, num_head, head_size prompt = prompt.permute([2,0,3,1,4]) print(f"P-tuningV2构造的trainable continuous embeddings形状:{prompt.shape}") past_key_values = prompt.split(2) num_layers = 24 hidden_dim = 768 n_head = 12 head_dim = hidden_dim // n_head all_head_size = n_head * head_dim hidden_states = torch.randn(32,128,768) # batch_size, seq_len, hidden_size print(f"输入的向量形状:{hidden_states.shape}") for i in range(num_layers): past_key_value = past_key_values[i] print(f"每一层BertLayer需要加入的prompt形状: {past_key_value.shape}") self_attn_past_key_value = past_key_value[:2] if past_key_value is not None else None # BertSelfAttention query = nn.Linear(hidden_dim, all_head_size) key = nn.Linear(hidden_dim, all_head_size) value = nn.Linear(hidden_dim, all_head_size) key_layer = transpose_for_scores(key(hidden_states)) print(f"经过transpose_for_scores后的key形状:{key_layer.shape}") value_layer = transpose_for_scores(value(hidden_states)) print(f"经过transpose_for_scores后的value形状:{value_layer.shape}") key_layer = torch.cat([past_key_value[0], key_layer], dim=2) print(f"past_key_value[0]的形状:{past_key_value[0].shape} key_layer的形状:{key_layer.shape} 经过cat后的key_layer形状:{key_layer.shape}") value_layer = torch.cat([past_key_value[1], value_layer], dim=2) print(f"past_key_value[1]的形状:{past_key_value[1].shape} value_layer的形状:{value_layer.shape} 经过cat后的value_layer形状:{value_layer.shape}") mixed_query_layer = query(hidden_states) print(f"hidden_states经过query层后输出的形状:{mixed_query_layer.size()}") #batch seq len embed query_layer = transpose_for_scores(mixed_query_layer) print(f"经过transpose_for_scores后的query形状{query_layer.size()}") #batch print("注意力分数开始计算") attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2)) print(f"attention_scores的形状:{attention_scores.size()}") #batch head seq_len seq_len print("开始注意力汇聚计算") context_layer = torch.matmul(attention_scores, value_layer) print(f"注意力汇聚后输出矩阵context_layer的形状:{context_layer.size()}") #batch head seq_len embed/12 print("最后,将context_layer的形状恢复成输入hidden_states的形状") context_layer = context_layer.permute(0, 2, 1, 3).contiguous() new_context_layer_shape = context_layer.size()[:-2] + (768,) context_layer = context_layer.view(new_context_layer_shape) print(f"context_layer的形状恢复完成,其形状为:{context_layer.size()}") print("一次P-tuningV2的BertLayer计算仿真结束") break
其输出为
P-tuningV2构造的contigous prompt形状:torch.Size([48, 32, 12, 128, 64]) 输入的向量形状:torch.Size([32, 128, 768]) 每一层BertLayer需要加入的prompt形状: torch.Size([2, 32, 12, 128, 64]) 经过transpose_for_scores后的key形状:torch.Size([32, 12, 128, 64]) 经过transpose_for_scores后的value形状:torch.Size([32, 12, 128, 64]) past_key_value[0]的形状:torch.Size([32, 12, 128, 64]) key_layer的形状:torch.Size([32, 12, 256, 64]) 经过cat后的key_layer形状:torch.Size([32, 12, 256, 64]) past_key_value[1]的形状:torch.Size([32, 12, 128, 64]) value_layer的形状:torch.Size([32, 12, 256, 64]) 经过cat后的value_layer形状:torch.Size([32, 12, 256, 64]) hidden_states经过query层后输出的形状:torch.Size([32, 128, 768]) 经过transpose_for_scores后的query形状torch.Size([32, 12, 128, 64]) 注意力分数开始计算 attention_scores的形状:torch.Size([32, 12, 128, 256]) 开始注意力汇聚计算 注意力汇聚后输出矩阵context_layer的形状:torch.Size([32, 12, 128, 64]) 最后,将context_layer的形状恢复成输入hidden_states的形状 context_layer的形状恢复完成,其形状为:torch.Size([32, 128, 768]) 一次P-tuningV2的BertLayer计算仿真结束
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。