当前位置:   article > 正文

LlamaFactory源码解析 PPO

LlamaFactory源码解析 PPO

  1. class CustomPPOTrainer(PPOTrainer, Trainer):
  2. r"""
  3. Inherits PPOTrainer.
  4. """
  5. def __init__(
  6. self,
  7. model_args: "ModelArguments",
  8. training_args: "Seq2SeqTrainingArguments",
  9. finetuning_args: "FinetuningArguments",
  10. generating_args: "GeneratingArguments",
  11. callbacks: List["TrainerCallback"],
  12. model: "AutoModelForCausalLMWithValueHead",
  13. reward_model: Optional["AutoModelForCausalLMWithValueHead"],
  14. ref_model: Optional["AutoModelForCausalLMWithValueHead"],
  15. tokenizer: "PreTrainedTokenizer",
  16. dataset: "Dataset",
  17. data_collator: "DataCollatorWithPadding",
  18. ):
  19. backward_batch_size = training_args.per_device_train_batch_size * training_args.gradient_accumulation_steps
  20. ppo_config = PPOConfig(
  21. model_name=model_args.model_name_or_path,
  22. learning_rate=training_args.learning_rate,
  23. mini_batch_size=training_args.per_device_train_batch_size,
  24. batch_size=backward_batch_size * finetuning_args.ppo_buffer_size,
  25. gradient_accumulation_steps=training_args.gradient_accumulation_steps,
  26. ppo_epochs=finetuning_args.ppo_epochs,
  27. max_grad_norm=training_args.max_grad_norm,
  28. seed=training_args.seed,
  29. optimize_device_cache=True,
  30. target=finetuning_args.ppo_target,
  31. use_score_scaling=finetuning_args.ppo_score_norm,
  32. use_score_norm=finetuning_args.ppo_score_norm,
  33. whiten_rewards=finetuning_args.ppo_whiten_rewards,
  34. accelerator_kwargs={"step_scheduler_with_optimizer": False},
  35. log_with=training_args.report_to[0] if training_args.report_to else None,
  36. project_kwargs={"logging_dir": training_args.logging_dir},
  37. )
  38. # Create optimizer and scheduler
  39. if training_args.max_steps > 0:
  40. num_training_steps = training_args.max_steps
  41. else:
  42. total_train_batch_size = backward_batch_size * finetuning_args.ppo_buffer_size * training_args.world_size
  43. num_training_steps = training_args.num_train_epochs * math.ceil(len(dataset) / total_train_batch_size)
  44. optimizer = self.create_optimizer(model, training_args, finetuning_args)
  45. scheduler = self.create_scheduler(training_args, num_training_steps, optimizer)
  46. PPOTrainer.__init__(
  47. self,
  48. config=ppo_config,
  49. model=model,
  50. ref_model=ref_model,
  51. tokenizer=tokenizer,
  52. dataset=dataset,
  53. data_collator=data_collator,
  54. lr_scheduler=scheduler,
  55. )
  56. self.args = training_args
  57. self.model_args = model_args
  58. self.finetuning_args = finetuning_args
  59. self.reward_model = reward_model
  60. self.current_device = get_current_device() # patch for deepspeed training
  61. self.generation_config = GenerationConfig(
  62. pad_token_id=self.tokenizer.pad_token_id,
  63. eos_token_id=[self.tokenizer.eos_token_id] + self.tokenizer.additional_special_tokens_ids,
  64. **generating_args.to_dict(),
  65. )
  66. self.state = TrainerState()
  67. self.control = TrainerControl()
  68. self.is_deepspeed_enabled = self.accelerator.distributed_type == "DEEPSPEED" and hasattr(
  69. self.accelerator.state, "deepspeed_plugin"
  70. )
  71. self.log_callback, self.save_callback = callbacks[0], callbacks[1]
  72. assert isinstance(self.log_callback, LogCallback) and isinstance(self.save_callback, FixValueHeadModelCallback)
  73. if self.args.max_steps > 0:
  74. logger.info("max_steps is given, it will override any value given in num_train_epochs")
  75. if finetuning_args.reward_model_type == "full":
  76. if self.is_deepspeed_enabled:
  77. if not (
  78. getattr(reward_model.pretrained_model, "is_loaded_in_8bit", False)
  79. or getattr(reward_model.pretrained_model, "is_loaded_in_4bit", False)
  80. ): # quantized models are already set on the correct device
  81. self.reward_model = self._prepare_deepspeed(self.reward_model)
  82. else:
  83. self.reward_model = self.accelerator.prepare_model(self.reward_model, evaluation_mode=True)
  84. if finetuning_args.use_badam:
  85. from badam import clip_grad_norm_for_sparse_tensor
  86. self.accelerator.clip_grad_norm_ = MethodType(clip_grad_norm_for_sparse_tensor, self.accelerator)

