赞
踩
Peft库加载finetune模型分析
import torch from peft import PeftModel from transformers import ( LlamaTokenizer, GenerationConfig, LlamaForCausalLM ) BASE_MODEL="" PE_WEIGHTS="" model = LlamaForCausalLM.from_pretrained( BASE_MODEL, load_in_8bit=LOAD_8BIT, device_map={"": device}, torch_dtype=torch.float16, ) model = PeftModel.from_pretrained( model, PE_WEIGHTS, device_map={"": device}, torch_dtype=torch.float16, ) def generate_prompt(): ... tokenizer = LlamaTokenizer.from_pretrained(BASE_MODEL) instruction="your instruction" input="your input" temperature='' top_p='' top_k='' num_beams='' prompt = generate_prompt(instruction, input) inputs = tokenizer(prompt, return_tensors="pt") input_ids = inputs["input_ids"].to(device) generation_config = GenerationConfig( temperature=temperature, top_p=top_p, top_k=top_k, num_beams=num_beams, **kwargs, ) with torch.no_grad(): generation_output = model.generate( input_ids=input_ids, generation_config=generation_config, return_dict_in_generate=True, output_scores=True, max_new_tokens=max_new_tokens, ) s = generation_output.sequences[0] output = tokenizer.decode(s) print(output.split("### Response:")[1].strip())
#class PeftModel(PushToHubMixin, torch.nn.Module): def from_pretrained( cls, model: PreTrainedModel, model_id: Union[str, os.PathLike], adapter_name: str = "default", is_trainable: bool = False, config: Optional[PeftConfig] = None, **kwargs: Any, ): from .mapping import MODEL_TYPE_TO_PEFT_MODEL_MAPPING, PEFT_TYPE_TO_CONFIG_MAPPING # load the config if config is None: config = PEFT_TYPE_TO_CONFIG_MAPPING[ PeftConfig._get_peft_type( model_id, subfolder=kwargs.get("subfolder", None), revision=kwargs.get("revision", None), cache_dir=kwargs.get("cache_dir", None), use_auth_token=kwargs.get("use_auth_token", None), ) ].from_pretrained(model_id, **kwargs) elif isinstance(config, PeftConfig): config.inference_mode = not is_trainable else: raise ValueError(f"The input config must be a PeftConfig, got {config.__class__}") if (getattr(model, "hf_device_map", None) is not None) and len( set(model.hf_device_map.values()).intersection({"cpu", "disk"}) ) > 0: remove_hook_from_submodules(model) if config.is_prompt_learning and is_trainable: raise ValueError("Cannot set a prompt learning adapter to trainable when loading pretrained adapter.") else: config.inference_mode = not is_trainable if config.task_type not in MODEL_TYPE_TO_PEFT_MODEL_MAPPING.keys(): model = cls(model, config, adapter_name) else: model = MODEL_TYPE_TO_PEFT_MODEL_MAPPING[config.task_type](model, config, adapter_name) model.load_adapter(model_id, adapter_name, is_trainable=is_trainable, **kwargs) return model
def from_pretrained(
cls,
model: PreTrainedModel,
model_id: Union[str, os.PathLike],
adapter_name: str = "default",
is_trainable: bool = False,
config: Optional[PeftConfig] = None,
**kwargs: Any,
):
model——pretrain model 预训练模型加载路径
model_id——包含adapter_config.json的路径
???adapter_name——在加载多个适配器时比较有用【如何加载多个适配器,又是怎样实现的】
is_trainable——adapter是否用于训练
config——自己传入config而不是读取模型路径,如果config之前已被读取过了,在这里直接传入就好了,model_id下config会被直接忽略
from .mapping import MODEL_TYPE_TO_PEFT_MODEL_MAPPING, PEFT_TYPE_TO_CONFIG_MAPPING # load the config if config is None: config = PEFT_TYPE_TO_CONFIG_MAPPING[ PeftConfig._get_peft_type( model_id, subfolder=kwargs.get("subfolder", None), revision=kwargs.get("revision", None), cache_dir=kwargs.get("cache_dir", None), use_auth_token=kwargs.get("use_auth_token", None), ) ].from_pretrained(model_id, **kwargs) elif isinstance(config, PeftConfig): config.inference_mode = not is_trainable else: raise ValueError(f"The input config must be a PeftConfig, got {config.__class__}")
config参数在没有传入的情况下
config = PEFT_TYPE_TO_CONFIG_MAPPING[
PeftConfig._get_peft_type(
model_id,
subfolder=kwargs.get("subfolder", None),
revision=kwargs.get("revision", None),
cache_dir=kwargs.get("cache_dir", None),
use_auth_token=kwargs.get("use_auth_token", None),
)
].from_pretrained(model_id, **kwargs)
PEFT_TYPE_TO_CONFIG_MAPPING
PEFT_TYPE_TO_CONFIG_MAPPING = {
"ADAPTION_PROMPT": AdaptionPromptConfig,
"PROMPT_TUNING": PromptTuningConfig,
"PREFIX_TUNING": PrefixTuningConfig,
"P_TUNING": PromptEncoderConfig,
"LORA": LoraConfig,
"ADALORA": AdaLoraConfig,
"IA3": IA3Config,
}
PeftConfig._get_peft_type
def _get_peft_type( cls, model_id, **hf_hub_download_kwargs, ): subfolder = hf_hub_download_kwargs.get("subfolder", None) path = os.path.join(model_id, subfolder) if subfolder is not None else model_id if os.path.isfile(os.path.join(path, CONFIG_NAME)): config_file = os.path.join(path, CONFIG_NAME) else: try: config_file = hf_hub_download( model_id, CONFIG_NAME, **hf_hub_download_kwargs, ) except Exception: raise ValueError(f"Can't find '{CONFIG_NAME}' at '{model_id}'") loaded_attributes = cls.from_json_file(config_file) return loaded_attributes["peft_type"]
CONFIG_NAME=“adapter_config.json”
故在model id文件夹下寻找此json
最后返回json中的peft_tupe
{
“auto_mapping”: null,
“base_model_name_or_path”: “llama2/model/7b_chat”,
“encoder_hidden_size”: 4096,
“inference_mode”: true,
“num_attention_heads”: 32,
“num_layers”: 32,
“num_transformer_submodules”: 1,
“num_virtual_tokens”: 32,
“peft_type”: “PREFIX_TUNING”,
“prefix_projection”: false,
“revision”: null,
“task_type”: “CAUSAL_LM”,
“token_dim”: 4096
}
此处返回“PREFIX_TUNING”
config = PEFT_TYPE_TO_CONFIG_MAPPING[“PREFIX_TUNING”].from_pretrained(model_id, **kwargs)
即config=PrefixTuningConfig.from_pretrained(model_id, **kwargs)
def from_pretrained(cls, pretrained_model_name_or_path, subfolder=None, **kwargs): from peft.mapping import PEFT_TYPE_TO_CONFIG_MAPPING path = ( os.path.join(pretrained_model_name_or_path, subfolder) if subfolder is not None else pretrained_model_name_or_path ) hf_hub_download_kwargs, class_kwargs, _ = cls._split_kwargs(kwargs) if os.path.isfile(os.path.join(path, CONFIG_NAME)): config_file = os.path.join(path, CONFIG_NAME) else: try: config_file = hf_hub_download( pretrained_model_name_or_path, CONFIG_NAME, subfolder=subfolder, **hf_hub_download_kwargs ) except Exception: raise ValueError(f"Can't find '{CONFIG_NAME}' at '{pretrained_model_name_or_path}'") loaded_attributes = cls.from_json_file(config_file) if "peft_type" in loaded_attributes: peft_type = loaded_attributes["peft_type"] config_cls = PEFT_TYPE_TO_CONFIG_MAPPING[peft_type] else: config_cls = cls config = config_cls(**class_kwargs) for key, value in loaded_attributes.items(): if hasattr(config, key): setattr(config, key, value) return config
对传入的config进行处理一下,之后挑选应该有的key,保存prefix_config的k,v.
elif isinstance(config, PeftConfig):
config.inference_mode = not is_trainable
如果传入config,调整训练模型,冻结参数用于infer
if (getattr(model, "hf_device_map", None) is not None) and len(
set(model.hf_device_map.values()).intersection({"cpu", "disk"})
) > 0:
remove_hook_from_submodules(model)
检查预训练模型是否具有属性 hf_device_map,如果有且不为 None
检查 hf_device_map 中的值是否包含 “cpu” 或者 “disk”,如果包含其中至少一个值
调用 remove_hook_from_submodules(model) 函数。
remove_hook_from_submodules: 这个函数的作用是从子模块中移除钩子。清理或者撤销之前添加到模型子模块中的某个特定钩子,正如名字所说,用于对齐设备
if config.is_prompt_learning and is_trainable:
raise ValueError("Cannot set a prompt learning adapter to trainable when loading pretrained adapter.")
else:
config.inference_mode = not is_trainable
读取config确保读取adapter为inference模式
if config.task_type not in MODEL_TYPE_TO_PEFT_MODEL_MAPPING.keys():
model = cls(model, config, adapter_name)
else:
model = MODEL_TYPE_TO_PEFT_MODEL_MAPPING[config.task_type](model, config, adapter_name)
是对于训练时的训练配置是否为MODEL_TYPE_TO_PEFT_MODEL_MAPPING其中的一部分,包含以下内容:
MODEL_TYPE_TO_PEFT_MODEL_MAPPING = {
"SEQ_CLS": PeftModelForSequenceClassification,
"SEQ_2_SEQ_LM": PeftModelForSeq2SeqLM,
"CAUSAL_LM": PeftModelForCausalLM,
"TOKEN_CLS": PeftModelForTokenClassification,
"QUESTION_ANS": PeftModelForQuestionAnswering,
"FEATURE_EXTRACTION": PeftModelForFeatureExtraction,
}
我这里是CAUSAL_LM
故model=PeftModelForCausalLM(model, config, adapter_name)
class PeftModelForCausalLM(PeftModel): def __init__(self, model, peft_config: PeftConfig, adapter_name="default"): super().__init__(model, peft_config, adapter_name) self.base_model_prepare_inputs_for_generation = self.base_model.prepare_inputs_for_generation def forward( self, input_ids=None, attention_mask=None, inputs_embeds=None, labels=None, output_attentions=None, output_hidden_states=None, return_dict=None, **kwargs, ): peft_config = self.active_peft_config if not peft_config.is_prompt_learning: if self.base_model.config.model_type == "mpt": if inputs_embeds is not None: raise AssertionError("forward in MPTForCausalLM does not support inputs_embeds") return self.base_model( input_ids=input_ids, attention_mask=attention_mask, labels=labels, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, **kwargs, ) return self.base_model( input_ids=input_ids, attention_mask=attention_mask, inputs_embeds=inputs_embeds, labels=labels, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, **kwargs, ) batch_size = _get_batch_size(input_ids, inputs_embeds) if attention_mask is not None: # concat prompt attention mask prefix_attention_mask = torch.ones(batch_size, peft_config.num_virtual_tokens).to(attention_mask.device) attention_mask = torch.cat((prefix_attention_mask, attention_mask), dim=1) if kwargs.get("position_ids", None) is not None: warnings.warn("Position ids are not supported for parameter efficient tuning. Ignoring position ids.") kwargs["position_ids"] = None if kwargs.get("token_type_ids", None) is not None: warnings.warn("Token type ids are not supported for parameter efficient tuning. Ignoring token type ids") kwargs["token_type_ids"] = None kwargs.update( { "attention_mask": attention_mask, "labels": labels, "output_attentions": output_attentions, "output_hidden_states": output_hidden_states, "return_dict": return_dict, } ) if peft_config.peft_type == PeftType.PREFIX_TUNING: past_key_values = self.get_prompt(batch_size) return self.base_model( input_ids=input_ids, inputs_embeds=inputs_embeds, past_key_values=past_key_values, **kwargs ) else: if inputs_embeds is None: inputs_embeds = self.word_embeddings(input_ids) # concat prompt labels if labels is not None: prefix_labels = torch.full((batch_size, peft_config.num_virtual_tokens), -100).to(labels.device) kwargs["labels"] = torch.cat((prefix_labels, labels), dim=1) prompts = self.get_prompt(batch_size=batch_size) prompts = prompts.to(inputs_embeds.dtype) inputs_embeds = torch.cat((prompts, inputs_embeds), dim=1) return self.base_model(inputs_embeds=inputs_embeds, **kwargs) def generate(self, **kwargs): self.base_model.prepare_inputs_for_generation = self.prepare_inputs_for_generation if hasattr(self.base_model, "model"): self.base_model.model.generation_config = self.generation_config else: self.base_model.generation_config = self.generation_config try: outputs = self.base_model.generate(**kwargs) except: self.base_model.prepare_inputs_for_generation = self.base_model_prepare_inputs_for_generation raise else: self.base_model.prepare_inputs_for_generation = self.base_model_prepare_inputs_for_generation return outputs def prepare_inputs_for_generation(self, *args, **kwargs): peft_config = self.active_peft_config model_kwargs = self.base_model_prepare_inputs_for_generation(*args, **kwargs) if peft_config.is_prompt_learning: if model_kwargs.get("attention_mask", None) is not None: prefix_attention_mask = torch.ones( model_kwargs["input_ids"].shape[0], peft_config.num_virtual_tokens ).to(model_kwargs["input_ids"].device) model_kwargs["attention_mask"] = torch.cat( (prefix_attention_mask, model_kwargs["attention_mask"]), dim=1 ) if model_kwargs.get("position_ids", None) is not None: warnings.warn("Position ids are not supported for parameter efficient tuning. Ignoring position ids.") model_kwargs["position_ids"] = None if kwargs.get("token_type_ids", None) is not None: warnings.warn( "Token type ids are not supported for parameter efficient tuning. Ignoring token type ids" ) kwargs["token_type_ids"] = None if model_kwargs["past_key_values"] is None and peft_config.peft_type == PeftType.PREFIX_TUNING: past_key_values = self.get_prompt(batch_size=model_kwargs["input_ids"].shape[0]) model_kwargs["past_key_values"] = past_key_values else: if model_kwargs["past_key_values"] is None: inputs_embeds = self.word_embeddings(model_kwargs["input_ids"]) prompts = self.get_prompt(batch_size=model_kwargs["input_ids"].shape[0]) prompts = prompts.to(inputs_embeds.dtype) model_kwargs["inputs_embeds"] = torch.cat((prompts, inputs_embeds), dim=1) model_kwargs["input_ids"] = None return model_kwargs
在这里adapter_name是没有传递的,所以依旧是default
base_model是预训练模型
model为预训练模型,peft_config为读取的finetune_config
此时被改变或新增的函数有:
.base_model_prepare_inputs_for_generation
.forward()
.generate()
.prepare_inputs_for_generation()
.save_pretrained()
._setup_prompt_encoder()
._prepare_model_for_gradient_checkpointing()
get_prompt_embedding_to_save()
get_prompt
get_nb_trainable_parameters
print_trainable_parameters
getattr
_get_base_model_class
disable_adapter
get_base_model
add_adapter
set_additional_trainable_modules
_split_kwargs
load_adapter
set_adapter
base_model_torch_dtype
active_peft_config
create_or_update_model_card
peft_config=self.active_peft_config
推理一下:
prefix-tuning采用的是is_prompt_learning
因此forward函数中,以下代码是有效代码
batch_size = _get_batch_size(input_ids, inputs_embeds)
if attention_mask is not None:
# concat prompt attention mask
prefix_attention_mask = torch.ones(batch_size, peft_config.num_virtual_tokens).to(attention_mask.device)
attention_mask = torch.cat((prefix_attention_mask, attention_mask), dim=1)
if kwargs.get("position_ids", None) is not None: warnings.warn("Position ids are not supported for parameter efficient tuning. Ignoring position ids.") kwargs["position_ids"] = None if kwargs.get("token_type_ids", None) is not None: warnings.warn("Token type ids are not supported for parameter efficient tuning. Ignoring token type ids") kwargs["token_type_ids"] = None kwargs.update( { "attention_mask": attention_mask, "labels": labels, "output_attentions": output_attentions, "output_hidden_states": output_hidden_states, "return_dict": return_dict, } ) past_key_values = self.get_prompt(batch_size) return self.base_model( input_ids=input_ids, inputs_embeds=inputs_embeds, past_key_values=past_key_values, **kwargs )
清除position_id和token_type_id
将"attention_mask", “labels”, “output_attentions”, “output_hidden_states”, “return_dict”:传入kwargs
添加past_key_values作为prefix的kv_prefix.
这里看一下self.base_model
在PeftModel类里,有以下初始化过程
def __init__(self, model: PreTrainedModel, peft_config: PeftConfig, adapter_name: str = "default"): super().__init__() self.base_model = model self.config = getattr(self.base_model, "config", {"model_type": "custom"}) self.modules_to_save = None self.peft_config = {} self.active_adapter = adapter_name self.peft_type = peft_config.peft_type if not peft_config.is_prompt_learning: self.peft_config[adapter_name] = peft_config self.base_model = PEFT_TYPE_TO_MODEL_MAPPING[peft_config.peft_type]( self.base_model, self.peft_config, adapter_name ) self.set_additional_trainable_modules(peft_config, adapter_name) else: self.add_adapter(adapter_name, peft_config) if getattr(model, "is_gradient_checkpointing", True): model = self._prepare_model_for_gradient_checkpointing(model) # the `pretraining_tp` is set for some models to simulate Tensor Parallelism during inference to avoid # numerical differences, https://github.com/pytorch/pytorch/issues/76232 - to avoid any unexpected # behavior we disable that in this line. if hasattr(self.base_model, "config") and hasattr(self.base_model.config, "pretraining_tp"): self.base_model.config.pretraining_tp = 1
self.add_adapter(adapter_name, peft_config)
因此,此时的self.base_model为已经加入adapter层的model
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。