当前位置:   article > 正文

P-tuning V2论文和代码实现详解

p-tuning v2

前阵子在本地部署体验完清华开源的ChatGLM-6B后,用P-tuning v2对其做了些微调。遂对P-tuning v2产生了些兴趣,阅读完原论文《P-Tuning v2: Prompt Tuning Can Be Comparable to Fine-tuning Universally Across Scales and Tasks》后,对其实现方式仍保有疑问,于是详细阅读了其源码后,做了以下记录。

论文简要介绍

  • 论文题目:P-Tuning v2: Prompt Tuning Can Be Comparable to Finetuning Universally Across Scales and Tasks

  • 论文地址:https://arxiv.org/pdf/2110.07602.pdf

  • 论文源码:https://github.com/THUDM/P-tuni

    网上已经有很多关于论文理论的介绍了,这里只简单提一提。

    P-tuning V2不是一个新东西,它是Deep Prompt Tuning (Li and Liang,2021; Qin and Eisner,2021)的一个优化和适应实现。与深度提示调整类似,P-tuning v2被设计用于生成和知识探索,但最重要的改进之一是将连续提示应用于预训练模型的每个层,而不仅仅是输入层。

    通过增加连续提示的容量,并针对各种设置(特别是针对小模型和难任务),P-tuning v2提高了与Fine-tuning相媲美的性能。此外,作者还介绍了一系列关键的优化和实现细节,以确保实现Fine-tuning的性能表现。

    在这里插入图片描述

    • 仅需微调0.1%-3%的参数,就能和Fint-tuning比肩
    • 将Prompt tuning技术首次应用到序列标注等复杂的NLU任务上

    其结构如图所示:

    在这里插入图片描述

P-tuning V2的改进

  • 相比于Li and Liang,2021的Prefix tuning用的MLP当作Reparameterization encoder,P-tuning V2用Embedding层。两者的对比如图所示

在这里插入图片描述

  • 基于多任务数据集预训练,在适配下游任务

  • 不采用Verbalizer,用模型原始的linear head。

    • 在这里插入图片描述

    • 效果差不多,但是linear head更加通用,方便适配序列标注等复杂NLU

    其实看完之后,不难发现P-tuning v2跟prefix tuning大差不差,但是P-tuning v2适配到了NLU任务之上。

P-tuning V2的连续prompt代码实现

这里会忽略些细节问题,只看其本质的实现方式

以代码中的**RobertaPrefixForTokenClassification**类为例

初始化