下面是对代码中每一行的逐行解释:

python

复制

  1. class CustomPPOTrainer(PPOTrainer, Trainer):

定义一个名为 CustomPPOTrainer 的类,继承自 PPOTrainer 和 Trainer 这两个类。

python

复制

  1. r"""
  2. Inherits PPOTrainer.
  3. """

文档字符串,说明这个类继承自 PPOTrainer

python

复制

  1. def __init__(
  2. self,

定义 CustomPPOTrainer 类的构造函数

python

复制

        model_args: "ModelArguments",

构造函数的参数之一,model_args,代表模型的参数。

python

复制

        training_args: "Seq2SeqTrainingArguments",

构造函数的参数之一,training_args,代表训练相关的参数。

        finetuning_args: "FinetuningArguments",

构造函数的参数之一,finetuning_args,代表微调相关的参数。

python

复制

        generating_args: "GeneratingArguments",

构造函数的参数之一,generating_args,代表生成相关的参数。

python

复制

        callbacks: List["TrainerCallback"],

构造函数的参数之一,callbacks,是一个包含 TrainerCallback 对象的列表,用于处理训练过程中的各种事件。

python

复制

        model: "AutoModelForCausalLMWithValueHead",

构造函数的参数之一,model,代表带有价值头的因果语言模型

python

复制

        reward_model: Optional["AutoModelForCausalLMWithValueHead"],

构造函数的参数之一,reward_model,可选,代表用于计算奖励的模型

python

复制

        ref_model: Optional["AutoModelForCausalLMWithValueHead"],

构造函数的参数之一,ref_model,可选,代表参考模型,可能用于对比或其他目的

python

复制

        tokenizer: "PreTrainedTokenizer",

构造函数的参数之一,tokenizer,代表预训练的分词器

python

复制

        dataset: "Dataset",

构造函数的参数之一,dataset,代表用于训练的数据集。

python

复制

        data_collator: "DataCollatorWithPadding",

构造函数的参数之一,data_collator,用于处理数据批次,通常包括填充等操作

python

复制

        backward_batch_size = training_args.per_device_train_batch_size * training_args.gradient_accumulation_steps

计算每个设备上的反向传播批量大小,乘以梯度累积步数。

python

复制

  1. ppo_config = PPOConfig(

创建一个 PPOConfig 配置对象,用于配置 PPO 训练。

python

复制

  1. model_name=model_args.model_name_or_path,

配置模型名称或路径。

python

复制

  1. learning_rate=training_args.learning_rate,

配置学习率。

python

复制

            mini_batch_size=training_args.per_device_train_batch_size,

配置小批量大小。

python

复制

            batch_size=backward_batch_size * finetuning_args.ppo_buffer_size,

配置整体批量大小,为反向批量大小乘以 PPO 缓冲区大小。

python

复制

  1. gradient_accumulation_steps=training_args.gradient_accumulation_steps,

配置梯度累积步数。

python

复制

  1. ppo_epochs=finetuning_args.ppo_epochs,

配置 PPO 训练的迭代次数

python

复制

  1. max_grad_norm=training_args.max_grad_norm,

配置梯度裁剪的最大范数。

python

复制

  1. seed=training_args.seed,

配置随机种子。

python

复制

            optimize_device_cache=True,

启用设备缓存优化。

python

复制

  1. target=finetuning_args.ppo_target,

配置 PPO 训练的目标。

python

复制

            use_score_scaling=finetuning_args.ppo_score_norm,

配置是否使用分数缩放。

python

复制

  1. use_score_norm=finetuning_args.ppo_score_norm,
  2. ```python
  3. whiten_rewards=finetuning_args.ppo_whiten_rewards,

配置是否对奖励进行白化处理,以减少奖励的方差和偏差。

python

复制

            accelerator_kwargs={"step_scheduler_with_optimizer": False},

配置加速器的参数,此处设置 step_scheduler_with_optimizer 为 False

python

复制

            log_with=training_args.report_to[0] if training_args.report_to else None,

配置日志记录方式,如果 training_args.report_to 存在,则使用列表中的第一个元素,否则不记录。

python

复制

            project_kwargs={"logging_dir": training_args.logging_dir},

设置项目相关的关键参数,此处设置日志目录。

python

复制

  1. )

结束 PPOConfig 的构造。

python

复制

        # Create optimizer and scheduler

注释,说明下面的代码块是用来创建优化器和调度器的。

python

复制

        if training_args.max_steps > 0:

如果 training_args.max_steps 大于0,使用这个值来设置训练步数。

python

复制

  1. num_training_steps = training_args.max_steps

将训练步数设置为 max_steps

python

复制

        else:

如果 max_steps 没有设置,则计算训练步数。

python

复制

            total_train_batch_size = backward_batch_size * finetuning_args.ppo_buffer_size * training_args.world_size

计算总的训练批量大小。

python

复制

            num_training_steps = training_args.num_train_epochs * math.ceil(len(dataset) / total_train_batch_size)

根据数据集的大小、批量大小和训练周期数计算总的训练步数

python

复制

        optimizer = self.create_optimizer(model, training_args, finetuning_args)

创建优化器。

python

复制

        scheduler = self.create_scheduler(training_args, num_training_steps, optimizer)

创建调度器,用来调整优化器的学习率。

python

复制

  1. PPOTrainer.__init__(

调用 PPOTrainer 类的构造器初始化。

python

复制

            self,

传递自身实例。

python

复制

  1. config=ppo_config,

传递 PPO 配置。

python

复制

  1. model=model,

传递模型。

python

复制

            ref_model=ref_model,

传递参考模型。

python

复制

  1. tokenizer=tokenizer,

传递分词器。

python

复制

  1. dataset=dataset,

传递数据集。

python

复制

            data_collator=data_collator,

传递数据整理器。

python

复制

  1. lr_scheduler=scheduler,

传递学习率调度器。

python

复制

  1. )

结束 PPOTrainer 的构造函数调用。

python

复制

        self.args = training_args

将训练参数保存到实例变量 args

python

复制

        self.model_args = model_args

将模型参数保存到实例变量 model_args

python

复制

        self.finetuning_args = finetuning_args

将微调参数保存到实例变量 finetuning_args

python

复制

        self.reward_model = reward_model

将奖励模型保存到实例变量 reward_model

python

复制

        self.current_device = get_current_device()  # patch for deepspeed training

获取当前设备,用于支持 DeepSpeed 训练。

python

复制

        self.generation_config = GenerationConfig(

创建生成配置。

python

复制

            pad_token_id=self.tokenizer.pad_token_id,

设置填充符号的 ID。

python

复制

            eos_token_id=[self.tokenizer.eos_token_id] + self.tokenizer.additional_special_tokens_ids,

设置结束符号的 ID 列表,包括 EOS 和其他特殊符号。

python

复制

            **generating_args.to_dict(),

将生成参数转换为字典,并展开到配置中。

python

复制

  1. )

结束生成配置的创建。

python

复制

        self.state = TrainerState()

创建训练状态对象。

python

复制

        self.control = TrainerControl()

创建训练控制对象。

很高兴你对上述 CustomPPOTrainer 类的详细解释感兴趣。下面,我将提供更多关于这个类的功能、用途以及它在机器学习和自然语言处理领域中的潜在应用的信息。

功能和用途

CustomPPOTrainer 类是为了在自然语言处理(NLP)领域中使用强化学习技术而设计的。通过继承 PPOTrainer,这个类可以利用近端策略优化(Proximal Policy Optimization, PPO)算法来优化模型。PPO 是一种在策略梯度方法中常用的强化学习算法,因为它在样本效率和易用性之间提供了一个很好的平衡。

此类的主要功能包括:

  1. 模型微调:使用PPO算法对特定任务进行微调。这在NLP中非常有用,尤其是在没有大量标注数据的情况下,可以通过奖励信号来引导模型学习。
  2. 奖励模型的集成:支持使用独立的奖励模型来定义和计算训练期间的奖励,这对于自动化文本生成任务中的目标优化至关重要。
  3. 高度可配置:提供了广泛的配置选项,使研究人员能够在不同的实验设置和任务需求下调整模型的行为。
  4. 生成控制:通过 generation_config 支持高级生成控制,如指定特殊符号和调整生成参数
  5. 支持多设备训练:支持使用像DeepSpeed这样的加速库来进行分布式训练,优化训练速度和资源使用。

潜在应用

在NLP领域,CustomPPOTrainer 可以应用于多种场景,例如:

  • 对话系统:通过PPO优化对话代理的回应,以提高对话质量和用户满意度。
  • 内容生成:在内容生成任务中,如新闻文章、故事或诗歌生成,使用PPO训练可以帮助模型学习生成更符合特定风格或质量标准的文本。
  • 游戏与模拟:在互动式模拟或游戏环境中,PPO可以训练模型生成自然语言描述或指令,增强用户体验。
  • 教育和培训:自动化生成教学材料或进行学术研究辅助,如自动生成问题、解决方案或解释性文本。

总结

通过 CustomPPOTrainer,研究人员和开发者可以更有效地在各种NLP任务中实现和测试PPO和其他强化学习技术的应用。这个类提供了一个灵活的框架,不仅支持标准的模型训练和微调,还允许通过强化学习方法来优化生成的文本,使其更具有目标导向和应用适应性。

请仔细解释ppo近段策略优化算法,以及在大模型微调中的具体应用步骤。

Poe

近端策略优化 (PPO) 算法详解

近端策略优化(Proximal Policy Optimization, PPO) 是一种流行的强化学习算法,由 OpenAI 提出。它主要用于解决策略梯度方法中的效率和稳定性问题,特别适用于处理高维和连续动作空间的任务。PPO 的核心思想是在进行策略更新时,限制更新步骤的大小,从而避免过大的策略更新造成的性能下降。

PPO 的关键特点:
  1. 目标函数:PPO 使用一种特殊的目标函数,它通过引入裁剪概率比率来避免太大的策略更新。这个比率是新策略与旧策略之间概率的比值,当这个比值超出预设的阈值时,目标函数会将其裁剪,以维持更新的稳定性。

  2. 两种主要变体

    • PPO-Clip:使用裁剪的概率比例来限制策略更新的幅度。
    • PPO-Penalty:通过增加一个关于概率比率的约束项来调整目标函数。
  3. 多次更新:在每个数据批次上执行多次梯度更新,从而提高数据的利用效率。
    • 在大模型微调中的应用步骤:

      PPO 可以用于大型模型(如 Transformer 模型)的微调,特别是在涉及序列决策任务,如文本生成或游戏中。以下是使用 PPO 微调大模型的具体步骤:

    • 环境与模型的设置

      • 定义或选择一个环境,该环境应能生成与任务相关的状态,并能接受模型输出的动作来返回奖励。
      • 准备或预训练一个大型神经网络模型,如 GPT 或 BERT,作为策略网络。
    • 收集数据

      • 使用当前策略(模型)与环境交互,收集一系列状态、动作、奖励和新状态。
    • 优势估计

      • 计算优势函数,通常使用 Generalized Advantage Estimation (GAE)。优势函数帮助算法评估某一动作相比平均水平好到什么程度。
    • 目标函数的构建与优化

      • 使用收集到的数据,构建 PPO 的目标函数。这包括计算旧策略和新策略的比率,并应用裁剪。
      • 对策略网络进行梯度上升,以最大化目标函数。
    • 多次迭代

      • 重复数据收集和策略更新过程多次,每次迭代都根据新的策略与环境交互,继续优化模型。
    • 评估与调整

      • 定期评估策略的性能,必要时调整算法参数,如裁剪阈值、优势估计的折扣因子等。
    • 集成与部署

      • 微调完成后,集成并部署模型到实际应用中。
    • PPO 在处理需要长期序列决策和高维动作空间的任务时表现出色,因此它在自然语言处理和复杂策略游戏等领域的应用尤为有效。通过适当的环境设计和奖励结构,PPO 可以帮助大型模型学习如何在复杂环境中做出更优的决策,从而提升整体性能和应用的实用性。

  1. def ppo_train(self, resume_from_checkpoint: Optional[str] = None) -> None:
  2. r"""
  3. Implements training loop for the PPO stage, like _inner_training_loop() in Huggingface's Trainer.
  4. """
  5. if resume_from_checkpoint is not None:
  6. raise ValueError("`resume_from_checkpoint` will be supported in the future version.")
  7. total_train_batch_size = (
  8. self.args.per_device_train_batch_size
  9. * self.args.gradient_accumulation_steps
  10. * self.finetuning_args.ppo_buffer_size
  11. * self.args.world_size
  12. )
  13. if self.args.max_steps > 0:
  14. num_examples = total_train_batch_size * self.args.max_steps
  15. num_train_epochs = sys.maxsize
  16. max_steps = self.args.max_steps
  17. steps_in_epoch = self.args.max_steps
  18. else:
  19. len_dataloader = len(self.dataloader)
  20. num_examples = len(self.dataset)
  21. num_train_epochs = self.args.num_train_epochs
  22. max_steps = math.ceil(num_train_epochs * len_dataloader)
  23. steps_in_epoch = len_dataloader
  24. self.state.max_steps = max_steps
  25. self.state.num_train_epochs = num_train_epochs
  26. self.state.is_local_process_zero = self.is_local_process_zero()
  27. self.state.is_world_process_zero = self.is_world_process_zero()
  28. if self.is_world_process_zero():
  29. logger.info("***** Running training *****")
  30. logger.info(" Num examples = {}".format(num_examples))
  31. logger.info(" Num Epochs = {}".format(num_train_epochs))
  32. logger.info(" Instantaneous batch size per device = {}".format(self.args.per_device_train_batch_size))
  33. logger.info(
  34. " Total train batch size (w. parallel, buffer, distributed & accumulation) = {}".format(
  35. total_train_batch_size
  36. )
  37. )
  38. logger.info(" Gradient Accumulation steps = {}".format(self.args.gradient_accumulation_steps))
  39. logger.info(" Num optimization epochs per batch = {}".format(self.finetuning_args.ppo_epochs))
  40. logger.info(" Total training steps = {}".format(max_steps))
  41. logger.info(" Number of trainable parameters = {}".format(count_parameters(self.model)[0]))
  42. unwrapped_model: "AutoModelForCausalLMWithValueHead" = self.accelerator.unwrap_model(self.model)
  43. dataiter = iter(self.dataloader)
  44. loss_meter = AverageMeter()
  45. reward_meter = AverageMeter()
  46. self.log_callback.on_train_begin(self.args, self.state, self.control)
  47. for step in tqdm(range(max_steps), disable=not self.is_local_process_zero()):
  48. try:
  49. batch = next(dataiter)
  50. except StopIteration:
  51. dataiter = iter(self.dataloader)
  52. batch = next(dataiter)
  53. # Cast to inference mode
  54. unwrapped_model.gradient_checkpointing_disable()
  55. unwrapped_model.config.use_cache = True
  56. self.model.eval()
  57. # Get inputs
  58. self.tokenizer.padding_side = "right" # change padding side
  59. queries, responses, rewards = [], [], []
  60. for idx in range(0, self.config.batch_size, self.config.mini_batch_size):
  61. mini_batch_queries, mini_batch_responses = self.get_inputs(
  62. batch[idx : idx + self.config.mini_batch_size]
  63. )
  64. mini_batch_rewards = self.get_rewards(mini_batch_queries, mini_batch_responses, unwrapped_model)
  65. queries.extend(mini_batch_queries)
  66. responses.extend(mini_batch_responses)
  67. rewards.extend(mini_batch_rewards)
  68. # Cast to training mode
  69. unwrapped_model.gradient_checkpointing_enable()
  70. unwrapped_model.config.use_cache = False
  71. self.model.train()
  72. # Run PPO step
  73. stats = self.step(queries, responses, rewards)
  74. self.tokenizer.padding_side = "left" # restore padding side
  75. loss_meter.update(float(stats["ppo/loss/total"]), n=len(rewards))
  76. reward_meter.update(torch.stack(rewards).mean().item(), n=len(rewards))
  77. if self.config.log_with is not None:
  78. try:
  79. batch["query"] = self.tokenizer.batch_decode(queries, skip_special_tokens=True)
  80. batch["response"] = self.tokenizer.batch_decode(responses, skip_special_tokens=True)
  81. self.log_stats(stats, batch, rewards)
  82. except Exception:
  83. logger.warning("Failed to save stats due to unknown errors.")
  84. self.state.global_step += 1
  85. self.log_callback.on_step_end(self.args, self.state, self.control)
  86. if self.is_local_process_zero() and (step + 1) % self.args.logging_steps == 0:
  87. logs = dict(
  88. loss=round(loss_meter.avg, 4),
  89. reward=round(reward_meter.avg, 4),
  90. learning_rate=stats["ppo/learning_rate"],
  91. epoch=round(step / steps_in_epoch, 2),
  92. )
  93. tqdm.write(str(logs))
  94. logs["step"] = step
  95. self.state.log_history.append(logs)
  96. self.log_callback.on_log(self.args, self.state, self.control)
  97. loss_meter.reset()
  98. reward_meter.reset()
  99. if (step + 1) % self.args.save_steps == 0: # save checkpoint
  100. self.save_model(
  101. os.path.join(self.args.output_dir, "{}-{}".format(PREFIX_CHECKPOINT_DIR, self.state.global_step))
  102. )
  103. self.save_callback.on_save(
  104. self.args, self.state, self.control, model=self.accelerator.unwrap_model(self.model)
  105. )
  106. if self.control.should_epoch_stop or self.control.should_training_stop:
  107. break
  108. self.log_callback.on_train_end(self.args, self.state, self.control)
  109. self.save_callback.on_train_end(
  110. self.args, self.state, self.control, model=self.accelerator.unwrap_model(self.model)
  111. )

这段代码是一个较为复杂的 PPO 训练循环实现,涵盖了从模型初始化到循环训练,以及日志记录和模型保存等功能。我们将逐行解释其核心内容:

python

复制

def ppo_train(self, resume_from_checkpoint: Optional[str] = None) -> None:
  • 定义了 ppo_train 方法,该方法接受一个可选的参数 resume_from_checkpoint,用于未来版本支持从检查点恢复训练。

python

复制

  1. if resume_from_checkpoint is not None:
  2. raise ValueError("`resume_from_checkpoint` will be supported in the future version.")
  • 如果传入 resume_from_checkpoint 参数,当前版本会抛出异常,提示该功能将在未来版本中支持。

python

复制

  1. total_train_batch_size = (
  2. self.args.per_device_train_batch_size
  3. * self.args.gradient_accumulation_steps
  4. * self.finetuning_args.ppo_buffer_size
  5. * self.args.world_size
  6. )
  • 计算总训练批次大小,考虑了每设备批次大小、梯度累积步数、PPO缓冲区大小和分布式训练中的世界大小(即处理器数量)。

python

复制

  1. if self.args.max_steps > 0:
  2. num_examples = total_train_batch_size * self.args.max_steps
  3. num_train_epochs = sys.maxsize
  4. max_steps = self.args.max_steps
  5. steps_in_epoch = self.args.max_steps
  6. else:
  7. len_dataloader = len(self.dataloader)
  8. num_examples = len(self.dataset)
  9. num_train_epochs = self.args.num_train_epochs
  10. max_steps = math.ceil(num_train_epochs * len_dataloader)
  11. steps_in_epoch = len_dataloader
  • 根据是否指定了最大步数 (max_steps) 来决定训练的迭代方式和周期。如果指定了最大步数,则使用最大步数来计算总例数和训练周期,否则根据数据加载器和数据集的大小计算。

python

复制

  1. self.state.max_steps = max_steps
  2. self.state.num_train_epochs = num_train_epochs
  3. self.state.is_local_process_zero = self.is_local_process_zero()
  4. self.state.is_world_process_zero = self.is_world_process_zero()
  • 初始化训练状态,包括最大步数、训练周期数以及是否是本地或全局的主进程(用于控制日志和模型保存等操作)。

python

复制

  1. if self.is_world_process_zero():
  2. ...
  • 如果当前进程是全局主进程,则记录训练的相关信息,包括例数、周期数、批次大小、梯度累积步数等。

python

复制

  1. unwrapped_model: "AutoModelForCausalLMWithValueHead" = self.accelerator.unwrap_model(self.model)
  2. dataiter = iter(self.dataloader)
  3. loss_meter = AverageMeter()
  4. reward_meter = AverageMeter()
  5. self.log_callback.on_train_begin(self.args, self.state, self.control)
  • 解包模型以便直接操作,初始化数据加载器的迭代器,创建损失和奖励的度量工具,并触发训练开始的回调。

python

复制

  1. for step in tqdm(range(max_steps), disable=not self.is_local_process_zero()):
  2. ...
  • 主训练循环,使用 tqdm 来展示进度条。循环将根据最大步数执行,并在本地主进程中显示进度。

接下来的代码块涉及处理数据批次、执行模型的推理和训练、记录统计数据、处理日志和保存模型的功能。由于篇幅限制,这里不对每一行代码进行详细解释,但大致流程包括:

  • 从数据加载器中获取批次数据
  • 将模型置于推理模式,处理输入数据,获取奖励。
  • 切换到训练模式,执行 PPO 训练步骤。
  • 更新损失和奖励的度量,记录日志和统计信息。
  • 根据设定的间隔保存模型和执行回调。

这个过程循环,直到达到最大步数或满足特定的停止条件。整个训练循环详细地处理了模型的运行状态、性能监控以及与分布式训练相关的特定操作。

  1. def create_optimizer(
  2. self,
  3. model: "AutoModelForCausalLMWithValueHead",
  4. training_args: "Seq2SeqTrainingArguments",
  5. finetuning_args: "FinetuningArguments",
  6. ) -> "torch.optim.Optimizer":
  7. optimizer = create_custom_optimzer(model, training_args, finetuning_args)
  8. if optimizer is None:
  9. decay_params, nodecay_params = [], []
  10. decay_param_names = self.get_decay_parameter_names(model)
  11. for name, param in model.named_parameters():
  12. if param.requires_grad:
  13. if name in decay_param_names:
  14. decay_params.append(param)
  15. else:
  16. nodecay_params.append(param)
  17. optim_class, optim_kwargs = Trainer.get_optimizer_cls_and_kwargs(training_args)
  18. param_groups = [
  19. dict(params=nodecay_params),
  20. dict(params=decay_params, weight_decay=training_args.weight_decay),
  21. ]
  22. optimizer = optim_class(param_groups, **optim_kwargs)
  23. return optimizer
  24. def create_scheduler(
  25. self, training_args: "Seq2SeqTrainingArguments", num_training_steps: int, optimizer: "torch.optim.Optimizer"
  26. ) -> "torch.optim.lr_scheduler.LRScheduler":
  27. create_custom_scheduler(training_args, num_training_steps, optimizer)
  28. lr_scheduler = get_scheduler(
  29. training_args.lr_scheduler_type,
  30. optimizer=optimizer,
  31. num_warmup_steps=training_args.get_warmup_steps(num_training_steps),
  32. num_training_steps=num_training_steps,
  33. )
  34. return lr_scheduler
  35. @torch.no_grad()
  36. def get_inputs(self, batch: Dict[str, torch.Tensor]) -> Tuple[List[torch.Tensor], List[torch.Tensor]]:
  37. r"""
  38. Generates model's responses given queries.
  39. """
  40. if self.model_args.upcast_layernorm:
  41. layernorm_params = dump_layernorm(self.model)
  42. if batch["input_ids"].size(0) == 1: # handle llama2 ppo with gradient accumulation > 1
  43. start_index = (batch["input_ids"][0] != self.tokenizer.pad_token_id).nonzero()[0].item()
  44. for k, v in batch.items():
  45. batch[k] = v[:, start_index:]
  46. unwrapped_model: "AutoModelForCausalLMWithValueHead" = self.accelerator.unwrap_model(self.model)
  47. generate_output: torch.Tensor = unwrapped_model.generate(
  48. generation_config=self.generation_config, logits_processor=get_logits_processor(), **batch
  49. )
  50. if self.model_args.upcast_layernorm:
  51. restore_layernorm(self.model, layernorm_params)
  52. query = batch["input_ids"].detach().cpu()
  53. response = generate_output[:, batch["input_ids"].size(-1) :].detach().cpu()
  54. queries, responses = [], []
  55. for i in range(len(query)):
  56. query_start_index = (query[i] != self.tokenizer.pad_token_id).nonzero()[0].item()
  57. response_index = (response[i] != self.tokenizer.pad_token_id).nonzero()
  58. if len(response_index) == 0:
  59. response_length = 1 # allow empty response
  60. else:
  61. response_length = response_index[-1].item() + 1
  62. queries.append(query[i, query_start_index:]) # remove padding from left
  63. responses.append(response[i, :response_length]) # remove padding from right
  64. return queries, responses
  65. @torch.no_grad()
  66. def get_rewards(
  67. self,
  68. queries: List[torch.Tensor],
  69. responses: List[torch.Tensor],
  70. unwrapped_model: "AutoModelForCausalLMWithValueHead",
  71. ) -> List[torch.Tensor]:
  72. r"""
  73. Computes scores using given reward model.
  74. Both inputs and outputs are put on CPU.
  75. """
  76. if self.finetuning_args.reward_model_type == "api":
  77. token_ids = [torch.cat((q, r), dim=-1).tolist() for q, r in zip(queries, responses)]
  78. messages = self.tokenizer.batch_decode(token_ids, skip_special_tokens=True)
  79. return get_rewards_from_server(self.reward_model, messages)
  80. if self.finetuning_args.reward_model_type == "lora":
  81. replace_model(unwrapped_model, target="reward")
  82. reward_model = self.model
  83. else:
  84. reward_model = self.reward_model
  85. batch = self.prepare_model_inputs(queries, responses)
  86. with torch.cuda.amp.autocast(dtype=self.model_args.compute_dtype): # support bf16
  87. _, _, values = reward_model(**batch, output_hidden_states=True, return_dict=True, use_cache=False)
  88. if getattr(unwrapped_model.config, "model_type", None) == "chatglm": # assume same architecture
  89. values = torch.transpose(values, 0, 1)
  90. rewards = []
  91. for i in range(values.size(0)):
  92. end_indexes = (batch["input_ids"][i] != self.tokenizer.pad_token_id).nonzero()
  93. end_index = end_indexes[-1].item() if len(end_indexes) else 0
  94. rewards.append(values[i, end_index].float().detach().cpu()) # use fp32 type
  95. if self.finetuning_args.reward_model_type == "lora":
  96. replace_model(unwrapped_model, target="default")
  97. return rewards

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/608079
推荐阅读
相关标签
  

闽ICP备14008679号