赞
踩
在车杆环境中进行 REINFORCE 算法的实验:
import gym import torch import torch.nn.functional as F import numpy as np import matplotlib.pyplot as plt from tqdm import tqdm import rl_utils首先定义策略网络
PolicyNet
,其输入是某个状态,输出则是该状态下的动作概率分布,这里采用在离散动作空间上的softmax()
函数来实现一个可学习的多项分布。
class PolicyNet(torch.nn.Module): def __init__(self, state_dim, hidden_dim, action_dim): super(PolicyNet, self).__init__() self.fc1 = torch.nn.Linear(state_dim, hidden_dim) self.fc2 = torch.nn.Linear(hidden_dim, action_dim) def forward(self, x): x = F.relu(self.fc1(x)) return F.softmax(self.fc2(x), dim=1)再定义我们的 REINFORCE 算法。在函数
take_action()
函数中,我们通过动作概率分布对离散的动作进行采样。在更新过程中,我们按照算法将损失函数写为策略回报的负数,即,对求导后就可以通过梯度下降来更新策略。
class REINFORCE: def __init__(self, state_dim, hidden_dim, action_dim, learning_rate, gamma, device): self.policy_net = PolicyNet(state_dim, hidden_dim,action_dim).to(device) self.optimizer = torch.optim.Adam(self.policy_net.parameters(),lr=learning_rate) # 使用Adam优化器 self.gamma = gamma # 折扣因子 self.device = device def take_action(self, state): # 根据动作概率分布随机采样 state = torch.tensor([state], dtype=torch.float).to(self.device) probs = self.policy_net(state) action_dist = torch.distributions.Categorical(probs) action = action_dist.sample() return action.item() def update(self, transition_dict): reward_list = transition_dict['rewards'] state_list = transition_dict['states'] action_list = transition_dict['actions'] G = 0 self.optimizer.zero_grad() for i in reversed(range(len(reward_list))): # 从最后一步算起 reward = reward_list[i] state = torch.tensor([state_list[i]],dtype=torch.float).to(self.device) action = torch.tensor([action_list[i]]).view(-1, 1).to(self.device) log_prob = torch.log(self.policy_net(state).gather(1, action)) G = self.gamma * G + reward loss = -log_prob * G # 每一步的损失函数 loss.backward() # 反向传播计算梯度 self.optimizer.step() # 梯度下降
learning_rate = 1e-3 num_episodes = 1000 hidden_dim = 128 gamma = 0.98 device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") env_name = "CartPole-v0" env = gym.make(env_name) env.seed(0) torch.manual_seed(0) state_dim = env.observation_space.shape[0] action_dim = env.action_space.n agent = REINFORCE(state_dim, hidden_dim, action_dim, learning_rate, gamma,device) return_list = [] for i in range(10): with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar: for i_episode in range(int(num_episodes / 10)): episode_return = 0 transition_dict = { 'states': [], 'actions': [], 'next_states': [], 'rewards': [], 'dones': [] } state = env.reset() env.render() done = False while not done: action = agent.take_action(state) next_state, reward, done, _ = env.step(action) transition_dict['states'].append(state) transition_dict['actions'].append(action) transition_dict['next_states'].append(next_state) transition_dict['rewards'].append(reward) transition_dict['dones'].append(done) state = next_state episode_return += reward return_list.append(episode_return) agent.update(transition_dict) if (i_episode + 1) % 10 == 0: pbar.set_postfix({ 'episode': '%d' % (num_episodes / 10 * i + i_episode + 1), 'return': '%.3f' % np.mean(return_list[-10:]) }) pbar.update(1)在 CartPole-v0 环境中,满分就是 200 分,我们发现 REINFORCE 算法效果很好,可以达到 200 分。接下来我们绘制训练过程中每一条轨迹的回报变化图。由于回报抖动比较大,往往会进行平滑处理。
episodes_list = list(range(len(return_list))) plt.plot(episodes_list, return_list) plt.xlabel('Episodes') plt.ylabel('Returns') plt.title('REINFORCE on {}'.format(env_name)) plt.show() mv_return = rl_utils.moving_average(return_list, 9) plt.plot(episodes_list, mv_return) plt.xlabel('Episodes') plt.ylabel('Returns') plt.title('REINFORCE on {}'.format(env_name)) plt.show()可以看到,随着收集到的轨迹越来越多,REINFORCE 算法有效地学习到了最优策略。不过,相比于前面的 DQN 算法,REINFORCE 算法使用了更多的序列,这是因为 REINFORCE 算法是一个在线策略算法,之前收集到的轨迹数据不会被再次利用。此外,REINFORCE 算法的性能也有一定程度的波动,这主要是因为每条采样轨迹的回报值波动比较大,这也是 REINFORCE 算法主要的不足。
仍然在 Cartpole 环境上进行 Actor-Critic 算法的实验。
import gym import torch import torch.nn.functional as F import numpy as np import matplotlib.pyplot as plt import rl_utils定义我们的策略网络 PolicyNet,与 REINFORCE 算法中一样。
class PolicyNet(torch.nn.Module): def __init__(self, state_dim, hidden_dim, action_dim): super(PolicyNet, self).__init__() self.fc1 = torch.nn.Linear(state_dim, hidden_dim) self.fc2 = torch.nn.Linear(hidden_dim, action_dim) def forward(self, x): x = F.relu(self.fc1(x)) return F.softmax(self.fc2(x),dim=1)Actor-Critic 算法中额外引入一个价值网络,接下来的代码定义我们的价值网络 ValueNet,输入是状态,输出状态的价值。
class ValueNet(torch.nn.Module): def __init__(self, state_dim, hidden_dim): super(ValueNet, self).__init__() self.fc1 = torch.nn.Linear(state_dim, hidden_dim) self.fc2 = torch.nn.Linear(hidden_dim, 1) def forward(self, x): x = F.relu(self.fc1(x)) return self.fc2(x)再定义我们的 ActorCritic 算法。主要包含采取动作和更新网络参数两个函数。
class ActorCritic: def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr, gamma, device): self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device) self.critic = ValueNet(state_dim, hidden_dim).to(device) # 价值网络 self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr) self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr) # 价值网络优化器 self.gamma = gamma def take_action(self, state): state = torch.tensor([state], dtype=torch.float) probs = self.actor(state) action_dist = torch.distributions.Categorical(probs) action = action_dist.sample() return action.item() def update(self, transition_dict): states = torch.tensor(transition_dict['states'], dtype=torch.float) actions = torch.tensor(transition_dict['actions']).view(-1, 1) rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1, 1) next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float) dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1) td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones) # 时序差分目标 td_delta = td_target - self.critic(states) # 时序差分误差 log_probs = torch.log(self.actor(states).gather(1, actions)) actor_loss = torch.mean(-log_probs * td_delta.detach()) critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach())) # 均方误差损失函数 self.actor_optimizer.zero_grad() self.critic_optimizer.zero_grad() actor_loss.backward() # 计算策略网络的梯度 critic_loss.backward() # 计算价值网络的梯度 self.actor_optimizer.step() # 更新策略网络参数 self.critic_optimizer.step() # 更新价值网络参数
actor_lr = 1e-3 critic_lr = 1e-2 num_episodes = 1000 hidden_dim = 128 gamma = 0.98 device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") env_name = 'CartPole-v0' env = gym.make(env_name) env.seed(0) torch.manual_seed(0) state_dim = env.observation_space.shape[0] action_dim = env.action_space.n agent = ActorCritic(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, gamma, device) return_list = rl_utils.train_on_policy_agent(env, agent, num_episodes)
episodes_list = list(range(len(return_list))) plt.plot(episodes_list, return_list) plt.xlabel('Episodes') plt.ylabel('Returns') plt.title('Actor-Critic on {}'.format(env_name)) plt.show() mv_return = rl_utils.moving_average(return_list, 9) plt.plot(episodes_list, mv_return) plt.xlabel('Episodes') plt.ylabel('Returns') plt.title('Actor-Critic on {}'.format(env_name)) plt.show()
根据实验结果我们发现,Actor-Critic 算法很快便能收敛到最优策略,并且训练过程非常稳定,抖动情况相比 REINFORCE 算法有了明显的改进,这多亏了价值函数的引入减小了方差。
基于策略的方法:策略梯度和AC算法。这些算法虽然简单、直观,但在实际应用过程中会遇到训练不稳定的情况。基于策略的方法中参数化智能体策略,并设计衡量策略好坏的目标函数,通过梯度上升的方法来最大化这个目标函数,使得策略最优。具体来说:假设
表示策略
的参数,定义
,基于策略的方法(如REINFORCE)的目标是找到
,策略梯度算法主要沿着
方向迭代更新策略参数
,更新方式为:
。但是这种算法有一个明显的缺点:当策略网络是深度模型时,沿着梯度策略梯度更新参数
,很有可能由于步长太长,策略突然显著变差,进而影响训练结果。
针对上面的问题,考虑在更新时找到一块信任区域(Trust Region),在这个区域上更新策略时能够得到某种策略性能的安全保证,这就是信任区域策略优化(Truest Region Policy Optimization,TRPO)算法的主要思想。TRPO算法在理论上能够保证策略学习的性能单调性,并在实际应用中取得了比策略梯度算法更好的效果。
假设当前策略为
,参数为
。考虑如何借助当前的
找到一个更优的参数
,是使得
。具体来说,由于初始状态
的分布和策略无关,因此上述策略
下的优化目标
可以写成新策略
下的期望形式:
基于以上等式,可以推导出新旧策略的目标函数之间的差距:
将时序差分残差定义为优势函数
(确定性策略中的一种思想):
最后一个等号的成立用到了状态访问分布的定义:
,所以只需要找个一个策略,使得
,就能保证策略性能单调递增,即
。
但是直接求解非常困难,因为
是我们需要求解的策略,但我们又要用它来收集样本。把所有的新策略都拿来收集数据,然后判断哪个策略满足上述条件的做法显然是不现实的。于是TRPO做了一步近似操作,对状态访问分布进行了相应的处理。具体而言,忽略两个策略之间的状态访问分布变化,直接采用旧的策略
的状态分布,定义如下替代优化目标:
当新旧策略非常接近时,状态访问分布变化很小,这样的近似是合理的。其中,动作仍然使用新策略
采样得到,可以用重要性采样对动作分布进行处理:
这样,我们就可以基于旧策略
已经采样出的数据来估计并优化新策略
了。为了保证新旧策略足够接近,TRPO使用了库尔贝克-莱布勒(KL)散度来衡量策略之间的距离,并给出了整体优化公式:
![]()
使得
这里不等式约束定义了策略空间中的一个库尔贝克-莱布勒球,称为信任区域。在这个区域中,可以认为当前学习策略和环境交互的状态分布与上一轮策略最后采样的状态分布一致,进而可以基于一步动作的重要性采样方法使当前学习策略稳定上升。
直接求解上述带约束的优化问题比较麻烦,TRPO在其具体实现中做了一步近似操作来快速求解。为了方便起见,在接下来公式中使用
代替之前的
,表示第
次迭代之后的策略。首先对目标函数和约束在
进行泰勒展开,分别用 1阶、2阶进行近似:
其中
表示目标函数的梯度
表示策略之间平均 KL 距离的Hessian matrix。
进而优化目标变成了:
利用KKT条件直接导出上述问题的解:
一般来说,用神经网络表示的策略函数的参数数量都是成千上万的,计算和存储Hessian矩阵
的逆矩阵会耗费大量的内存资源和时间。TRPO通过共轭梯度法回避了这个问题,它的核心思想是直接计算
,
即参数更新方向。假设满足KL距离约束的参数更新时的最大步长为
,于是,根据KL距离约束条件,有
。求解
,得到
。因此,此时参数更新方式为:
因此,只要可以计算
,就可以根据该式更新参数,问题转换为解
。实际上
为对称正定矩阵,所以我们可以使用共轭梯度法来求解。共轭梯度法的流程如下:
在共轭梯度运算过程中,直接计算
和
需要计算和存储Hession矩阵
。为了避免这种大矩阵的出现,只计算
向量,而不直接和存储
矩阵。这样做比较容易,因为对于任意的列向量
,容易验证:
即,先用梯度和向量
点乘后再计算梯度。
由于TRPO算法用到了泰勒展开的1阶和2阶近似,这并非精准求解,因此,
可能未必比
好,或未必能满足KL散度限制。TRPO在每次迭代的最后进行一次线性搜索,以确保找到满足条件。具体来说,就是找到一个最小的非负整数
,使得按照:
求出的
依然满足最初的KL散度限制,并且确实能够提升目标函数
,其中
是一个决定线性搜索长度的超参数。
现在我们尚未得知如何估计优势函数
。目前比较常用的一种方法为广义优势估计(GAE)。首先
表示时序差分误差,其中
是一个已经学习的状态价值函数。于是,根据多步时序差分的思想,有:
然后,GAE将这些不同步数的优势估计进行指数加权平均:
其中,
是在GAE中额外引入的一个超参数。
- 当
时,
,即看一步差分得到的优势。
- 当
时,
,则看每一步差分得到的优势的完全平均值。
def compute_advantage(gamma, lmbda, td_delta): td_delta = td_delta.detach().numpy() advantage_list = [] advantage = 0.0 for delta in td_delta[::-1]: advantage = gamma * lmbda * advantage + delta advantage_list.append(advantage) advantage_list.reverse() return torch.tensor(advantage_list, dtype=torch.float)
离散动作空间(车杆环境):
import torch import numpy as np import gym import matplotlib.pyplot as plt import torch.nn.functional as F import rl_utils import copy def compute_advantage(gamma, lmbda, td_delta): td_delta = td_delta.detach().numpy() advantage_list = [] advantage = 0.0 for delta in td_delta[::-1]: advantage = gamma * lmbda * advantage + delta advantage_list.append(advantage) advantage_list.reverse() return torch.tensor(advantage_list, dtype=torch.float) class PolicyNet(torch.nn.Module): def __init__(self, state_dim, hidden_dim, action_dim): super(PolicyNet, self).__init__() self.fc1 = torch.nn.Linear(state_dim, hidden_dim) self.fc2 = torch.nn.Linear(hidden_dim, action_dim) def forward(self, x): x = F.relu(self.fc1(x)) return F.softmax(self.fc2(x), dim=1) class ValueNet(torch.nn.Module): def __init__(self, state_dim, hidden_dim): super(ValueNet, self).__init__() self.fc1 = torch.nn.Linear(state_dim, hidden_dim) self.fc2 = torch.nn.Linear(hidden_dim, 1) def forward(self, x): x = F.relu(self.fc1(x)) return self.fc2(x) class TRPO: """ TRPO算法 """ def __init__(self, hidden_dim, state_space, action_space, lmbda, kl_constraint, alpha, critic_lr, gamma, device): state_dim = state_space.shape[0] action_dim = action_space.n # 策略网络参数不需要优化器更新 self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device) self.critic = ValueNet(state_dim, hidden_dim).to(device) self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),lr=critic_lr) self.gamma = gamma self.lmbda = lmbda # GAE参数 self.kl_constraint = kl_constraint # KL距离最大限制 self.alpha = alpha # 线性搜索参数 self.device = device def take_action(self, state): state = torch.tensor([state], dtype=torch.float).to(self.device) probs = self.actor(state) action_dist = torch.distributions.Categorical(probs) action = action_dist.sample() return action.item() def hessian_matrix_vector_product(self, states, old_action_dists, vector): # 计算黑塞矩阵和一个向量的乘积 new_action_dists = torch.distributions.Categorical(self.actor(states)) kl = torch.mean( torch.distributions.kl.kl_divergence(old_action_dists,new_action_dists)) # 计算平均KL距离 kl_grad = torch.autograd.grad(kl,self.actor.parameters(),create_graph=True) kl_grad_vector = torch.cat([grad.view(-1) for grad in kl_grad]) # KL距离的梯度先和向量进行点积运算 kl_grad_vector_product = torch.dot(kl_grad_vector, vector) grad2 = torch.autograd.grad(kl_grad_vector_product,self.actor.parameters()) grad2_vector = torch.cat([grad.view(-1) for grad in grad2]) return grad2_vector def conjugate_gradient(self, grad, states, old_action_dists): # 共轭梯度法求解方程 x = torch.zeros_like(grad) r = grad.clone() p = grad.clone() rdotr = torch.dot(r, r) for i in range(10): # 共轭梯度主循环 Hp = self.hessian_matrix_vector_product(states, old_action_dists,p) alpha = rdotr / torch.dot(p, Hp) x += alpha * p r -= alpha * Hp new_rdotr = torch.dot(r, r) if new_rdotr < 1e-10: break beta = new_rdotr / rdotr p = r + beta * p rdotr = new_rdotr return x def compute_surrogate_obj(self, states, actions, advantage, old_log_probs,actor): # 计算策略目标 log_probs = torch.log(actor(states).gather(1, actions)) ratio = torch.exp(log_probs - old_log_probs) return torch.mean(ratio * advantage) def line_search(self, states, actions, advantage, old_log_probs, old_action_dists, max_vec): # 线性搜索 old_para = torch.nn.utils.convert_parameters.parameters_to_vector(self.actor.parameters()) old_obj = self.compute_surrogate_obj(states, actions, advantage,old_log_probs, self.actor) for i in range(15): # 线性搜索主循环 coef = self.alpha**i new_para = old_para + coef * max_vec new_actor = copy.deepcopy(self.actor) torch.nn.utils.convert_parameters.vector_to_parameters(new_para, new_actor.parameters()) new_action_dists = torch.distributions.Categorical(new_actor(states)) kl_div = torch.mean( torch.distributions.kl.kl_divergence(old_action_dists,new_action_dists)) new_obj = self.compute_surrogate_obj(states, actions, advantage,old_log_probs, new_actor) if new_obj > old_obj and kl_div < self.kl_constraint: return new_para return old_para def policy_learn(self, states, actions, old_action_dists, old_log_probs,advantage): # 更新策略函数 surrogate_obj = self.compute_surrogate_obj(states, actions, advantage,old_log_probs, self.actor) grads = torch.autograd.grad(surrogate_obj, self.actor.parameters()) obj_grad = torch.cat([grad.view(-1) for grad in grads]).detach() # 用共轭梯度法计算x = H^(-1)g descent_direction = self.conjugate_gradient(obj_grad, states,old_action_dists) Hd = self.hessian_matrix_vector_product(states, old_action_dists,descent_direction) max_coef = torch.sqrt(2 * self.kl_constraint /(torch.dot(descent_direction, Hd) + 1e-8)) new_para = self.line_search(states, actions, advantage, old_log_probs,old_action_dists,descent_direction * max_coef) # 线性搜索 torch.nn.utils.convert_parameters.vector_to_parameters(new_para, self.actor.parameters()) # 用线性搜索后的参数更新策略 def update(self, transition_dict): states = torch.tensor(transition_dict['states'],dtype=torch.float).to(self.device) actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device) rewards = torch.tensor(transition_dict['rewards'],dtype=torch.float).view(-1, 1).to(self.device) next_states = torch.tensor(transition_dict['next_states'],dtype=torch.float).to(self.device) dones = torch.tensor(transition_dict['dones'],dtype=torch.float).view(-1, 1).to(self.device) td_target = rewards + self.gamma * self.critic(next_states) * (1 -dones) td_delta = td_target - self.critic(states) advantage = compute_advantage(self.gamma, self.lmbda,td_delta.cpu()).to(self.device) old_log_probs = torch.log(self.actor(states).gather(1,actions)).detach() old_action_dists = torch.distributions.Categorical( self.actor(states).detach()) critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach())) self.critic_optimizer.zero_grad() critic_loss.backward() self.critic_optimizer.step() # 更新价值函数 # 更新策略函数 self.policy_learn(states, actions, old_action_dists, old_log_probs, advantage) num_episodes = 500 hidden_dim = 128 gamma = 0.98 lmbda = 0.95 critic_lr = 1e-2 kl_constraint = 0.0005 alpha = 0.5 device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") env_name = 'CartPole-v0' env = gym.make(env_name) env.seed(0) torch.manual_seed(0) agent = TRPO(hidden_dim, env.observation_space, env.action_space, lmbda, kl_constraint, alpha, critic_lr, gamma, device) return_list = rl_utils.train_on_policy_agent(env, agent, num_episodes) episodes_list = list(range(len(return_list))) plt.plot(episodes_list, return_list) plt.xlabel('Episodes') plt.ylabel('Returns') plt.title('TRPO on {}'.format(env_name)) plt.show() mv_return = rl_utils.moving_average(return_list, 9) plt.plot(episodes_list, mv_return) plt.xlabel('Episodes') plt.ylabel('Returns') plt.title('TRPO on {}'.format(env_name)) plt.show()
Iteration 0: 100%|██████████| 50/50 [00:16<00:00, 3.08it/s, episode=50, return=122.200] Iteration 1: 100%|██████████| 50/50 [00:17<00:00, 2.84it/s, episode=100, return=130.700] Iteration 2: 100%|██████████| 50/50 [00:26<00:00, 1.88it/s, episode=150, return=174.800] Iteration 3: 100%|██████████| 50/50 [00:24<00:00, 2.01it/s, episode=200, return=173.300] Iteration 4: 100%|██████████| 50/50 [00:32<00:00, 1.55it/s, episode=250, return=178.300] Iteration 5: 100%|██████████| 50/50 [00:33<00:00, 1.49it/s, episode=300, return=178.900] Iteration 6: 100%|██████████| 50/50 [00:28<00:00, 1.73it/s, episode=350, return=181.700] Iteration 7: 100%|██████████| 50/50 [00:29<00:00, 1.72it/s, episode=400, return=184.500] Iteration 8: 100%|██████████| 50/50 [00:22<00:00, 2.26it/s, episode=450, return=179.000] Iteration 9: 100%|██████████| 50/50 [00:22<00:00, 2.20it/s, episode=500, return=188.600]
连续动作空间(倒立摆环境):
import torch import numpy as np import gym import matplotlib.pyplot as plt import torch.nn.functional as F import rl_utils import copy def compute_advantage(gamma, lmbda, td_delta): td_delta = td_delta.detach().numpy() advantage_list = [] advantage = 0.0 for delta in td_delta[::-1]: advantage = gamma * lmbda * advantage + delta advantage_list.append(advantage) advantage_list.reverse() return torch.tensor(advantage_list, dtype=torch.float) class ValueNet(torch.nn.Module): def __init__(self, state_dim, hidden_dim): super(ValueNet, self).__init__() self.fc1 = torch.nn.Linear(state_dim, hidden_dim) self.fc2 = torch.nn.Linear(hidden_dim, 1) def forward(self, x): x = F.relu(self.fc1(x)) return self.fc2(x) class PolicyNetContinuous(torch.nn.Module): def __init__(self, state_dim, hidden_dim, action_dim): super(PolicyNetContinuous, self).__init__() self.fc1 = torch.nn.Linear(state_dim, hidden_dim) self.fc_mu = torch.nn.Linear(hidden_dim, action_dim) self.fc_std = torch.nn.Linear(hidden_dim, action_dim) def forward(self, x): x = F.relu(self.fc1(x)) mu = 2.0 * torch.tanh(self.fc_mu(x)) std = F.softplus(self.fc_std(x)) return mu, std # 高斯分布的均值和标准差 class TRPOContinuous: """ 处理连续动作的TRPO算法 """ def __init__(self, hidden_dim, state_space, action_space, lmbda,kl_constraint, alpha, critic_lr, gamma, device): state_dim = state_space.shape[0] action_dim = action_space.shape[0] self.actor = PolicyNetContinuous(state_dim, hidden_dim,action_dim).to(device) self.critic = ValueNet(state_dim, hidden_dim).to(device) self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),lr=critic_lr) self.gamma = gamma self.lmbda = lmbda self.kl_constraint = kl_constraint self.alpha = alpha self.device = device def take_action(self, state): state = torch.tensor([state], dtype=torch.float).to(self.device) mu, std = self.actor(state) action_dist = torch.distributions.Normal(mu, std) action = action_dist.sample() return [action.item()] def hessian_matrix_vector_product(self,states,old_action_dists,vector,damping=0.1): mu, std = self.actor(states) new_action_dists = torch.distributions.Normal(mu, std) kl = torch.mean(torch.distributions.kl.kl_divergence(old_action_dists,new_action_dists)) kl_grad = torch.autograd.grad(kl,self.actor.parameters(),create_graph=True) kl_grad_vector = torch.cat([grad.view(-1) for grad in kl_grad]) kl_grad_vector_product = torch.dot(kl_grad_vector, vector) grad2 = torch.autograd.grad(kl_grad_vector_product,self.actor.parameters()) grad2_vector = torch.cat([grad.contiguous().view(-1) for grad in grad2]) return grad2_vector + damping * vector def conjugate_gradient(self, grad, states, old_action_dists): x = torch.zeros_like(grad) r = grad.clone() p = grad.clone() rdotr = torch.dot(r, r) for i in range(10): Hp = self.hessian_matrix_vector_product(states, old_action_dists,p) alpha = rdotr / torch.dot(p, Hp) x += alpha * p r -= alpha * Hp new_rdotr = torch.dot(r, r) if new_rdotr < 1e-10: break beta = new_rdotr / rdotr p = r + beta * p rdotr = new_rdotr return x def compute_surrogate_obj(self, states, actions, advantage, old_log_probs, actor): mu, std = actor(states) action_dists = torch.distributions.Normal(mu, std) log_probs = action_dists.log_prob(actions) ratio = torch.exp(log_probs - old_log_probs) return torch.mean(ratio * advantage) def line_search(self, states, actions, advantage, old_log_probs,old_action_dists, max_vec): old_para = torch.nn.utils.convert_parameters.parameters_to_vector(self.actor.parameters()) old_obj = self.compute_surrogate_obj(states, actions, advantage,old_log_probs, self.actor) for i in range(15): coef = self.alpha**i new_para = old_para + coef * max_vec new_actor = copy.deepcopy(self.actor) torch.nn.utils.convert_parameters.vector_to_parameters(new_para, new_actor.parameters()) mu, std = new_actor(states) new_action_dists = torch.distributions.Normal(mu, std) kl_div = torch.mean( torch.distributions.kl.kl_divergence(old_action_dists,new_action_dists)) new_obj = self.compute_surrogate_obj(states, actions, advantage,old_log_probs, new_actor) if new_obj > old_obj and kl_div < self.kl_constraint: return new_para return old_para def policy_learn(self, states, actions, old_action_dists, old_log_probs,advantage): surrogate_obj = self.compute_surrogate_obj(states, actions, advantage,old_log_probs, self.actor) grads = torch.autograd.grad(surrogate_obj, self.actor.parameters()) obj_grad = torch.cat([grad.view(-1) for grad in grads]).detach() descent_direction = self.conjugate_gradient(obj_grad, states,old_action_dists) Hd = self.hessian_matrix_vector_product(states, old_action_dists,descent_direction) max_coef = torch.sqrt(2 * self.kl_constraint /(torch.dot(descent_direction, Hd) + 1e-8)) new_para = self.line_search(states, actions, advantage, old_log_probs,old_action_dists,descent_direction * max_coef) torch.nn.utils.convert_parameters.vector_to_parameters(new_para, self.actor.parameters()) def update(self, transition_dict): states = torch.tensor(transition_dict['states'],dtype=torch.float).to(self.device) actions = torch.tensor(transition_dict['actions'],dtype=torch.float).view(-1, 1).to(self.device) rewards = torch.tensor(transition_dict['rewards'],dtype=torch.float).view(-1, 1).to(self.device) next_states = torch.tensor(transition_dict['next_states'],dtype=torch.float).to(self.device) dones = torch.tensor(transition_dict['dones'],dtype=torch.float).view(-1, 1).to(self.device) rewards = (rewards + 8.0) / 8.0 # 对奖励进行修改,方便训练 td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones) td_delta = td_target - self.critic(states) advantage = compute_advantage(self.gamma, self.lmbda, td_delta.cpu()).to(self.device) mu, std = self.actor(states) old_action_dists = torch.distributions.Normal(mu.detach(),std.detach()) old_log_probs = old_action_dists.log_prob(actions) critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach())) self.critic_optimizer.zero_grad() critic_loss.backward() self.critic_optimizer.step() self.policy_learn(states, actions, old_action_dists, old_log_probs,advantage) num_episodes = 2000 hidden_dim = 128 gamma = 0.9 lmbda = 0.9 critic_lr = 1e-2 kl_constraint = 0.00005 alpha = 0.5 device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") env_name = 'Pendulum-v0' env = gym.make(env_name) env.seed(0) torch.manual_seed(0) agent = TRPOContinuous(hidden_dim, env.observation_space, env.action_space,lmbda, kl_constraint, alpha, critic_lr, gamma, device) return_list = rl_utils.train_on_policy_agent(env, agent, num_episodes) episodes_list = list(range(len(return_list))) plt.plot(episodes_list, return_list) plt.xlabel('Episodes') plt.ylabel('Returns') plt.title('TRPO on {}'.format(env_name)) plt.show() mv_return = rl_utils.moving_average(return_list, 9) plt.plot(episodes_list, mv_return) plt.xlabel('Episodes') plt.ylabel('Returns') plt.title('TRPO on {}'.format(env_name)) plt.show()
Iteration 0: 100%|██████████| 200/200 [00:23<00:00, 8.34it/s, episode=200, return=-1245.402] Iteration 1: 100%|██████████| 200/200 [00:23<00:00, 8.57it/s, episode=400, return=-1258.636] Iteration 2: 100%|██████████| 200/200 [00:23<00:00, 8.52it/s, episode=600, return=-1195.327] Iteration 3: 100%|██████████| 200/200 [00:23<00:00, 8.61it/s, episode=800, return=-1109.290] Iteration 4: 100%|██████████| 200/200 [00:23<00:00, 8.66it/s, episode=1000, return=-984.067] Iteration 5: 100%|██████████| 200/200 [00:22<00:00, 8.76it/s, episode=1200, return=-709.332] Iteration 6: 100%|██████████| 200/200 [00:22<00:00, 8.78it/s, episode=1400, return=-620.103] Iteration 7: 100%|██████████| 200/200 [00:23<00:00, 8.36it/s, episode=1600, return=-426.315] Iteration 8: 100%|██████████| 200/200 [00:23<00:00, 8.66it/s, episode=1800, return=-468.169] Iteration 9: 100%|██████████| 200/200 [00:22<00:00, 8.78it/s, episode=2000, return=-363.595]
TRPO算法在很多场景上的应用都很成功,但是我们发现它的计算非常复杂,每一步更新的运算量非常大。于是,TRPO算法的改进版——PPO在2017年被提出,PPO基于TRPO的思想,但是其算法实现更加简单。PPO是TRPO的一种改进算法,它在实现上简化了TRPO中的复杂计算,并且它在实验中的性能大多数情况下会比TRPO更好,因此目前常被用作一种基准算法。TRPO和PPO都属于在线策略算法,即使优化目标中包含重要性采样过程,但其只用到了上一轮策略的数据,而不是过去所有策略的数据。并且大量的实验结果表明,与TRPO相比,PPO能学习得一样好(甚至更快),这使得PPO称为非常流行的强化学习算法。如果想要尝试在一个新的环境中是用强化学习算法,那么PPO就属于可以首次尝试的算法。
TRPO的优化目标:
使得
TRPO使用泰勒展开近似、共轭梯度、线性搜索等方法直接求解。PPO的优化目标与TRPO相同,但PPO用了一些简单的方法求解。具体来说,PPO有两种形式:一种是PPO-惩罚;另一种是PPO-截断。
PPO-惩罚:PPO惩罚用拉格朗日乘法直接将KL散度的限制放进目标函数中,这就变成了一个无约束的优化问题,在迭代的过程中不断更新KL散度前的系数,即:
令
,
的更新规则如下:
(1)如果
,那么
。
(2)如果
,那么
(3)否则
其中,
是事先设定的一个超参数,用于限制学习策略和之前一轮策略的差距。
PPO-截断:PPO的另一种形式PPO-截断(PPO-Clip)更加直接,它在目标函数中进行限制,以保证新的参数和旧参数的差距不会太大,即:
其中,
,即把
限制在
内。
是一个超参数,表示进行截断(clip)的范围。
如果
,说明这个动作的价值高于平均值,最大化这个式子会增加
,但不会让其超过
。反之,如果
,最大化这个式子会减小
,但不会让其小于
。
离散动作空间:
import gym import torch import torch.nn.functional as F import numpy as np import matplotlib.pyplot as plt import rl_utils class PolicyNet(torch.nn.Module): def __init__(self, state_dim, hidden_dim, action_dim): super(PolicyNet, self).__init__() self.fc1 = torch.nn.Linear(state_dim, hidden_dim) self.fc2 = torch.nn.Linear(hidden_dim, action_dim) def forward(self, x): x = F.relu(self.fc1(x)) return F.softmax(self.fc2(x), dim=1) class ValueNet(torch.nn.Module): def __init__(self, state_dim, hidden_dim): super(ValueNet, self).__init__() self.fc1 = torch.nn.Linear(state_dim, hidden_dim) self.fc2 = torch.nn.Linear(hidden_dim, 1) def forward(self, x): x = F.relu(self.fc1(x)) return self.fc2(x) class PPO: ''' PPO算法,采用截断方式 ''' def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr,lmbda, epochs, eps, gamma, device): self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device) self.critic = ValueNet(state_dim, hidden_dim).to(device) self.actor_optimizer = torch.optim.Adam(self.actor.parameters(),lr=actor_lr) self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),lr=critic_lr) self.gamma = gamma self.lmbda = lmbda self.epochs = epochs # 一条序列的数据用来训练轮数 self.eps = eps # PPO中截断范围的参数 self.device = device def take_action(self, state): state = torch.tensor([state], dtype=torch.float).to(self.device) probs = self.actor(state) action_dist = torch.distributions.Categorical(probs) action = action_dist.sample() return action.item() def update(self, transition_dict): states = torch.tensor(transition_dict['states'],dtype=torch.float).to(self.device) actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device) rewards = torch.tensor(transition_dict['rewards'],dtype=torch.float).view(-1, 1).to(self.device) next_states = torch.tensor(transition_dict['next_states'],dtype=torch.float).to(self.device) dones = torch.tensor(transition_dict['dones'],dtype=torch.float).view(-1, 1).to(self.device) td_target = rewards + self.gamma * self.critic(next_states) * (1 -dones) td_delta = td_target - self.critic(states) advantage = rl_utils.compute_advantage(self.gamma, self.lmbda,td_delta.cpu()).to(self.device) old_log_probs = torch.log(self.actor(states).gather(1, actions)).detach() for _ in range(self.epochs): log_probs = torch.log(self.actor(states).gather(1, actions)) ratio = torch.exp(log_probs - old_log_probs) surr1 = ratio * advantage surr2 = torch.clamp(ratio, 1 - self.eps,1 + self.eps) * advantage # 截断 actor_loss = torch.mean(-torch.min(surr1, surr2)) # PPO损失函数 critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach())) self.actor_optimizer.zero_grad() self.critic_optimizer.zero_grad() actor_loss.backward() critic_loss.backward() self.actor_optimizer.step() self.critic_optimizer.step() actor_lr = 1e-3 critic_lr = 1e-2 num_episodes = 500 hidden_dim = 128 gamma = 0.98 lmbda = 0.95 epochs = 10 eps = 0.2 device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") env_name = 'CartPole-v0' env = gym.make(env_name) env.seed(0) torch.manual_seed(0) state_dim = env.observation_space.shape[0] action_dim = env.action_space.n agent = PPO(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, epochs, eps, gamma, device) return_list = rl_utils.train_on_policy_agent(env, agent, num_episodes) episodes_list = list(range(len(return_list))) plt.plot(episodes_list, return_list) plt.xlabel('Episodes') plt.ylabel('Returns') plt.title('PPO on {}'.format(env_name)) plt.show() mv_return = rl_utils.moving_average(return_list, 9) plt.plot(episodes_list, mv_return) plt.xlabel('Episodes') plt.ylabel('Returns') plt.title('PPO on {}'.format(env_name)) plt.show()
连续动作空间:
class PolicyNetContinuous(torch.nn.Module): def __init__(self, state_dim, hidden_dim, action_dim): super(PolicyNetContinuous, self).__init__() self.fc1 = torch.nn.Linear(state_dim, hidden_dim) self.fc_mu = torch.nn.Linear(hidden_dim, action_dim) self.fc_std = torch.nn.Linear(hidden_dim, action_dim) def forward(self, x): x = F.relu(self.fc1(x)) mu = 2.0 * torch.tanh(self.fc_mu(x)) std = F.softplus(self.fc_std(x)) return mu, std class PPOContinuous: ''' 处理连续动作的PPO算法 ''' def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr,lmbda, epochs, eps, gamma, device): self.actor = PolicyNetContinuous(state_dim, hidden_dim,action_dim).to(device) self.critic = ValueNet(state_dim, hidden_dim).to(device) self.actor_optimizer = torch.optim.Adam(self.actor.parameters(),lr=actor_lr) self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),lr=critic_lr) self.gamma = gamma self.lmbda = lmbda self.epochs = epochs self.eps = eps self.device = device def take_action(self, state): state = torch.tensor([state], dtype=torch.float).to(self.device) mu, sigma = self.actor(state) action_dist = torch.distributions.Normal(mu, sigma) action = action_dist.sample() return [action.item()] def update(self, transition_dict): states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device) actions = torch.tensor(transition_dict['actions'],dtype=torch.float).view(-1, 1).to(self.device) rewards = torch.tensor(transition_dict['rewards'],dtype=torch.float).view(-1, 1).to(self.device) next_states = torch.tensor(transition_dict['next_states'],dtype=torch.float).to(self.device) dones = torch.tensor(transition_dict['dones'],dtype=torch.float).view(-1, 1).to(self.device) rewards = (rewards + 8.0) / 8.0 # 和TRPO一样,对奖励进行修改,方便训练 td_target = rewards + self.gamma * self.critic(next_states) * (1 -dones) td_delta = td_target - self.critic(states) advantage = rl_utils.compute_advantage(self.gamma, self.lmbda,td_delta.cpu()).to(self.device) mu, std = self.actor(states) action_dists = torch.distributions.Normal(mu.detach(), std.detach()) # 动作是正态分布 old_log_probs = action_dists.log_prob(actions) for _ in range(self.epochs): mu, std = self.actor(states) action_dists = torch.distributions.Normal(mu, std) log_probs = action_dists.log_prob(actions) ratio = torch.exp(log_probs - old_log_probs) surr1 = ratio * advantage surr2 = torch.clamp(ratio, 1 - self.eps, 1 + self.eps) * advantage actor_loss = torch.mean(-torch.min(surr1, surr2)) critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach())) self.actor_optimizer.zero_grad() self.critic_optimizer.zero_grad() actor_loss.backward() critic_loss.backward() self.actor_optimizer.step() self.critic_optimizer.step() actor_lr = 1e-4 critic_lr = 5e-3 num_episodes = 2000 hidden_dim = 128 gamma = 0.9 lmbda = 0.9 epochs = 10 eps = 0.2 device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") env_name = 'Pendulum-v0' env = gym.make(env_name) env.seed(0) torch.manual_seed(0) state_dim = env.observation_space.shape[0] action_dim = env.action_space.shape[0] # 连续动作空间 agent = PPOContinuous(state_dim, hidden_dim, action_dim, actor_lr, critic_lr,lmbda, epochs, eps, gamma, device) return_list = rl_utils.train_on_policy_agent(env, agent, num_episodes) episodes_list = list(range(len(return_list))) plt.plot(episodes_list, return_list) plt.xlabel('Episodes') plt.ylabel('Returns') plt.title('PPO on {}'.format(env_name)) plt.show() mv_return = rl_utils.moving_average(return_list, 21) plt.plot(episodes_list, mv_return) plt.xlabel('Episodes') plt.ylabel('Returns') plt.title('PPO on {}'.format(env_name)) plt.show()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。