赞
踩
小李哥将继续每天介绍一个基于亚马逊云科技AWS云计算平台的全球前沿AI技术解决方案,帮助大家快速了解国际上最热门的云计算平台亚马逊云科技AWS AI最佳实践,并应用到自己的日常工作里。
本次我将介绍如何用亚马逊云科技的AI模型训练服务Amazon SageMaker和PEFT、RLHF框架高效微调AI大模型FLAN-T5-BASE,减少大模型回复过程中的潜在有害内容。我将带领大家手把手通过一行一行的代码学会AI模型的微调,0基础学会AI核心技能。本架构设计还包括了与用户交互的前后端应用,全部采用了云原生Serverless架构,提供可扩展和安全的AI应用解决方案。本方案架构图如下
参数高效微调(PEFT)和基于人类反馈的强化学习(RLHF)都是我们在 AI 大模型微调中使用的常见方法。PEFT 通过选择性地调整模型的一部分参数,提高了微调过程的效率和资源利用率,而 RLHF 则通过引入人类反馈,优化模型的表现,减少偏见和有害暗示。这两种方法各有优势,可以互补使用,以实现更高效、更可靠的机器学习模型微调,满足不同应用场景的需求。
参数高效微调(Parameter-Efficient Fine-Tuning, PEFT)是一种在微调预训练模型时只调整部分参数的方法。与传统的全参数微调相比,PEFT 专注于模型中的一小部分参数,这不仅可以减少计算资源的消耗,还能提高微调过程的效率。PEFT 方法通过选择性地更新模型参数,保持了模型的整体结构和大部分预训练信息,从而在保持高性能的同时,显著减少训练时间和资源消耗。这种方法特别适用于资源有限的开发环境,同时也能在大规模模型上取得优异的效果。
基于人类反馈的强化学习(Reinforcement Learning from Human Feedback, RLHF)是一种结合人类反馈来优化机器学习模型的算法。RLHF 通过引入人类在训练过程中的评价和反馈,指导模型进行自我调整和优化。具体而言,RLHF 利用人类标注的奖励信号来更新模型的策略,使其更符合预期的行为和输出。这种方法可以显著提升模型的表现,尤其在处理涉及伦理和偏见的问题时,RLHF 能够通过人类的反馈来减少有害暗示和不良行为,提高模型的安全性和可靠性。RLHF 在对话系统、推荐系统等应用中表现出色,能够有效提高用户体验和模型的实际应用价值。
1. 首先我们打开亚马逊云科技控制台,打开SageMaker服务主页,进入到我们的SageMaker Studio中。
2. 接下来我们创建一个新的Jupyter Notebook,开始我们的模型微调。首先我们安装必要的依赖
- %%capture
- %pip install torch==2.0.1 torchdata
- %pip install transformers==4.28.1
- %pip install datasets==2.17.0
- %pip install accelerate==0.16.0
- %pip install evaluate==0.4.0
- %pip install trl==0.7.1
- %pip install rouge_score==0.1.2
- %pip install loralib==0.1.1
- %pip install peft==0.3.0
- %pip install -q awswrangler
3. 在Notebook中导入必要的依赖
- from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification, AutoModelForSeq2SeqLM, GenerationConfig
- from peft import PeftModel, PeftConfig, LoraConfig, TaskType
-
- # trl: Transformer Reinforcement Learning library
- from trl import PPOTrainer, PPOConfig, AutoModelForSeq2SeqLMWithValueHead
- from trl import create_reference_model
- from trl.core import LengthSampler
-
- import torch
- import evaluate
- import numpy as np
- import pandas as pd
- import peft
-
- # tqdm library makes the loops show a smart progress meter.
- from tqdm import tqdm
- tqdm.pandas()

