赞
踩
近端策略优化(PPO),它的性能与最先进的方法相当或更好,同时更容易实现和调整。PPO因其易用性和良好的性能成为OpenAI默认的强化学习算法。(2017年,openAI官网发布)
# Proximal Policy Optimization (openai.com)
官方代码:
openAI论文https://arxiv.org/abs/1707.06347
具体步骤是怎样的?
PPO算法的具体步骤是基于对策略梯度方法的改进,它主要包括以下几个关键的步骤:
1、收集数据:通过在环境中执行当前策略(policy)来收集一组交互数据。这些数据包括状态(state)、动作(action)、奖励(reward)以及可能的下一个状态。
2、计算优势估计:为了评价一个动作相对于平均水平的好坏,需要计算优势函数(advantage function)。这通常是通过某种形式的时间差分(TD)估计或者广义优势估计(GAE)来完成的。
3、优化目标函数:PPO算法使用一个特殊设计的目标函数,这个函数涉及到概率比率
表示旧策略。目标函数的形式通常为:
其中,是优势函数的估计
是一个小的正数(如0.1或0.2),clip 函数限制了概率比率
(θ)的变化范围,防止更新步骤过大。
4、更新策略:使用梯度上升方法来更新策略参数,即
,其中
是学习率。
5、重复步骤:使用新的策略参数重复以上步骤,直到满足某些停止准则,比如策略性能不再提升或者已经达到了一定的迭代次数。
PPO算法的关键之处在于它通过限制策略更新的幅度,使得学习过程更加稳定。在每次更新时,概率比率被限制在
范围内,防止由于单个数据点导致的极端策略更新,这有助于避免策略性能的急剧下降。同时,PPO允许在每次迭代中使用相同的数据多次进行策略更新,这提高了数据效率。
在PG算法中,Agent又被称为Actor,Actor对于一个特定的任务,都有自己的一个策略π,策略π通常用一个神经网络表示,其参数为θ。从一个特定的状态state出发,一直到任务的结束,被称为一个完整的eposide,在每一步,我们都能获得一个奖励r,一个完整的任务所获得的最终奖励被称为R。这样,一个有T个时刻的eposide,Actor不断与环境交互,形成如下的序列τ:
这样一个序列τ是不确定的,因为Actor在不同state下所采取的action可能是不同的,一个序列τ发生的概率为:
序列τ所获得的奖励为每个阶段所得到的奖励的和,称为R(τ)。因此,在Actor的策略为π的情况下,所能获得的期望奖励为:
而我们的期望是调整Actor的策略π,使得期望奖励最大化,于是有了策略梯度的方法,既然期望函数已经有了,只要使用梯度提升的方法更新我们的网络参数θ(即更新策略π)就好了,所以问题的重点变为了求参数的梯度。梯度的求解过程如下:
上面的过程中,首先利用log函数求导的特点进行转化,随后用N次采样的平均值来近似期望,最后,将pθ展开(1-Tn),将与θ无关的项去掉,即得到了最终的结果。
所以,一个PG方法的完整过程如下:
首先采集数据,然后基于前面得到的梯度提升的式子更新参数,随后再根据更新后的策略再采集数据,再更新参数,如此循环进行。
注意到图中的大红字only used once,因为在更新参数后,策略已经变了,而先前的数据是基于更新参数前的策略得到的。
通过上面的介绍你可能发现了,PG方法在更新策略时,基本思想就是增加reward大的动作出现的概率,减小reward小的策略出现的概率。
假设现在有一种情况,我们的reward在无论何时都是正的,对于没有采样到的动作,它的reward是0。因此,如果一个比较好的动作没有被采样到,而采样到的不好的动作得到了一个比较小的正reward,那么没有被采样到的好动作的出现概率会越来越小,这显然是不合适的,因此我们需要增加一个奖励的基线,让reward有正有负。
一般增加的基线是所获得奖励的平均值:
这个很容易理解,就像买股票一样,未来的1块钱的价值要小于当前1块钱的价值,因此未来的1块钱变成现在的价值,需要进行一定的折扣
1.3、Tip3:使用优势函数
我们之前介绍的PG方法,对于同一个eposide中的所有数据,使用的奖励都是一样的,其实与st和at相关的。
这里我们使用的是优势函数,即Qπ(st,at) - Vπ(st)。其中Qπ(st,at)可以使用从当前状态开始到eposide结束的奖励折现和得到,Vπ(st)可以通过一个critic来计算得到。
接着上面的讲,PG方法一个很大的缺点就是参数更新慢,因为我们每更新一次参数都需要进行重新的采样,这其实是中on-policy的策略,即我们想要训练的agent和与环境进行交互的agent是同一个agent;与之对应的就是off-policy的策略,即想要训练的agent和与环境进行交互的agent不是同一个agent,简单来说,就是拿别人的经验来训练自己。
举个下棋的例子,如果你是通过自己下棋来不断提升自己的棋艺,那么就是on-policy的,如果是通过看别人下棋来提升自己,那么就是off-policy的。
那么为了提升我们的训练速度,让采样到的数据可以重复使用,我们可以将on-policy的方式转换为off-policy的方式。即我们的训练数据通过另一个Actor(对应的网络参数为θ'得到。这要怎么做呢?通过下面的思路:
通过这种方式,我们的p(x)和q(x)的分布不能差别太大,否则需要进行非常多次的采样,才能得到近似的结果:
如上图所示,很显然,在x服从p(x)分布时,f(x)的期望为负,此时我们从q(x)中来采样少数的x,那么我们采样到的x很有可能都分布在右半部分,此时f(x)大于0,我们很容易得到f(x)的期望为正的结论,这就会出现问题,因此需要进行大量的采样。
那么此时我们想要期望奖励最大化,则变为:
则梯度变为:
最后一项因为我们假设两个分布不能差太远,所以认为他们是相等的,为了求解方便,我们直接划掉。此时似然函数变为:
由梯度变为似然函数,使用的还是下面式子,可以自己手动算一下:
到这里,马上就要得到我们的PPO算法了,再坚持一下!
PPO 有一个前身:信任区域策略优化(trust region policy optimization,TRPO),TRPO 的式子如下式所示。
TRPO 与 PPO 不一样的地方是约束项的位置不一样,PPO 是直接把约束放到要优化的式子里,可以直接用梯度上升的方法最大化这个式子。但TRPO是把 KL 散度当作约束,它希望跟
的 KL 散度小于一个
。如果我们使用的是基于梯度的优化时,有约束是很难处理的,因为它把 KL 散度约束当做一个额外的约束,没有放目标里面。PPO 跟 TRPO 的性能差不多,但 PPO 在实现上比 TRPO 容易的多,所以我们一般就用 PPO,而不用TRPO。
我们前面介绍了,我们希望θ和θ'不能差太远,这并不是说参数的值不能差太多,而是说,输入同样的state,网络得到的动作的概率分布不能差太远。得到动作的概率分布的相似程度,我们可以用KL散度来计算,将其加入PPO模型的似然函数中,变为:
在实际中,我们会动态改变对θ和θ'分布差异的惩罚,如果KL散度值太大,我们增加这一部分惩罚,如果小到一定值,我们就减小这一部分的惩罚,基于此,我们得到了PPO算法的过程:
PPO算法还有另一种实现方式,不将KL散度直接放入似然函数中,而是进行一定程度的裁剪:
上图中,绿色的线代表min中的第一项,即不做任何处理,蓝色的线为第二项,如果两个分布差距太大,则进行一定程度的裁剪。最后对这两项再取min,防止了θ更新太快。
上式看起来很复杂,其实很简单,它想做的事情就是希望跟
,也就是做示范的模型跟实际上学习的模型,在优化以后不要差距太大。
操作符min作用是在第一项和第二项中选择最小的。
第二项前面有个裁剪(clip)函数,裁剪函数是指:在括号里有三项,如果第一项小于第二项,则输出1 − ε;如果第一项大于第三项的话,则输出1 + ε。
ε 是一个超参数,要需要我们调整的,一般设置为0.1或0.2 。
用收集到的数据去训练
。假设我们可以用
收集到的数据去训练
,意味着说我们可以把
收集到的数据用很多次,也就是
可以执行梯度上升好几次,
更新参数好几次,这都只要用同一批数据就可以实现。那
就只要采样一次,也许采样多一点的数据,让
去更新很多次, 这样就会比较有效率。
那么问题来了, 我们怎么找到这样的一个演员,使其收集到的数据可以用于训练
,且他们之间的差异可以被忽略不计呢?
首先我们先介绍一个名词,重要性采样(importance sampling)。 假设有一个函数,
需要从分布
中采样。我们应该如何怎么计算
的期望值呢?假设分布
不能做积分,那么我们可以从分布
尽可能多采样更多的
。这样就会得到更多的
,取它的平均值就可以近似
的期望值。
现在另外一个问题也来了,假设我们不能在分布中采样数据,只能从另外一个分布
,中去采样数据.
可以是任何分布。我们从
中采样
的话就不能直接套下面的式子。
因为上式是假设都是从
采样出来的。如果我们想要在
中采样的情况下带入上式,就需要做些变换。期望值
的另一种写法是
,对其进行变换,如下式所示,
整理得下式,
这样就可以对分布中采样的
取期望值。具体来说,计算
,最后取期望值。
这边是从做采样,所以我们从
里采样出来的每一笔数据,需要乘上一个重要性权重(importance weight)
来修正这两个分布的差异。
可以是任何分布。重要性采样有一些问题。虽然我们可以把
换成任何的
。但是在实现上,
和
不能差太多。差太多的话,会有一些问题。两个随机变量的平均值一样,并不代表它的方差一样,可以带入方差公式
推导一下。
现在要做的事情就是把重要性采样用在off_policy的情况,把on-policy训练的算法改成off-olicy训练的算法。 怎么改呢,如下式所示,我们用另外一个策略,它就是另外一个演员,与环境做互动,采样出轨迹
,计算
。
实际在做策略梯度的时候,并不是给整个轨迹都一样的分数,而是每一个状态-动作的对会分开来计算。具体可参考上一篇PG的文章。实际上更新梯度的时候,如下式所示。
现在是
跟环境互动以后所采样到的数据。但是拿来训练,要调整参数的模型是
。因为
跟
是不同的模型,所以需要用重要性采样技术去做修正。即把
用
采样出来的概率除掉
把用
采样出来的概率。公式如下。
上式中的有一个上标
,代表说是agent
跟环境互动的时候所计算出来的结果。但实际上从
换到
的时候,应该改成
。
接下来,我们可以拆解和
,即
于是可得公式
这里需要做一件事情,假设模型是的时候,我们看到
的概率,跟模型是
的时候,看到
的概率是差不多的,即
=
,实际上在更新参数 的时候,我们就是按照下式来更新参数。
我们有个策略的网络,输入状态,它会输出每一个
的概率。所以
和
这两项,我们只要知道
和
的参数就可以算。
所以实际上,我们可以从梯度去反推原来的目标函数,可以用
来反推目标函数(由梯度变为似然函数)。
当使用重要性采样的时候,要去优化的目标函数如下式所示,我们把它记。括号里面的
代表我们需要去优化的参数。用
去做示范采样数据,采样出
、
以后,要去计算跟
、
的优势,再乘上
。
除了官网的,还有详解近端策略优化(ppo,干货满满) - 简书 (jianshu.com)
案例:倒立摆问题。钟摆以随机位置开始,目标是将其向上摆动,使其保持直立。 测试环境:Pendulum-v1
动作:往左转还是往右转,用力矩来衡量,即力乘以力臂。范围[-2,2]:(连续空间)
状态:cos(theta), sin(theta) , thetadot。
奖励:越直立拿到的奖励越高,越偏离,奖励越低。奖励的最大值为0。
- class FeedForwardNN(nn.Module):
-
- def __init__(self, in_dim, out_dim):
-
- super(FeedForwardNN, self).__init__()
-
- self.layer1 = nn.Linear(in_dim, 64)
- self.layer2 = nn.Linear(64, 64)
- self.layer3 = nn.Linear(64, out_dim)
-
- def forward(self, obs):
-
- if isinstance(obs, np.ndarray):
- obs = torch.tensor(obs, dtype=torch.float)
-
- activation1 = F.relu(self.layer1(obs))
- activation2 = F.relu(self.layer2(activation1))
- output = self.layer3(activation2)
-
- return output

- class PPO:
-
- def __init__(self, policy_class, env, **hyperparameters):
-
- # PPO 初始化用于训练的超参数
- self._init_hyperparameters(hyperparameters)
-
- # 提取环境信息
- self.env = env
- self.obs_dim = env.observation_space.shape[0]
- self.act_dim = env.action_space.shape[0]
-
- # 初始化演员和评论家网络
- self.actor = policy_class(self.obs_dim, self.act_dim)
- self.critic = policy_class(self.obs_dim, 1)
-
- # 为演员和评论家初始化优化器
- self.actor_optim = Adam(self.actor.parameters(), lr=self.lr)
- self.critic_optim = Adam(self.critic.parameters(), lr=self.lr)
-
- # 初始化协方差矩阵,用于查询actor网络的action
- self.cov_var = torch.full(size=(self.act_dim,), fill_value=0.5)
- self.cov_mat = torch.diag(self.cov_var)
-
- # 这个记录器将帮助我们打印出每个迭代的摘要
- self.logger = {
- 'delta_t': time.time_ns(),
- 't_so_far': 0, # 到目前为止的时间步数
- 'i_so_far': 0, # 到目前为止的迭代次数
- 'batch_lens': [], # 批次中的episodic长度
- 'batch_rews': [], # 批次中的rews回报
- 'actor_losses': [], # 当前迭代中演员网络的损失
- }
-
- def learn(self, total_timesteps):
-
- print(f"Learning... Running {self.max_timesteps_per_episode} timesteps per episode, ", end='')
- print(f"{self.timesteps_per_batch} timesteps per batch for a total of {total_timesteps} timesteps")
- t_so_far = 0 # 到目前为止仿真的时间步数
- i_so_far = 0 # 到目前为止,已运行的迭代次数
- while t_so_far < total_timesteps:
-
- # 收集批量实验数据
- batch_obs, batch_acts, batch_log_probs, batch_rtgs, batch_lens = self.rollout()
-
- # 计算收集这一批数据的时间步数
- t_so_far += np.sum(batch_lens)
-
- # 增加迭代次数
- i_so_far += 1
-
- # 记录到目前为止的时间步数和到目前为止的迭代次数
- self.logger['t_so_far'] = t_so_far
- self.logger['i_so_far'] = i_so_far
-
- # 计算第k次迭代的advantage
- V, _ = self.evaluate(batch_obs, batch_acts)
- A_k = batch_rtgs - V.detach()
-
- # 将优势归一化 在理论上不是必须的,但在实践中,它减少了我们优势的方差,使收敛更加稳定和快速。
- # 添加这个是因为在没有这个的情况下,解决一些环境的问题太不稳定了。
- A_k = (A_k - A_k.mean()) / (A_k.std() + 1e-10)
-
- # 在其中更新我们的网络。
- for _ in range(self.n_updates_per_iteration):
-
- V, curr_log_probs = self.evaluate(batch_obs, batch_acts)
-
- # 重要性采样的权重
- ratios = torch.exp(curr_log_probs - batch_log_probs)
-
- surr1 = ratios * A_k
- surr2 = torch.clamp(ratios, 1 - self.clip, 1 + self.clip) * A_k
-
- # 计算两个网络的损失。
- actor_loss = (-torch.min(surr1, surr2)).mean()
- critic_loss = nn.MSELoss()(V, batch_rtgs)
-
- # 计算梯度并对actor网络进行反向传播
- # 梯度清零
- self.actor_optim.zero_grad()
- # 反向传播,产生梯度
- actor_loss.backward(retain_graph=True)
- # 通过梯度下降进行优化
- self.actor_optim.step()
-
- # 计算梯度并对critic网络进行反向传播
- self.critic_optim.zero_grad()
- critic_loss.backward()
- self.critic_optim.step()
-
- self.logger['actor_losses'].append(actor_loss.detach())
-
- self._log_summary()
-
- if i_so_far % self.save_freq == 0:
- torch.save(self.actor.state_dict(), './ppo_actor.pth')
- torch.save(self.critic.state_dict(), './ppo_critic.pth')
-
- def rollout(self):
- """
- 这就是我们从实验中收集一批数据的地方。由于这是一个on-policy的算法,我们需要在每次迭代行为者/批评者网络时收集一批新的数据。
- """
- batch_obs = []
- batch_acts = []
- batch_log_probs = []
- batch_rews = []
- batch_rtgs = []
- batch_lens = []
-
- # 一回合的数据。追踪每一回合的奖励,在回合结束的时候会被清空,开始新的回合。
- ep_rews = []
-
- # 追踪到目前为止这批程序我们已经运行了多少个时间段
- t = 0
-
- # 继续实验,直到我们每批运行超过或等于指定的时间步数
- while t < self.timesteps_per_batch:
- ep_rews = [] 每回合收集的奖励
-
- # 重置环境
- obs = self.env.reset()
- done = False
-
- # 运行一个回合的最大时间为max_timesteps_per_episode的时间步数
- for ep_t in range(self.max_timesteps_per_episode):
-
- if self.render and (self.logger['i_so_far'] % self.render_every_i == 0) and len(batch_lens) == 0:
- self.env.render()
-
- # 递增时间步数,到目前为止已经运行了这批程序
- t += 1
-
- # 追踪本批中的观察结果
- batch_obs.append(obs)
-
- # 计算action,并在env中执行一次step。
- # 注意,rew是奖励的简称。
- action, log_prob = self.get_action(obs)
- obs, rew, done, _ = self.env.step(action)
-
- # 追踪最近的奖励、action和action的对数概率
- ep_rews.append(rew)
- batch_acts.append(action)
- batch_log_probs.append(log_prob)
-
- if done:
- break
-
- # 追踪本回合的长度和奖励
- batch_lens.append(ep_t + 1)
- batch_rews.append(ep_rews)
-
- # 将数据重塑为函数描述中指定形状的张量,然后返回
- batch_obs = torch.tensor(batch_obs, dtype=torch.float)
- batch_acts = torch.tensor(batch_acts, dtype=torch.float)
- batch_log_probs = torch.tensor(batch_log_probs, dtype=torch.float)
- batch_rtgs = self.compute_rtgs(batch_rews)
-
- # 在这批中记录回合的回报和回合的长度。
- self.logger['batch_rews'] = batch_rews
- self.logger['batch_lens'] = batch_lens
-
- return batch_obs, batch_acts, batch_log_probs, batch_rtgs, batch_lens
-
- def compute_rtgs(self, batch_rews):
-
- batch_rtgs = []
-
- # 遍历每一回合,一个回合有一批奖励
- for ep_rews in reversed(batch_rews):
- # 到目前为止的折扣奖励
- discounted_reward = 0
-
- # 遍历这一回合的所有奖励。我们向后退,以便更顺利地计算每一个折现的回报
- for rew in reversed(ep_rews):
-
- discounted_reward = rew + discounted_reward * self.gamma
- batch_rtgs.insert(0, discounted_reward)
-
- # 将每个回合的折扣奖励的数据转换成张量
- batch_rtgs = torch.tensor(batch_rtgs, dtype=torch.float)
-
- return batch_rtgs
-
- def get_action(self, obs):
-
- mean = self.actor(obs)
-
- # 用上述协方差矩阵中的平均行动和标准差创建一个分布。
- dist = MultivariateNormal(mean, self.cov_mat)
- action = dist.sample()
- log_prob = dist.log_prob(action)
-
- return action.detach().numpy(), log_prob.detach()
-
- def evaluate(self, batch_obs, batch_acts):
- """
- 估算每个观察值,以及最近一批actor网络迭代中的每个action的对数prob。
- """
-
- # 为每个batch_obs查询critic网络的V值。V的形状应与batch_rtgs相同。
- V = self.critic(batch_obs).squeeze()
-
- # 使用最近的actor网络计算批量action的对数概率。
- mean = self.actor(batch_obs)
- dist = MultivariateNormal(mean, self.cov_mat)
- log_probs = dist.log_prob(batch_acts)
-
- # 返回批次中每个观察值的值向量V和批次中每个动作的对数概率log_probs
- return V, log_probs

PPO的特点:
PPO(Proximal Policy Optimization,近端策略优化)是一种强化学习算法,由John Schulman等人在2017年提出。PPO属于策略梯度方法,这类方法直接对策略(即模型的行为)进行优化,试图找到使得期望回报最大化的策略。PPO旨在改进和简化以前的策略梯度算法,如TRPO(Trust Region Policy Optimization,信任域策略优化),它通过几个关键的技术创新提高了训练的稳定性和效率。
PPO的主要特点包括:
裁剪的概率比率:PPO使用一个目标函数,其中包含了一个裁剪的概率比率,这个比率是旧策略和新策略产生动作概率的比值。这个比率被限制在一个范围内,防止策略在更新时做出太大的改变。
多次更新:在一个数据批次上可以安全地进行多次更新,这对于样本效率非常重要,尤其是在高维输入和实时学习环境中。
简单实现:与TRPO相比,PPO更容易实现和调整,因为它不需要复杂的数学运算来保证策略更新的安全性。
平衡探索与利用:PPO尝试在学习稳定性和足够的探索之间取得平衡,以避免局部最优并改进策略性能。
PPO已被广泛应用于各种强化学习场景,包括游戏、机器人控制以及自然语言处理中的序列决策问题。它是目前最流行的强化学习算法之一。
1、李宏毅老师的强化学习课程的前两讲,主要介绍了Policy Gradient算法和Proximal Policy Optimization算法,在此整理总结一下。
视频地址:Q-learning (Advanced Tips)_哔哩哔哩_bilibili
2、作者:文哥的学习日记
链接:https://www.jianshu.com/p/9f113adc0c50,来源:简书
3、作者:行者AI
链接:https://www.jianshu.com/p/8803cb2d4e30
来源:简书
4、原文链接:https://blog.csdn.net/u014386899/article/details/136474215【基础知识】什么是 PPO(Proximal Policy Optimization,近端策略优化)_schulman 提出ppo-CSDN博客
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。