赞
踩
大家好,今天和各位分享一下深度强化学习中的 Actor-Critic 演员评论家算法,Actor-Critic 算法是一种综合了策略迭代和价值迭代的集成算法。我将使用该模型结合 OpenAI 中的 Gym 环境完成一个小游戏,完整代码可以从我的 GitHub 中获得:
https://github.com/LiSir-HIT/Reinforcement-Learning/tree/main/Model
根据 agent 选择动作方法的不同,可以把强化学习方法分为三大类:行动者方法(Actor-only),评论家方法(Critic-only),行动者评论家方法(Actor-critic)。
行动者方法中不会对值函数进行估计,直接按照当前策略和环境进行交互。通过交互后得到的立即奖赏值直接优化当前策略。例如:Policy Gradients
评论家方法没有需要维护的策略,评论家方法的策略是直接通过当前的值函数获得的,并通过值函数获得的策略与环境交互。交互得到的立即奖赏值用来优化当前值函数。例如:DQN
行动者评论家方法是由行动者和评论家两个部分构成。行动者用于选择动作,评论家评论选择动作的好坏。行动者选择动作的方法不是依据当前的值函数,而是依据存储的策略。评论家的评论一般采用时间差分误差的形式,时间差分误差是根据当前的值函数计算获得的。时间差分误差是是评论家的唯一输出,并且驱动了行动者和评论家之间的所有学习。
根据策略梯度算法的定义,策略优化目标函数如下:
令 ,,称 为优势函数。采用 n 步时序差分法求解时, 可以表示如下:
当 n 为一个完整的状态序列大小时,该算法与蒙特卡洛算法等价。
Actor-Critic 算法一共分为两个部分,Critic 和 Actor 网络。
Critic 是评判网络,当输入为环境状态时,它可以评估当前状态的价值,当输入为环境状态和采取的动作时,它可以评估当前状态下采取该动作的价值。
Actor 为策略网络,以当前的状态作为输入,输出为动作的概率分布或者连续动作值,再由 Critic 网络来评价该动作的好坏从而调整策略。Actor-Critic算法将动作价值评估和策略更新过程分开,Actor 可以对当前环境进行充分探索并缓慢进行策略更新,Critic 只需要负责评价策略的好坏,所以这种集成算法有相对较好的性能。
Critic 网络的输入一般有两种形式,(1)如果输入为状态,则该评价网络的作用为评价当前状态价值;(2)如果输入为状态和动作,则该评价网络的作用为评价当前状态的动作价值。
如果评价网络 Critic 为状态价值 state value 的评价网络,输入为状态。Critic 网络的损失函数计算公式采用均方误差损失函数,即 TD 误差值的累计平方值的均值,表达式如下:
Actor 网络的优化目标可以如下:
其中, 代表最优策略,由于该公式表达的含义为当 TD 误差值大于 0 时增强该动作选择概率,当 TD 误差值小于 0 时减小该动作选择概率,所以目标为最小化损失函数
如果评价网络 Critic 为动作价值 action value 的评价网络,即输入为状态和动作,则Critic 网络的损失函数如下:
其中, 的表达式变换如下:
Actor-Critic 算法流程如下:
Actor-Critic 模型部分的实现方式如下:
- import torch
- from torch import nn
- from torch.nn import functional as F
- import numpy as np
-
- # ------------------------------------ #
- # 策略梯度Actor,动作选择
- # ------------------------------------ #
-
- class PolicyNet(nn.Module):
- def __init__(self, n_states, n_hiddens, n_actions):
- super(PolicyNet, self).__init__()
- self.fc1 = nn.Linear(n_states, n_hiddens)
- self.fc2 = nn.Linear(n_hiddens, n_actions)
- # 前向传播
- def forward(self, x):
- x = self.fc1(x) # [b,n_states]-->[b,n_hiddens]
- x = F.relu(x)
- x = self.fc2(x) # [b,n_hiddens]-->[b,n_actions]
- # 每个状态对应的动作的概率
- x = F.softmax(x, dim=1) # [b,n_actions]-->[b,n_actions]
- return x
-
- # ------------------------------------ #
- # 值函数Critic,动作评估输出 shape=[b,1]
- # ------------------------------------ #
-
- class ValueNet(nn.Module):
- def __init__(self, n_states, n_hiddens):
- super(ValueNet, self).__init__()
- self.fc1 = nn.Linear(n_states, n_hiddens)
- self.fc2 = nn.Linear(n_hiddens, 1)
- # 前向传播
- def forward(self, x):
- x = self.fc1(x) # [b,n_states]-->[b,n_hiddens]
- x = F.relu(x)
- x = self.fc2(x) # [b,n_hiddens]-->[b,1]
- return x
-
- # ------------------------------------ #
- # Actor-Critic
- # ------------------------------------ #
-
- class ActorCritic:
- def __init__(self, n_states, n_hiddens, n_actions,
- actor_lr, critic_lr, gamma):
- # 属性分配
- self.gamma = gamma
-
- # 实例化策略网络
- self.actor = PolicyNet(n_states, n_hiddens, n_actions)
- # 实例化价值网络
- self.critic = ValueNet(n_states, n_hiddens)
- # 策略网络的优化器
- self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
- # 价值网络的优化器
- self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)
-
- # 动作选择
- def take_action(self, state):
- # 维度变换numpy[n_states]-->[1,n_sates]-->tensor
- state = torch.tensor(state[np.newaxis, :])
- # 动作价值函数,当前状态下各个动作的概率
- probs = self.actor(state)
- # 创建以probs为标准类型的数据分布
- action_dist = torch.distributions.Categorical(probs)
- # 随机选择一个动作 tensor-->int
- action = action_dist.sample().item()
- return action
-
- # 模型更新
- def update(self, transition_dict):
- # 训练集
- states = torch.tensor(transition_dict['states'], dtype=torch.float)
- actions = torch.tensor(transition_dict['actions']).view(-1,1)
- rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1,1)
- next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float)
- dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1,1)
-
- # 预测的当前时刻的state_value
- td_value = self.critic(states)
- # 目标的当前时刻的state_value
- td_target = rewards + self.gamma * self.critic(next_states) * (1-dones)
- # 时序差分的误差计算,目标的state_value与预测的state_value之差
- td_delta = td_target - td_value
-
- # 对每个状态对应的动作价值用log函数
- log_probs = torch.log(self.actor(states).gather(1, actions))
- # 策略梯度损失
- actor_loss = torch.mean(-log_probs * td_delta.detach())
- # 值函数损失,预测值和目标值之间
- critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))
-
- # 优化器梯度清0
- self.actor_optimizer.zero_grad() # 策略梯度网络的优化器
- self.critic_optimizer.zero_grad() # 价值网络的优化器
- # 反向传播
- actor_loss.backward()
- critic_loss.backward()
- # 参数更新
- self.actor_optimizer.step()
- self.critic_optimizer.step()
我们使用 OpenAI 的 gym 库中的环境,完成一个小案例。我们的目的是左右移动黑色小车使得黄色的杆子保持竖直。状态 state 的维度为 4,动作 action 有 2 个。
环境交互与训练部分的代码如下:
- import numpy as np
- import matplotlib.pyplot as plt
- import gym
- import torch
- from RL_brain import ActorCritic
-
- # ----------------------------------------- #
- # 参数设置
- # ----------------------------------------- #
-
- num_episodes = 100 # 总迭代次数
- gamma = 0.9 # 折扣因子
- actor_lr = 1e-3 # 策略网络的学习率
- critic_lr = 1e-2 # 价值网络的学习率
- n_hiddens = 16 # 隐含层神经元个数
- env_name = 'CartPole-v1'
- return_list = [] # 保存每个回合的return
-
- # ----------------------------------------- #
- # 环境加载
- # ----------------------------------------- #
-
- env = gym.make(env_name, render_mode="human")
- n_states = env.observation_space.shape[0] # 状态数 4
- n_actions = env.action_space.n # 动作数 2
-
- # ----------------------------------------- #
- # 模型构建
- # ----------------------------------------- #
-
- agent = ActorCritic(n_states=n_states, # 状态数
- n_hiddens=n_hiddens, # 隐含层数
- n_actions=n_actions, # 动作数
- actor_lr=actor_lr, # 策略网络学习率
- critic_lr=critic_lr, # 价值网络学习率
- gamma=gamma) # 折扣因子
-
- # ----------------------------------------- #
- # 训练--回合更新
- # ----------------------------------------- #
-
- for i in range(num_episodes):
-
- state = env.reset()[0] # 环境重置
- done = False # 任务完成的标记
- episode_return = 0 # 累计每回合的reward
-
- # 构造数据集,保存每个回合的状态数据
- transition_dict = {
- 'states': [],
- 'actions': [],
- 'next_states': [],
- 'rewards': [],
- 'dones': [],
- }
-
- while not done:
- action = agent.take_action(state) # 动作选择
- next_state, reward, done, _, _ = env.step(action) # 环境更新
- # 保存每个时刻的状态\动作\...
- transition_dict['states'].append(state)
- transition_dict['actions'].append(action)
- transition_dict['next_states'].append(next_state)
- transition_dict['rewards'].append(reward)
- transition_dict['dones'].append(done)
- # 更新状态
- state = next_state
- # 累计回合奖励
- episode_return += reward
-
- # 保存每个回合的return
- return_list.append(episode_return)
- # 模型训练
- agent.update(transition_dict)
-
- # 打印回合信息
- print(f'iter:{i}, return:{np.mean(return_list[-10:])}')
-
- # -------------------------------------- #
- # 绘图
- # -------------------------------------- #
-
- plt.plot(return_list)
- plt.title('return')
- plt.show()
绘制每回合的回报 return
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。