4. 接下来我们导入我们用到的测试数据集“knkarthick/dialogsum”和AI大模型"google/flan-t5-base"
- from datasets import load_dataset
-
- model_name="google/flan-t5-base"
- huggingface_dataset_name = "knkarthick/dialogsum"
-
- dataset_original = load_dataset(huggingface_dataset_name)
-
- dataset_original
5. 下面我们对数据集预处理,定义函数build_dataset选择合适长度(200-1000字符)的数据,初始化Tokenizer, 将数据集数据封装到提示词并解码成PPO库的标准格式,按2/8比例分割数据集为测试集和训练集。
- from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification, AutoModelForSeq2SeqLM, GenerationConfig
-
- def build_dataset(model_name,
- dataset_name,
- input_min_text_length,
- input_max_text_length):
-
- # Load dataset (the "train" part only is enough for this lab).
- dataset = load_dataset(dataset_name, split="train")
-
- # Filter the dialogues of length between input_min_text_length and input_max_text_length characters.
- dataset = dataset.filter(lambda x: len(x["dialogue"]) > input_min_text_length and len(x["dialogue"]) <= input_max_text_length, batched=False)
-
- # Prepare the tokenizer. Setting device_map="auto" allows to switch between GPU and CPU automatically.
- tokenizer = AutoTokenizer.from_pretrained(model_name, device_map="auto",force_download=True)
-
- def tokenize(sample):
-
- # Wrap each dialogue with the instruction.
- prompt = f"""
- Summarize the following conversation.
- {sample["dialogue"]}
- Summary:
- """
- sample["input_ids"] = tokenizer.encode(prompt)
-
- # This must be called "query", which is a requirement of our PPO library.
- sample["query"] = tokenizer.decode(sample["input_ids"])
- return sample
-
- # Tokenize each dialogue.
- dataset = dataset.map(tokenize, batched=False)
- dataset.set_format(type="torch")
-
- # Split the dataset into train and test parts.
- dataset_splits = dataset.train_test_split(test_size=0.2, shuffle=False, seed=42)
-
- return dataset_splits
-
- dataset = build_dataset(model_name=model_name,
- dataset_name=huggingface_dataset_name,
- input_min_text_length=200,
- input_max_text_length=1000)
-
- print(dataset)

6. 我们定义一个函数来查看模型参数数量
- def print_number_of_trainable_model_parameters(model):
- trainable_model_params = 0
- all_model_params = 0
- for _, param in model.named_parameters():
- all_model_params += param.numel()
- if param.requires_grad:
- trainable_model_params += param.numel()
- return f"\ntrainable model parameters: {trainable_model_params}\nall model parameters: {all_model_params}\npercentage of trainable model parameters: {100 * trainable_model_params / all_model_params:.2f}%"
7. 下面我们为大语言模型添加适配器LoRA,并结合 PEFT 方法,可以在保持模型性能的同时,大幅减少微调所需的参数数量和计算资源。
- lora_config = LoraConfig(
- r=32, # Rank
- lora_alpha=32,
- target_modules=["q", "v"],
- lora_dropout=0.05,
- bias="none",
- task_type=TaskType.SEQ_2_SEQ_LM # FLAN-T5
- )
-
- model = AutoModelForSeq2SeqLM.from_pretrained(model_name,
- torch_dtype=torch.bfloat16)
-
- peft_model = PeftModel.from_pretrained(model,
- './peft-dialogue-summary-checkpoint-from-s3/',
- lora_config=lora_config,
- torch_dtype=torch.bfloat16,
- device_map="auto",
- is_trainable=True)
-
- print(f'PEFT model parameters to be updated:\n{print_number_of_trainable_model_parameters(peft_model)}\n')

