当前位置:   article > 正文

详解带RLHF的类ChatGPT:从TRL、ChatLLaMA到ColossalChat、DSC_类chatgpt逐行代码解读(1/2):从零实现transformer、chatglm-6b

类chatgpt逐行代码解读(1/2):从零实现transformer、chatglm-6b

本文为《类ChatGPT逐行代码解读》系列的第二篇,上一篇是:从零实现Transformer、ChatGLM-6B:从位置编码/缩放点积注意力/多头注意力开始

本文模型的特点是都加了RLHF,对于这4个模型而言:TRL、ChatLLaMA、ColossalChat、DeepSpeed Chat

  • 如果只关注两个 则可以更多关注下ColossalChat、DeepSpeed Chat,原因在于ColossalChat给的图示特别好,而DeepSpeed Chat的实现很清晰
  • 如果有读者说 就只想看一个,则推荐DeepSpeed Chat(简称DSC),特别是DSC会给你一个完整而通透的“PPO算法/RLHF”的代码实现全流程,好的资料可以让你事半功倍

总之,微软这个DeepSpeed Chat实现的不错,抠完它的关键代码后,你会发现和之前本博客内另一篇写的原理部分都一一对应起来了(如果你还没看过原理,建议先看此文:ChatGPT技术原理解析,只有懂原理才能更好的理解实现或实际实现,特别是该文的第三部分 ),而把论文、原理/算法、公式、代码一一对应,可以让你的理解有个质变

本文最早的标题是:从零实现带RLHF的类ChatGPT:从TRL/ChatLLaMA/ColossalChat到DeepSpeed Chat,后来因为要不断扩展DSC的内容,为避免本文越写越长,故最终分出了两篇文章

  1. 本文侧重:TRL、ChatLLaMA、ColossalChat
  2. 新文侧重:从零实现带RLHF的类ChatGPT:逐行解析微软DeepSpeed Chat

第一部分 PPO算法微调LM的TRL包

1.1 TRL包:类似ChatGPT训练阶段三的PPO方式微调语言模型

通过《ChatGPT技术原理解析》一文,我们已经知道了ChatGPT的三阶段训练过程,其中,阶段三的本质其实就是通过PPO的方式去微调LM

GitHub上有个TRL(Transformer Reinforcement Learning,基于『Hugging Face开发的Transformer库』),便是通过PPO的方式去微调LM,需要的数据便是三元组「query, response, reward」,具体如下图所示

  1. Rollout:语言模型根据query生成response
  2. 评估:怎么评估模型针对特定query生成response的质量呢,我们可以使用a function、model、human feedback或它们的某种组合进行评估,然后为每个query/response对产生一个标量值,说白了 就是奖励模型有了,那就直接打分
  3. 优化:在优化步骤中,「query/response pairs」用于计算序列中标记的对数概率,且比较下面这两个模型输出之间的 KL 散度用作额外的奖励信号
    \rightarrow  经过训练的模型(即上图中的Active model)
    \rightarrow  基线模型(即上图中的Reference model),通常是PPO微调之前的模型(比如这里的GPT2,或者instructGPT里的SFT)
    最终,使得Active model生成的响应不会偏离基线模型Reference model太远

1.2 通过TRL包实现基于AC架构的PPO算法

PPO算法是一种具体的Actor-Critic算法实现,比如在对话机器人中,输入的prompt是state,输出的response是action,想要得到的策略就是怎么从prompt生成action能够得到最大的reward,也就是拟合人类的偏好。具体实现时,可以按如下两大步骤实现

  1. 首先定义4个模型:Actor(action_logits)、SFT(sft_logits)、Critic(value)、RM「r(x, y)」,和kl_div、reward、优势函数adv
    从prompt库中采样出来的prompt在经过SFT(微调过GPT3/GPT3.5的模型称之为SFT)做generate得到一个response,这个『prompt + response』定义为sequence(这个采样的过程是批量采样进行generate,得到一个sequence buffer),然后这个sequence buffer的内容做batched之后输入给4个模型做inference

    这4个模型分别为Actor、SFT、Critic、RM,其中:
    Actor和SFT都是175B的模型,且Actor参数由SFT初始化(SFT是baseline),Actor输出action_logits,SFT输出sft_logits
    sft_logits和action_logits做kl_div,为了约束actor模型的更新step不要偏离原始模型SFT太远

    Critic和RM是6B的模型,Critic参数由RM初始化
    Critic输出标量value,RM输出标量r(x, y),由r(x, y)和kl_div计算得到reward,reward和value计算得到adv
  2. 其次,通过pg_loss和value_loss优化迭代
    Actor的流程是取出sequence,然后inference生成新的logits,再和sequence对应的之前的logits计算ratio,和adv计算出pg_loss,也就是actor的loss,然后反向传播,优化器迭代
    Critic的流程是取出sequence,然后inference得到新的value,和old_value做clip_value,再和reward计算value loss,然后反向传播,优化器迭代

以下是计算策略损失和价值损失的关键代码(来自trl/ppo_trainer.py at main · lvwerra/trl · GitHub的第971-1032行),且为方便大家阅读时一目了然,我特意给每一行的代码都加上了注释

  1. def loss(
  2. self,
  3. old_logprobs: torch.FloatTensor, # 旧的对数概率,是前一步的策略输出
  4. values: torch.FloatTensor, # 价值函数的输出
  5. rewards: torch.FloatTensor, # 从环境中得到的奖励
  6. logits: torch.FloatTensor, # 策略网络的原始输出(未经softmax)
  7. vpreds: torch.FloatTensor, # 价值函数的预测值
  8. logprobs: torch.FloatTensor, # 当前策略输出的对数概率
  9. mask: torch.LongTensor, # 用于忽略某些元素(如填充元素)的掩码
  10. ):
  11. lastgaelam = 0 # 初始化lastgaelam,用于计算广义优势估计(GAE)
  12. advantages_reversed = [] # 初始化一个空列表,用于存储计算出的逆序优势值
  13. gen_len = rewards.shape[-1] # 获取奖励的长度,即序列的长度
  14. values = values * mask # 使用掩码对值进行过滤
  15. rewards = rewards * mask # 使用掩码对奖励进行过滤
  16. # 反向遍历时间步,计算每一步的优势值
  17. for t in reversed(range(gen_len)):
  18. # 获取下一个状态的值
  19. nextvalues = values[:, t + 1] if t < gen_len - 1 else 0.0
  20. # 计算TD误差
  21. delta = rewards[:, t] + self.config.gamma * nextvalues - values[:, t]
  22. # 更新lastgaelam
  23. lastgaelam = delta + self.config.gamma * self.config.lam * lastgaelam
  24. # 将计算出的优势值添加到列表中
  25. advantages_reversed.append(lastgaelam)
  26. # 对逆序的优势值进行逆序,使其按照原始顺序,然后将其堆叠起来并进行转置,得到最终的优势值
  27. advantages = torch.stack(advantages_reversed[::-1]).transpose(0, 1)
  28. # 计算回报,即优势值加上对应的value
  29. returns = advantages + values
  30. # 对优势值进行掩码白化,即只对掩码部分进行白化处理
  31. advantages = masked_whiten(advantages, mask)
  32. # 从计算图中分离优势值,防止反向传播
  33. advantages = advantages.detach()
  34. # 对预测值进行剪裁,防止预测值偏离真实值过远
  35. vpredclipped =
  36. clip_by_value(vpreds, values - self.config.cliprange_value, values + self.config.cliprange_value)
  37. # 计算预测值与回报之间的平方损失
  38. vf_losses1 = (vpreds - returns) ** 2
  39. # 计算剪裁后的预测值与回报之间的平方损失
  40. vf_losses2 = (vpredclipped - returns) ** 2
  41. # 计算价值函数的损失,选择两种损失中的较大者,然后计算其平均值,并乘以0.5
  42. vf_loss = 0.5 * masked_mean(torch.max(vf_losses1, vf_losses2), mask)
  43. # 计算剪裁损失比原始损失大的部分的平均值
  44. vf_clipfrac = masked_mean(torch.gt(vf_losses2, vf_losses1).double(), mask)
  45. # 计算新旧策略的比例
  46. ratio = torch.exp(logprobs - old_logprobs)
  47. # 计算策略梯度损失
  48. pg_losses = -advantages * ratio
  49. # 计算剪裁后的策略梯度损失
  50. pg_losses2 =
  51. -advantages * torch.clamp(ratio, 1.0 - self.config.cliprange, 1.0 + self.config.cliprange)
  52. # 计算策略损失,选择两种损失中的较大者,然后计算其平均值
  53. pg_loss = masked_mean(torch.max(pg_losses, pg_losses2), mask)
  54. # 计算剪裁损失比原始损失大的部分的平均值
  55. pg_clipfrac = masked_mean(torch.gt(pg_losses2, pg_losses).double(), mask)
  56. # 计算总损失,等于策略损失加上价值损失
  57. loss = pg_loss + self.config.vf_coef * vf_loss

