赞
踩
PPO算法是对应TRPO算法的简化,PPO相对于TRPO算法更加简洁却更加高效。
PPO算法的目的主要在于更新一个损失函数
其中r表示在状态s下所选择的行动a的概率除以旧策略下相同状态选择相同行动的概率,这个称作重要性采样比
A被称作优势估计,这个函数的实际过程是一个时序差分的过程:
这里,表示的是在s状态下采取行动a的价值与采取行动θ的价值的差,这是衡量行动a相对于平均水平的价值,假如A大于0,那么a行动会更好,否则就更差。
clipe是一个切片函数,它的主要作用是限制更新过程中,新决策和旧决策的变化比率从而保证性能的稳定性。clipe函数一般会是以下的形式。
接下来,策略选择函数和优势估计函数都会使用神经网络,并以此为例来进行平衡车的游戏
- import gym
- from matplotlib import pyplot as plt
- import torch
- import random
- import numpy as np
- from IPython import display
- #创建环境
- env = gym.make('CartPole-v1')
- env.reset()
- #打印游戏
- def show():
- plt.imshow(env.render(mode='rgb_array'))
- plt.axis('off')
- plt.show()
-
- #定义模型
- #策略梯度
- model = torch.nn.Sequential(
- torch.nn.Linear(4,128),
- torch.nn.ReLU(),
- torch.nn.Linear(128,2),
- torch.nn.Softmax(dim=1)
- )
- #时序差分
- model_td = torch.nn.Sequential(
- torch.nn.Linear(4,128),
- torch.nn.ReLU(),
- torch.nn.Linear(128,1)
- )
-
- #获取动作
- def get_action(state):
- state = torch.FloatTensor(state).reshape(1, 4)
- prob = model(state)
-
- prob_normalized = prob[0].tolist()
- prob_sum = sum(prob_normalized)
- prob_normalized = [p / prob_sum for p in prob_normalized]
-
- action = np.random.choice(range(2), p=prob_normalized, size=1)[0]
-
- return action
-
- def get_Date():
- states = []
- rewards = []
- actions = []
- next_states = []
- overs = []
-
- state = env.reset()
-
- over = False
- while not over:
- action = get_action(state)
- next_state,reward,over,_ = env.step(action)
-
- states.append(state)
- rewards.append(reward)
- actions.append(action)
- next_states.append(next_state)
- overs.append(over)
-
- state = next_state
-
- states = torch.FloatTensor(states).reshape(-1,4)
- rewards = torch.FloatTensor(rewards).reshape(-1,1)
- actions = torch.LongTensor(actions).reshape(-1, 1) # 使用 LongTensor 存储动作索引
- next_states = torch.FloatTensor(next_states).reshape(-1,4)
- overs = torch.FloatTensor(overs).reshape(-1,1)
- return states,rewards,actions,next_states,overs
-
- def test(play):
- state = env.reset()
-
- reward_sum = 0
- over = False
- while not over:
- action = get_action(state)
-
- state,reward,over,_ = env.step(action)
- reward_sum += reward
- if play and random.random()<0.2:
- display.clear_output(wait=True)
- show()
- return reward_sum
-
- #优势函数
- def get_advantages(deltas):
- advantages = []
- #反向遍历
- s = 0.0
- for delta in deltas[::-1]:
- s = 0.98*0.95*s+delta
- advantages.append(s)
-
- #逆序
- advantages.reverse()
- return advantages
-
- def train():
- optimizer = torch.optim.Adam(model.parameters(),lr=1e-3)
- optimizer_td = torch.optim.Adam(model_td.parameters(),lr=1e-2)
- loss_fn = torch.nn.MSELoss()
-
- #玩n局每局训练m次
- for epoch in range(500):
- states,rewards,actions,next_states,overs = get_Date()
-
- #计算value和target
- values = model_td(states)
-
- targets = model_td(next_states).detach()
- targets = targets*0.98
- targets = targets*(1-overs)
- targets += rewards
-
- deltas = (targets-values).squeeze(dim=1).tolist()
- advantages = get_advantages(deltas)
- advantages = torch.FloatTensor(advantages).reshape(-1,1)
-
- old_probs = model(states)
- old_probs = old_probs.gather(dim=1,index=actions)
- old_probs = old_probs.detach()
-
- for _ in range(10):
- new_probs = model(states)
- new_probs=new_probs.gather(dim=1,index=actions)
-
- rations = new_probs/old_probs
- #计算截断的和不截断的两份loss取其中最小的
- surr1 = rations*advantages
- surr2 = torch.clamp(rations,0.8,1.2)*advantages
-
- loss = -torch.min(surr1,surr2)
- loss = loss.mean()
-
- #重新计算value,并计算时序差分loss
- values = model_td(states)
- loss_td = loss_fn(values,targets)
-
- #更新参数
- optimizer.zero_grad()
- loss.backward()
- optimizer.step()
-
- optimizer_td.zero_grad()
- loss_td.backward()
- optimizer_td.step()
-
- if epoch % 50 ==0:
- test_result = sum([test(play=False) for _ in range(10)])/10
- print(epoch,test_result)
-
- train()
- test(play=True)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。