class RobertaPrefixForTokenClassification(RobertaPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        ....
        self.prefix_tokens = torch.arange(self.pre_seq_len).long()
        self.prefix_encoder = PrefixEncoder(config)
		...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • prefix_tokens“加在输入前面的prompt"

  • PrefixEncoder:为了获得连续prompt,设计的模块

    class PrefixEncoder(torch.nn.Module):
        r'''
        The torch.nn model to encode the prefix
        Input shape: (batch-size, prefix-length)
        Output shape: (batch-size, prefix-length, 2*layers*hidden)
        '''
        def __init__(self, config):
            super().__init__()
            self.prefix_projection = config.prefix_projection
            if self.prefix_projection:
                # Use a two-layer MLP to encode the prefix
    			...
            else:
                self.embedding = torch.nn.Embedding(
                    config.pre_seq_len,
                    config.num_hidden_layers * 2 * config.hidden_size)
    
        def forward(self, prefix: torch.Tensor):
            if self.prefix_projection:
               	...
            else:
                past_key_values = self.embedding(prefix)
            return past_key_values
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • self.embedding这里就是论文附录B所说的Embedding

前向计算

在**RobertaPrefixForTokenClassification**中的forward函数中

class RobertaPrefixForTokenClassification(RobertaPreTrainedModel):
    def __init__(self, config):
		...
    def get_prompt(self, batch_size):
        prefix_tokens = self.prefix_tokens.unsqueeze(0).expand(batch_size, -1).to(self.roberta.device)
        # 得到连续Prompt
        past_key_values = self.prefix_encoder(prefix_tokens)
        # 改变形状
        past_key_values = past_key_values.view(
            batch_size,
            self.pre_seq_len,
            self.n_layer * 2, 
            self.n_head,
            self.n_embd
        )
        past_key_values = self.dropout(past_key_values)
        # 改变形状,划分成数组。没一个数组元素形状为:(2,batch_size,n_head,seq_len,head_dim)
        past_key_values = past_key_values.permute([2, 0, 3, 1, 4]).split(2)
        return past_key_values

    def forward(...):
        ...
        past_key_values = self.get_prompt(batch_size=batch_size)
        prefix_attention_mask = torch.ones(batch_size, self.pre_seq_len).to(self.roberta.device)
        attention_mask = torch.cat((prefix_attention_mask, attention_mask), dim=1)
        
        outputs = self.roberta(
            input_ids,
           	...
            past_key_values=past_key_values,
        )
		...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 再一次前向计算中,P-tuning v2会通过self.get_prompt(batch_size=batch_size)得到要连续Prompt
  • 巧妙的利用past_key_values参数,将past_key_values数组中每一个元素,拼接到BertSelfAttention中Key和Value。那具体怎么实现的?接下来就是Transformers库里原代码了

past_key_values

跟踪past_key_values传递的路径

RobertaModel -> RobertaEncoder

# 在RobertaEncoder的Forward里,有这样一段代码
for i, layer_module in enumerate(self.layer):
	# 遍历Roberta的层数,然后每一次得到past_key_values的数组元素,
    # past_key_value的形状就是上述提到的(2,batch_size,n_head,seq_len,head_dim)
    past_key_value = past_key_values[i] if past_key_values is not None else None
    ....
    layer_outputs = layer_module(
        hidden_states,
        attention_mask,
        layer_head_mask,
        encoder_hidden_states,
        encoder_attention_mask,
        past_key_value,
        output_attentions,
        )
    

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

self.layer是nn.ModuleList([RobertaLayer(config) for _ in range(config.num_hidden_layers)])

所以看到**RobertaLayer**层

class RobertaLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
       	...
        self.attention = RobertaAttention(config)
		...

    def forward(
        self,
        hidden_states: torch.Tensor,
        attention_mask: Optional[torch.FloatTensor] = None,
        head_mask: Optional[torch.FloatTensor] = None,
        encoder_hidden_states: Optional[torch.FloatTensor] = None,
        encoder_attention_mask: Optional[torch.FloatTensor] = None,
        past_key_value: Optional[Tuple[Tuple[torch.FloatTensor]]] = None,
        output_attentions: Optional[bool] = False,
    ) -> Tuple[torch.Tensor]:
        # decoder uni-directional self-attention cached key/values tuple is at positions 1,2
        self_attn_past_key_value = past_key_value[:2] if past_key_value is not None else None
        self_attention_outputs = self.attention(
            hidden_states,
            attention_mask,
            head_mask,
            output_attentions=output_attentions,
            past_key_value=self_attn_past_key_value,
        )
        attention_output = self_attention_outputs[0]

		...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • self.attention是RobertaAttention,继续跟踪
class RobertaAttention(nn.Module):
    def __init__(self, config, position_embedding_type=None):
        super().__init__()
        self.self = RobertaSelfAttention(config, 		position_embedding_type=position_embedding_type)
        ...

    def forward(...) -> Tuple[torch.Tensor]:
        self_outputs = self.self(
            hidden_states,
            attention_mask,
            head_mask,
            encoder_hidden_states,
            encoder_attention_mask,
            past_key_value,
            output_attentions,
        )
        attention_output = self.output(self_outputs[0], hidden_states)
        outputs = (attention_output,) + self_outputs[1:]  # add attentions if we output them
        return outputs
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

继续跟踪,RobertaSelfAttention

class RobertaSelfAttention(nn.Module):
    def __init__(self, config, position_embedding_type=None):
        super().__init__()
        ...
        self.num_attention_heads = config.num_attention_heads
        self.attention_head_size = int(config.hidden_size / config.num_attention_heads)
        self.all_head_size = self.num_attention_heads * self.attention_head_size

        self.query = nn.Linear(config.hidden_size, self.all_head_size)
        self.key = nn.Linear(config.hidden_size, self.all_head_size)
        self.value = nn.Linear(config.hidden_size, self.all_head_size)
    	...
        return outputs

    def transpose_for_scores(self, x: torch.Tensor) -> torch.Tensor:
        new_x_shape = x.size()[:-1] + (self.num_attention_heads, self.attention_head_size)
        x = x.view(new_x_shape)
        return x.permute(0, 2, 1, 3)

    def forward(
  		...
        past_key_value: Optional[Tuple[Tuple[torch.FloatTensor]]] = None,
        output_attentions: Optional[bool] = False,
    ) -> Tuple[torch.Tensor]:
        
        # query
        mixed_query_layer = self.query(hidden_states)
    	...
        if is_cross_attention and past_key_value is not None:
            ...
        elif is_cross_attention:
            ...
        elif past_key_value is not None:
            # 
            key_layer = self.transpose_for_scores(self.key(hidden_states))
            value_layer = self.transpose_for_scores(self.value(hidden_states))
            key_layer = torch.cat([past_key_value[0], key_layer], dim=2)
            value_layer = torch.cat([past_key_value[1], value_layer], dim=2)
        else:
            ...
        query_layer = self.transpose_for_scores(mixed_query_layer)
    	...
        # Take the dot product between "query" and "key" to get the raw attention scores.
        attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2))
        ...
        attention_scores = attention_scores / math.sqrt(self.attention_head_size)
       	...
        # Normalize the attention scores to probabilities.
        attention_probs = nn.functional.softmax(attention_scores, dim=-1)
        # This is actually dropping out entire tokens to attend to, which might
        # seem a bit unusual, but is taken from the original Transformer paper.
        attention_probs = self.dropout(attention_probs)
        # Mask heads if we want to
        ...
        context_layer = torch.matmul(attention_probs, value_layer)
        context_layer = context_layer.permute(0, 2, 1, 3).contiguous()
        new_context_layer_shape = context_layer.size()[:-2] + (self.all_head_size,)
        context_layer = context_layer.view(new_context_layer_shape)
        outputs = (context_layer, attention_probs) if output_attentions else (context_layer,)
        if self.is_decoder:
            outputs = outputs + (past_key_value,)
        return outputs
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 到这里情况就很明了,因为past_key_value is not None成立,会执行

    • key_layer = self.transpose_for_scores(self.key(hidden_states))
      value_layer = self.transpose_for_scores(self.value(hidden_states))
      key_layer = torch.cat([past_key_value[0], key_layer], dim=2)
      value_layer = torch.cat([past_key_value[1], value_layer], dim=2)
      
      • 1
      • 2
      • 3
      • 4
    • 其中transpose_for_scores这个函数会将张量转换形状,调换维度。这个代码会在seq_length维度进行拼接,其他维度不可动。

  • 然后,mixed_query_layer经过transpose_for_scores得到query_layer,方便和key_layervalue_layer做运算,即attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2))

  • 然后就是注意分数的计算细节,得到注意力分数attention_probs后,做注意力汇聚context_layer = torch.matmul(attention_probs, value_layer)

  • 最后调整context_layer的形状,使其和输入hidden state保持一致

