赞
踩
PPO 算法是一个 on-policy 的算法
。123
PPO算法是一种基于策略的强化学习算法,它可以处理连续动作空间的问题。PPO算法是一种在线算法,也就是说它需要用当前的策略产生数据,并用这些数据更新策略。PPO算法不能直接使用经验回放,因为经验回放中的数据可能是由不同的策略产生的,这会导致策略梯度的偏差12。
但是,PPO算法可以使用一种技术叫做重要性采样(importance sampling),来利用之前的数据进行多步更新23。重要性采样的思想是给每个数据加上一个权重,表示目标策略和行为策略的比例24。这样,PPO算法可以在一定程度上提高数据的利用效率,而不影响策略梯度的正确性23。
PPO 算法的原理是在每一步更新策略时,尽量减小代价函数,同时保证新策略和旧策略的差异不要太大。为了做到这一点,PPO 算法使用了一个特殊的目标函数,它包含了一个截断的比率因子,用来限制新策略和旧策略的比例。1
参考链接点击
重要性采样是一种调整数据权重的方法,它可以用于在线算法,也可以用于离线算法。PPO算法使用了重要性采样,但它并没有使用经验回放中的随机抽样和淘汰机制,而是按照时间顺序使用数据,并在一定次数后丢弃数据 。
PPO 算法只使用当前策略产生的经验来更新网络,而不使用历史策略产生的经验。这意味着 PPO 算法需要在每次更新策略后丢弃之前收集的经验,因为它们不再适用于新的策略。这样做的好处是 PPO 算法可以保证策略和值函数之间的一致性,也就是说,值函数可以很好地估计当前策略的性能。2
虽然 PPO 算法可以使用经验回放缓冲区来存储和重用历史经验,但这并不改变它是一个 on-policy 的算法。因为 PPO 算法在使用缓冲区中的数据时,仍然需要计算新策略和旧策略的比例,并且使用截断的比率因子来限制更新幅度。这样做相当于对 off-policy 的数据进行了校正,使得它们更接近 on-policy 的数据。4
如果 PPO 算法使用经验回放,那么它需要对 off-policy 的数据进行一些校正,以减小偏差和方差。一种常用的校正方法是使用重要性采样权重 (ISW),它可以衡量数据的分布和当前策略的分布之间的差异,并对损失函数或者优势函数进行加权。12
另一种校正方法是使用优先级经验回放 (PER),它可以根据数据的价值或者优势来给数据分配优先级,并按照优先级来采样数据。这样可以使得更有价值或者更有优势的数据被更频繁地采样,从而提高学习效率。2
# 定义折扣因子和平滑因子 gamma = 0.99 lambda = 0.95 # 初始化优势函数估计和回报估计为空列表 advantages = [] returns = [] # 初始化时间差分误差为0 delta = 0 # 从最后一个时间步开始反向遍历经验 for state, action, reward, next_state, done in reversed(experiences): # 如果是终止状态,那么下一个状态的值为0,否则用值函数网络预测 if done: next_value = 0 else: next_value = value_network.predict(next_state) # 计算当前状态的值 value = value_network.predict(state) # 计算时间差分误差 delta = reward + gamma * next_value - value # 计算优势函数估计,使用时间差分误差和上一个时间步的优势函数估计 advantage = delta + gamma * lambda * advantage # 计算回报估计,使用奖励和下一个状态的值 return = reward + gamma * next_value # 将优势函数估计和回报估计插入到列表的开头 advantages.insert(0, advantage) returns.insert(0, return) # 将优势函数估计和回报估计转换为张量 advantages = torch.tensor(advantages) returns = torch.tensor(returns)
注意一下几点:
''' @Author :Yan JP @Created on Date:2023/4/19 17:31 ''' # https://blog.csdn.net/dgvv4/article/details/129496576?spm=1001.2101.3001.6650.3&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EYuanLiJiHua%7EPosition-3-129496576-blog-117329002.235%5Ev29%5Epc_relevant_default_base3&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EYuanLiJiHua%7EPosition-3-129496576-blog-117329002.235%5Ev29%5Epc_relevant_default_base3&utm_relevant_index=6 # 代码用于离散环境的模型 import numpy as np import torch from torch import nn from torch.nn import functional as F # ----------------------------------- # # 构建策略网络--actor # ----------------------------------- # class PolicyNet(nn.Module): def __init__(self, n_states, n_hiddens, n_actions): super(PolicyNet, self).__init__() self.fc1 = nn.Linear(n_states, n_hiddens) self.fc2 = nn.Linear(n_hiddens, n_actions) def forward(self, x): x = self.fc1(x) # [b,n_states]-->[b,n_hiddens] x = F.relu(x) x = self.fc2(x) # [b, n_actions] x = F.softmax(x, dim=1) # [b, n_actions] 计算每个动作的概率 return x # ----------------------------------- # # 构建价值网络--critic # ----------------------------------- # class ValueNet(nn.Module): def __init__(self, n_states, n_hiddens): super(ValueNet, self).__init__() self.fc1 = nn.Linear(n_states, n_hiddens) self.fc2 = nn.Linear(n_hiddens, 1) def forward(self, x): x = self.fc1(x) # [b,n_states]-->[b,n_hiddens] x = F.relu(x) x = self.fc2(x) # [b,n_hiddens]-->[b,1] 评价当前的状态价值state_value return x # ----------------------------------- # # 构建模型 # ----------------------------------- # class PPO: def __init__(self, n_states, n_hiddens, n_actions, actor_lr, critic_lr, lmbda, epochs, eps, gamma, device): # 实例化策略网络 self.actor = PolicyNet(n_states, n_hiddens, n_actions).to(device) # 实例化价值网络 self.critic = ValueNet(n_states, n_hiddens).to(device) # 策略网络的优化器 self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr) # 价值网络的优化器 self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr) self.gamma = gamma # 折扣因子 self.lmbda = lmbda # GAE优势函数的缩放系数 self.epochs = epochs # 一条序列的数据用来训练轮数 self.eps = eps # PPO中截断范围的参数 self.device = device # 动作选择 def take_action(self, state): # 维度变换 [n_state]-->tensor[1,n_states] state = torch.tensor(state[np.newaxis, :]).to(self.device) # 当前状态下,每个动作的概率分布 [1,n_states] probs = self.actor(state) # 创建以probs为标准的概率分布 action_list = torch.distributions.Categorical(probs) # 依据其概率随机挑选一个动作 action = action_list.sample().item() return action # 训练 def learn(self, transition_dict): # 提取数据集 states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device) actions = torch.tensor(transition_dict['actions']).to(self.device).view(-1, 1) rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).to(self.device).view(-1, 1) next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device) dones = torch.tensor(transition_dict['dones'], dtype=torch.float).to(self.device).view(-1, 1) # 目标,下一个状态的state_value [b,1] next_q_target = self.critic(next_states) # 目标,当前状态的state_value [b,1] td_target = rewards + self.gamma * next_q_target * (1 - dones) # 预测,当前状态的state_value [b,1] td_value = self.critic(states) # 目标值和预测值state_value之差 [b,1] td_delta = td_target - td_value # 时序差分值 tensor-->numpy [b,1] td_delta = td_delta.cpu().detach().numpy() advantage = 0 # 优势函数初始化 advantage_list = [] # 计算优势函数 for delta in td_delta[::-1]: # 逆序时序差分值 axis=1轴上倒着取 [], [], [] # 优势函数GAE的公式 :计算优势函数估计,使用时间差分误差和上一个时间步的优势函数估计 advantage = self.gamma * self.lmbda * advantage + delta advantage_list.append(advantage) # 正序 advantage_list.reverse() # numpy --> tensor [b,1] advantage = torch.tensor(advantage_list, dtype=torch.float).to(self.device) # 策略网络给出每个动作的概率,根据action得到当前时刻下该动作的概率 old_log_probs = torch.log(self.actor(states).gather(1, actions)).detach() # 一组数据训练 epochs 轮 for _ in range(self.epochs): # 每一轮更新一次策略网络预测的状态 log_probs = torch.log(self.actor(states).gather(1, actions)) # 新旧策略之间的比例 ratio = torch.exp(log_probs - old_log_probs) # 近端策略优化裁剪目标函数公式的左侧项 surr1 = ratio * advantage # 公式的右侧项,ratio小于1-eps就输出1-eps,大于1+eps就输出1+eps surr2 = torch.clamp(ratio, 1 - self.eps, 1 + self.eps) * advantage # 策略网络的损失函数 actor_loss = torch.mean(-torch.min(surr1, surr2)) # 价值网络的损失函数,当前时刻的state_value - 下一时刻的state_value critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach())) # 梯度清0 self.actor_optimizer.zero_grad() self.critic_optimizer.zero_grad() # 反向传播 actor_loss.backward() critic_loss.backward() # 梯度更新 self.actor_optimizer.step() self.critic_optimizer.step() import matplotlib.pyplot as plt import gym import torch if __name__ == '__main__': device = torch.device('cuda') if torch.cuda.is_available() \ else torch.device('cpu') # ----------------------------------------- # # 参数设置 # ----------------------------------------- # num_episodes = 300 # 总迭代次数 gamma = 0.9 # 折扣因子 actor_lr = 1e-3 # 策略网络的学习率 critic_lr = 1e-2 # 价值网络的学习率 n_hiddens = 16 # 隐含层神经元个数 env_name = 'CartPole-v0' return_list = [] # 保存每个回合的return # ----------------------------------------- # # 环境加载 # ----------------------------------------- # env = gym.make(env_name) n_states = env.observation_space.shape[0] # 状态数 4 n_actions = env.action_space.n # 动作数 2 # ----------------------------------------- # # 模型构建 # ----------------------------------------- # agent = PPO(n_states=n_states, # 状态数 n_hiddens=n_hiddens, # 隐含层数 n_actions=n_actions, # 动作数 actor_lr=actor_lr, # 策略网络学习率 critic_lr=critic_lr, # 价值网络学习率 lmbda=0.95, # 优势函数的缩放因子 epochs=10, # 一组序列训练的轮次 eps=0.2, # PPO中截断范围的参数 gamma=gamma, # 折扣因子 device=device ) # ----------------------------------------- # # 训练--回合更新 on_policy # ----------------------------------------- # for i in range(num_episodes): state = env.reset() # 环境重置 done = False # 任务完成的标记 episode_return = 0 # 累计每回合的reward # 构造数据集,保存每个回合的状态数据 transition_dict = { 'states': [], 'actions': [], 'next_states': [], 'rewards': [], 'dones': [], } while not done: action = agent.take_action(state) # 动作选择 next_state, reward, done, _ = env.step(action) # 环境更新 # 保存每个时刻的状态\动作\... transition_dict['states'].append(state) transition_dict['actions'].append(action) transition_dict['next_states'].append(next_state) transition_dict['rewards'].append(reward) transition_dict['dones'].append(done) # 更新状态 state = next_state # 累计回合奖励 episode_return += reward # 保存每个回合的return return_list.append(episode_return) # 模型训练 agent.learn(transition_dict) # 打印回合信息 print(f'iter:{i}, return:{np.mean(return_list[-10:])}') # -------------------------------------- # # 绘图 # -------------------------------------- # plt.plot(return_list) plt.title('return') plt.show()
PPO 有一个价值网络,用来估计状态值,并用于计算优势函数或者回报估计。理论上,PPO 也可以使用目标网络来生成目标状态值,从而稳定价值网络的训练。但是,PPO 是一个 on-policy 的算法,它只使用当前策略产生的经验来更新网络,而不使用历史策略产生的经验。这意味着 PPO 需要在每次更新策略后丢弃之前收集的经验,因为它们不再适用于新的策略。这样做的好处是 PPO 可以保证策略和值函数之间的一致性,也就是说,值函数可以很好地估计当前策略的性能。
如果 PPO 使用目标网络,那么目标网络的参数会滞后于价值网络的参数,这可能导致目标状态值和价值网络输出之间的不一致性,也就是说,目标状态值可能不能很好地估计当前策略的性能。这样做的代价是 PPO 可能降低学习效率,因为它不能及时反映环境和策略的变化。
因此,PPO 通常不使用目标网络,而是直接使用价值网络来生成目标状态值。这样做的好处是 PPO 可以提高学习效率,因为它可以及时反映环境和策略的变化。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。