赞
踩
强化学习解释
import gym import torch import torch.nn.functional as F from tqdm import tqdm import numpy as np class ValueNet(torch.nn.Module): """价值网络模型""" def __init__(self, state_dim, hidden_dim): super(ValueNet, self).__init__() self.fc1 = torch.nn.Linear(state_dim, hidden_dim) self.fc2 = torch.nn.Linear(hidden_dim, 1) def forward(self, x): """输入是状态(state),输出是状态的价值(value)""" x = F.relu(self.fc1(x)) return self.fc2(x) class PolicyNetContinuous(torch.nn.Module): """策略网络模型""" def __init__(self, state_dim, hidden_dim, action_dim): super(PolicyNetContinuous, self).__init__() self.fc1 = torch.nn.Linear(state_dim, hidden_dim) self.fc_mu = torch.nn.Linear(hidden_dim, action_dim) self.fc_std = torch.nn.Linear(hidden_dim, action_dim) def forward(self, x): """输入是状态(state),输出是一个动作(action)的均值(mu)和标准差(std)""" x = F.relu(self.fc1(x)) mu = 2.0 * torch.tanh(self.fc_mu(x)) std = F.softplus(self.fc_std(x)) return mu, std class PPOContinuous: """PPO算法的主要类,包含了策略网络(actor)和价值网络(critic)以及用于优化网络参数的优化器""" def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, epochs, eps, gamma, device): self.actor = PolicyNetContinuous(state_dim, hidden_dim, action_dim).to(device) self.critic = ValueNet(state_dim, hidden_dim).to(device) self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr) self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr) self.gamma = gamma self.lmbda = lmbda self.epochs = epochs self.eps = eps self.device = device def take_action(self, state): """ 根据当前状态(state)选择一个动作(action)。 首先将状态转换为张量,并通过策略网络获得动作的均值(mu)和标准差(sigma)。然后,根据均值和标准差创建一个正态分布对象,并从中采样一个动作,最后返回动作的值。 """ state = torch.tensor([state], dtype=torch.float).to(self.device) mu, sigma = self.actor(state) action_dist = torch.distributions.Normal(mu, sigma) action = action_dist.sample() return [action.item()] def update(self, transition_dict): """ 用于更新策略和价值网络。首先将经验转换为张量,并计算目标值(td_target)和优势函数(advantage)。 然后根据当前状态使用策略网络得到新的均值(mu)和标准差(std),构建正态分布(action_dists)。计算旧的动作对数概率(old_log_probs) """ # 当前状态state states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device) # 当前状态state选择的动作 actions = torch.tensor(transition_dict['actions'], dtype=torch.float).view(-1, 1).to(self.device) # 执行actions得到的奖励reward rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1, 1).to(self.device) # 执行actions得到的下一个状态next_state next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device) # 执行actions得到的是否终止done dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1).to(self.device) rewards = (rewards + 8.0) / 8.0 # 和TRPO一样,对奖励进行修改,方便训练 # TD目标 td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones) # td_target = rewards + gamma * V(next_states) * (1 - dones) # rewards是当前回合的奖励,gamma是折扣因子,V(next_states)是下一个状态对应的价值网络的估计值,dones表示是否到达终止状态。 td_delta = td_target - self.critic(states) # 优势函数 advantage = compute_advantage(self.gamma, self.lmbda, td_delta.cpu()).to(self.device) # advantage = compute_advantage(gamma, lmbda, td_delta) # compute_advantage函数根据GAE(Generalized Advantage Estimation)算法计算优势函数 mu, std = self.actor(states) action_dists = torch.distributions.Normal(mu.detach(), std.detach()) # 用于计算动作的概率密度 # 旧的动作对数概率,将在更新步骤中用到 old_log_probs = action_dists.log_prob(actions) # ction_dists是根据策略网络输出的均值(mu)和标准差(std)构建的正态分布,actions是当前回合选择的动作 for _ in range(self.epochs): """ 在每次更新中,重新计算策略网络的均值(mu)和标准差(std),构建新的正态分布(action_dists),计算新的动作对数概率(log_probs)。 计算重要性采样比率(ratio)和两个损失函数的修剪项(surr1和surr2)。其中,surr1是未修剪的损失,surr2是经过修剪的损失, 通过使用torch.clamp函数将ratio限制在区间[1-eps, 1+eps]内。计算策略网络的损失函数(actor_loss)和价值网络的损失函数(critic_loss)。 将两个网络的梯度清零,然后分别对两个损失函数进行反向传播并进行优化。 """ mu, std = self.actor(states) # 创建新的正态分布对象 action_dists = torch.distributions.Normal(mu, std) # 新的动作对数概率 log_probs = action_dists.log_prob(actions) # action_dists是根据更新后的策略网络重新构建的正态分布 # 重要性采样比率,新步骤中需要用到的比率(ratio) ratio = torch.exp(log_probs - old_log_probs) # exp表示指数函数 # 修剪项(surr1和surr2) surr1 = ratio * advantage surr2 = torch.clamp(ratio, 1 - self.eps, 1 + self.eps) * advantage # 利用截断函数来约束比率的范围,以增加训练的稳定性 # clamp函数用于将ratio限制在区间[1-eps, 1+eps]内 # 策略网络的损失函数 actor_loss = torch.mean(-torch.min(surr1, surr2)) # 取两个损失的最小值的负平均,PPO算法核心损失函数 # min表示取最小值,mean表示取平均值 # 价值网络的损失函数 critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach())) # 使用均方误差损失函数来度量评论家网络的误差 # MSE表示均方误差,detach表示断开反向传播的计算图 self.actor_optimizer.zero_grad() self.critic_optimizer.zero_grad() actor_loss.backward() critic_loss.backward() self.actor_optimizer.step() self.critic_optimizer.step() def train_on_policy_agent(env, agent, num_episodes): return_list = [] for i in range(10): with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar: for i_episode in range(int(num_episodes / 10)): episode_return = 0 transition_dict = {'states': [], 'actions': [], 'next_states': [], 'rewards': [], 'dones': []} # 重置环境并获取初始状态 state = env.reset() done = False while not done: action = agent.take_action(state) # 根据当前状态state使用当前策略选择一个动作action next_state, reward, done, _ = env.step(action) # 执行选择的动作action,并观察下一个状态next_state、奖励reward和是否终止done transition_dict['states'].append(state) transition_dict['actions'].append(action) transition_dict['next_states'].append(next_state) transition_dict['rewards'].append(reward) transition_dict['dones'].append(done) state = next_state # 更新当前状态为下一个状态 episode_return += reward return_list.append(episode_return) agent.update(transition_dict) if (i_episode + 1) % 10 == 0: pbar.set_postfix({'episode': '%d' % (num_episodes / 10 * i + i_episode + 1), 'return': '%.3f' % np.mean(return_list[-10:])}) pbar.update(1) return return_list def compute_advantage(gamma, lmbda, td_delta): """ 势函数的估计值 :param gamma: 折扣因子 :param lmbda: GAE参数 :param td_delta: TD误差 :return: 优势函数的估计值 """ td_delta = td_delta.detach().numpy() advantage_list = [] advantage = 0.0 for delta in td_delta[::-1]: """对于每个TD误差(delta),从后往前计算优势函数的估计值,使用公式:advantage = gamma * lmbda * advantage + delta""" advantage = gamma * lmbda * advantage + delta advantage_list.append(advantage) advantage_list.reverse() # 将优势列表反转 return torch.tensor(advantage_list, dtype=torch.float) actor_lr = 1e-4 critic_lr = 5e-3 num_episodes = 2000 hidden_dim = 128 gamma = 0.9 lmbda = 0.9 epochs = 10 eps = 0.2 device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") env_name = 'Pendulum-v0' env = gym.make(env_name) env.seed(0) torch.manual_seed(0) state_dim = env.observation_space.shape[0] action_dim = env.action_space.shape[0] # 连续动作空间 agent = PPOContinuous(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, epochs, eps, gamma, device) return_list = train_on_policy_agent(env, agent, num_episodes)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。