8. 基于摸大型加载并创建 PPO 模型,用于之后的Reinforcement Learning
- ppo_model = AutoModelForSeq2SeqLMWithValueHead.from_pretrained(peft_model,
- torch_dtype=torch.bfloat16,
- is_trainable=True)
-
- ref_model = create_reference_model(ppo_model)
9. 接下来我们创建奖励模型Meta RoBERTa,用来评估数据集对话中的有害内容。
- toxicity_model_name = "facebook/roberta-hate-speech-dynabench-r4-target"
- toxicity_tokenizer = AutoTokenizer.from_pretrained(toxicity_model_name, device_map="auto")
- toxicity_model = AutoModelForSequenceClassification.from_pretrained(toxicity_model_name, device_map="auto")
- print(toxicity_model.config.id2label)
10. 我们输入两个实例评估他们是否属于仇恨言论,并输出仇恨分数。仇恨Logit分数越高,则表示这段话越有可能是仇恨言论。
- non_toxic_text = "You are a great person and i like you."
-
- toxicity_input_ids = toxicity_tokenizer(non_toxic_text, return_tensors="pt").input_ids
-
- logits = toxicity_model(input_ids=toxicity_input_ids).logits
- print(f'logits [not hate, hate]: {logits.tolist()[0]}')
-
- # Print the probabilities for [not hate, hate]
- probabilities = logits.softmax(dim=-1).tolist()[0]
- print(f'probabilities [not hate, hate]: {probabilities}')
-
- # get the logits for "not hate" - this is the reward!
- not_hate_index = 0
- nothate_reward = (logits[:, not_hate_index]).tolist()
- print(f'reward (value of "not hate" logit): {nothate_reward}')
-
-
- toxic_text = "You are a terrible person and i hate you."
-
- toxicity_input_ids = toxicity_tokenizer(toxic_text, return_tensors="pt").input_ids
-
- logits = toxicity_model(toxicity_input_ids).logits
- print(f'logits [not hate, hate]: {logits.tolist()[0]}')
-
- # Print the probabilities for [not hate, hate]
- probabilities = logits.softmax(dim=-1).tolist()[0]
- print(f'probabilities [not hate, hate]: {probabilities}')
-
- # Get the logits for "not hate" - this is the reward!
- nothate_reward = (logits[:, not_hate_index]).tolist()
- print(f'reward (value of "not hate" logit): {nothate_reward}')

11. 创建一个大模型生成内容分析管道,从生成内容中获取结果。
- device = 0 if torch.cuda.is_available() else "cpu"
-
- sentiment_pipe = pipeline("sentiment-analysis",
- model=toxicity_model_name,
- device=device)
- reward_logits_kwargs = {
- "top_k": None, # Return all scores.
- "function_to_apply": "none", # Set to "none" to retrieve raw logits.
- "batch_size": 16
- }
-
- reward_probabilities_kwargs = {
- "top_k": None, # Return all scores.
- "function_to_apply": "softmax", # Set to "softmax" to apply softmax and retrieve probabilities.
- "batch_size": 16
- }
-
- print("Reward model output for non-toxic text:")
- print(sentiment_pipe(non_toxic_text, **reward_logits_kwargs))
- print(sentiment_pipe(non_toxic_text, **reward_probabilities_kwargs))
- print("\nReward model output for toxic text:")
- print(sentiment_pipe(toxic_text, **reward_logits_kwargs))
- print(sentiment_pipe(toxic_text, **reward_probabilities_kwargs))

12. 下面我们定一个生成内容的评估器,用于分析生成内容的有害性。定义函数evaluate_toxicity,将模型有害性以0-1之间的分数展示,1表示最高有害性。并且在微调之前得到模型的有害性分数。
- toxicity_evaluator = evaluate.load("toxicity",
- toxicity_model_name,
- module_type="measurement",
- toxic_label="hate")
-
- toxicity_score = toxicity_evaluator.compute(predictions=[
- non_toxic_text
- ])
-
- print("Toxicity score for non-toxic text:")
- print(toxicity_score["toxicity"])
-
- toxicity_score = toxicity_evaluator.compute(predictions=[
- toxic_text
- ])
-
- print("\nToxicity score for toxic text:")
- print(toxicity_score["toxicity"])
-
- def evaluate_toxicity(model,
- toxicity_evaluator,
- tokenizer,
- dataset,
- num_samples):
-
- max_new_tokens=100
-
- toxicities = []
- input_texts = []
- for i, sample in tqdm(enumerate(dataset)):
- input_text = sample["query"]
-
- if i > num_samples:
- break
-
- input_ids = tokenizer(input_text, return_tensors="pt", padding=True).input_ids
-
- generation_config = GenerationConfig(max_new_tokens=max_new_tokens,
- tok_k=0.0,
- top_p=1.0,
- do_sample=True)
-
- response_token_ids = model.generate(input_ids=input_ids,
- generation_config=generation_config)
-
- generated_text = tokenizer.decode(response_token_ids[0], skip_special_tokens=True)
-
- toxicity_score = toxicity_evaluator.compute(predictions=[(input_text + " " + generated_text)])
-
- toxicities.extend(toxicity_score["toxicity"])
-
- # Compute mean and std using np.
- mean = np.mean(toxicities)
- std = np.std(toxicities)
-
- return mean, std
-
- tokenizer = AutoTokenizer.from_pretrained(model_name, device_map="auto")
-
- mean_before_detoxification, std_before_detoxification = evaluate_toxicity(model=ref_model,
- toxicity_evaluator=toxicity_evaluator,
- tokenizer=tokenizer,
- dataset=dataset["test"],
- num_samples=10)
-
- print(f'toxicity [mean, std] before detox: [{mean_before_detoxification}, {std_before_detoxification}]')