代码实现总结

  1. RobertaPrefixForTokenClassification中的get_prompt函数,调用PrefixEncoder,调整形状转换维度,划分数组,得到每一层RobertaLayer需要的past_key_value
  2. 持续跟踪past_key_value传递,到RobertaSelfAttention,发现past_key_value会在seq_length维度和key_layer及其value_layer拼接。
  3. 然后就是一些注意层的计算,调整形状,使注意层的输入和hidden_state形状一致。

P-tuning V2巧妙的利用past_key_value这个参数,完成了连续Promtpt参数的融入。

P-tuning V2连续Prompt代码实现仿真代码

为了方便读者直观理解,整个过程,我写了一段仿真代码,如下:

from torch import nn
def transpose_for_scores(x: torch.Tensor) -> torch.Tensor:
    new_x_shape = x.size()[:-1] + (12, 64)
    x = x.view(new_x_shape)
    return x.permute(0, 2, 1, 3)

prompt = torch.rand(32,128,48,12,64) # batch_size, seq_len, num_layer*2, num_head, head_size
prompt = prompt.permute([2,0,3,1,4])
print(f"P-tuningV2构造的trainable continuous embeddings形状:{prompt.shape}")
past_key_values = prompt.split(2)
num_layers = 24
hidden_dim = 768
n_head = 12
head_dim = hidden_dim // n_head
all_head_size = n_head * head_dim
hidden_states = torch.randn(32,128,768) # batch_size, seq_len, hidden_size
print(f"输入的向量形状:{hidden_states.shape}")
for i in range(num_layers):
    past_key_value = past_key_values[i]
    print(f"每一层BertLayer需要加入的prompt形状: {past_key_value.shape}")
    self_attn_past_key_value = past_key_value[:2] if past_key_value is not None else None
    # BertSelfAttention
    query = nn.Linear(hidden_dim, all_head_size)
    key = nn.Linear(hidden_dim, all_head_size)
    value = nn.Linear(hidden_dim, all_head_size)

    key_layer = transpose_for_scores(key(hidden_states))
    print(f"经过transpose_for_scores后的key形状:{key_layer.shape}")
    value_layer = transpose_for_scores(value(hidden_states))
    print(f"经过transpose_for_scores后的value形状:{value_layer.shape}")
    key_layer = torch.cat([past_key_value[0], key_layer], dim=2)
    print(f"past_key_value[0]的形状:{past_key_value[0].shape} key_layer的形状:{key_layer.shape} 经过cat后的key_layer形状:{key_layer.shape}")
    value_layer = torch.cat([past_key_value[1], value_layer], dim=2)
    print(f"past_key_value[1]的形状:{past_key_value[1].shape} value_layer的形状:{value_layer.shape} 经过cat后的value_layer形状:{value_layer.shape}")

    mixed_query_layer = query(hidden_states)
    print(f"hidden_states经过query层后输出的形状:{mixed_query_layer.size()}") #batch seq len embed
    query_layer = transpose_for_scores(mixed_query_layer)
    print(f"经过transpose_for_scores后的query形状{query_layer.size()}") #batch

    print("注意力分数开始计算")
    attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2))
    print(f"attention_scores的形状:{attention_scores.size()}") #batch head seq_len seq_len
    print("开始注意力汇聚计算")
    context_layer = torch.matmul(attention_scores, value_layer)
    print(f"注意力汇聚后输出矩阵context_layer的形状:{context_layer.size()}") #batch head seq_len embed/12
    print("最后,将context_layer的形状恢复成输入hidden_states的形状")
    context_layer = context_layer.permute(0, 2, 1, 3).contiguous()
    new_context_layer_shape = context_layer.size()[:-2] + (768,)
    context_layer = context_layer.view(new_context_layer_shape)
    print(f"context_layer的形状恢复完成,其形状为:{context_layer.size()}")
    print("一次P-tuningV2的BertLayer计算仿真结束")
    break
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

