当前位置:   article > 正文

强化学习PPO代码解释以及流程_ppo lstm

ppo lstm

强化学习解释

import gym
import torch
import torch.nn.functional as F
from tqdm import tqdm
import numpy as np


class ValueNet(torch.nn.Module):
    """价值网络模型"""

    def __init__(self, state_dim, hidden_dim):
        super(ValueNet, self).__init__()
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
        self.fc2 = torch.nn.Linear(hidden_dim, 1)

    def forward(self, x):
        """输入是状态(state),输出是状态的价值(value)"""
        x = F.relu(self.fc1(x))
        return self.fc2(x)


class PolicyNetContinuous(torch.nn.Module):
    """策略网络模型"""

    def __init__(self, state_dim, hidden_dim, action_dim):
        super(PolicyNetContinuous, self).__init__()
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
        self.fc_mu = torch.nn.Linear(hidden_dim, action_dim)
        self.fc_std = torch.nn.Linear(hidden_dim, action_dim)

    def forward(self, x):
        """输入是状态(state),输出是一个动作(action)的均值(mu)和标准差(std)"""
        x = F.relu(self.fc1(x))
        mu = 2.0 * torch.tanh(self.fc_mu(x))
        std = F.softplus(self.fc_std(x))
        return mu, std


class PPOContinuous:
    """PPO算法的主要类,包含了策略网络(actor)和价值网络(critic)以及用于优化网络参数的优化器"""

    def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, epochs, eps, gamma, device):
        self.actor = PolicyNetContinuous(state_dim, hidden_dim, action_dim).to(device)
        self.critic = ValueNet(state_dim, hidden_dim).to(device)
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)
        self.gamma = gamma
        self.lmbda = lmbda
        self.epochs = epochs
        self.eps = eps
        self.device = device

    def take_action(self, state):
        """
        根据当前状态(state)选择一个动作(action)。
        首先将状态转换为张量,并通过策略网络获得动作的均值(mu)和标准差(sigma)。然后,根据均值和标准差创建一个正态分布对象,并从中采样一个动作,最后返回动作的值。
        """
        state = torch.tensor([state], dtype=torch.float).to(self.device)
        mu, sigma = self.actor(state)
        action_dist = torch.distributions.Normal(mu, sigma)
        action = action_dist.sample()
        return [action.item()]

    def update(self, transition_dict):
        """
        用于更新策略和价值网络。首先将经验转换为张量,并计算目标值(td_target)和优势函数(advantage)。
        然后根据当前状态使用策略网络得到新的均值(mu)和标准差(std),构建正态分布(action_dists)。计算旧的动作对数概率(old_log_probs)
        """
        # 当前状态state
        states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device)
        # 当前状态state选择的动作
        actions = torch.tensor(transition_dict['actions'], dtype=torch.float).view(-1, 1).to(self.device)
        # 执行actions得到的奖励reward
        rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1, 1).to(self.device)
        # 执行actions得到的下一个状态next_state
        next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device)
        # 执行actions得到的是否终止done
        dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1).to(self.device)

        rewards = (rewards + 8.0) / 8.0  # 和TRPO一样,对奖励进行修改,方便训练

        # TD目标
        td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones)
        # td_target = rewards + gamma * V(next_states) * (1 - dones)
        # rewards是当前回合的奖励,gamma是折扣因子,V(next_states)是下一个状态对应的价值网络的估计值,dones表示是否到达终止状态。

        td_delta = td_target - self.critic(states)
        # 优势函数
        advantage = compute_advantage(self.gamma, self.lmbda, td_delta.cpu()).to(self.device)
        # advantage = compute_advantage(gamma, lmbda, td_delta)
        # compute_advantage函数根据GAE(Generalized Advantage Estimation)算法计算优势函数

        mu, std = self.actor(states)
        action_dists = torch.distributions.Normal(mu.detach(), std.detach())  # 用于计算动作的概率密度

        # 旧的动作对数概率,将在更新步骤中用到
        old_log_probs = action_dists.log_prob(actions)
        # ction_dists是根据策略网络输出的均值(mu)和标准差(std)构建的正态分布,actions是当前回合选择的动作

        for _ in range(self.epochs):
            """
            在每次更新中,重新计算策略网络的均值(mu)和标准差(std),构建新的正态分布(action_dists),计算新的动作对数概率(log_probs)。
            计算重要性采样比率(ratio)和两个损失函数的修剪项(surr1和surr2)。其中,surr1是未修剪的损失,surr2是经过修剪的损失,
            通过使用torch.clamp函数将ratio限制在区间[1-eps, 1+eps]内。计算策略网络的损失函数(actor_loss)和价值网络的损失函数(critic_loss)。
            将两个网络的梯度清零,然后分别对两个损失函数进行反向传播并进行优化。
            """
            mu, std = self.actor(states)
            # 创建新的正态分布对象
            action_dists = torch.distributions.Normal(mu, std)

            # 新的动作对数概率
            log_probs = action_dists.log_prob(actions)
            # action_dists是根据更新后的策略网络重新构建的正态分布

            # 重要性采样比率,新步骤中需要用到的比率(ratio)
            ratio = torch.exp(log_probs - old_log_probs)
            # exp表示指数函数

            # 修剪项(surr1和surr2)
            surr1 = ratio * advantage
            surr2 = torch.clamp(ratio, 1 - self.eps, 1 + self.eps) * advantage  # 利用截断函数来约束比率的范围,以增加训练的稳定性
            # clamp函数用于将ratio限制在区间[1-eps, 1+eps]内

            # 策略网络的损失函数
            actor_loss = torch.mean(-torch.min(surr1, surr2))  # 取两个损失的最小值的负平均,PPO算法核心损失函数
            # min表示取最小值,mean表示取平均值

            # 价值网络的损失函数
            critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))  # 使用均方误差损失函数来度量评论家网络的误差
            # MSE表示均方误差,detach表示断开反向传播的计算图

            self.actor_optimizer.zero_grad()
            self.critic_optimizer.zero_grad()
            actor_loss.backward()
            critic_loss.backward()
            self.actor_optimizer.step()
            self.critic_optimizer.step()