18. 接下来我们利用PPO对大语言模型进行强化学习,降低有害回复。我们首先配置PPO模型和模型超参数并创建PPO训练器trainer,用于对大语言模型进行微调强化训练。
- learning_rate=1.41e-5
- max_ppo_epochs=1
- mini_batch_size=4
- batch_size=16
-
- config = PPOConfig(
- model_name=model_name,
- learning_rate=learning_rate,
- ppo_epochs=max_ppo_epochs,
- mini_batch_size=mini_batch_size,
- batch_size=batch_size
- )
-
-
- def collator(data):
- return dict((key, [d[key] for d in data]) for key in data[0])
-
- ppo_trainer = PPOTrainer(config=config,
- model=ppo_model,
- ref_model=ref_model,
- tokenizer=tokenizer,
- dataset=dataset["train"],
- data_collator=collator)

19. 接下来我们配置训练的参数,模型内容生成参数和奖励参数,开始对PPO模型进行强化训练,这个训练器会最大化positive正向回复内容的奖励分数,使微调后的模型回复分数接近于1。
- output_min_length = 100
- output_max_length = 400
- output_length_sampler = LengthSampler(output_min_length, output_max_length)
-
- generation_kwargs = {
- "min_length": 5,
- "top_k": 0.0,
- "top_p": 1.0,
- "do_sample": True
- }
-
- reward_kwargs = {
- "top_k": None, # Return all scores.
- "function_to_apply": "none", # You want the raw logits without softmax.
- "batch_size": 16
- }
-
- max_ppo_steps = 10
-
- for step, batch in tqdm(enumerate(ppo_trainer.dataloader)):
- # Break when you reach max_steps.
- if step >= max_ppo_steps:
- break
-
- prompt_tensors = batch["input_ids"]
-
- # Get response from FLAN-T5/PEFT LLM.
- summary_tensors = []
-
- for prompt_tensor in prompt_tensors:
- max_new_tokens = output_length_sampler()
-
- generation_kwargs["max_new_tokens"] = max_new_tokens
- summary = ppo_trainer.generate(prompt_tensor, **generation_kwargs)
-
- summary_tensors.append(summary.squeeze()[-max_new_tokens:])
-
- # This needs to be called "response".
- batch["response"] = [tokenizer.decode(r.squeeze()) for r in summary_tensors]
-
- # Compute reward outputs.
- query_response_pairs = [q + r for q, r in zip(batch["query"], batch["response"])]
- rewards = sentiment_pipe(query_response_pairs, **reward_kwargs)
-
- # You use the "nothate" item because this is the score for the positive "nothate" class.
- reward_tensors = [torch.tensor(reward[not_hate_index]["score"]) for reward in rewards]
-
- # Run the PPO step.
- stats = ppo_trainer.step(prompt_tensors, summary_tensors, reward_tensors)
- ppo_trainer.log_stats(stats, batch, reward_tensors)
-
- print(f'objective/kl: {stats["objective/kl"]}')
- print(f'ppo/returns/mean: {stats["ppo/returns/mean"]}')
- print(f'ppo/policy/advantages_mean: {stats["ppo/policy/advantages_mean"]}')
- print('-'.join('' for x in range(100)))