上面代码中 有两点值得解释下

  1. 计算回报时,为何是优势值加上对应的value值
    别忘了,根据本博客中另一篇文章《RL极简入门》可知
    优势函数A(s,a)定义为Q(s,a) - V(s),其中Q(s,a)是动作价值函数,表示在状态s采取动作a所能获得的预期回报
    而V(s)则是状态价值函数,表示在状态s下依据当前策略所能获得的预期回报

    因此,当我们计算returns时,实际上是在计算Q(s,a)的估计值,即预期的动作价值
    所以,returns = advantages + values = (Q(s,a) - V(s)) + V(s) = Q(s,a)
    这样,returns就代表了我们预期能在状态s采取动作a获得的回报

  2. 为何计算策略损失和价值损失时,要先裁剪,然后对比裁剪前后的损失,最后取两者之中更大的值呢?
    首先关于计算策略损失时,为何对比截断前后的值
    这点可以回顾下本博客内的《ChatGPT技术原理解析一文》或RL极简入门
    \begin{aligned} J_{\mathrm{PPO2}}^{\theta'}(\theta) \approx \sum_{\left(s_{t}, a_{t}\right)} \min &\left(\frac{p_{\theta}\left(a_{t} | s_{t}\right)}{p_{\theta'}\left(a_{t} | s_{t}\right)} A^{\theta'}\left(s_{t}, a_{t}\right),{clip}\left(\frac{p_{\theta}\left(a_{t} | s_{t}\right)}{p_{\theta'}\left(a_{t} | s_{t}\right)}, 1-\varepsilon, 1+\varepsilon\right) A^{\theta'}\left(s_{t}, a_{t}\right)\right) \end{aligned}
    至于为何取更大
    原因在于策略损失带了个负号:pg_losses = -advantages * ratio ,原本应该是无负号取min的,但是带了负号就取max了

    那价值损失呢,为何也要对比裁剪前后的损失 然后取两者之中的更大值呢?首先,做对价值的估计也做裁剪 好理解(也是控制估计范围,防止预测值偏离真实值过远),但为何也要对比裁剪前后然后取更大值呢(毕竟价值损失是平方损失:vf_losses1 = (vpreds - returns) ** 2,无所谓负号)?
    原因在于:1 有时裁剪容易把真实的价值损失偏低化,2 更大的损失利于做更多轮的迭代,当然 在迭代过程中控制估计范围防止跑偏

第二部分 LLaMA的RLHF版:ChatLLaMA(英文版)

由于LLaMA没有使用RLHF方法,初创公司 Nebuly AI开源了RLHF版的LLaMA,即ChatLLaMA

2.1 三套数据集:分别训练actor、reward、rlhf

其训练过程类似 ChatGPT,而通过本博客内的《ChatGPT技术原理解析》3.1节,可知训练三个模型(SFT、RM、RL/PPO)得先准备三套数据集

2.1.1 actor_training_data,即用于微调GPT3所用的数据

actor_training_data,即用于微调GPT3所用的数据,比如
[
  {
      "user_input": "here the input of the user",
      "completion": "here the model completion"
  }
]

actor_training_data如何而来呢,有4项途径

  1. 使用 100% 合成数据,可以通过运行以下命令综合生成数据集:
    python artifacts/generate_actor_dataset.py,注:此命令需要订阅OpenAI,生成完整数据集的davinci-003成本约为 200 美元(当然 也有免费的途径)
  2. 使用具有辅助交互的开源数据集之一,目前支持:

    Anthropic HH RLHF:这个数据集由结构化的 {question/answer pairs} 组成,包括机器人选择和拒绝的答案;
    Stanford Human Preferences Dataset (SHP):这个数据集是从选定的“提问”subreddits 中挑选出来的,并且包括基于最受支持的回答的范围广泛的 {question/answer pairs} 的问题
    可以运行以下命令下载数据集:

    python artifacts/download_dataset.py <dataset_name> --path <path_to_folder_for_download> --number_of_samples <N>

    其中:
    <dataset_name>对于 StanfordNLP/SHP 数据集,可以是“SHP”或“ARLHF”,对于 Anthropic/hh-rlhf 数据集,可以分别是“SHP”或“ARLHF”;
    <path_to_folder_for_download>是要创建数据集的文件夹路径;
    <N>是组成 reward_dataset.json 的样本数

  3. 使用 100% 个性化数据集
    用户提供自己的个性化完整数据集,数据集必须是具有以下格式的 JSON 文件:
    [
        {
            "user_input": "here the input of the user",
            "completion": "here the model completion"
        }
    ]

    其中列表包含多个dictionaries,每个dictionary 对应一个数据样本,建议使用超过 1000 个数据样本来进行对actor的训练

  4. 创建完整的数据集,增加一些自定义数据样本,数据集可以从用户提供的一些提示+响应示例中综合生成(少数 => 10)

2.1.2 reward_training_data,用于训练一个奖励模型的数据

reward_training_data,用于训练一个奖励模型的数据,包含三部分的数据: 
i) prompts,
ii) completion
iii) score of the completion assigned accordingly to the user feedback (the Human Feedback in RLHF,即对各个回答的评分score)

示例如下
[{
    "user_input": "...",
    "completion": "...",
    "score": 1
},
    ...
]


