赞
踩
Actor-Critic(演员-评论家)是一种强化学习算法,通常用于解决连续动作空间的问题。其一个重要特点是它允许在学习过程中同时学习策略和价值函数,从而可以有效地处理连续动作空间和高维状态空间的问题。在本章的内容中,将详细讲解Actor-Critic算法的知识,为读者步入后面知识的学习打下基础。
Actor-Critic(演员-评论家)算法是一种强化学习算法,用于解决马尔可夫决策过程(MDP)中的连续动作空间问题。Actor-Critic结合了两个重要的组件:演员(Actor)和评论家(Critic),每个组件有不同的角色和功能。
强化学习中的策略梯度方法是一类用于训练策略(policy)的算法,其目标是最大化预期累积奖励。与值函数方法(如Q-learning和深度Q网络)不同,策略梯度方法直接对策略进行参数化,并通过梯度上升来更新策略,以找到最优策略。这些方法在处理连续动作空间和高维状态空间的问题时非常有用,因为它们可以直接输出动作的概率分布。下面是策略梯度方法的一些常用概念和算法:
1. 策略表示
在策略梯度方法中,策略通常用一个参数化函数来表示,例如神经网络。这个函数接受环境的状态作为输入,并输出在给定状态下采取各个动作的概率。
2. 策略梯度
策略梯度方法的核心思想是使用梯度上升法来更新策略的参数,以增加预期累积奖励。梯度上升的目标是最大化一个性能度量,通常是期望回报(expected return)。策略梯度表示通过增加选择高回报动作的概率,减少选择低回报动作的概率。
3. 策略梯度定理
策略梯度方法的理论基础是策略梯度定理(Policy Gradient Theorem),它描述了如何计算策略梯度。该定理表明,策略梯度可以通过对回报乘以动作概率的梯度来计算。
4. REINFORCE算法
REINFORCE是一种经典的策略梯度算法,它使用蒙特卡洛采样来估计策略梯度,通过采样多个轨迹并计算每个动作的梯度,然后更新策略参数。
5. Actor-Critic方法
Actor-Critic算法是一种策略梯度方法,同时结合了演员(Actor)和评论家(Critic)两个组件。演员负责学习策略,评论家负责学习值函数,演员根据评论家的反馈来更新策略。这种方法通常更稳定,因为评论家可以提供更精确的梯度信号。
6. TRPO和PPO
TRPO(Trust Region Policy Optimization)和PPO(Proximal Policy Optimization)是一些进阶的策略梯度方法,它们使用一些技巧来提高算法的稳定性和样本效率。TRPO引入了一个“信任区域”,以确保策略更新不会引起太大的性能下降,PPO使用剪切(clipping)策略梯度以防止太大的策略更新。
策略梯度方法在许多强化学习应用中表现出色,特别是在处理高维连续动作空间问题和在模拟环境中训练深度神经网络策略时。然而,它们通常需要更多的样本来收敛,并且在某些问题上可能会遇到局部最优解的问题。因此,研究人员一直在努力改进策略梯度方法,以提高其性能和稳定性。
Actor-Critic(演员-评论家)是一种强化学习框架,旨在解决马尔可夫决策过程(MDP)中的策略优化问题。该框架结合了两个关键组件:演员(Actor)和评论家(Critic),它们各自负责策略学习和价值评估。
1. 演员(Actor)
演员是策略的学习者,它的主要任务是选择动作,以最大化长期预期回报。演员通常使用一个参数化的策略函数,例如神经网络,将环境的状态作为输入,输出一个动作或动作概率分布。演员的目标是学习一个良好的策略,以在不同的状态下选择动作,以最大化总回报。
2. 评论家(Critic)
评论家是价值函数的学习者,它的任务是估计在给定状态下采取某个动作的预期累积奖励。评论家通常使用一个参数化的价值函数,例如神经网络,将状态和动作作为输入,输出预期回报的估计值。评论家的目标是学习一个准确的价值函数,以提供对演员策略的反馈,帮助演员改进策略。
Actor-Critic框架的工作流程如下:
Actor-Critic框架的关键思想是结合了策略优化(通过演员)和价值估计(通过评论家)两个方面,使得算法更稳定且能够更快地收敛到好的策略。这种框架在深度强化学习中得到了广泛的应用,尤其是处理连续动作空间和高维状态空间的问题时非常有用。不同的Actor-Critic变体和改进方法也在不断涌现,以进一步提高算法的性能和稳定性。
策略网络,通常称为演员(Actor),是强化学习中用于学习并表示策略的神经网络或函数。策略网络的主要工作是将环境的状态映射到采取动作的概率分布。
1. 输入和输出
策略网络接收环境的状态作为输入。状态可以是任何描述环境的信息,通常是一个向量。输出通常是一个动作空间中每个可能动作的概率分布。这可以是离散动作空间(例如,在游戏中选择不同的按键)或连续动作空间(例如,在机器人控制中选择连续的动作值)。
2. 参数化策略
策略网络是一个参数化函数,其参数可以是神经网络的权重和偏差。这些参数决定了策略的具体形状。通过训练,策略网络的参数将被调整,以便生成更优化的策略,以最大化预期回报。
3. 策略选择动作
一旦策略网络接收到环境的状态,它会计算每个可能动作的概率。然后,根据这些概率选择一个动作。通常,可以使用随机采样方法(如softmax函数)来根据概率分布选择动作,或者选择概率最高的动作。
4. 学习策略
策略网络的参数会随着时间的推移而不断更新,以最大化预期累积奖励。更新策略的目标是增加采取高回报动作的概率,并减少采取低回报动作的概率。这通常通过梯度上升法来实现,即根据策略梯度定理计算梯度并应用于策略网络的参数。
5. 探索与利用
策略网络在训练期间需要在探索和利用之间找到平衡。探索是为了发现新的、未知的高回报动作,而利用是为了利用已知的高回报动作。通常,策略网络的参数会随着时间逐渐从探索倾向于利用,以改善策略。
总的来说,策略网络(演员)的工作原理是通过神经网络表示策略,将环境状态映射到动作概率分布,并通过训练来调整策略,以在强化学习任务中最大化累积奖励。这种方法特别适用于处理连续动作空间和高维状态空间的问题,因为它可以学习复杂的策略。
值函数网络,通常称为评论家(Critic),在强化学习中用于估计在给定状态下采取某个动作的预期累积奖励。评论家的工作是帮助策略网络(演员)优化策略,提供对动作的价值估计。
1. 输入和输出
评论家网络接收环境的状态和所采取的动作作为输入。这些输入可以合并为一个状态-动作对。输出是一个值函数,用于估计在给定状态-动作对下的预期累积奖励,通常表示为Q值(或者是状态值V值,如果不考虑具体动作)。
2. 参数化值函数
评论家网络是一个参数化函数,其参数可以是神经网络的权重和偏差。这些参数决定了值函数的形状。通过训练,评论家网络的参数将被调整,以便生成更准确的值函数估计。
3. 价值估计
评论家网络的目标是学习一个准确的值函数,该函数估计在给定状态-动作对下的预期累积奖励。为了估计这个值,评论家网络学习从状态-动作对到预期回报的映射。
4. 训练和误差计算
评论家网络的参数通过最小化值函数估计误差来训练。这个误差通常是实际获得的奖励与值函数估计之间的差异,即时奖励与值函数之间的差异。最常见的目标是均方误差(Mean Squared Error),即将实际回报与值函数估计的平方差最小化。
5. 值函数反馈
一旦评论家网络被训练并产生了准确的值函数估计,这些估计值可以用来为策略网络(演员)提供反馈。评论家可以告诉演员哪些动作在某个状态下更有可能产生高回报,从而引导演员改进策略。
总的来说,值函数网络(评论家)的工作原理是通过神经网络表示值函数,将状态-动作对映射到预期累积奖励的估计,并通过训练来调整估计的准确性。评论家的价值估计可以帮助策略网络更有效地选择动作,从而改善策略的性能。这种框架通常在Actor-Critic算法中使用,以同时学习策略和值函数,以提高强化学习任务的性能。
CartPole是OpenAI gym中的一个游戏测试,目的是通过强化学习让Agent控制购物车cart,使Pole尽量长时间不倒。这个游戏很简单,将购物车往不同的方向推,最终让车爬到山顶。含义。本实例演示使用 TensorFlow实现Actor-Critic算法的过程,功能是在Open AI Gym CartPole-V0 环境中训练代理。在CartPole-v0 环境中,一根杆子连接到沿着无摩擦轨道移动的购物车上。杆是直立的,代理的目标是通过对购物车施加-1或+1的力来防止它翻倒。杆子保持直立的每一步都会得到+1的奖励。当 (1) 杆子与垂直方向的夹角超过 15 度或 (2) 购物车从中心移动超过 2.4 个单位时,一episode播放结束。当这一集的平均总奖励在100次连续试验中达到 195时,这个问题就被认为“解决了”。
实例10-1:手推购物车(实现CartPole游戏)(源码路径:daima\10\ping01.py和ping02.py)
实例文件ping01.py的具体实现流程如下所示:
(1)导入需要的库,然后创建使用CartPole-v0环境。代码如下:
- import collections
- import gym
- import numpy as np
- import statistics
- import tensorflow as tf
- import tqdm
-
- from matplotlib import pyplot as plt
- from tensorflow.keras import layers
- from typing import Any, List, Sequence, Tuple
-
- #创建环境
- env = gym.make("CartPole-v0")
-
- #设置训练数量
- seed = 42
- env.seed(seed)
- tf.random.set_seed(seed)
- np.random.seed(seed)
-
- # 稳定除法运算小值
- eps = np.finfo(np.float32).eps.item()
(2)使用 Actor-Critic算法开发神经网络,创建实现类ActorCritic。Actor和Critic可以分别使用生成行动概率和Critic值的一个神经网络来建模。在本实例中,使用模型子类化来定义模型。在前向传递期间,模型将状态作为输入,并输出动作概率和评论值 ,它对状态相关的价值函数进行建模。目标是训练一个基于策略选择动作的模型最大化预期回报。对于 Cartpole-v0游戏来说有四个代表购物车状态的值:分别是推车位置、推车速度、极角和极速。agent代理可以采取两个动作分别向左 (0) 和向右 (1) 推动购物车。代码如下:
- class ActorCritic(tf.keras.Model):
- """创建神经网络"""
-
- def __init__(
- self,
- num_actions: int,
- num_hidden_units: int):
- """初始化"""
- super().__init__()
-
- self.common = layers.Dense(num_hidden_units, activation="relu")
- self.actor = layers.Dense(num_actions)
- self.critic = layers.Dense(1)
-
- def call(self, inputs: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor]:
- x = self.common(inputs)
- return self.actor(x), self.critic(x)
-
- num_actions = env.action_space.n # 2
- num_hidden_units = 128
-
- model = ActorCritic(num_actions, num_hidden_units)
(3)开始训练数据,需要按照以下步骤训练agent:
接下来我们首先收集训练数据,与监督学习一样,为了训练 Actor-Critic 模型,需要有训练数据。但是为了收集此类数据,模型需要在环境中“运行”。为每一episode收集训练数据。然后在每个时间步内,模型的前向传递将在环境状态下运行,以基于模型权重参数化的当前策略生成动作概率和评论值。然后将从模型生成的动作概率中采样下一个动作,并将其应用于环境,从而生成下一个状态和奖励。这个过程在函数run_episode()中实现,它使用 TensorFlow 操作,以便稍后可以将其编译成 TensorFlow 图以进行更快的训练。请注意,tf.TensorArrays 用于支持可变长度数组的张量迭代。代码如下:
- #将OpenAI Gym的'env.step'调用包装为TensorFlow函数中的操作。
- #这将允许它包含在可调用的TensorFlow图中
-
- def env_step(action: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
- """返回给定操作的状态、奖励和完成标志."""
-
- state, reward, done, _ = env.step(action)
- return (state.astype(np.float32),
- np.array(reward, np.int32),
- np.array(done, np.int32))
-
-
- def tf_env_step(action: tf.Tensor) -> List[tf.Tensor]:
- return tf.numpy_function(env_step, [action],
- [tf.float32, tf.int32, tf.int32])
-
- def run_episode(
- initial_state: tf.Tensor,
- model: tf.keras.Model,
- max_steps: int) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
- """运行单个事件以收集培训数据."""
-
- action_probs = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
- values = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
- rewards = tf.TensorArray(dtype=tf.int32, size=0, dynamic_size=True)
-
- initial_state_shape = initial_state.shape
- state = initial_state
-
- for t in tf.range(max_steps):
- #将状态转换为批处理张量(批处理大小=1)
- state = tf.expand_dims(state, 0)
-
- #运行模型并获取行动概率和临界值
- action_logits_t, value = model(state)
-
- # 从动作概率分布中选取下一个动作
- action = tf.random.categorical(action_logits_t, 1)[0, 0]
- action_probs_t = tf.nn.softmax(action_logits_t)
-
- # 存储Critic值
- values = values.write(t, tf.squeeze(value))
-
- #存储所选操作的日志概率
- action_probs = action_probs.write(t, action_probs_t[0, action])
-
- #对环境应用操作以获得下一个状态和奖励
- state, reward, done = tf_env_step(action)
- state.set_shape(initial_state_shape)
-
- #存储奖励
- rewards = rewards.write(t, reward)
-
- if tf.cast(done, tf.bool):
- break
-
- action_probs = action_probs.stack()
- values = values.stack()
- rewards = rewards.stack()
-
- return action_probs, values, rewards
(4)计算预期回报
- def get_expected_return(
- rewards: tf.Tensor,
- gamma: float,
- standardize: bool = True) -> tf.Tensor:
- """计算每个时间步的预期回报."""
-
- n = tf.shape(rewards)[0]
- returns = tf.TensorArray(dtype=tf.float32, size=n)
-
- # 从'rewards'结尾开始,将奖励金额累积到'returns'数组中
- rewards = tf.cast(rewards[::-1], dtype=tf.float32)
- discounted_sum = tf.constant(0.0)
- discounted_sum_shape = discounted_sum.shape
- for i in tf.range(n):
- reward = rewards[i]
- discounted_sum = reward + gamma * discounted_sum
- discounted_sum.set_shape(discounted_sum_shape)
- returns = returns.write(i, discounted_sum)
- returns = returns.stack()[::-1]
-
- if standardize:
- returns = ((returns - tf.math.reduce_mean(returns)) /
- (tf.math.reduce_std(returns) + eps))
-
- return returns
(5)因为使用了Actor-Critic 模型,因此选择的损失函数是用于训练的 actor 和critic 损失的组合,如下所示:
是Huber loss,它对数据中的异常值比平方误差损失更不敏感。
- huber_loss = tf.keras.losses.Huber(reduction=tf.keras.losses.Reduction.SUM)
-
- def compute_loss(
- action_probs: tf.Tensor,
- values: tf.Tensor,
- returns: tf.Tensor) -> tf.Tensor:
- """计算actor-critic组合的损失"""
-
- advantage = returns - values
-
- action_log_probs = tf.math.log(action_probs)
- actor_loss = -tf.math.reduce_sum(action_log_probs * advantage)
-
- critic_loss = huber_loss(values, returns)
-
- return actor_loss + critic_loss
(6)定义训练步骤以更新参数
上述所有步骤组合成一个训练步骤,每episode都会运行一次,这样损失函数的所有步骤都与tf.GradientTape上下文一起执行以实现自动微分。本实例使用 Adam 优化器将梯度应用于模型参数,在此步骤中,episode_reward还用于计算未折扣奖励的总和。tf.function上下文被施加到train_step功能,使得它可以被编译成一个可调用TensorFlow图,这可导致在训练10倍的加速。代码如下:
- optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)
-
-
- @tf.function
- def train_step(
- initial_state: tf.Tensor,
- model: tf.keras.Model,
- optimizer: tf.keras.optimizers.Optimizer,
- gamma: float,
- max_steps_per_episode: int) -> tf.Tensor:
- """运行模型训练步骤"""
-
- with tf.GradientTape() as tape:
-
- #运行一集的模型以收集训练数据
- action_probs, values, rewards = run_episode(
- initial_state, model, max_steps_per_episode)
-
- #计算预期收益
- returns = get_expected_return(rewards, gamma)
-
- # 将训练数据转换为适当的TF张量形状
- action_probs, values, returns = [
- tf.expand_dims(x, 1) for x in [action_probs, values, returns]]
-
- #计算损失值以更新我们的网络
- loss = compute_loss(action_probs, values, returns)
-
- #根据损失计算梯度
- grads = tape.gradient(loss, model.trainable_variables)
-
- #将渐变应用于模型的参数
- optimizer.apply_gradients(zip(grads, model.trainable_variables))
-
- episode_reward = tf.math.reduce_sum(rewards)
-
- return episode_reward
(7)运行训练循环
通过运行训练步骤来执行训练,直到达到成功标准或最大episode数。将episode奖励的运行记录保存在队列中,一旦达到 100 次,最旧的奖励会从队列的左(尾)端移除,最新的奖励会被添加到头(右)。为了计算效率,还保持了奖励的运行总和。根据运行时间,可以在不到一分钟的时间内完成训练。代码如下:
- min_episodes_criterion = 100
- max_episodes = 10000
- max_steps_per_episode = 1000
-
- # 如果100次连续试验的平均奖励大于等于195,则认为Cartpole-v0
- reward_threshold = 195
- running_reward = 0
-
- #未来奖励的折扣系数
- gamma = 0.99
-
- #保留最后一集奖励
- episodes_reward: collections.deque = collections.deque(maxlen=min_episodes_criterion)
-
- with tqdm.trange(max_episodes) as t:
- for i in t:
- initial_state = tf.constant(env.reset(), dtype=tf.float32)
- episode_reward = int(train_step(
- initial_state, model, optimizer, gamma, max_steps_per_episode))
-
- episodes_reward.append(episode_reward)
- running_reward = statistics.mean(episodes_reward)
-
- t.set_description(f'Episode {i}')
- t.set_postfix(
- episode_reward=episode_reward, running_reward=running_reward)
-
- #平均每10集播放一集奖励
- if i % 10 == 0:
- pass # print(f'Episode {i}: average reward: {avg_reward}')
-
- if running_reward > reward_threshold and i >= min_episodes_criterion:
- break
-
- print(f'\nSolved at episode {i}: average reward: {running_reward:.2f}!')
执行后会输出:
- Episode 361: 4%|▎ | 361/10000 [01:16<34:10, 4.70it/s, episode_reward=182, running_reward=195]
- Solved at episode 361: average reward: 195.14!
- CPU times: user 2min 50s, sys: 40.3 s, total: 3min 30s
- Wall time: 1min 16s
Wall time: 1min 16s
(8)可视化
在训练完成后,建议可视化展示模型在环境中的表现。可以使用下面的代码以生成模型的一episode运行的 GIF 动画。请注意,需要为 OpenAI Gym 安装其他软件包,才能够在 Colab 中正确渲染环境图像。
- #渲染一episode并另存为GIF文件
-
- from IPython import display as ipythondisplay
- from PIL import Image
- from pyvirtualdisplay import Display
-
-
- display = Display(visible=0, size=(400, 300))
- display.start()
-
-
- def render_episode(env: gym.Env, model: tf.keras.Model, max_steps: int):
- screen = env.render(mode='rgb_array')
- im = Image.fromarray(screen)
-
- images = [im]
-
- state = tf.constant(env.reset(), dtype=tf.float32)
- for i in range(1, max_steps + 1):
- state = tf.expand_dims(state, 0)
- action_probs, _ = model(state)
- action = np.argmax(np.squeeze(action_probs))
-
- state, _, done, _ = env.step(action)
- state = tf.constant(state, dtype=tf.float32)
-
- # 每10步渲染一次屏幕
- if i % 10 == 0:
- screen = env.render(mode='rgb_array')
- images.append(Image.fromarray(screen))
-
- if done:
- break
-
- return images
-
-
- #保存为GIF格式的图像
- images = render_episode(env, model, max_steps_per_episode)
- image_file = 'cartpole-v0.gif'
- # loop=0:永远循环, duration=1: 每1ms播放1帧
- images[0].save(
- image_file, save_all=True, append_images=images[1:], loop=0, duration=1)
执行后的可视化效果如图10-1所示。
图10-1 手推购物车的可视化效果
通过上面对强化学习的了解,我们可以总结出强化学习包含了5个基本对象:
再看下面的实例文件ping02.py,使用强化学习实现了简易平衡杆功能。
- import tensorflow as tf
- import numpy as np
- import gym
- import random
- from collections import deque
-
- num_episodes = 500 # 游戏训练的总episode数量
- num_exploration_episodes = 100 # 探索过程所占的episode数量
- max_len_episode = 1000 # 每个episode的最大回合数
- batch_size = 32 # 批次大小
- learning_rate = 1e-3 # 学习率
- gamma = 1. # 折扣因子
- initial_epsilon = 1. # 探索起始时的探索率
- final_epsilon = 0.01 # 探索终止时的探索率
-
- class QNetwork(tf.keras.Model):
- def __init__(self):
- super().__init__()
- self.dense1 = tf.keras.layers.Dense(units=24, activation=tf.nn.relu)
- self.dense2 = tf.keras.layers.Dense(units=24, activation=tf.nn.relu)
- self.dense3 = tf.keras.layers.Dense(units=2)
-
- def call(self, inputs):
- x = self.dense1(inputs)
- x = self.dense2(x)
- x = self.dense3(x)
- return x
-
- def predict(self, inputs):
- q_values = self(inputs)
- return tf.argmax(q_values, axis=-1)
-
- if __name__ == '__main__':
- env = gym.make('CartPole-v1') # 实例化一个游戏环境,参数为游戏名称
- model = QNetwork()
- optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
- replay_buffer = deque(maxlen=10000) # 使用一个 deque 作为 Q Learning 的经验回放池
- epsilon = initial_epsilon
- for episode_id in range(num_episodes):
- state = env.reset() # 初始化环境,获得初始状态
- epsilon = max( # 计算当前探索率
- initial_epsilon * (num_exploration_episodes - episode_id) / num_exploration_episodes,
- final_epsilon)
- for t in range(max_len_episode):
- env.render() # 对当前帧进行渲染,绘图到屏幕
- if random.random() < epsilon: # epsilon-greedy 探索策略,以 epsilon 的概率选择随机动作
- action = env.action_space.sample() # 选择随机动作(探索)
- else:
- action = model.predict(np.expand_dims(state, axis=0)).numpy() # 选择模型计算出的 Q Value 最大的动作
- action = action[0]
-
- # 让环境执行动作,获得执行完动作的下一个状态,动作的奖励,游戏是否已结束以及额外信息
- next_state, reward, done, info = env.step(action)
- # 如果游戏Game Over,给予大的负奖励
- reward = -10. if done else reward
- # 将(state, action, reward, next_state)的四元组(外加 done 标签表示是否结束)放入经验回放池
- replay_buffer.append((state, action, reward, next_state, 1 if done else 0))
- # 更新当前 state
- state = next_state
-
- if done: # 游戏结束则退出本轮循环,进行下一个 episode
- print("episode %d, epsilon %f, score %d" % (episode_id, epsilon, t))
- break
-
- if len(replay_buffer) >= batch_size:
- # 从经验回放池中随机取一个批次的四元组,并分别转换为 NumPy 数组
- batch_state, batch_action, batch_reward, batch_next_state, batch_done = zip(
- *random.sample(replay_buffer, batch_size))
- batch_state, batch_reward, batch_next_state, batch_done = \
- [np.array(a, dtype=np.float32) for a in [batch_state, batch_reward, batch_next_state, batch_done]]
- batch_action = np.array(batch_action, dtype=np.int32)
-
- q_value = model(batch_next_state)
- y = batch_reward + (gamma * tf.reduce_max(q_value, axis=1)) * (1 - batch_done) # 计算 y 值
- with tf.GradientTape() as tape:
- loss = tf.keras.losses.mean_squared_error( # 最小化 y 和 Q-value 的距离
- y_true=y,
- y_pred=tf.reduce_sum(model(batch_state) * tf.one_hot(batch_action, depth=2), axis=1)
- )
- grads = tape.gradient(loss, model.variables)
- optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables)) # 计算梯度并更新参数
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。