20. 接下来我们利用“evaluate_toxicity”函数,对微调后的模型进行评估
- mean_after_detoxification, std_after_detoxification = evaluate_toxicity(model=ppo_model,
- toxicity_evaluator=toxicity_evaluator,
- tokenizer=tokenizer,
- dataset=dataset["test"],
- num_samples=10)
- print(f'toxicity [mean, std] after detox: [{mean_after_detoxification}, {std_after_detoxification}]')
21. 再将模型有害性分数和训练前进行比较。
- mean_improvement = (mean_after_detoxification - mean_before_detoxification) / mean_before_detoxification
- std_improvement = (std_after_detoxification - std_before_detoxification) / std_before_detoxification
-
-
- print(f'Percentage improvement of toxicity score after detoxification:')
- print(f'mean: {mean_improvement*100:.2f}%')
- print(f'std: {std_improvement*100:.2f}%')
22. 获取微调前的响应和微调后的响应的数据/正向分数、正向分数差、用户提问,将其作为多列存到dataframe里。
- batch_size = 20
- compare_results = {}
-
- df_batch = dataset["test"][0:batch_size]
-
- compare_results["query"] = df_batch["query"]
- prompt_tensors = df_batch["input_ids"]
-
- summary_tensors_ref = []
- summary_tensors = []
-
- # Get response from the PPO and base model.
- for i in tqdm(range(batch_size)):
- gen_len = output_length_sampler()
- generation_kwargs["max_new_tokens"] = gen_len
-
- summary = ref_model.generate(
- input_ids=torch.as_tensor(prompt_tensors[i]).unsqueeze(dim=0).to(device),
- **generation_kwargs
- ).squeeze()[-gen_len:]
- summary_tensors_ref.append(summary)
-
- summary = ppo_model.generate(
- input_ids=torch.as_tensor(prompt_tensors[i]).unsqueeze(dim=0).to(device),
- **generation_kwargs
- ).squeeze()[-gen_len:]
- summary_tensors.append(summary)
-
- # Decode responses.
- compare_results["response_before"] = [tokenizer.decode(summary_tensors_ref[i]) for i in range(batch_size)]
- compare_results["response_after"] = [tokenizer.decode(summary_tensors[i]) for i in range(batch_size)]
-
- # Sentiment analysis of query/response pairs before/after.
- texts_before = [d + s for d, s in zip(compare_results["query"], compare_results["response_before"])]
- rewards_before = sentiment_pipe(texts_before, **reward_kwargs)
- compare_results["reward_before"] = [reward[not_hate_index]["score"] for reward in rewards_before]
-
- texts_after = [d + s for d, s in zip(compare_results["query"], compare_results["response_after"])]
- rewards_after = sentiment_pipe(texts_after, **reward_kwargs)
- compare_results["reward_after"] = [reward[not_hate_index]["score"] for reward in rewards_after]
-
- pd.set_option('display.max_colwidth', 500)
- df_compare_results = pd.DataFrame(compare_results)
- df_compare_results["reward_diff"] = df_compare_results['reward_after'] - df_compare_results['reward_before']
- df_compare_results_sorted = df_compare_results.sort_values(by=['reward_diff'], ascending=False).reset_index(drop=True)
- df_compare_results_sorted

23. 最后我们利用awswrangler库进行数据处理,为微调前的响应和微调后的响应的数据、添加索引列,存入DynamoDB中
- import awswrangler as wr
-
- # Add an index column to the data frame to act as the partition key
- df_compare_results['index'] = range(1, len(df_compare_results) + 1)
-
- # Create a results dataframe,reorganized with DynamoDB table attributes
- result = pd.DataFrame({
- "conversation_id": df_compare_results['index'],
- "query": df_compare_results['query'],
- "response_before": df_compare_results['response_before'],
- "response_after": df_compare_results['response_after']
- })
-
- # Upload result to DDB
- wr.dynamodb.put_df(df=result, table_name='llm_with_rlhf')
-
-

24. 通过Cloudfront打开S3内的静态网页,html文件会触发后端API Gateway调用数据库将数据显示到页面上。
以上就是在亚马逊云科技上利用SageMaker微调大模型,减少回复中的有害内容的全部步骤。欢迎大家关注小李哥,未来获取更多国际前沿的生成式AI开发方案!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。