def train_on_policy_agent(env, agent, num_episodes):
    return_list = []
    for i in range(10):
        with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar:
            for i_episode in range(int(num_episodes / 10)):
                episode_return = 0
                transition_dict = {'states': [], 'actions': [], 'next_states': [], 'rewards': [], 'dones': []}
                # 重置环境并获取初始状态
                state = env.reset()
                done = False
                while not done:
                    action = agent.take_action(state)  # 根据当前状态state使用当前策略选择一个动作action
                    next_state, reward, done, _ = env.step(action)  # 执行选择的动作action,并观察下一个状态next_state、奖励reward和是否终止done
                    transition_dict['states'].append(state)
                    transition_dict['actions'].append(action)
                    transition_dict['next_states'].append(next_state)
                    transition_dict['rewards'].append(reward)
                    transition_dict['dones'].append(done)
                    state = next_state   # 更新当前状态为下一个状态
                    episode_return += reward
                return_list.append(episode_return)
                agent.update(transition_dict)
                if (i_episode + 1) % 10 == 0:
                    pbar.set_postfix({'episode': '%d' % (num_episodes / 10 * i + i_episode + 1),
                                      'return': '%.3f' % np.mean(return_list[-10:])})
                pbar.update(1)
    return return_list


def compute_advantage(gamma, lmbda, td_delta):
    """
    势函数的估计值
    :param gamma: 折扣因子
    :param lmbda: GAE参数
    :param td_delta: TD误差
    :return: 优势函数的估计值
    """
    td_delta = td_delta.detach().numpy()
    advantage_list = []
    advantage = 0.0
    for delta in td_delta[::-1]:
        """对于每个TD误差(delta),从后往前计算优势函数的估计值,使用公式:advantage = gamma * lmbda * advantage + delta"""
        advantage = gamma * lmbda * advantage + delta
        advantage_list.append(advantage)
    advantage_list.reverse()  # 将优势列表反转
    return torch.tensor(advantage_list, dtype=torch.float)


actor_lr = 1e-4
critic_lr = 5e-3
num_episodes = 2000
hidden_dim = 128
gamma = 0.9
lmbda = 0.9
epochs = 10
eps = 0.2
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

env_name = 'Pendulum-v0'
env = gym.make(env_name)
env.seed(0)
torch.manual_seed(0)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]  # 连续动作空间
agent = PPOContinuous(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, epochs, eps, gamma, device)
return_list = train_on_policy_agent(env, agent, num_episodes)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/171445
推荐阅读
相关标签
  

闽ICP备14008679号