其输出为

P-tuningV2构造的contigous prompt形状:torch.Size([48, 32, 12, 128, 64])
输入的向量形状:torch.Size([32, 128, 768])
每一层BertLayer需要加入的prompt形状: torch.Size([2, 32, 12, 128, 64])
经过transpose_for_scores后的key形状:torch.Size([32, 12, 128, 64])
经过transpose_for_scores后的value形状:torch.Size([32, 12, 128, 64])
past_key_value[0]的形状:torch.Size([32, 12, 128, 64]) key_layer的形状:torch.Size([32, 12, 256, 64]) 经过cat后的key_layer形状:torch.Size([32, 12, 256, 64])
past_key_value[1]的形状:torch.Size([32, 12, 128, 64]) value_layer的形状:torch.Size([32, 12, 256, 64]) 经过cat后的value_layer形状:torch.Size([32, 12, 256, 64])
hidden_states经过query层后输出的形状:torch.Size([32, 128, 768])
经过transpose_for_scores后的query形状torch.Size([32, 12, 128, 64])
注意力分数开始计算
attention_scores的形状:torch.Size([32, 12, 128, 256])
开始注意力汇聚计算
注意力汇聚后输出矩阵context_layer的形状:torch.Size([32, 12, 128, 64])
最后,将context_layer的形状恢复成输入hidden_states的形状
context_layer的形状恢复完成,其形状为:torch.Size([32, 128, 768])
一次P-tuningV2的BertLayer计算仿真结束
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号