同样的,奖励数据怎么来呢?有以下三种方式

  1. be synthetically scored using a LLM as Human Feedback
    LLM 模型用于为每个entry计算分数
    为此,LLM 需要一个提示模板,其中包含评估生成的文本的所有说明(比如奖励规则,什么情况下该奖 什么情况下不奖都得十分明确)。为此,您应该将key reward添加到文件中templates.json,比如:

    {

        "reward": "Here is the template for the reward model. The rules are:\n\n1.Rule 1\n\n2. Rule 2"
    }

    如果未提供模板,则使用默认模板artifacts/generate_rewards.py,注:所有模板都必须保存在一个名为 .json 的 JSON 文件中templates.json

    获得unlabelled dataset后,您可以通过运行以下命令生成分数:

    python artifacts/generate_rewards.py <dataset_path> --model <model_to_use> --temperature <t> --max_tokens <n> --reward_template <path_to_file.json>

    其中,<dataset_path>要评分的reward dataset的路径;
    <model_to_use>用于奖励的模型,默认建议使用text-davinci-003
    <temperature>用于对模型进行评分的temperature,temperature =0.1;
    <max_tokens>
    <reward_template>,这是包含用于生成奖励的模板的文件的路径,如果未提供路径,将使用默认模版

    这里值得注意的是,与instructGPT中的「人类通过对模型的输出进行排序,然后利用这些排序数据去训练一个RM」不同,ChatLLaMA直接训练一个RM对模型的输出进行打分 比如0-5分,且与人类的打分做MSE损失(减少RM打分与人类打分之间的差距)

    1. REWARD_TEMPLATE = dict(
    2. template=(
    3. "You have to evaluate the following chat with a score"
    4. "between 0 and 5"

    最后,可能你会问,从哪里看出来的用的MSE损失,答案是从另外一个文件里看出来的(具体是chatllama/rlhf/reward.py 文件的第282行) 

    1. class RewardTrainer:
    2. """Class to train the reward model
    3. def __init__(self, config: ConfigReward) -> None:
    4. # save the config
    5. self.config = config
    6. # load the model
    7. self.reward = RewardModel(config)
    8. # optimizer
    9. self.optimizer = torch.optim.AdamW(
    10. self.reward.parameters(), lr=config.lr
    11. )
    12. # loss function
    13. self.loss_function = torch.nn.MSELoss()
    14. // ...
  2. 用户提供他们个性化的完整数据集(至少需要 100 个数据样本),但数据集必须是以下格式的 JSON 文件,取名为:reward_training_data.json

    1. [
    2. {
    3. "user_input": "here type the user input",
    4. "completion": "here type the completion",
    5. "score": 4.0
    6. },
    7. {
    8. "user_input": "here type the user input",
    9. "completion": "random garbage",
    10. "score": 0.0
    11. }
    12. ]
  3. 用户提供的少量示例和使用 LLM 综合扩展的数据集(通过self-instruct的方式提示LLM产生更多所需要的指令数据)

2.1.3 rlhf_training_data,通过RL方法不断优化迭代最优策略的数据

It can be provided in 2 different ways:

  • Few examples provided by the user and dataset synthetically expanded using LLM(依然可以

    继续通过self-instruct的方式提示LLM产生更多所需要的指令数据)
    需要将key rlhf添加到templates.json文件中,其中包含有关要执行的任务的信息以及 LLM 生成所需的额外上下文,这是模板的示例(所有模板必须保存在一个名为templates.json):

    {
      "rlhf": "Here is the template for the generating RLHF prompts. The task we want to perform is ..."
    }

  • The user provides the full dataset with possible interactions with the model

    数据集需要包含超过 1000 个提示示例(文件命名为rlhf_training_data.json):

    [
        {
            "user_input": "here the example of user input"
        }
    ]

2.2 训练流程:SFT、RM、RL/PPO训练三步骤

2.2.1 RewardTrainer 类的 train 方法训练一个奖励函数

chatllama/rlhf/reward.py中

首先定义了一个名为 Reward Model 的类,作为奖励模型或批评者模型(Critic Model)。Reward Model 是一个基于语言模型的模型,附加了一个头部head,用于预测给定的 token 序列的奖励(一个标量值),最后将CriticModel类设置为RewardModel类,以保持命名一致性

之后,定义类:RewardDatase用于训练奖励模型的数据集
RewardDataset 类是一个继承自 Dataset 的自定义数据集类,它的作用是从给定的 JSON 文件中读取数据,并将数据整理成适当的格式。JSON 文件应包含以下格式的数据:

  1. class RewardDataset(Dataset):
  2. """Dataset class for the reward model
  3. read a json file with the following format:
  4. [
  5. {
  6. "user_input": "...",
  7. "completion": "...",
  8. "score": ...
  9. },
  10. ...
  11. ]
  12. Where:
  13. user_input: the initial input of the user
  14. completion: the completion generated by the model
  15. score: the score given by the user to the completion (or by the LLM)
  16. """

其中 user_input 是用户的初始输入,completion 是模型生成的补全,而 score 是用户或LLM给予补全的分数

再定义一个RewardTrainer 类用于训练奖励模型,它初始化奖励模型、优化器、损失函数(具体如上文所说,或如282行所述的MSE损失函数)、数据集和数据加载器等。此外,它还支持使用 DeepSpeed 或 Accelerate(两种高性能深度学习训练框架)进行训练

RewardTrainer 类的主要方法有:
train:训练奖励模型。它执行训练循环,包括前向传播、计算损失、反向传播和优化器更新。在每个周期结束时,它还可以对模型进行验证(如果提供了验证数据集的话)

  • 首先是初始化
    1. # 定义构造函数
    2. def __init__(self, config: ConfigReward) -> None:
    3. # 保存配置对象
    4. self.config = config
    5. # 加载模型
    6. self.reward = RewardModel(config)
    7. # 创建优化器
    8. self.optimizer = torch.optim.AdamW(
    9. self.reward.parameters(), lr=config.lr
    10. )
    11. # 定义损失函数,用的交叉熵损失
    12. self.loss_function = torch.nn.MSELoss()
    13. # 检查验证数据集是否存在
    14. self.validation_flag = False
    15. if config.validation_dataset_path is not None:
    16. self.validation_flag = True
    17. # 创建数据集和数据加载器
    18. self.train_dataset = RewardDataset(config.train_dataset_path)
    19. self.train_dataloader = DataLoader(
    20. self.train_dataset, batch_size=config.batch_size
    21. )
    22. # 如果有验证数据集,则创建验证数据集和数据加载器
    23. if self.validation_flag:
    24. self.eval_dataset = RewardDataset(config.validation_dataset_path)
    25. self.validation_dataloader = DataLoader(
    26. self.eval_dataset, batch_size=config.batch_size
    27. )
    28. # 初始化学习率调度器 - 学习率将下降到初始值的10%
    29. self.scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(
    30. self.optimizer,
    31. T_0=len(self.train_dataset) // config.batch_size,
    32. T_mult=1,
    33. eta_min=config.lr * 0.1,
    34. last_epoch=-1,
    35. )
  • save_checkpoint:保存模型的检查点。在训练过程中,可以定期将模型的当前状态(包括模型参数、优化器状态和训练统计信息)保存为检查点,以便在需要时恢复训练
  • load_checkpoint:从检查点恢复模型。如果在训练过程中找到检查点文件,则该方法将从检查点恢复模型状态,并返回从何处恢复训练的周期和步骤
  • 接下来,是具体的训练过程
    1. def train(
    2. self,
    3. ) -> None:
    4. # 训练奖励模型
    5. # # 打印开始训练奖励模型的消息
    6. print("Start Training the Reward Model")
    7. # 如果启用了 DeepSpeed,从训练数据加载器获取批次大小
    8. if self.config.deepspeed_enable:
    9. batch_size = self.train_dataloader.batch_size
    10. else:
    11. batch_size = self.config.batch_size # 从配置获取批次大小
    12. epochs = self.config.epochs # 从配置获取训练轮次
    13. device = self.config.device # 从配置获取设备
    14. # 从配置获取每次打印的迭代次数
    15. iteration_per_print = self.config.iteration_per_print
    16. # 从配置获取检查点步骤
    17. checkpoint_steps = self.config.checkpoint_steps
    18. # 计算训练数据集的迭代次数
    19. n_iter = int(len(self.train_dataset) / batch_size)
    20. # 加载检查点并获取开始轮次和开始步骤
    21. start_epoch, start_step = self.load_checkpoint()
    22. # 初始化检查点计数器
    23. cnt_checkpoints = 1
    24. # 训练循环
    25. for epoch in range(start_epoch, epochs): # 对于每个轮次
    26. self.reward.train() # 将奖励模型设置为训练模式
    27. # 遍历训练数据加载器中的每个输入
    28. for i,inputs in enumerate(self.train_dataloader):
    29. # 如果从检查点恢复,则跳过步骤
    30. if i < start_step:
    31. continue
    32. # 获取输入
    33. input_text = inputs[0] # 获取输入文本
    34. score = inputs[1] # 获取分数
    35. # 对输入进行分词
    36. with torch.no_grad(): # 禁用梯度计算
    37. input_tokens = self.reward.tokenizer(
    38. input_text,
    39. return_tensors="pt",
    40. truncation=True,
    41. padding=True,
    42. )
    43. output = torch.as_tensor(
    44. score, dtype=torch.float32, device=device
    45. ) # 将分数转换为张量
    46. # 前向传播
    47. if self.config.deepspeed_enable: # 如果启用了 DeepSpeed
    48. est_output = self.model_engine(
    49. input_tokens["input_ids"].to(device),
    50. input_tokens["attention_mask"].to(device),
    51. )[:, -1] # 使用模型引擎进行前向传播
    52. else:
    53. est_output = self.reward.get_reward(
    54. input_tokens["input_ids"].to(device),
    55. input_tokens["attention_mask"].to(device),
    56. ) # 使用奖励模型进行前向传播
    57. # 计算损失函数
    58. loss = self.loss_function(est_output, output)
    59. # 将损失添加到训练统计数据中
    60. self.training_stats.training_loss.append(loss.item())
    61. # 反向传播
    62. if self.config.deepspeed_enable: # 如果启用了 DeepSpeed
    63. self.model_engine.backward(loss) # 使用模型引擎进行反向传播
    64. self.model_engine.step() # 更新模型参数
    65. elif self.config.accelerate_enable: # 如果启用了加速
    66. self.optimizer.zero_grad() # 将优化器的梯度归零
    67. self.accelerator.backward(loss) # 使用加速器进行反向传播
    68. self.optimizer.step() # 更新模型参数
    69. self.scheduler.step() # 更新学习率调度器
    70. else:
    71. self.optimizer.zero_grad() # 将优化器的梯度归零
    72. loss.backward() # 进行反向传播
    73. self.optimizer.step() # 更新模型参数
    74. self.scheduler.step() # 更新学习率调度器
    75. # 打印进度,如果当前迭代次数是打印间隔的整数倍
    76. if i % iteration_per_print == 0:
    77. print(
    78. f"Epoch: {epoch+1}/{epochs}, "
    79. f"Iteration: {i+1}/{n_iter}, "
    80. f"Training Loss: {loss.item()}"
    81. )
    82. printed_est_output = [
    83. round(float(x), 1) for x in est_output.cpu().tolist()
    84. ] # 对估计输出进行四舍五入
    85. print(
    86. "prediction",
    87. printed_est_output,
    88. "target",
    89. score.cpu().tolist(),
    90. ) # 打印预测值和目标值
    91. # 保存检查点,如果检查点计数器是检查点步骤的整数倍
    92. if cnt_checkpoints % checkpoint_steps == 0:
    93. self.save_checkpoint(epoch, i, epochs, n_iter) # 保存检查点
    94. cnt_checkpoints = 1 # 重置检查点计数器
    95. else:
    96. cnt_checkpoints += 1 # 检查点计数器加一
    97. # 验证
    98. if self.validation_flag: # 如果启用了验证
    99. self.reward.eval() # 将奖励模型设置为评估模式
    100. with torch.no_grad(): # 禁用梯度计算
    101. for i, (text, score) in enumerate(
    102. self.validation_dataloader
    103. ): # 遍历验证数据加载器中的每个输入
    104. # 对输入进行分词
    105. input_tokens = self.reward.tokenizer(
    106. text, return_tensors="pt", padding=True
    107. ) # 对输入文本进行分词
    108. input_tokens = input_tokens.to(device) # 将输入令牌移动到设备上
    109. # TODO: 检查输入令牌的长度,如果过长可能会导致问题
    110. output = torch.tensor(score, dtype=torch.float32).to(
    111. device
    112. ) # 将分数转换为张量并移动到设备上
    113. # 前向传播
    114. est_output = self.reward.get_reward(
    115. input_tokens["input_ids"],
    116. input_tokens["attention_mask"],
    117. ) # 使用奖励模型进行前向传播
    118. # 计算损失函数
    119. loss = self.loss_function(est_output, output)
    120. # 将损失添加到训练统计数据中
    121. self.training_stats.validation_loss.append(loss.item())
    122. # 打印进度,如果当前迭代次数是打印间隔的整数倍
    123. if i % iteration_per_print == 0:
    124. print(
    125. f"Epoch: {epoch+1}/{epochs}, "
    126. f"Iteration: {i+1}/{n_iter}, "
    127. f"Validation Loss: {loss.item()}"
    128. ) # 打印验证进度
    129. # 在恢复训练后重置 start_step
    130. start_step = 0
    131. # 训练结束后保存模型
    132. self.reward.save() # 保存奖励模型

总之,在 RewardTrainer 类的 train 方法中
首先会尝试从检查点恢复模型(如果有的话);
然后,它会遍历数据加载器中的所有输入,对每个输入执行前向传播、计算损失、反向传播和优化器更新;在每个周期结束时,如果提供了验证数据集,还会对模型进行验证;
最后,在训练完成后,将保存模型

2.2.2 通过chatllama/rlhf/actor.py再训练一个actor

此外,项目通过chatllama/rlhf/actor.py再训练一个actor,比如通过train方法训练一个基于transformer的模型,它包括了数据处理、模型训练、验证和模型保存等操作

  1. 定义train方法,它没有返回值。
  2. 打印训练开始信息。
  3. 获取配置参数,包括批量大小、训练轮数、设备和检查点步数。
  4. 计算迭代次数。
  5. 加载模型检查点并获取开始的轮数和步数。
  6. 如果从头开始训练,清空训练统计。
  7. 初始化检查点计数器。
  8. 定义训练循环,其中包括:
    • 设置模型为训练模式。
    • 遍历训练数据加载器。
    • 如果恢复训练,跳过已经完成的步数。
    • 对输入文本进行标记化处理。
    • 将输入文本分割成tokens和mask。
    • 添加结束符(EOS)。
    • 将输入文本分割成输入和输出。
    • 将输入文本移至设备。
    • 执行前向传播。
    • 计算损失。
    • 执行反向传播和优化。
    • 打印训练进度。
    • 定期保存检查点和训练统计。
  9. 进行验证(如果启用验证的话):
  10. 设置模型为评估模式。
  11. 使用torch.no_grad()禁用梯度计算。
  12. 遍历验证数据加载器。
  13. 对输入文本进行标记化处理。
  14. 将输入文本分割成验证输入和输出。
  15. 执行前向传播。
  16. 计算损失。
  17. 更新验证损失统计。
  18. 打印验证进度。
  19. 在恢复训练后,将start_step重置为0。
  20. 训练完成后,保存模型。
  21. 打印训练结束信息

2.2.3 通过PPO算法优化强化学习任务中的策略(actor)和价值(critic)网络

有了奖励函数和actor,便可以通过PPO算法优化强化学习任务中的策略(actor)和价值(critic)网络,具体如下图,设置内外两个循环

  • 外层循环迭代训练轮次(epochs)
  • 内层循环遍历数据加载器(dataloader)中的批次(batches),在每次迭代中,它会处理一批数据,包括状态、动作、价值等,这些数据用于训练智能体-评论家模型

在内层循环中依次做如下处理(以下代码来源于:chatllama/chatllama/rlhf/trainer.py ):

首先是导入必须的库和模块,当然,主要是ActorCritic类

  1. 导入所需的库和模块。
  2. change_tokenization函数:用于在两个不同的分词器之间转换给定的tokens。
  3. check_model_family函数:检查两个配置是否属于相同的模型家族。
  4. ActorCritic类:包含了actor和critic模型,并用于在训练actor过程中为给定的序列生成动作和值。它包括以下方法:
    • __init__:初始化actor和critic模型
      1. def __init__(self, config: Config) -> None:
      2. super().__init__()
      3. self.config = config
      4. self.actor = ActorModel(config.actor)
      5. # check if critic must be initialized from reward model
      6. ModelLoader.init_critic_from_reward(config.critic)
      7. self.critic = CriticModel(config.critic)
      8. # if the actor and critic use the same tokenizer is set to True
      9. self.use_same_tokenizer = False
      10. # debug flag
      11. self.debug = config.actor.
    • load:加载模型,但未实现。
    • save:将模型保存到路径。
    • forward:基于给定的整个序列,使用actor的forward方法获取序列中每个token的logits,并使用critic的forward方法获取每个生成步骤的值。

这个代码主要用于强化学习训练自然语言生成模型。ActorCritic类是其中的核心部分,它包含了actor和critic模型。这两个模型在训练过程中相互协作,用于生成动作和值。

其次,主要是关于一个用于生成动作、动作逻辑、价值和序列的生成函数,以及用于存储训练数据和生成训练示例的类

  1. 首先定义了一个名为generate的函数,它使用了@torch.no_grad()和@beartype修饰器。这个函数接收四个参数,分别是states_actor、states_mask_actor和states_critic,并返回一个元组
    这个函数的主要目的是从输入状态生成动作、动作逻辑、价值和序列。它首先从actor模型生成动作序列,然后创建一个用于actor序列的mask。接下来,它检查是否需要为critic使用不同的编码。如果需要,它将使用change_tokenization函数来更改序列的编码。接着,它生成动作逻辑和价值。如果处于调试模式,将打印一些调试信息。
  2. 接下来,代码定义了一个名为Memory的namedtuple,用于存储每个经验的数据。Memory包含了11个字段,如states_actor、actions、values等。
  3. 然后定义了一个名为ExperienceDataset的类,它继承自torch.utils.data.Dataset。这个类用于训练actor-critic模型。它接收一个memories参数和一个device参数。memories参数是一个包含Memory实例的双端队列。device参数表示要在哪个设备上进行计算。这个类实现了__len__和__getitem__方法,使其可以像普通的PyTorch数据集一样使用。
  4. 最后,定义了一个名为ExamplesSampler的类,用于从JSON文件中读取示例并在需要时抽样。这个类接收一个表示文件路径的path参数。在初始化时,它从文件中读取数据,并将其存储在self.data中。它还实现了一个名为sample的方法,用于从数据中抽取指定数量的示例。

再之后,定义了一个名为 RLTrainer 的类,用于使用强化学习训练一个Actor-Critic模型。该类具有多个属性和方法,用于训练过程中的各种操作。

  • __init__ 方法中,初始化了训练器的各个组件,包括Actor-Critic模型、actor和critic优化器、reward模型、用于存储训练统计数据和对话记录的类、以及示例采样器
  • save_checkpoint 方法保存了当前状态的Actor-Critic模型的检查点,包括当前的训练轮数、actor和critic模型的状态字典,以及它们各自的优化器的状态字典。
  • load_checkpoint 方法加载了Actor-Critic模型的检查点,包括训练轮数、actor和critic模型的状态字典,以及它们各自的优化器的状态字典。如果没有找到检查点,则返回轮数0。如果actor和critic的检查点存在差异,则从两者中最小的轮数开始训练。

再之后,调用 learn 方法更新actor和critic模型,并保存训练统计数据和对话记录

  1. 使用智能体-评论家模型计算新的动作概率和价值
    1. # get actor critic new probabilities and values
    2. actions_logits, values = self.actorcritic.forward(
    3. sequences_actor,
    4. sequences_mask_actor,
    5. sequences_critic,
    6. sequences_mask_critic,
    7. action_len_actor.item(),
    8. action_len_critic.item(),
    9. )
  2. 计算动作的对数概率、熵和KL散度损失
    1. # get action log prob
    2. actions_prob = (
    3. torch.softmax(actions_logits, dim=-1).max(dim=-1).values
    4. )
    5. actions_log_prob = torch.log(actions_prob + self.eps)
    6. # compute entropy,一般表示为-sum「p(x)logp(x)」
    7. entropies = (actions_prob * actions_log_prob).sum(dim=-1)
    8. # compute KL divergence,一般表示为:-sum「p(x)log q(x)/p(x)」
    9. kl_div_loss = (
    10. (actions_prob * (old_actions_log_probs - actions_log_prob))
    11. .sum(dim=-1)
    12. .mean()
    13. )
  3. 计算重要性权重比率(ratios),即新旧策略的概率比
    1. # compute ratios
    2. ratios = (actions_log_prob - old_actions_log_probs).exp()
  4. 计算PPO损失,包括优势函数的计算和PPO-clip算法的应用
    首先我们回顾下强化学习极简入门一文里对『近端策略优化裁剪PPO-clip』的阐述

    \begin{aligned} J_{\mathrm{PPO2}}^{\theta'}(\theta) \approx \sum_{\left(s_{t}, a_{t}\right)} \min &\left(\frac{p_{\theta}\left(a_{t} | s_{t}\right)}{p_{\theta'}\left(a_{t} | s_{t}\right)} A^{\theta'}\left(s_{t}, a_{t}\right),{clip}\left(\frac{p_{\theta}\left(a_{t} | s_{t}\right)}{p_{\theta'}\left(a_{t} | s_{t}\right)}, 1-\varepsilon, 1+\varepsilon\right) A^{\theta'}\left(s_{t}, a_{t}\right)\right) \end{aligned}

    简单实现的话,即是
    1. # ratios即为重要性权重,exp代表求期望,括号里的environment_log_probs代表用于与环境交互的策略
    2. ratios = torch.exp(log_probs - environment_log_probs)
    3. # 分别用sur_1、sur_2来计算公式的两部分
    4. # 第一部分是重要性权重乘以优势函数
    5. sur_1 = ratios * advs
    6. # 第二部分是具体的裁剪过程
    7. sur_2 = torch.clamp(ratios, 1 - clip_eps, 1 + clip_eps) * advs
    8. # 最终看谁更小则取谁
    9. clip_loss = -torch.min(sur_1,sur_2).mean()
    更具体的实现,则可以如下所示
    1. # 计算PPO总损失
    2. if check_model_family(self.config.actor, self.config.critic):
    3. # 计算 TRL 中的折扣回报
    4. gamma = self.config.trainer.gamma_discounted
    5. # 初始化折扣回报矩阵
    6. discounted_rewards = torch.zeros_like(old_values)
    7. # 遍历每个时间步
    8. for i in range(discounted_rewards.shape[1]):
    9. for j in range(i, discounted_rewards.shape[1]):
    10. # 计算折扣回报
    11. discounted_rewards[:, i] += (
    12. gamma ** (j - i) * rewards[:, j]
    13. )
    14. # 计算优势值,与TRL 中旧值的符号相反
    15. advantages = (
    16. discounted_rewards - old_values
    17. )
    18. # normalize advantages
    19. advantages = (advantages - advantages.mean(dim=-1)) / (
    20. advantages.std() + self.eps
    21. )
    22. surr1 = advantages * ratios
    23. else:
    24. advantages = rewards - old_values[:, -1]
    25. surr1 = advantages * ratios
    26. surr2 = (
    27. torch.clamp(ratios, 1 - actor_eps_clip, 1 + actor_eps_clip)
    28. * advantages
    29. )
  5. 计算策略损失和总损失
    1. policy_loss = -torch.min(surr1, surr2) - beta_s * entropies
    2. policy_loss = policy_loss.mean()
    3. loss = policy_loss + kl_div_loss
    可能有读者看到这里 看迷糊了,即咋出来两个损失函数了,看起来是一个策略损失,一个KL散度损失,与我们在本博客里的另一篇文章《ChatGPT技术原理解析》中「3.1.3 InstructGPT训练阶段3:如何通过PPO算法进一步优化模型的策略」探讨的结果咋不太一样呢 ?

    \begin{aligned} objective(\phi ) &= E_{(x,y)\sim D_{\pi _{\phi }^{RL}}} [r_\theta (x,y) - \beta log(\pi _{\phi }^{RL}(y|x) / \pi ^{SFT}(y|x) )] + \gamma E_{x\sim D_{pretrain}} [log(\pi _{\phi }^{RL})] \\&= E_{(x,y)\sim D_{\pi _{ }^{RL'}}} \left [ \frac{\pi _{\phi }^{RL}(y|x)}{\pi ^{RL'}(y|x)}r_{\theta'}(x,y) - \beta log(\pi^{RL'}(y|x) / \pi ^{SFT}(y|x) ) \right ] + \gamma E_{x\sim D_{pretrain}} [log(\pi _{\phi }^{RL})] \\&= E_{(x,y)\sim D_{\pi _{ }^{RL'}}} \left [ \min \left(\frac{\pi_{\phi }^{RL}(y|x)}{\pi ^{RL'}(y|x)} r_{\theta'}(x,y),{clip}\left(\frac{\pi_{\phi }^{RL}(y|x)}{\pi ^{RL'}(y|x)}, 1-\varepsilon, 1+\varepsilon\right) r_{\theta'}(x,y)\right) - \beta log(\pi^{RL'}(y|x) / \pi ^{SFT}(y|x) ) \right ]+ \gamma E_{x\sim D_{pretrain}} [log(\pi _{\phi }^{RL})] \end{aligned}

    不急,我们先来分析下这两个损失函数
    \rightarrow  一个 policy loss,本质是一个目标函数(具体用的近端策略优化裁剪PPO-clip与熵H(\pi) = -\sum p_{\theta} \log p_{\theta}的差值)

    L^{PPO}(\theta ) = \hat{E_t} [min \left ( r_t(\theta )\hat{A_t}, clip(r_t(\theta ),1-\epsilon ,1+\epsilon)\hat{A_t} \right )] - \beta_s[- \sum p_{\theta} logp_{\theta} ]

    其中,\hat{\mathbb{E}}t表示在当前策略下采样得到的经验的无偏估计,r_t(\theta) = \frac{p_{\theta}(a_t|s_t)}{p_{\theta_{\text{old}}}(a_t|s_t)} 是策略比率,\hat{A}_t是优势函数,\epsilon 是超参数,用于控制策略更新的幅度,\beta是熵的系数

    当然 在instructGPT的原理中并没有这个熵
    1. # compute entropy,一般表示为-sum「p(x)logp(x)」
    2. entropies = (actions_prob * actions_log_prob).sum(dim=-1)
    \rightarrow  另一个 KL散度损失(kl_div_loss),还是用于限制新策略与旧策略之间的差异,以免更新太快,导致学习不稳定
    1. # compute KL divergence,一般表示为:-sum「p(x)log q(x)/p(x)」
    2. kl_div_loss = (
    3. (actions_prob * (old_actions_log_probs - actions_log_prob))
    4. .sum(dim=-1)
    5. .mean()
    6. )

    对应的公式为

    L^{KL}(\theta ) = \hat{E_t} \left [ KL(p_{\theta_{old}}(a_t|s_t),p_{\theta}(a_t|s_t) ) \right ]

    值得一提的是,这里确实容易引发疑惑,毕竟上面的policy loss已经对新旧策略的比值 ratios = (actions_log_prob - old_actions_log_probs).exp() 做了截断处理,而这里又加一个对新旧策略差值的KL散度约束,未免有多此一举之嫌,比如在instructGPT的原理中便只有两者其一:关于策略梯度的损失就一个policy loss

    最终,总的损失函数为:

    L(\theta ) = L^{PPO}(\theta ) + \alpha L^{KL}(\theta )

    其中,\alpha 是超参数,用于控制 KL 散度损失的权重

  6. 如果损失值为NaN,抛出异常
    1. # check if loss item is NaN
    2. if torch.isnan(loss):
    3. raise ValueError("Loss is nan")
  7. 更新策略,包括使用DeepSpeed或Accelerate库进行优化
    1. # 按照损失更新 actor 模型参数
    2. # 使用 DeepSpeed 的 engine 对 loss 进行反向传播
    3. if self.config.actor.deepspeed_enable:
    4. actor_model_engine.backward(loss)
    5. actor_model_engine.step()
    6. # 如果启用了 PyTorch 的 Accelerate
    7. elif self.config.actor.accelerate_enable:
    8. # 将 actor 模型参数的梯度清零
    9. self.actor_optimizer.zero_grad()
    10. # 使用 Accelerate 对 loss 进行反向传播
    11. actor_accelerator.backward(loss)
    12. # 使用 PyTorch 的优化器更新 actor 模型参数
    13. self.actor_optimizer.step()
    14. # 使用 PyTorch 的学习率调度器更新学习率
    15. self.actor_scheduler.step()
    16. else:
    17. self.actor_optimizer.zero_grad()
    18. # 对 loss 进行反向传播
    19. loss.backward()
    20. self.actor_optimizer.step()
    21. self.actor_scheduler.step()
  8. 计算价值损失
    1. # compute value loss
    2. # 裁剪限制了值损失剪辑的变化速率
    3. # 使得value相比old_value的更新范围限制在[-critic_eps_clip, critic_eps_clip]之内,否则就截断
    4. # 说白了,截断的作用就是:最大不能大过critic_eps_clip,最小不能小过-critic_eps_clip
    5. value_loss_clipped = old_values + (values - old_values).clamp(
    6. -critic_eps_clip, critic_eps_clip
    7. )
    8. # 计算第一种值损失,即裁剪后的值与奖励之间的平方差
    9. value_loss1 = (value_loss_clipped - rewards) ** 2
    10. # 计算第二种值损失,即未裁剪的值与奖励之间的平方差
    11. value_loss2 = (values - rewards) ** 2
    12. # 选择两种值损失中较大的那个,并计算其均值
    13. value_loss = torch.max(value_loss1, value_loss2).mean()
    本文发布后,有读者留言对这块表达疑惑,即怎么是先计算裁剪的损失,然后对比未裁剪的损失,然后两种损失中取更大呢,原因和上文第六部分最后解释的一样,便不再重复了
  9. 如果价值损失为NaN,抛出异常
    1. if torch.isnan(value_loss):
    2. raise ValueError("Value loss is nan")
  10. 更新评论家,包括使用DeepSpeed或Accelerate库进行优化
    1. # upate critic
    2. if self.config.critic.deepspeed_enable:
    3. critic_model_engine.backward(value_loss)
    4. critic_model_engine.step()
    5. elif self.config.critic.accelerate_enable:
    6. self.critic_optimizer.zero_grad()
    7. critic_accelerator.backward(loss)
    8. self.critic_optimizer.step()
    9. self.critic_scheduler.step()
    10. else:
    11. self.critic_optimizer.zero_grad()
    12. value_loss.backward()
    13. self.critic_optimizer.step()
    14. self.critic_scheduler.step()
  11. 将损失值追加到训练统计信息中
    1. # 将训练损失值添加到训练统计信息中
    2. self.training_stats.training_loss.append(
    3. # 将损失值从计算图中分离,移动到 CPU 上,并转换为 Python 数值类型
    4. loss.detach().cpu().item()
    5. )
    6. # 将价值损失值添加到训练统计信息中
    7. self.training_stats.value_loss.append(
    8. # 将价值损失值从计算图中分离,移动到 CPU 上,并转换为 Python 数值类型
    9. value_loss.detach().cpu().item()
    10. )
  12. 输出迭代信息
    1. # print iteration info
    2. print(
    3. f"Epoch {epoch+1}/{epochs}",
    4. f"Step {k+1}/{int(len(dataloader) / batch_size)}",
    5. f"Loss {loss.detach().cpu().item():.4f}",
    6. f"Value Loss {value_loss.detach().cpu().item():.4f}",
    7. )
  13. 训练循环结束后,将智能体-评论家模型设为评估模式并输出训练结束信息
    1. self.actorcritic.eval()
    2. print("End Learning")

最后的最后,定义了一个 train() 方法,使用 actor-critic 算法训练强化学习模型。方法首先初始化各种设置,如训练的总 episode 数量、每个 episode 的最大步数、批次大小和训练设备等。然后检查要用于学习的记忆数量是否是批次大小的倍数,以及总步数是否是更新步数的倍数。

该方法初始化记忆,加载检查点(如果有的话),如果是从头开始的新训练,则清除会话记录。然后循环遍历 episode 和 timestep,从示例数据集中抽取样本,为 actor 和 critic 进行分词,生成动作和值的序列,计算动作日志概率,计算奖励。存储每个 episode/timestep 的记忆,并将完成(解码后的动作)记录在会话日志中。

在一定数量的 timestep 后,使用记忆进行学习,并计算平均奖励。该过程重复进行,直到训练完成。该方法在训练结束时保存模型和会话日志。


第三部分 ColossalChat:通过self-instruct技术指令微调LLaMA且加上RLHF

3.1 技术架构:通过self-instruct生成的中英双语数据集 + 三阶段训练方式

据介绍(介绍页面,该页面的翻译之一代码地址),Colossal-AI 开源了基于 LLaMA-7B 模型的包含完整 RLHF 流程的类 Chat 模型复现方案 ColossalChat

3.1.1 针对社交平台的种子数据且利用self-instruct 技术生成中英双语数据集

ColossalChat 收集并清洗了社交平台上人们的真实提问场景作为种子数据集,然后利用 self-instruct 技术扩充数据(通过prompt OpenAI API,花费约 900 美元进行标注),最终生成了10.4万条问答的中、英双语数据集(这是数据的开源地址)
他们的说法是,对比其他 self-instruct 方法生成的数据集,该数据集的种子数据更加真实、丰富,生成的数据集涵盖的话题更多,该数据可以同时用于微调和 RLHF 训练,通过高质量的数据,ColossalChat 能进行更好地对话交互,同时支持中文

3.1.2​ ColossalChat训练方式:类似instructGPT/ChatGPT的训练三步骤

关于训练方式:类似instructGPT/ChatGPT的训练三步骤(如果忘了,务必复习下此文的3.1节)

  • Stage1 是supervised-fintuning,即使用上文提到的数据集进行监督微调
  • Stage2 训练一个奖励模型(初始化为阶段1的SFT模型),它通过模型对于同一个 prompt 的不同输出进行人工排序,根据排序结果监督训练出一个奖励模型
  • Stage3 是通过阶段2训练出来的奖励函数微调出一个RL模型,微调过程中通过PPO算法限制RL模型的参数更新范围(以阶段1的SFT模型的策略为参考基准,PPO算法避免与基线模型SFT的策略偏离过远)

具体而言,为两个阶段进行:

  • 如上图底部,首先是 Make Experience 部分,利用 SFT 、Actor、RM、Critic模型计算生成 Experience 存入 buffer 中;

    之后是参数更新部分,利用 Experience 计算价值损失(value loss)​和策略损失(policy loss),具体说明在此文的4.4.3节有介绍

  • 如上图顶部即是PTX 部分(上面的目标函数objective(\phi)​中加在最后的偏置项)
    ColossalChat 计算 Actor 的现有输出response 和预训练语料的回答部分的交叉熵损失函数(calculates the cross-entropy loss between the Actor’s output response and the response part of the input corpus)
    用来在 PPO 梯度中加入预训练梯度(add pre-training gradients to the PPO gradient)
    以保持语言模型比如GPT2原有的核心性能(maintain the language model’s original performance and prevent forgetting),防止忘了最早从哪里出发的(GPT2 \rightarrow​ SFT \rightarrow​ RM \rightarrow​ RLHF)
  • 最后将策略损失、价值损失和 PTX 损失加和(the policy loss, value loss, and PTX loss are summed up),进行反向传播和参数更新 

3.2 代码实现:SFT模型 + 奖励模型 + PPO training

先看下整体的代码架构图


接下来,我们看下一些关键实现

3.2.1 首先,训练一个SFT模型

首先通过ColossalAI/applications/Chat/coati/trainer/sft.py,训练一个SFT模型

  1. import math # 导入Python的数学库
  2. import time # 导入Python的时间库
  3. from abc import ABC # 从Python的抽象基类库中导入ABC基类
  4. from typing import Optional # 导入Python类型注解库中的Optional, 表示某个类型值可能为空
  5. import loralib as lora # 导入一个名为loralib的库并重命名为lora
  6. import torch # 导入PyTorch库
  7. import torch.distributed as dist # 导入PyTorch分布式计算库
  8. import wandb # 导入Weights & Biases库,一般用于实验跟踪和版本控制
  9. from coati.models.loss import GPTLMLoss # 导入coati库中的GPTLMLoss模型
  10. from torch import nn # 导入PyTorch的神经网络库
  11. from torch.optim import Adam, Optimizer # 导入PyTorch优化器库中的Adam和Optimizer类
  12. from torch.optim.lr_scheduler import LambdaLR # 导入PyTorch优化器库中的LambdaLR类,一般用于动态调整学习率
  13. from torch.utils.data import DataLoader # 导入PyTorch数据处理库中的DataLoader类,用于加载数据
  14. from torch.utils.data.distributed import DistributedSampler # 导入PyTorch分布式计算库中的DistributedSampler类,用于在分布式训练中采样数据
  15. from tqdm import tqdm # 导入进度条库tqdm
  16. from transformers.tokenization_utils_base import PreTrainedTokenizerBase # 导入transformers库中的PreTrainedTokenizerBase类,用于处理预训练模型的令牌化
  17. from transformers.trainer import get_scheduler # 导入transformers库中的get_scheduler函数,用于获取学习率调整策略
  18. from colossalai.logging import get_dist_logger # 导入colossalai库中的分布式日志记录函数get_dist_logger
  19. from .strategies import Strategy # 导入当前目录下strategies文件中的Strategy类
  20. from .utils import is_rank_0 # 导入当前目录下utils文件中的is_rank_0函数,用于检查当前进程是否为主进程
  21. # 下面是定义一个名为SFTTrainer的类,该类继承自abc库的ABC抽象基类
  22. class SFTTrainer(ABC):
  23. """
  24. Trainer to use while training reward model.
  25. Args:
  26. model (torch.nn.Module): the model to train
  27. strategy (Strategy): the strategy to use for training
  28. optim(Optimizer): the optimizer to use for training
  29. train_dataloader: the dataloader to use for training
  30. eval_dataloader: the dataloader to use for evaluation
  31. batch_size (int, defaults to 1): the batch size while training
  32. max_epochs (int, defaults to 2): the number of epochs to train
  33. optim_kwargs (dict, defaults to {'lr':1e-4}): the kwargs to use while initializing optimizer
  34. """
  35. # 下面是初始化函数,初始化SFTTrainer类的实例
  36. def __init__(
  37. self,
  38. model, # 输入参数model,即将训练的模型
  39. strategy: Strategy, # 输入参数strategy,即训练的策略
  40. optim: Optimizer, # 输入参数optim,即训练的优化器
  41. train_dataloader: DataLoader, # 输入参数train_dataloader,即训练的数据加载器
  42. eval_dataloader: DataLoader = None, # 输入参数eval_dataloader,即评估的数据加载器,默认为None
  43. batch_size: int = 1, # 输入参数batch_size,即每批训练的样本数量,默认为1
  44. max_epochs: int = 2, # 输入参数max_epochs,即训练的最大轮数,默认为2
  45. accimulation_steps: int = 8, # 输入参数accimulation_steps,即梯度积累的步数,默认为8
  46. ) -> None: # 初始化函数的返回值类型为None
  47. super().__init__() # 调用父类的初始化函数
  48. self.strategy = strategy # 将输入参数strategy赋值给实例变量self.strategy
  49. self.epochs = max_epochs # 将输入参数max_epochs赋值给实例变量self.epochs
  50. self.train_dataloader = train_dataloader # 将输入参数train_dataloader赋值给实例变量self.train_dataloader
  51. self.eval_dataloader = eval_dataloader # 将输入参数eval_dataloader赋值给实例变量self.eval_dataloader
  52. # 调用策略的setup_model方法对模型进行设置,并将返回的模型赋值给实例变量self.model
  53. self.model = strategy.setup_model(model)
  54. if "DDP" in str(self.strategy): # 如果策略的字符串表示中包含"DDP"
  55. self.model = self.model.module # 将模型的module属性赋值给实例变量self.model
  56. # 调用策略的setup_optimizer方法对优化器进行设置,并将返回的优化器赋值给实例变量self.optimizer
  57. self.optimizer = strategy.setup_optimizer(optim, self.model)
  58. self.accimulation_steps = accimulation_steps # 将输入参数accimulation_steps赋值给实例变量self.accimulation_steps
  59. num_update_steps_per_epoch = len(train_dataloader) // self.accimulation_steps # 计算每个训练轮次的更新步数
  60. max_steps = math.ceil(self.epochs * num_update_steps_per_epoch) # 计算最大更新步数
  61. # 获取学习率调度器,并赋值给实例变量self.scheduler
  62. self.scheduler = get_scheduler("cosine", # 学习率调度策略为"cosine"
  63. self.optimizer, # 优化器为实例变量self.optimizer
  64. num_warmup_steps=math.ceil(max_steps * 0.03), # 预热步数为最大更新步数的3%
  65. num_training_steps=max_steps) # 训练步数为最大更新步数
  66. # 下面是SFTTrainer类的fit方法,用于训练模型
  67. def fit(self, logger, log_interval=10): # 输入参数为logger,即日志记录器,以及log_interval,即日志记录间隔,默认为10
  68. # 初始化Weights & Biases的实验,并设置项目名为"Coati",实验名为当前时间
  69. wandb.init(project="Coati", name=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
  70. wandb.watch(self.model) # 开始监视实例变量self.model
  71. total_loss = 0 # 定义变量total_loss并初始化为0,用于记录总损失
  72. # 定义一个进度条,长度为训练轮次数,描述信息为'Epochs',如果当前进程不是主进程则禁用进度条
  73. # epoch_bar = tqdm(range(self.epochs), desc='Epochs', disable=not is_rank_0())
  74. # 定义一个进度条,长度为训练步数,描述信息为'steps',如果当前进程不是主进程则禁用进度条
  75. step_bar = tqdm(range(len(self.train_dataloader) // self.accimulation_steps * self.epochs),
  76. desc=f'steps',
  77. disable=not is_rank_0())
  78. for epoch in range(self.epochs): # 遍历每一个训练轮次
  79. # 定义一个进度条,长度为训练样本数,描述信息为'Train process for{epoch}',如果当前进程不是主进程则禁用进度条
  80. # process_bar = tqdm(range(len(self.train_dataloader)), desc=f'Train process for{epoch}', disable=not is_rank_0())
  81. # 训练模式
  82. self.model.train()
  83. for batch_id, batch in enumerate(self.train_dataloader): # 遍历每一个训练样本
  84. # 将样本中的"input_ids"字段送到当前设备上,并赋值给变量prompt_ids
  85. prompt_ids = batch["input_ids"].to(torch.cuda.current_device())
  86. # 将样本中的"attention_mask"字段送到当前设备上,并赋值给变量p_mask
  87. p_mask = batch["attention_mask"].to(torch.cuda.current_device())
  88. # 将样本中的"labels"字段送到当前设备上,并赋值给变量labels
  89. labels = batch["labels"].to(torch.cuda.current_device())
  90. # prompt_ids = prompt_ids.squeeze(1).cuda()
  91. # p_mask = p_mask.squeeze(1).cuda()
  92. # prompt_logits = self.model(prompt_ids, attention_mask=p_mask, labels=labels)
  93. # 对模型进行前向传播,并将返回的结果赋值给变量outputs
  94. outputs = self.model(prompt_ids, attention_mask=p_mask, labels=labels)
  95. loss = outputs.loss # 获取outputs中的loss属性,并赋值给变量loss
  96. prompt_logits = outputs.logits # 获取outputs中的logits属性,并赋值给变量prompt_logits
  97. if loss >= 2.5: # 如果loss大于或等于2.5
  98. logger.warning(f"batch_id:{batch_id}, abnormal loss: {loss}") # 在日志中记录警告信息
  99. loss = loss / self.accimulation_steps # 将loss除以梯度积累步数
  100. total_loss += loss.item() # 将loss加入到total_loss中
  101. loss.backward() # 对loss进行反向传播
  102. if (batch_id + 1) % self.accimulation_steps == 0: # 如果当前批次是梯度积累步数的整数倍
  103. self.optimizer.step() # 对优化器执行一步优化
  104. self.optimizer.zero_grad() # 清空优化器的梯度
  105. self.scheduler.step() # 对学习率调度器执行一步调度
  106. if is_rank_0(): # 如果当前进程是主进程
  107. wandb.log({"train_loss": total_loss / self.accimulation_steps}) # 在Weights & Biases中记录训练损失
  108. total_loss = 0 # 将total_loss重置为0
  109. step_bar.update(1) # 更新步骤进度条
  110. # process_bar.update(1) # 更新训练进度条
  111. if (batch_id + 1) % (self.accimulation_steps * log_interval) == 0: # 如果当前批次是日志记录间隔的整数倍
  112. self.evaluate(epoch, logger, log_interval) # 进行一次评估
  113. if self.eval_dataloader: # 如果存在评估数据加载器
  114. self.evaluate(epoch, logger) # 进行一次评估
  115. if is_rank_0(): # 如果当前进程是主进程
  116. torch.save(self.model.state_dict(), f"coati_checkpoints/epoch_{epoch}.pth") # 保存模型的状态字典
  117. wandb.save(f"coati_checkpoints/epoch_{epoch}.pth") # 在Weights & Biases中保存模型的状态字典
  118. # epoch_bar.update(1) # 更新轮次进度条
  119. wandb.finish() # 结束Weights & Biases的实验
  120. # 下面是SFTTrainer类的evaluate方法,用于评估模型
  121. def evaluate(self, epoch, logger, log_interval=10): # 输入参数为epoch,即轮次,logger,即日志记录器,以及log_interval,即日志记录间隔,默认为10
  122. # 打印一条日志信息,内容为"Start evaluation process..."
  123. logger.info("Start evaluation process...")
  124. self.model.eval() # 将模型切换到评估模式
  125. total_loss = 0 # 定义变量total_loss并初始化为0,用于记录总损失
  126. # 定义一个进度条,长度为评估样本数,描述信息为'Eval process',如果当前进程不是主进程则禁用进度条
  127. # process_bar = tqdm(range(len(self.eval_dataloader)), desc='Eval process', disable=not is_rank_0())
  128. with torch.no_grad(): # 禁止计算梯度
  129. for batch_id, batch in enumerate(self.eval_dataloader): # 遍历每一个评估样本
  130. # 将样本中的"input_ids"字段送到当前设备上,并赋值给变量prompt_ids
  131. prompt_ids = batch["input_ids"].to(torch.cuda.current_device())
  132. # 将样本中的"attention_mask"字段送到当前设备上,并赋值给变量p_mask
  133. p_mask = batch["attention_mask"].to(torch.cuda.current_device())
  134. # 将样本中的"labels"字段送到当前设备上,并赋值给变量labels
  135. labels = batch["labels"].to(torch.cuda.current_device())
  136. # 对模型进行前向传播,并将返回的结果赋值给变量outputs
  137. outputs = self.model(prompt_ids, attention_mask=p_mask, labels=labels)
  138. loss = outputs.loss # 获取outputs中的loss属性,并赋值给变量loss
  139. if loss >= 2.5: # 如果loss大于或等于2.5
  140. logger.warning(f"batch_id:{batch_id}, abnormal loss: {loss}") # 在日志中记录警告信息
  141. total_loss += loss.item() # 将loss加入到total_loss中
  142. if (batch_id + 1) % log_interval == 0: # 如果当前批次是日志记录间隔的整数倍
  143. if is_rank_0(): # 如果当前进程是主进程
  144. wandb.log({"eval_loss": total_loss / log_interval}) # 在Weights & Biases中记录评估损失
  145. total_loss = 0 # 将total_loss重置为0
  146. # process_bar.update(1) # 更新评估进度条
  147. logger.info(f"Finish evaluation process for epoch {epoch}") # 打印一条日志信息,内容为"Finish evaluation process for epoch {epoch}"

3.2.2 训练一个奖励模型

其次,通过ColossalAI/applications/Chat/coati/trainer/rm.py 训练一个奖励模型

  1. from abc import ABC # 导入 abc 模块(抽象基类模块)
  2. from datetime import datetime # 导入 datetime 模块,用于处理日期和时间
  3. from typing import Optional # 导入 typing 模块中的 Optional 类型,它表示一个类型可能是 None
  4. import pandas as pd # 导入 pandas 库,用于数据分析和操作
  5. import torch # 导入 PyTorch 库,一个用于深度学习的开源库
  6. import torch.distributed as dist # 导入 PyTorch 分布式计算模块
  7. from torch.optim import Optimizer, lr_scheduler # 导入 PyTorch 中的优化器和学习率调度器
  8. # 导入 PyTorch 的数据加载器和数据集模块
  9. from torch.utils.data import DataLoader, Dataset, DistributedSampler
  10. from tqdm import tqdm # 导入 tqdm,一个用于打印进度条的库
  11. # 导入 transformers 库的 tokenization_utils_base 模块中的 PreTrainedTokenizerBase 类,它用于处理预训练的 tokenizer
  12. from transformers.tokenization_utils_base import PreTrainedTokenizerBase
  13. from .strategies import Strategy # 从当前包的 strategies 模块中导入 Strategy 类
  14. from .utils import is_rank_0 # 从当前包的 utils 模块中导入 is_rank_0 函数
  15. # 定义 RewardModelTrainer 类,它继承自 ABC(抽象基类)
  16. class RewardModelTrainer(ABC):
  17. """
  18. Trainer to use while training reward model.
  19. Args:
  20. 这个类继承了 ABC 抽象基类。它接受以下参数:
  21. model:待训练的模型
  22. strategy:训练策略
  23. optim:优化器
  24. loss_fn:损失函数
  25. train_dataset:训练数据集
  26. valid_dataset:验证数据集
  27. eval_dataset:评估数据集
  28. batch_size:批次大小(默认为1)
  29. max_epochs:最大训练轮数(默认为2)
  30. """
  31. # 初始化 RewardModelTrainer 类的实例
  32. def __init__(
  33. self,
  34. model,
  35. strategy: Strategy,
  36. optim: Optimizer,
  37. loss_fn,
  38. train_dataset: Dataset,
  39. valid_dataset: Dataset,
  40. eval_dataset: Dataset,
  41. batch_size: int = 1,
  42. max_epochs: int = 1,
  43. ) -> None:
  44. # 调用父类(ABC)的初始化方法
  45. super().__init__()
  46. # 将传入的训练策略保存到实例变量中
  47. self.strategy = strategy
  48. # 将传入的最大训练轮数保存到实例变量中
  49. self.epochs = max_epochs
  50. # 初始化训练采样器为 None
  51. train_sampler = None
  52. # 如果当前运行环境已经初始化了分布式计算,并且世界尺寸(即参与分布式计算的进程数)大于 1
  53. if dist.is_initialized() and dist.get_world_size() > 1:
  54. # 创建一个分布式采样器
  55. train_sampler = DistributedSampler(train_dataset, shuffle=True, seed=42, drop_last=True)
  56. # 创建一个用于训练的数据加载器,如果 train_sampler 为 None,则将 shuffle 设为 True
  57. self.train_dataloader = DataLoader(train_dataset,
  58. shuffle=(train_sampler is None),
  59. sampler=train_sampler,
  60. batch_size=batch_size)
  61. # 创建一个用于验证的数据加载器
  62. self.valid_dataloader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=True)
  63. # 创建一个用于评估的数据加载器
  64. self.eval_dataloader = DataLoader(eval_dataset, batch_size=batch_size, shuffle=True)
  65. # 调用策略的 setup_model 方法设置模型,并将结果保存到实例变量中
  66. self.model = strategy.setup_model(model)
  67. # 将传入的损失函数保存到实例变量中
  68. self.loss_fn = loss_fn
  69. # 调用策略的 setup_optimizer 方法设置优化器,并将结果保存到实例变量中
  70. self.optimizer = strategy.setup_optimizer(optim, self.model)
  71. # 创建一个余弦退火学习率调度器,并将结果保存到实例变量中
  72. self.scheduler = lr_scheduler.CosineAnnealingLR(self.optimizer, self.train_dataloader.__len__() // 100)
  73. # 定义一个方法,用于评估给定数据加载器上的准确率,并计算选定奖励和拒绝奖励之间的平均距离
  74. def eval_acc(self, dataloader):
  75. # 初始化距离、计数器和准确率
  76. dist = 0
  77. on = 0
  78. cnt = 0
  79. # 将模型设置为评估模式
  80. self.model.eval()
  81. # 禁用梯度计算,以节省计算资源
  82. with torch.no_grad():
  83. # 遍历数据加载器中的每一批数据
  84. for chosen_ids, c_mask, reject_ids, r_mask in dataloader:
  85. # 将 chosen_ids,c_mask,reject_ids 和 r_mask 移动到当前设备上,并去掉第一个维度
  86. chosen_ids = chosen_ids.squeeze(1).to(torch.cuda.current_device())
  87. c_mask = c_mask.squeeze(1).to(torch.cuda.current_device())
  88. reject_ids = reject_ids.squeeze(1).to(torch.cuda.current_device())
  89. r_mask = r_mask.squeeze(1).to(torch.cuda.current_device())
  90. # 将 chosen_ids 和 c_mask 传入模型,计算 chosen_reward
  91. chosen_reward = self.model(chosen_ids, attention_mask=c_mask)
  92. # 将 reject_ids 和 r_mask 传入模型,计算 reject_reward
  93. reject_reward = self.model(reject_ids, attention_mask=r_mask)
  94. # 遍历 chosen_reward 的每个元素
  95. for i in range(len(chosen_reward)):
  96. # 计数器加一
  97. cnt += 1
  98. # 如果 chosen_reward 大于 reject_reward,那么准确率加一
  99. if chosen_reward[i] > reject_reward[i]:
  100. on += 1
  101. # 更新距离
  102. dist += (chosen_reward - reject_reward).mean().item()
  103. # 计算距离的平均值
  104. dist_mean = dist / len(dataloader)
  105. # 计算准确率
  106. acc = on / cnt
  107. # 将模型设置为训练模式
  108. self.model.train()
  109. # 返回平均距离和准确率
  110. return dist_mean, acc
  111. # 定义一个方法,用于训练模型
  112. def fit(self):
  113. # 获取当前的日期和时间
  114. time = datetime.now()
  115. # 创建一个进度条,表示训练轮数
  116. epoch_bar = tqdm(range(self.epochs), desc='Train epoch', disable=not is_rank_0())
  117. # 遍历每一个训练轮
  118. for epoch in range(self.epochs):
  119. # 创建一个进度条,表示当前训练轮的训练步数
  120. step_bar = tqdm(range(self.train_dataloader.__len__()),
  121. desc='Train step of epoch %d' % epoch,
  122. disable=not is_rank_0())
  123. # 将模型设置为训练模式
  124. self.model.train()
  125. # 初始化计数器、准确率和距离
  126. cnt = 0
  127. acc = 0
  128. dist = 0
  129. # 遍历训练数据加载器中的每一批数据
  130. for chosen_ids, c_mask, reject_ids, r_mask in self.train_dataloader:
  131. # 将 chosen_ids,c_mask,reject_ids 和 r_mask 移动到当前设备上,并去掉第一个维度
  132. chosen_ids = chosen_ids.squeeze(1).to(torch.cuda.current_device())
  133. c_mask = c_mask.squeeze(1).to(torch.cuda.current_device())
  134. reject_ids = reject_ids.squeeze(1).to(torch.cuda.current_device())
  135. r_mask = r_mask.squeeze(1).to(torch.cuda.current_device())
  136. # 将 chosen_ids 和 c_mask 传入模型,计算 chosen_reward
  137. chosen_reward = self.model(chosen_ids, attention_mask=c_mask)
  138. # 将 reject_ids 和 r_mask 传入模型,计算 reject_reward
  139. reject_reward = self.model(reject_ids, attention_mask=r_mask)
  140. # 调用损失函数,计算损失
  141. loss = self.loss_fn(chosen_reward, reject_reward)
  142. # 调用策略的 backward 方法,计算梯度
  143. self.strategy.backward(loss, self.model, self.optimizer)
  144. # 调用策略的 optimizer_step 方法,更新模型参数
  145. self.strategy.optimizer_step(self.optimizer)
  146. # 将优化器的梯度缓存清零
  147. self.optimizer.zero_grad()
  148. # 计数器加一
  149. cnt += 1
  150. # 如果计数器达到 100
  151. if cnt == 100:
  152. # 调用学习率调度器的 step 方法,更新学习率
  153. self.scheduler.step()
  154. # 计算验证数据加载器上的平均距离和准确率
  155. dist, acc = self.eval_acc(self.valid_dataloader)
  156. # 重置计数器
  157. cnt = 0
  158. # 如果当前进程是 rank 0
  159. if is_rank_0():
  160. # 创建一个 DataFrame 来存储步数、损失、距离和准确率
  161. log = pd.DataFrame([[step_bar.n, loss.item(), dist, acc]],
  162. columns=['Step', 'Loss', 'Distance', 'Accuracy'])
  163. # 将 DataFrame 保存到 CSV 文件中
  164. log.to_csv(str(time) + '.csv', mode='a', header=False)
  165. # 更新进度条
  166. step_bar.update(100)
  167. # 更新进度条
  168. epoch_bar.update(1)
  169. # 如果当前进程是 rank 0
  170. if is_rank_0():
  171. # 打印训练结束的消息
  172. print('Training finished!')
  173. # 计算评估数据加载器上的平均距离和准确率
  174. dist, acc = self.eval_acc(self.eval_dataloader)
  175. # 创建一个 DataFrame 来存储损失、距离和准确率
  176. log = pd.DataFrame([[self.train_dataloader.__len__(), 'N/A', dist, acc]],
  177. columns=['Step', 'Loss', 'Distance', 'Accuracy'])
  178. # 将 DataFrame 保存到 CSV 文件中
  179. log.to_csv(str(time) + '.csv', mode='a', header=False)

3.2.3 通过trainer/ppo.py to start PPO training

最后,通过ColossalAI/applications/Chat/coati/trainer/ppo.py to start PPO training

  1. from typing import Any, Callable, Dict, List, Optional # 导入一些类型别名,用于类型注解
  2. import torch # 导入PyTorch库,这是一个机器学习库,广泛用于深度学习模型的建立和训练
  3. import torch.nn as nn # 导入PyTorch的神经网络模块
  4. from coati.experience_maker import Experience, NaiveExperienceMaker # 导入Experience和NaiveExperienceMaker,前者用于保存Agent的经验,后者用于创建Experience对象
  5. from coati.models.base import Actor, Critic # 导入Actor和Critic,他们是PPO算法中的关键组成部分
  6. from coati.models.generation_utils import update_model_kwargs_fn # 导入函数update_model_kwargs_fn,用于更新模型的参数
  7. from coati.models.loss import PolicyLoss, ValueLoss # 导入PolicyLoss和ValueLoss,分别计算策略损失和价值损失
  8. from coati.replay_buffer import NaiveReplayBuffer # 导入NaiveReplayBuffer,用于保存和回放经验
  9. from torch.optim import Optimizer # 导入Optimizer,这是优化算法的基类
  10. from transformers.tokenization_utils_base import PreTrainedTokenizerBase # 导入预训练的tokenizer基类,用于处理文本数据
  11. from .base import Trainer # 导入Trainer类,这是训练循环的基类
  12. from .callbacks import Callback # 导入Callback类,用于在训练过程中的某些阶段执行特定的函数
  13. from .strategies import Strategy # 导入Strategy类,它定义了模型参数更新的策略
  14. # PPOTrainer类的定义,继承自Trainer
  15. class PPOTrainer(Trainer):
  16. # 类的初始化函数,接受许多参数
  17. def __init__(self,
  18. strategy: Strategy, # 用于更新模型参数的策略
  19. actor: Actor, # 用于选择动作的模型
  20. critic: Critic, # 用于评估动作的模型
  21. reward_model: nn.Module, # 用于计算奖励的模型
  22. initial_model: Actor, # 用于生成初始策略的模型
  23. actor_optim: Optimizer, # 用于优化actor的优化器
  24. critic_optim: Optimizer, # 用于优化critic的优化器
  25. kl_coef: float = 0.1, # kl散度系数
  26. ptx_coef: float = 0.9, # ptx系数
  27. train_batch_size: int = 8, # 训练批大小
  28. buffer_limit: int = 0, # 缓冲区大小限制
  29. buffer_cpu_offload: bool = True, # 是否在cpu上处理缓冲区
  30. eps_clip: float = 0.2, # epsilon剪裁值
  31. value_clip: float = 0.4, # 价值剪裁值
  32. experience_batch_size: int = 8, # 经验批大小
  33. max_epochs: int = 1, # 最大训练周期数
  34. tokenizer: Optional[Callable[[Any], dict]] = None, # 用于文本处理的tokenizer
  35. sample_replay_buffer: bool = False, # 是否从回放缓冲区中抽样
  36. dataloader_pin_memory: bool = True, # 数据加载器是否针对内存
  37. callbacks: List[Callback] = [], # 在训练过程中的某些阶段执行的函数列表
  38. **generate_kwargs) -> None: # 其他生成参数
  39. # 创造经验生成器
  40. experience_maker = NaiveExperienceMaker(actor, critic, reward_model, initial_model, kl_coef)
  41. # 创造经验回放缓冲区
  42. replay_buffer = NaiveReplayBuffer(train_batch_size, buffer_limit, buffer_cpu_offload)
  43. # 根据策略和actor设置默认的生成参数
  44. generate_kwargs = _set_default_generate_kwargs(strategy, generate_kwargs, actor)
  45. # 调用父类的初始化函数
  46. super().__init__(strategy, experience_maker, replay_buffer, experience_batch_size, max_epochs, tokenizer,
  47. sample_replay_buffer, dataloader_pin_memory, callbacks, **generate_kwargs)
  48. # 初始化actor和critic
  49. self.actor = actor
  50. self.critic = critic
  51. # 初始化损失函数
  52. self.actor_loss_fn = PolicyLoss(eps_clip)
  53. self.critic_loss_fn = ValueLoss(value_clip)
  54. self.ptx_loss_fn = nn.CrossEntropyLoss(ignore_index=-100)
  55. self.ptx_coef = ptx_coef
  56. # 初始化优化器
  57. self.actor_optim = actor_optim
  58. self.critic_optim = critic_optim
  59. # 训练步骤函数,根据经验对象计算actor和critic的损失,并使用策略进行反向传播和优化器更新
  60. def training_step(self, experience: Experience) -> Dict[str, float]:
  61. self.actor.train() # 将actor设置为训练模式
  62. self.critic.train() # 将critic设置为训练模式
  63. # 计算策略损失
  64. num_actions = experience.action_mask.size(1)
  65. action_log_probs = self.actor(experience.sequences, num_actions, attention_mask=experience.attention_mask)
  66. actor_loss = self.actor_loss_fn(action_log_probs,
  67. experience.action_log_probs,
  68. experience.advantages,
  69. action_mask=experience.action_mask)
  70. # 计算ptx损失
  71. if self.ptx_coef != 0:
  72. ptx = next(iter(self.pretrain_dataloader))['input_ids'].to(torch.cuda.current_device())
  73. label = next(iter(self.pretrain_dataloader))['labels'].to(torch.cuda.current_device())[:, 1:]
  74. attention_mask = next(iter(self.pretrain_dataloader))['attention_mask'].to(torch.cuda.current_device())
  75. ptx_log_probs = self.actor.get_base_model()(ptx, attention_mask=attention_mask)['logits'][..., :-1, :]
  76. ptx_loss = self.ptx_loss_fn(ptx_log_probs.view(-1, ptx_log_probs.size(-1)), label.view(-1))
  77. actor_loss = ptx_loss * self.ptx_coef + actor_loss * (1 - self.ptx_coef)
  78. self.strategy.backward(actor_loss, self.actor, self.actor_optim) # 使用策略进行反向传播
  79. self.strategy.optimizer_step(self.actor_optim) # 使用策略进行优化器步进
  80. self.actor_optim.zero_grad() # 清零优化器的梯度
  81. # 计算价值损失
  82. values = self.critic(experience.sequences,
  83. action_mask=experience.action_mask,
  84. attention_mask=experience.attention_mask)
  85. critic_loss = self.critic_loss_fn(values,
  86. experience.values,
  87. experience.reward,
  88. action_mask=experience.action_mask)
  89. self.strategy.backward(critic_loss, self.critic, self.critic_optim) # 使用策略进行反向传播
  90. self.strategy.optimizer_step(self.critic_optim) # 使用策略进行优化器步进
  91. self.critic_optim.zero_grad() # 清零优化器的梯度
  92. return {'reward': experience.reward.mean().item()} # 返回平均奖励
  93. # 根据策略和actor设置默认的生成参数
  94. def _set_default_generate_kwargs(strategy: Strategy, generate_kwargs: dict, actor: Actor) -> None:
  95. origin_model = strategy._unwrap_actor(actor)
  96. new_kwargs = {**generate_kwargs}
  97. # 使用huggingface模型的方法直接生成输入
  98. if 'prepare_inputs_fn' not in generate_kwargs and hasattr(origin_model, 'prepare_inputs_for_generation'):
  99. new_kwargs['prepare_inputs_fn'] = origin_model.prepare_inputs_for_generation
  100. if 'update_model_kwargs_fn' not in generate_kwargs:
  101. new_kwargs['update_model_kwargs_fn'] = update_model_kwargs_fn
  102. return new_kwargs
  103. # 保存模型的函数
  104. def save_model(self, path: str, only_rank0: bool = False, tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None:
  105. self.strategy.save_model(model=self.actor, path=path, only_rank0=only_rank0, tokenizer=tokenizer) # 使用策略保存模型

在获得最终模型权重后,还可通过量化降低推理硬件成本,并启动在线推理服务,仅需单张约 4GB 显存的 GPU 即可完成 70 亿参数模型推理服务部署


更多请参见另一篇文章:从零实现带RLHF的类ChatGPT:逐行解析微软DeepSpeed Chat

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/81403
推荐阅读
相关标签
  

闽ICP备14008679号