赞
踩
近端策略优化(PPO),它的性能与最先进的方法相当或更好,同时更容易实现和调整。PPO因其易用性和良好的性能成为OpenAI默认的强化学习算法。(2017年,openAI官网发布)
# Proximal Policy Optimization (openai.com)
官方代码:
openAI论文https://arxiv.org/abs/1707.06347
具体步骤是怎样的?
PPO算法的具体步骤是基于对策略梯度方法的改进,它主要包括以下几个关键的步骤:
1、收集数据:通过在环境中执行当前策略(policy)来收集一组交互数据。这些数据包括状态(state)、动作(action)、奖励(reward)以及可能的下一个状态。
2、计算优势估计:为了评价一个动作相对于平均水平的好坏,需要计算优势函数(advantage function)。这通常是通过某种形式的时间差分(TD)估计或者广义优势估计(GAE)来完成的。
3、优化目标函数:PPO算法使用一个特殊设计的目标函数,这个函数涉及到概率比率
表示旧策略。目标函数的形式通常为:
其中,是优势函数的估计是一个小的正数(如0.1或0.2),clip 函数限制了概率比率
(θ)的变化范围,防止更新步骤过大。
4、更新策略:使用梯度上升方法来更新策略参数,即 ,其中是学习率。
5、重复步骤:使用新的策略参数重复以上步骤,直到满足某些停止准则,比如策略性能不再提升或者已经达到了一定的迭代次数。
PPO算法的关键之处在于它通过限制策略更新的幅度,使得学习过程更加稳定。在每次更新时,概率比率被限制在范围内,防止由于单个数据点导致的极端策略更新,这有助于避免策略性能的急剧下降。同时,PPO允许在每次迭代中使用相同的数据多次进行策略更新,这提高了数据效率。
在PG算法中,Agent又被称为Actor,Actor对于一个特定的任务,都有自己的一个策略π,策略π通常用一个神经网络表示,其参数为θ。从一个特定的状态state出发,一直到任务的结束,被称为一个完整的eposide,在每一步,我们都能获得一个奖励r,一个完整的任务所获得的最终奖励被称为R。这样,一个有T个时刻的eposide,Actor不断与环境交互,形成如下的序列τ:
这样一个序列τ是不确定的,因为Actor在不同state下所采取的action可能是不同的,一个序列τ发生的概率为:
序列τ所获得的奖励为每个阶段所得到的奖励的和,称为R(τ)。因此,在Actor的策略为π的情况下,所能获得的期望奖励为:
而我们的期望是调整Actor的策略π,使得期望奖励最大化,于是有了策略梯度的方法,既然期望函数已经有了,只要使用梯度提升的方法更新我们的网络参数θ(即更新策略π)就好了,所以问题的重点变为了求参数的梯度。梯度的求解过程如下:
上面的过程中,首先利用log函数求导的特点进行转化,随后用N次采样的平均值来近似期望,最后,将pθ展开(1-Tn),将与θ无关的项去掉,即得到了最终的结果。
所以,一个PG方法的完整过程如下:
首先采集数据,然后基于前面得到的梯度提升的式子更新参数,随后再根据更新后的策略再采集数据,再更新参数,如此循环进行。
注意到图中的大红字only used once,因为在更新参数后,策略已经变了,而先前的数据是基于更新参数前的策略得到的。
通过上面的介绍你可能发现了,PG方法在更新策略时,基本思想就是增加reward大的动作出现的概率,减小reward小的策略出现的概率。
假设现在有一种情况,我们的reward在无论何时都是正的,对于没有采样到的动作,它的reward是0。因此,如果一个比较好的动作没有被采样到,而采样到的不好的动作得到了一个比较小的正reward,那么没有被采样到的好动作的出现概率会越来越小,这显然是不合适的,因此我们需要增加一个奖励的基线,让reward有正有负。
一般增加的基线是所获得奖励的平均值:
这个很容易理解,就像买股票一样,未来的1块钱的价值要小于当前1块钱的价值,因此未来的1块钱变成现在的价值,需要进行一定的折扣
1.3、Tip3:使用优势函数
我们之前介绍的PG方法,对于同一个eposide中的所有数据,使用的奖励都是一样的,其实与st和at相关的。
这里我们使用的是优势函数,即Qπ(st,at) - Vπ(st)。其中Qπ(st,at)可以使用从当前状态开始到eposide结束的奖励折现和得到,Vπ(st)可以通过一个critic来计算得到。
接着上面的讲,PG方法一个很大的缺点就是参数更新慢,因为我们每更新一次参数都需要进行重新的采样,这其实是中on-policy的策略,即我们想要训练的agent和与环境进行交互的agent是同一个agent;与之对应的就是off-policy的策略,即想要训练的agent和与环境进行交互的agent不是同一个agent,简单来说,就是拿别人的经验来训练自己。
举个下棋的例子,如果你是通过自己下棋来不断提升自己的棋艺,那么就是on-policy的,如果是通过看别人下棋来提升自己,那么就是off-policy的。
那么为了提升我们的训练速度,让采样到的数据可以重复使用,我们可以将on-policy的方式转换为off-policy的方式。即我们的训练数据通过另一个Actor(对应的网络参数为θ'得到。这要怎么做呢?通过下面的思路:
通过这种方式,我们的p(x)和q(x)的分布不能差别太大,否则需要进行非常多次的采样,才能得到近似的结果:
如上图所示,很显然,在x服从p(x)分布时,f(x)的期望为负,此时我们从q(x)中来采样少数的x,那么我们采样到的x很有可能都分布在右半部分,此时f(x)大于0,我们很容易得到f(x)的期望为正的结论,这就会出现问题,因此需要进行大量的采样。
那么此时我们想要期望奖励最大化,则变为:
则梯度变为:
最后一项因为我们假设两个分布不能差太远,所以认为他们是相等的,为了求解方便,我们直接划掉。此时似然函数变为:
由梯度变为似然函数,使用的还是下面式子,可以自己手动算一下:
到这里,马上就要得到我们的PPO算法了,再坚持一下!
PPO 有一个前身:信任区域策略优化(trust region policy optimization,TRPO),TRPO 的式子如下式所示。
TRPO 与 PPO 不一样的地方是约束项的位置不一样,PPO 是直接把约束放到要优化的式子里,可以直接用梯度上升的方法最大化这个式子。但TRPO是把 KL 散度当作约束,它希望跟的 KL 散度小于一个。如果我们使用的是基于梯度的优化时,有约束是很难处理的,因为它把 KL 散度约束当做一个额外的约束,没有放目标里面。PPO 跟 TRPO 的性能差不多,但 PPO 在实现上比 TRPO 容易的多,所以我们一般就用 PPO,而不用TRPO。
我们前面介绍了,我们希望θ和θ'不能差太远,这并不是说参数的值不能差太多,而是说,输入同样的state,网络得到的动作的概率分布不能差太远。得到动作的概率分布的相似程度,我们可以用KL散度来计算,将其加入PPO模型的似然函数中,变为:
在实际中,我们会动态改变对θ和θ'分布差异的惩罚,如果KL散度值太大,我们增加这一部分惩罚,如果小到一定值,我们就减小这一部分的惩罚,基于此,我们得到了PPO算法的过程:
PPO算法还有另一种实现方式,不将KL散度直接放入似然函数中,而是进行一定程度的裁剪:
上图中,绿色的线代表min中的第一项,即不做任何处理,蓝色的线为第二项,如果两个分布差距太大,则进行一定程度的裁剪。最后对这两项再取min,防止了θ更新太快。
上式看起来很复杂,其实很简单,它想做的事情就是希望跟,也就是做示范的模型跟实际上学习的模型,在优化以后不要差距太大。
操作符min作用是在第一项和第二项中选择最小的。
第二项前面有个裁剪(clip)函数,裁剪函数是指:在括号里有三项,如果第一项小于第二项,则输出1 − ε;如果第一项大于第三项的话,则输出1 + ε。
ε 是一个超参数,要需要我们调整的,一般设置为0.1或0.2 。
用收集到的数据去训练。假设我们可以用收集到的数据去训练,意味着说我们可以把收集到的数据用很多次,也就是可以执行梯度上升好几次,更新参数好几次,这都只要用同一批数据就可以实现。那就只要采样一次,也许采样多一点的数据,让去更新很多次, 这样就会比较有效率。
那么问题来了, 我们怎么找到这样的一个演员,使其收集到的数据可以用于训练,且他们之间的差异可以被忽略不计呢?
首先我们先介绍一个名词,重要性采样(importance sampling)。 假设有一个函数,需要从分布中采样。我们应该如何怎么计算的期望值呢?假设分布不能做积分,那么我们可以从分布尽可能多采样更多的。这样就会得到更多的,取它的平均值就可以近似的期望值。
现在另外一个问题也来了,假设我们不能在分布中采样数据,只能从另外一个分布,中去采样数据.可以是任何分布。我们从中采样的话就不能直接套下面的式子。
因为上式是假设都是从采样出来的。如果我们想要在中采样的情况下带入上式,就需要做些变换。期望值
的另一种写法是,对其进行变换,如下式所示,
整理得下式,
这样就可以对分布中采样的取期望值。具体来说,计算,最后取期望值。
这边是从做采样,所以我们从里采样出来的每一笔数据,需要乘上一个重要性权重(importance weight)来修正这两个分布的差异。
可以是任何分布。重要性采样有一些问题。虽然我们可以把换成任何的。但是在实现上,
和不能差太多。差太多的话,会有一些问题。两个随机变量的平均值一样,并不代表它的方差一样,可以带入方差公式推导一下。
现在要做的事情就是把重要性采样用在off_policy的情况,把on-policy训练的算法改成off-olicy训练的算法。 怎么改呢,如下式所示,我们用另外一个策略,它就是另外一个演员,与环境做互动,采样出轨迹,计算。
实际在做策略梯度的时候,并不是给整个轨迹都一样的分数,而是每一个状态-动作的对会分开来计算。具体可参考上一篇PG的文章。实际上更新梯度的时候,如下式所示。
现在是跟环境互动以后所采样到的数据。但是拿来训练,要调整参数的模型是。因为
跟是不同的模型,所以需要用重要性采样技术去做修正。即把用采样出来的概率除掉
把用采样出来的概率。公式如下。
上式中的有一个上标,代表说是agent跟环境互动的时候所计算出来的结果。但实际上从换到的时候,应该改成。
接下来,我们可以拆解和,即
于是可得公式
这里需要做一件事情,假设模型是的时候,我们看到的概率,跟模型是的时候,看到的概率是差不多的,即=,实际上在更新参数 的时候,我们就是按照下式来更新参数。
我们有个策略的网络,输入状态,它会输出每一个的概率。所以和这两项,我们只要知道和的参数就可以算。
所以实际上,我们可以从梯度去反推原来的目标函数,可以用
来反推目标函数(由梯度变为似然函数)。
当使用重要性采样的时候,要去优化的目标函数如下式所示,我们把它记。括号里面的代表我们需要去优化的参数。用去做示范采样数据,采样出、以后,要去计算跟、的优势,再乘上 。
除了官网的,还有详解近端策略优化(ppo,干货满满) - 简书 (jianshu.com)
案例:倒立摆问题。钟摆以随机位置开始,目标是将其向上摆动,使其保持直立。 测试环境:Pendulum-v1
动作:往左转还是往右转,用力矩来衡量,即力乘以力臂。范围[-2,2]:(连续空间)
状态:cos(theta), sin(theta) , thetadot。
奖励:越直立拿到的奖励越高,越偏离,奖励越低。奖励的最大值为0。
- class FeedForwardNN(nn.Module):
-
- def __init__(self, in_dim, out_dim):
-
- super(FeedForwardNN, self).__init__()
-
- self.layer1 = nn.Linear(in_dim, 64)
- self.layer2 = nn.Linear(64, 64)
- self.layer3 = nn.Linear(64, out_dim)
-
- def forward(self, obs):
-
- if isinstance(obs, np.ndarray):
- obs = torch.tensor(obs, dtype=torch.float)
-
- activation1 = F.relu(self.layer1(obs))
- activation2 = F.relu(self.layer2(activation1))
- output = self.layer3(activation2)
-
- return output
- class PPO:
-
- def __init__(self, policy_class, env, **hyperparameters):
-
- # PPO 初始化用于训练的超参数
- self._init_hyperparameters(hyperparameters)
-
- # 提取环境信息
- self.env = env
- self.obs_dim = env.observation_space.shape[0]
- self.act_dim = env.action_space.shape[0]
-
- # 初始化演员和评论家网络
- self.actor = policy_class(self.obs_dim, self.act_dim)
- self.critic = policy_class(self.obs_dim, 1)
-
- # 为演员和评论家初始化优化器
- self.actor_optim = Adam(self.actor.parameters(), lr=self.lr)
- self.critic_optim = Adam(self.critic.parameters(), lr=self.lr)
-
- # 初始化协方差矩阵,用于查询actor网络的action
- self.cov_var = torch.full(size=(self.act_dim,), fill_value=0.5)
- self.cov_mat = torch.diag(self.cov_var)
-
- # 这个记录器将帮助我们打印出每个迭代的摘要
- self.logger = {
- 'delta_t': time.time_ns(),
- 't_so_far': 0, # 到目前为止的时间步数
- 'i_so_far': 0, # 到目前为止的迭代次数
- 'batch_lens': [], # 批次中的episodic长度
- 'batch_rews': [], # 批次中的rews回报
- 'actor_losses': [], # 当前迭代中演员网络的损失
- }
-
- def learn(self, total_timesteps):
-
- print(f"Learning... Running {self.max_timesteps_per_episode} timesteps per episode, ", end='')
- print(f"{self.timesteps_per_batch} timesteps per batch for a total of {total_timesteps} timesteps")
- t_so_far = 0 # 到目前为止仿真的时间步数
- i_so_far = 0 # 到目前为止,已运行的迭代次数
- while t_so_far < total_timesteps:
-
- # 收集批量实验数据
- batch_obs, batch_acts, batch_log_probs, batch_rtgs, batch_lens = self.rollout()
-
- # 计算收集这一批数据的时间步数
- t_so_far += np.sum(batch_lens)
-
- # 增加迭代次数
- i_so_far += 1
-
- # 记录到目前为止的时间步数和到目前为止的迭代次数
- self.logger['t_so_far'] = t_so_far
- self.logger['i_so_far'] = i_so_far
-
- # 计算第k次迭代的advantage
- V, _ = self.evaluate(batch_obs, batch_acts)
- A_k = batch_rtgs - V.detach()
-
- # 将优势归一化 在理论上不是必须的,但在实践中,它减少了我们优势的方差,使收敛更加稳定和快速。
- # 添加这个是因为在没有这个的情况下,解决一些环境的问题太不稳定了。
- A_k = (A_k - A_k.mean()) / (A_k.std() + 1e-10)
-
- # 在其中更新我们的网络。
- for _ in range(self.n_updates_per_iteration):
-
- V, curr_log_probs = self.evaluate(batch_obs, batch_acts)
-
- # 重要性采样的权重
- ratios = torch.exp(curr_log_probs - batch_log_probs)
-
- surr1 = ratios * A_k
- surr2 = torch.clamp(ratios, 1 - self.clip, 1 + self.clip) * A_k
-
- # 计算两个网络的损失。
- actor_loss = (-torch.min(surr1, surr2)).mean()
- critic_loss = nn.MSELoss()(V, batch_rtgs)
-
- # 计算梯度并对actor网络进行反向传播
- # 梯度清零
- self.actor_optim.zero_grad()
- # 反向传播,产生梯度
- actor_loss.backward(retain_graph=True)
- # 通过梯度下降进行优化
- self.actor_optim.step()
-
- # 计算梯度并对critic网络进行反向传播
- self.critic_optim.zero_grad()
- critic_loss.backward()
- self.critic_optim.step()
-
- self.logger['actor_losses'].append(actor_loss.detach())
-
- self._log_summary()
-
- if i_so_far % self.save_freq == 0:
- torch.save(self.actor.state_dict(), './ppo_actor.pth')
- torch.save(self.critic.state_dict(), './ppo_critic.pth')
-
- def rollout(self):
- """
- 这就是我们从实验中收集一批数据的地方。由于这是一个on-policy的算法,我们需要在每次迭代行为者/批评者网络时收集一批新的数据。
- """
- batch_obs = []
- batch_acts = []
- batch_log_probs = []
- batch_rews = []
- batch_rtgs = []
- batch_lens = []
-
- # 一回合的数据。追踪每一回合的奖励,在回合结束的时候会被清空,开始新的回合。
- ep_rews = []
-
- # 追踪到目前为止这批程序我们已经运行了多少个时间段
- t = 0
-
- # 继续实验,直到我们每批运行超过或等于指定的时间步数
- while t < self.timesteps_per_batch:
- ep_rews = [] 每回合收集的奖励
-
- # 重置环境
- obs = self.env.reset()
- done = False
-
- # 运行一个回合的最大时间为max_timesteps_per_episode的时间步数
- for ep_t in range(self.max_timesteps_per_episode):
-
- if self.render and (self.logger['i_so_far'] % self.render_every_i == 0) and len(batch_lens) == 0:
- self.env.render()
-
- # 递增时间步数,到目前为止已经运行了这批程序
- t += 1
-
- # 追踪本批中的观察结果
- batch_obs.append(obs)
-
- # 计算action,并在env中执行一次step。
- # 注意,rew是奖励的简称。
- action, log_prob = self.get_action(obs)
- obs, rew, done, _ = self.env.step(action)
-
- # 追踪最近的奖励、action和action的对数概率
- ep_rews.append(rew)
- batch_acts.append(action)
- batch_log_probs.append(log_prob)
-
- if done:
- break
-
- # 追踪本回合的长度和奖励
- batch_lens.append(ep_t + 1)
- batch_rews.append(ep_rews)
-
- # 将数据重塑为函数描述中指定形状的张量,然后返回
- batch_obs = torch.tensor(batch_obs, dtype=torch.float)
- batch_acts = torch.tensor(batch_acts, dtype=torch.float)
- batch_log_probs = torch.tensor(batch_log_probs, dtype=torch.float)
- batch_rtgs = self.compute_rtgs(batch_rews)
-
- # 在这批中记录回合的回报和回合的长度。
- self.logger['batch_rews'] = batch_rews
- self.logger['batch_lens'] = batch_lens
-
- return batch_obs, batch_acts, batch_log_probs, batch_rtgs, batch_lens
-
- def compute_rtgs(self, batch_rews):
-
- batch_rtgs = []
-
- # 遍历每一回合,一个回合有一批奖励
- for ep_rews in reversed(batch_rews):
- # 到目前为止的折扣奖励
- discounted_reward = 0
-
- # 遍历这一回合的所有奖励。我们向后退,以便更顺利地计算每一个折现的回报
- for rew in reversed(ep_rews):
-
- discounted_reward = rew + discounted_reward * self.gamma
- batch_rtgs.insert(0, discounted_reward)
-
- # 将每个回合的折扣奖励的数据转换成张量
- batch_rtgs = torch.tensor(batch_rtgs, dtype=torch.float)
-
- return batch_rtgs
-
- def get_action(self, obs):
-
- mean = self.actor(obs)
-
- # 用上述协方差矩阵中的平均行动和标准差创建一个分布。
- dist = MultivariateNormal(mean, self.cov_mat)
- action = dist.sample()
- log_prob = dist.log_prob(action)
-
- return action.detach().numpy(), log_prob.detach()
-
- def evaluate(self, batch_obs, batch_acts):
- """
- 估算每个观察值,以及最近一批actor网络迭代中的每个action的对数prob。
- """
-
- # 为每个batch_obs查询critic网络的V值。V的形状应与batch_rtgs相同。
- V = self.critic(batch_obs).squeeze()
-
- # 使用最近的actor网络计算批量action的对数概率。
- mean = self.actor(batch_obs)
- dist = MultivariateNormal(mean, self.cov_mat)
- log_probs = dist.log_prob(batch_acts)
-
- # 返回批次中每个观察值的值向量V和批次中每个动作的对数概率log_probs
- return V, log_probs
PPO的特点:
PPO(Proximal Policy Optimization,近端策略优化)是一种强化学习算法,由John Schulman等人在2017年提出。PPO属于策略梯度方法,这类方法直接对策略(即模型的行为)进行优化,试图找到使得期望回报最大化的策略。PPO旨在改进和简化以前的策略梯度算法,如TRPO(Trust Region Policy Optimization,信任域策略优化),它通过几个关键的技术创新提高了训练的稳定性和效率。
PPO的主要特点包括:
裁剪的概率比率:PPO使用一个目标函数,其中包含了一个裁剪的概率比率,这个比率是旧策略和新策略产生动作概率的比值。这个比率被限制在一个范围内,防止策略在更新时做出太大的改变。
多次更新:在一个数据批次上可以安全地进行多次更新,这对于样本效率非常重要,尤其是在高维输入和实时学习环境中。
简单实现:与TRPO相比,PPO更容易实现和调整,因为它不需要复杂的数学运算来保证策略更新的安全性。
平衡探索与利用:PPO尝试在学习稳定性和足够的探索之间取得平衡,以避免局部最优并改进策略性能。
PPO已被广泛应用于各种强化学习场景,包括游戏、机器人控制以及自然语言处理中的序列决策问题。它是目前最流行的强化学习算法之一。
1、李宏毅老师的强化学习课程的前两讲,主要介绍了Policy Gradient算法和Proximal Policy Optimization算法,在此整理总结一下。
视频地址:Q-learning (Advanced Tips)_哔哩哔哩_bilibili
2、作者:文哥的学习日记
链接:https://www.jianshu.com/p/9f113adc0c50,来源:简书
3、作者:行者AI
链接:https://www.jianshu.com/p/8803cb2d4e30
来源:简书
4、原文链接:https://blog.csdn.net/u014386899/article/details/136474215【基础知识】什么是 PPO(Proximal Policy Optimization,近端策略优化)_schulman 提出ppo-CSDN博客
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。