赞
踩
使用 Qwen1.5-7B-Chat 模型进行微调,主要了解该模型如何构造 prompt,尤其对单轮对话和多轮对话的处理方式,只有了解并掌握其原理,才能根据需要做出相应的调整。下面将介绍微调时如何构造 prompt。
Qwen1.5-7B 微调和推理时环境配置如下:
- # python==3.10.13
- torch==2.0.0
- transformers==4.37.0
- deepspeed==0.12.6
- peft==0.7.1
- accelerate==0.28.0
- loguru==0.7.2
- wandb==0.16.5
- scikit-learn==1.4.1.post1
下面使用官方开源的代码进行 debug,然后介绍该大模型的原理,为后续修改优化等工作奠定基础
挑选的这些数据主要用于测试模型是如何构造 prompt, 只有了解其原理,后期才能根据需要做相应的修改,以符合我们的要求。
注意:字段里面的内容可以随意替换,或用公开数据集,然后处理成这种格式也行。
- messages: List[List[Dict]] = [
- [
- {"role": "user", "content": "选走红绿灯最少"},
- {"role": "assistant", "content": "<导航>"}
- ],
- [
- {
- "role": "system",
- "content": "你是一个擅长猜测人类意图的人工智能助手,下面有选项供你选择用户的意图,请选择:\nA. <通用>\nB. <媒体>\nC. <系统控制>\nD. <天气>\nE. <车控>\nF. <导航>\nG. <蓝牙电话>"
- },
- {"role": "user", "content": "选走红绿灯最少"},
- {"role": "assistant", "content": "<导航>"}
- ],
- [
- {
- "role": "system",
- "content": "你是一个擅长猜测人类意图的人工智能助手,下面有选项供你选择用户的意图,请选择:\nA. <通用>\nB. <媒体>\nC. <系统控制>\nD. <天气>\nE. <车控>\nF. <导航>\nG. <蓝牙电话>"
- },
- {"role": "user", "content": "查询下明天天气温度"},
- {"role": "assistant", "content": "<天气>"},
- {"role": "user", "content": "你协助我开电台"},
- {"role": "assistant", "content": "<媒体>"},
- {"role": "user", "content": "开启最大除霜"},
- {"role": "assistant", "content": "<车控>"},
- {"role": "user", "content": "右车窗开开一下"},
- {"role": "assistant", "content": "<车控>"}
- ],
- ]
下面代码主要讲解如何对句子进行tokenizer操作。
- # 导包
- from dataclasses import dataclass, field
- import json
- import logging
- import copy
- import os
- import pathlib
- from typing import Dict, Optional, List
- import torch
- from torch.utils.data import Dataset
- import transformers
- from transformers import AutoModelForCausalLM, AutoTokenizer
- from transformers import Trainer, BitsAndBytesConfig, deepspeed
- from transformers.trainer_pt_utils import LabelSmoother
-
- # 加载 tokenizer
- tokenizer = AutoTokenizer.from_pretrained(
- r"D:\Company\Code\LLM\Qwen1.5-7B-Chat",
- cache_dir=None,
- model_max_length=512,
- padding_side="right",
- use_fast=False
- )
-
- # 将文本处理成 ids
- def preprocess(
- messages: List[List[Dict[str,str]]],
- tokenizer: transformers.PreTrainedTokenizer,
- max_len: int,
- ) -> Dict:
- texts = []
- for i, msg in enumerate(messages):
- texts.append(
- tokenizer.apply_chat_template(
- msg,
- tokenize=True,
- add_generation_prompt=False,
- padding=True,
- max_length=max_len,
- truncation=True,
- )
- )
-
- # tokenizer 后的 ids
- target_ids = copy.deepcopy(texts)
- target_ids[target_ids == tokenizer.pad_token_id] = -100
- print(texts)
- print(target_ids)
-
- # 如何被截断的
- after_target_ids = [tokenizer.decode(i) for i in texts]
- print(after_target_ids)
-
- # 测试
- if __name__ == "__main__":
- preprocess(messages=messages, tokenizer=tokenizer, max_len=52)
label
, 系统只对 pad_token_id(<|endoftext|>)设置为 -100,其余不做任何处理根据官方给出的代码可以看出,该代码存在以下问题:
- # query
- [151644, 8948, 198, 56568, 101909, 107618, 109736, 103971, 111450, 100623, 48692, 100168, 110498, 3837, 100431, 18830, 109487, 83744, 56568, 50404, 107494, 111450, 37945, 50404, 28311, 32, 13, 366, 105600, 397]
-
- # answer
- "<|im_start|>system\n你是一个擅长猜测人类意图的人工智能助手,下面有选项供你选择用户的意图,请选择:\nA. <通用>"
根据上面提出的问题并结合自己对大模型的理解,下面将给出解决方案:
在编程时,数据在读取和存储这两个阶段的代码可以适用于其它大模型,因此可以将这部分的代码进行分装,后续有需要时直接复制过来使用即可。
- from typing import List, Dict, Union
- import json
- import os
-
-
- def loads(path: str) -> List[Dict]:
- """
- Args:
- path (str): _description_
- Returns:
- List[Dict]: _description_
- """
-
- datas: List[Dict] = []
- with open(path, mode="r", encoding="UTF-8") as fr:
- for line in fr:
- datas.append(json.loads(line))
-
- return datas
-
-
- def load(path: str) -> List[Dict]:
- """
- Args:
- path (str): _description_
- Returns:
- List[Dict]: _description_
- """
- with open(path, mode="r", encoding="UTF-8") as fr:
- data = json.load(fr)
-
- return data
-
-
- def read_datas(paths: Union[str, List[str]] = None) -> List[List[Dict]]:
- """
- Args:
- paths (Union[str, List[str]], optional): _description_. Defaults to None.
- Returns:
- List[Dict]: _description_
- """
- if not paths:
- return []
-
- if isinstance(paths, str):
- paths = [paths]
-
- datas: List[List[Dict]] = []
- for path in paths:
- fold, suffix = os.path.splitext(path)
- if ".json" == suffix:
- datas.append(load(path))
- elif ".jsonl" == suffix:
- datas.append(loads(path))
-
- return datas
-
-
- def dump(path: str, datas: List[Dict]):
- """
- Args:
- path (str): _description_
- datas (List[Dict]): _description_
- """
- prex = path.rsplit(os.sep, 1)[0]
- os.makedirs(prex, exist_ok=True)
- with open(path, "w", encoding="UTF-8") as fw:
- json.dump(datas, fw, ensure_ascii=False, indent=4)
-
-
- def dumps(path: str, datas: List[Dict]):
- """
- Args:
- path (str): _description_
- datas (List[Dict]): _description_
- """
- prex = path.rsplit(os.sep, 1)[0]
- os.makedirs(prex, exist_ok=True)
- with open(path, "w", encoding="UTF-8") as fw:
- for obj in datas:
- fw.write(json.dumps(obj, ensure_ascii=False) + "\n")
不同模型提供的数据格式会有细微的差别,最简单的做法就是将自己的数据处理成官方提供数据的那种格式。
- from loguru import logger
- from typing import Dict, List, Any, Tuple
- import torch
- from torch.utils.data import Dataset
- import transformers
- from transformers.trainer_pt_utils import LabelSmoother
- from transformers import AutoTokenizer
- from transformers import PreTrainedTokenizer
- from argument import DataArguments
-
- from datas import com
-
-
- def dialog(
- tokenizer: PreTrainedTokenizer,
- messages: List[Dict[str, str]],
- model_max_length: int,
- ) -> List[int]:
- def _parse_messages(
- messages: List[Dict[str, str]], split_role: str = "user"
- ) -> Tuple[str, List[List[Dict[str, str]]]]:
- """
- Args:
- messages: List[Dict[str, str]]
- split_role: user
- Return: List[Tuple[Dict[str, str], Dict[str, str]], ...]
- Example:
- >>> messages = [{'role': 'system', 'content': '你是一个人工智能助理'}, {'role': 'user', 'content':'你好'}, {'role': 'assistant', 'content':'你好啊'}, ...]
- >>> 你是一个人工智能助理, [[{'role': 'user', 'content': '你好'}, {'role': 'assistant', 'content': '你好啊'}], ...]
- """
- system, rounds = "", []
- round = []
- for i, message in enumerate(messages):
- if message["role"] == "system":
- assert i == 0
- system = message["content"]
- continue
- # 结束一轮对话才将数据添加到 rounds
- if message["role"] == split_role and round:
- rounds.append(round)
- round = []
- round.append(message)
- if round: # 最后的数据也添加到 rounds
- rounds.append(round)
-
- return system, rounds
-
- system, rounds = _parse_messages(messages)
- system_ids = tokenizer.encode(f"<|im_start|>system\n{system}<|im_end|>\n")
- input_ids: List[int] = []
-
- # 如果是多轮对话时, 只取后面几轮对话, 即[n:]
- for i, round in enumerate(rounds[::-1]): # 从后往前遍历
- # 一轮完整对话
- text_id = []
- for message in round:
- role, content = message["role"], message["content"]
- if role == "user":
- cont = f"<|im_start|>user\n{content}<|im_end|>\n"
- else:
- # 最后一次对话需要做特殊处理
- if role == "assistant" and i == 0:
- cont = f"<|im_start|>assistant\n{content}"
- else:
- cont = f"<|im_start|>assistant\n{content}<|im_end|>\n"
- # user + assistant
- text_id = text_id + tokenizer.encode(cont)
-
- # 如果当前对话添加到当前轮数对话时不会超出模型最大长度, 则将其添加到对话中
- if len(system_ids + input_ids + text_id) > model_max_length:
- break
- else:
- input_ids = text_id + input_ids
-
- # 将input_ids填充到model_max_length设置的长度
- pad = (model_max_length - len(system_ids + input_ids)) * tokenizer.encode(
- "<|endoftext|>"
- )
-
- return system_ids + input_ids + pad
-
-
- def preprocess(
- tokenizer: PreTrainedTokenizer,
- messages: List[List[Dict[str, str]]],
- model_max_length: int,
- ) -> Dict[str, torch.Tensor]:
-
- texts_ids: List[List[int]] = []
- for msg in messages:
- text_id: List[int] = dialog(tokenizer, msg, model_max_length)
- texts_ids.append(text_id)
-
- input_ids = torch.tensor(texts_ids, dtype=torch.int)
- target_ids = input_ids.clone()
- target_ids[target_ids == tokenizer.pad_token_id] = LabelSmoother.ignore_index
- attention_mask = input_ids.ne(tokenizer.pad_token_id) # True or False
-
- return {
- "input_ids": input_ids,
- "target_ids": target_ids,
- "attention_mask": attention_mask,
- }
-
-
- class SupervisedDataset(Dataset):
- def __init__(
- self,
- raw_data: List[Dict[str, Any]],
- tokenizer: PreTrainedTokenizer,
- model_max_length: int,
- ):
- super().__init__()
- self.tokenizer = tokenizer
- # 格式化数据
- self.messages: List[List[Dict[str, str]]] = self.format(raw_data)
-
- # 将文本转成id
- data_dict: Dict[str, torch.Tensor] = preprocess(
- tokenizer, self.messages, model_max_length
- )
-
- self.input_ids = data_dict["input_ids"]
- self.target_ids = data_dict["target_ids"]
- self.attention_mask = data_dict["attention_mask"]
-
- def __len__(self):
- return len(self.input_ids)
-
- def __getitem__(self, i) -> Dict[str, torch.Tensor]:
- input_ids = self.input_ids[i]
- target_ids = self.target_ids[i]
- attention_mask = self.attention_mask[i]
-
- if i == 0:
- target = [i for i in target_ids.tolist() if i != -100]
- logger.debug(
- f"text: {self.tokenizer.decode(input_ids.tolist())}\n{input_ids.tolist()}"
- )
- logger.debug(f"label: {self.tokenizer.decode(target)}\n{target}")
-
- return {
- "input_ids": input_ids,
- "labels": target_ids,
- "attention_mask": attention_mask,
- }
-
- def format(self, datas: List[List[Dict[str, Any]]]) -> List[List[Dict[str, str]]]:
- lis = []
- for data in datas:
- for message in data:
- lis.append(
- [
- {"role": "system", "content": message["system"]},
- {"role": "user", "content": message["query"]},
- {"role": "assistant", "content": message["answer"]},
- ]
- )
- return lis
-
-
- def make_supervised_data_module(
- tokenizer: PreTrainedTokenizer,
- data_args: DataArguments,
- model_max_length: int,
- ) -> Dict[str, SupervisedDataset]:
-
- logger.info("Loading data...")
- train_data = com.read_datas(com.get_paths(data_args.data_path))
- eval_data = com.read_datas(com.get_paths(data_args.eval_data_path))
- logger.info("data loading fished")
- train_dataset = SupervisedDataset(train_data, tokenizer, model_max_length)
- eval_dataset = SupervisedDataset(eval_data, tokenizer, model_max_length)
-
- return {"train_dataset": train_dataset, "eval_dataset": eval_dataset}
-
-
- if __name__ == "__main__":
- parser = transformers.HfArgumentParser((DataArguments))
- (data_args,) = parser.parse_args_into_dataclasses()
-
- tokenizer = AutoTokenizer.from_pretrained(
- "/root/.cache/modelscope/hub/qwen/Qwen1___5-7B-Chat",
- cache_dir=None,
- model_max_length=1024,
- padding_side="right",
- use_fast=False,
- )
-
- data_module = make_supervised_data_module(
- tokenizer=tokenizer,
- data_args=data_args,
- model_max_length=1024,
- )
- for data in data_module["train_dataset"]:
- logger.debug(data)
参数主要用于模型微调,例如学习率,训练集、测试集路径,以及大模型的路径等等。通常情况下可以将参数的设置写到 **.sh 文件,或者写在 Python 文件中。
- from dataclasses import dataclass, field
- from typing import Optional, List
- import transformers
-
-
- @dataclass
- class DataArguments:
- data_path: Optional[str] = field(
- default="/root/autodl-tmp/Qwen-our/datas/handled/devs/battle_death",
- metadata={"help": "Path to the training data."},
- )
-
- eval_data_path: Optional[str] = field(
- default="/root/autodl-tmp/Qwen-our/datas/handled/trains/activity_1",
- metadata={"help": "Path to the evaluation data."},
- )
-
-
- @dataclass
- class ModelArguments:
- model_name_or_path: Optional[str] = field(
- default="/root/.cache/modelscope/hub/qwen/Qwen1___5-7B-Chat"
- )
-
-
- @dataclass
- class TrainingArguments(transformers.TrainingArguments):
- output_dir: Optional[str] = field(
- default="/root/autodl-tmp/Qwen-our/sft",
- metadata={"help": "model output"},
- )
-
- cache_dir: Optional[str] = field(default=None)
- optim: str = field(default="adamw_torch")
- model_max_length: int = field(
- default=1024,
- metadata={
- "help": "Maximum sequence length. Sequences will be right padded (and possibly truncated)."
- },
- )
- bf16: bool = field(default=True)
- num_train_epochs: int = field(default=1)
- per_device_train_batch_size: int = field(default=2)
- per_device_eval_batch_size: int = field(default=2)
- gradient_accumulation_steps: int = field(default=2)
- evaluation_strategy: str = field(default="epoch")
- save_strategy: str = field(default="epoch")
- save_total_limit: int = field(default=3)
- learning_rate: int = field(default=5e-5)
- weight_decay: int = field(default=0.01)
- adam_beta2: int = field(default=0.95)
- warmup_ratio: int = field(default=0.01)
- lr_scheduler_type: str = field(default="cosine")
- logging_steps: int = field(default=1)
- gradient_checkpointing: bool = field(default=True)
- use_lora: bool = field(default=True)
-
-
- @dataclass
- class LoraArguments:
- lora_r: int = 64
- lora_alpha: int = 16
- lora_dropout: float = 0.05
- lora_target_modules: List[str] = field(
- default_factory=lambda: [
- "q_proj",
- "k_proj",
- "v_proj",
- "o_proj",
- "up_proj",
- "gate_proj",
- "down_proj",
- ]
- )
- lora_weight_path: str = ""
- lora_bias: str = "none"
- q_lora: bool = False
接下来就到微调模型阶段的,这个没什么好说的,如果对模型的架构不是很了解的话,建议不要随意修改官方的源码,很容易出现问题,一时之间又摸不着头脑。
- import logging
- import os
-
- # os.environ["CUDA_VISIBLE_DEVICES"] = "2" # 指定使用哪一张显卡
- import pathlib
- import torch
- from deepspeed import zero
- from deepspeed.runtime.zero.partition_parameters import ZeroParamStatus
- import transformers
- from transformers import AutoModelForCausalLM, AutoTokenizer
- from transformers import Trainer, BitsAndBytesConfig
- from transformers.integrations import deepspeed
-
- from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
- from peft import LoraConfig, TaskType, get_peft_model
- from accelerate.utils import DistributedType
-
-
- from argument import DataArguments, ModelArguments, TrainingArguments, LoraArguments
- import dataSets
-
-
- def maybe_zero_3(param):
- if hasattr(param, "ds_id"):
- assert param.ds_status == ZeroParamStatus.NOT_AVAILABLE
- with zero.GatheredParameters([param]):
- param = param.data.detach().cpu().clone()
- else:
- param = param.detach().cpu().clone()
- return param
-
-
- # Borrowed from peft.utils.get_peft_model_state_dict
- def get_peft_state_maybe_zero_3(named_params, bias):
- if bias == "none":
- to_return = {k: t for k, t in named_params if "lora_" in k}
- elif bias == "all":
- to_return = {k: t for k, t in named_params if "lora_" in k or "bias" in k}
- elif bias == "lora_only":
- to_return = {}
- maybe_lora_bias = {}
- lora_bias_names = set()
- for k, t in named_params:
- if "lora_" in k:
- to_return[k] = t
- bias_name = k.split("lora_")[0] + "bias"
- lora_bias_names.add(bias_name)
- elif "bias" in k:
- maybe_lora_bias[k] = t
- for k, t in maybe_lora_bias:
- if bias_name in lora_bias_names:
- to_return[bias_name] = t
- else:
- raise NotImplementedError
- to_return = {k: maybe_zero_3(v) for k, v in to_return.items()}
- return to_return
-
-
- def safe_save_model_for_hf_trainer(
- trainer: transformers.Trainer, output_dir: str, bias="none"
- ):
- """Collects the state dict and dump to disk."""
- # check if zero3 mode enabled
- if deepspeed.is_deepspeed_zero3_enabled():
- state_dict = trainer.model_wrapped._zero3_consolidated_16bit_state_dict()
- else:
- if trainer.args.use_lora:
- state_dict = get_peft_state_maybe_zero_3(
- trainer.model.named_parameters(), bias
- )
- else:
- state_dict = trainer.model.state_dict()
- if trainer.args.should_save and trainer.args.local_rank == 0:
- trainer._save(output_dir, state_dict=state_dict)
-
-
- def train():
- parser = transformers.HfArgumentParser(
- (ModelArguments, DataArguments, TrainingArguments, LoraArguments)
- )
- (
- model_args,
- data_args,
- training_args,
- lora_args,
- ) = parser.parse_args_into_dataclasses()
-
- # This serves for single-gpu qlora.
- if (
- getattr(training_args, "deepspeed", None)
- and int(os.environ.get("WORLD_SIZE", 1)) == 1
- ):
- training_args.distributed_state.distributed_type = DistributedType.DEEPSPEED
-
- device_map = None
- world_size = int(os.environ.get("WORLD_SIZE", 1))
- ddp = world_size != 1
- # 加载 qlora
- if lora_args.q_lora:
- device_map = {"": int(os.environ.get("LOCAL_RANK") or 0)} if ddp else "auto"
- if len(training_args.fsdp) > 0 or deepspeed.is_deepspeed_zero3_enabled():
- logging.warning("FSDP or ZeRO3 is incompatible with QLoRA.")
-
- model_load_kwargs = {
- "low_cpu_mem_usage": not deepspeed.is_deepspeed_zero3_enabled(),
- }
-
- compute_dtype = (
- torch.float16
- if training_args.fp16
- else (torch.bfloat16 if training_args.bf16 else torch.float32)
- )
-
- # Load model and tokenizer
- config = transformers.AutoConfig.from_pretrained(
- model_args.model_name_or_path,
- cache_dir=training_args.cache_dir,
- )
- config.use_cache = False
-
- model = AutoModelForCausalLM.from_pretrained(
- model_args.model_name_or_path,
- config=config,
- cache_dir=training_args.cache_dir,
- device_map=device_map,
- # 同时设置 lora 和 qlora 时设置量化
- quantization_config=(
- BitsAndBytesConfig(
- load_in_4bit=True,
- bnb_4bit_use_double_quant=True,
- bnb_4bit_quant_type="nf4",
- bnb_4bit_compute_dtype=compute_dtype,
- )
- if training_args.use_lora and lora_args.q_lora
- else None
- ),
- torch_dtype=torch.bfloat16, # 使用混合精度
- **model_load_kwargs,
- )
- tokenizer = AutoTokenizer.from_pretrained(
- model_args.model_name_or_path,
- cache_dir=training_args.cache_dir,
- model_max_length=training_args.model_max_length,
- padding_side="right",
- use_fast=False,
- )
-
- if training_args.use_lora:
- lora_config = LoraConfig(
- r=lora_args.lora_r,
- lora_alpha=lora_args.lora_alpha,
- target_modules=lora_args.lora_target_modules,
- lora_dropout=lora_args.lora_dropout,
- bias=lora_args.lora_bias,
- task_type=TaskType.CAUSAL_LM,
- )
- if lora_args.q_lora:
- model = prepare_model_for_kbit_training(
- model, use_gradient_checkpointing=training_args.gradient_checkpointing
- )
-
- model = get_peft_model(model, lora_config)
-
- # Print peft trainable params
- model.print_trainable_parameters()
-
- if training_args.gradient_checkpointing:
- model.enable_input_require_grads()
-
- # Load data
- data_module = dataSets.make_supervised_data_module(
- tokenizer=tokenizer,
- data_args=data_args,
- model_max_length=training_args.model_max_length,
- )
-
- # Start trainer
- trainer = Trainer(
- model=model,
- tokenizer=tokenizer,
- args=training_args,
- train_dataset=data_module["train_dataset"],
- eval_dataset=data_module["eval_dataset"],
- )
-
- # `not training_args.use_lora` is a temporary workaround for the issue that there are problems with
- # loading the checkpoint when using LoRA with DeepSpeed.
- # Check this issue https://github.com/huggingface/peft/issues/746 for more information.
- # 微调
- if (
- list(pathlib.Path(training_args.output_dir).glob("checkpoint-*"))
- and not training_args.use_lora
- ):
- trainer.train(resume_from_checkpoint=True)
- else:
- trainer.train()
-
- trainer.save_state()
-
- safe_save_model_for_hf_trainer(
- trainer=trainer, output_dir=training_args.output_dir, bias=lora_args.lora_bias
- )
-
-
- if __name__ == "__main__":
- train()
使用 LoRA 进行微调后,会生成一个 LoRA 的模型,该模型参数量很少,后续模型在推理时需要加载基座模型和 LoRA 模型。针对微调后的模型,在推理阶段时,下面介绍两种方案,可以根据需要进行选择。
模型微调完成时,可以将基座模型参数和LoRA模型参数合并成一个新的大模型,后续在加载新模型跟加载基座模型的方法是一样的。但是,如果对应用场景有要求的话,比如角色扮演,一般不建议将模型合并,因为加载一个新的大模型比较耗时,而加载 LoRA 模型就很快。
参数不合并的意思是分别加载基座模型和 LoRA 模型,加载方法也很简单,不过值得注意的是,由于分别加载基座模型和 LoRA 模型,需要将两个模型的参数都放在显卡或 CPU 上,不然会出错。
微调和推理阶段时构造的 prompt 存在细微的差别,主要差别如下:
- message = [
- {
- "role": "system",
- "content": "你是一个擅长猜测人类意图的人工智能助手,下面有选项供你选择用户的意图,请选择:\nA. <通用>\nB. <媒体>\nC. <系统控制>\nD. <天气>\nE. <车控>\nF. <导航>\nG. <蓝牙电话>"
- },
- {"role": "user", "content": "查询下明天天气温度"},
- {"role": "assistant", "content": "<天气>"},
- {"role": "user", "content": "你协助我开电台"},
- {"role": "assistant", "content": "<媒体>"},
- {"role": "user", "content": "开启最大除霜"},
- {"role": "assistant", "content": "<车控>"},
- {"role": "user", "content": "右车窗开开一下"}
- ]
-
- # result
- """
- <|im_start|>system
- 你是一个擅长猜测人类意图的人工智能助手,下面有选项供你选择用户的意图,请选择:
- A. <通用>
- B. <媒体>
- C. <系统控制>
- D. <天气>
- E. <车控>
- F. <导航>
- G. <蓝牙电话><|im_end|>
- <|im_start|>user
- 开启最大除霜<|im_end|>
- <|im_start|>assistant
- <车控><|im_end|>
- <|im_start|>user
- 右车窗开开一下<|im_end|>
- <|im_start|>assistant
- """
上面讲过,如果在微调结束时,选择将基座模型和 LoRA 模型的参数合并成一个新的模型时,那么加载方法跟加载基座模型的方式是一样的。
一种方法是将基座模型和 LoRA 模型的参数合并在一起,成为一个新的大模型,另一种方式是不将基座模型和LoRA模型参数合并在一起,这时需要分别加载这两个模型。
- import torch
- from peft import PeftModel
- from typing import List, Union, Dict, Optional, Tuple, Any
- import os
-
- # os.environ["CUDA_VISIBLE_DEVICES"] = "3"
- import transformers
- from loguru import logger
- from transformers import AutoTokenizer, AutoModelForCausalLM
-
- from datas import com
-
- device = torch.device("cuda")
-
- base_model_path = "/root/.cache/modelscope/hub/qwen/Qwen1___5-7B-Chat"
- lora_model_path = "/root/autodl-tmp/Qwen-our/sft/checkpoint-4"
- test_data_path = "/root/autodl-tmp/Qwen-our/datas/handled/devs"
-
-
- def init_model(
- base_model_path: str, lora_model_path: str
- ) -> Tuple[AutoTokenizer, PeftModel]:
- """
- Args:
- base_model_path (str): _description_
- lora_model_path (str): _description_
- Returns:
- Tuple[AutoTokenizer, PeftModel]: _description_
- """
- config = transformers.AutoConfig.from_pretrained(
- base_model_path,
- cache_dir=None,
- )
- config.use_cache = False
- base_model = AutoModelForCausalLM.from_pretrained(
- base_model_path,
- config=config,
- cache_dir=None,
- device_map="auto",
- quantization_config=None,
- torch_dtype=torch.bfloat16,
- ).to(device)
-
- tokenizer = AutoTokenizer.from_pretrained(
- base_model_path,
- cache_dir=None,
- model_max_length=1024,
- padding_side="right",
- use_fast=False,
- )
-
- new_model = PeftModel.from_pretrained(
- base_model,
- lora_model_path,
- device_map="auto",
- torch_dtype=torch.bfloat16,
- ).to(device)
-
- return tokenizer, new_model
-
-
- def format(instance: List[Dict[str, Any]]) -> List[Dict[str, str]]:
- message = []
- for i, ins in enumerate(instance):
- system = ins["system"]
- query = ins["query"]
- answer = ins["answer"]
- if i == 0:
- message.append({"role": "system", "content": system})
- message.append({"role": "user", "content": query})
- message.append({"role": "assistant", "content": answer})
-
- return message
-
-
- def processing(
- tokenizer: AutoTokenizer,
- messages: List[Dict[str, str]],
- model_max_length: int,
- ) -> List[int]:
-
- def _parse_messages(
- messages: List[Dict[str, str]], split_role: str = "user"
- ) -> Tuple[str, List[List[Dict[str, str]]]]:
- system, rounds = "", []
- round = []
- for i, message in enumerate(messages):
- if message["role"] == "system":
- assert i == 0
- system = message["content"]
- continue
- # 结束一轮对话才将数据添加到 rounds
- if message["role"] == split_role and round:
- rounds.append(round)
- round = []
- round.append(message)
- if round: # 最后的数据也添加到 rounds
- rounds.append(round)
-
- return system, rounds
-
- system, rounds = _parse_messages(messages)
- system_ids = tokenizer.encode(f"<|im_start|>system\n{system}<|im_end|>\n")
- input_ids: List[int] = []
-
- # 如果是多轮对话时, 只取后面几轮对话, 即[n:]
- for i, round in enumerate(rounds[::-1]): # 从后往前遍历
- # 一轮完整对话
- text_id = []
- for message in round:
- role, content = message["role"], message["content"]
- if role == "user":
- if i == 0: # 末尾需要特殊处理
- cont = f"<|im_start|>user\n{content}<|im_end|>\n<|im_start|>assistant\n"
- else:
- cont = f"<|im_start|>user\n{content}<|im_end|>\n"
- else:
- cont = f"<|im_start|>assistant\n{content}<|im_end|>\n"
-
- # [user + assistant] or [user]
- text_id = text_id + tokenizer.encode(cont)
-
- # 如果当前对话添加到当前轮数对话时不会超出模型最大长度, 则将其添加到对话中
- if len(system_ids + input_ids + text_id) > model_max_length:
- break
- else:
- input_ids = text_id + input_ids
-
- # system + [query + answer, ..., query]
- return system_ids + input_ids
-
-
- def main(model_max_length: int = 1024, debug: bool = False):
- # 初始化模型
- tokenizer, model = init_model(base_model_path, lora_model_path)
-
- # 加载数据
- paths: List[str] = com.get_paths(test_data_path)
- for path in paths:
- # 加载数据
- message: List[List[Dict[str, str]]] = format(com.load(path))
- text_ids: List[int] = processing(tokenizer, message, model_max_length)
- # debug
- if debug:
- logger.debug(f"{len(text_ids)} {tokenizer.decode(text_ids)}")
- # 推理
- input_ids = torch.tensor([text_ids]).to(device)
- generated_ids = model.generate(input_ids=input_ids, max_new_tokens=512)
- generated_ids = [
- output_ids[len(input_ids) :]
- for input_ids, output_ids in zip(input_ids, generated_ids)
- ]
- response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
- # 保存数据
- com.dump(
- path.replace(f"{os.sep}handled{os.sep}", f"{os.sep}results{os.sep}"),
- message + [{"role": "predict", "content": response}],
- )
-
-
- if __name__ == "__main__":
- main(model_max_length=1024, debug=False)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。