赞
踩
概述
Deep Q-Learning(深度 Q 学习)是一种强化学习算法,用于解决决策问题,其中代理(agent)通过学习在不同环境中采取行动来最大化累积奖励。Lunar Lander 是一个经典的强化学习问题,其中代理的任务是控制一个着陆舱在月球表面着陆,最小化着陆过程中的燃料消耗。
以下是使用 Deep Q-Learning 解决 Lunar Lander 问题的基本步骤:
环境建模: 首先,需要对 Lunar Lander 环境进行建模。这包括定义状态空间、动作空间、奖励函数等。在 Lunar Lander 中,状态可以包括着陆舱的位置、速度、角度等信息,动作可以是推力引擎的火力等。
深度 Q 网络: 创建一个深度神经网络,该网络将输入状态,并输出每个可能动作的 Q 值。Q 值表示在给定状态下采取某个动作的累积奖励的估计。网络的目标是通过学习调整 Q 值以最大化累积奖励。
经验回放: 使用经验回放机制来改善学习稳定性。这涉及到存储代理先前的经验,并从中随机抽样进行训练。这有助于解决样本相关性问题,提高算法的收敛性。
ε-贪婪策略: 引入 ε-贪婪策略,以平衡探索和利用。在一部分情况下,代理将以高概率选择当前认为最佳的动作(贪婪),而在其他情况下,它会以较高概率选择一个随机动作(探索)。
目标 Q 值计算: 使用目标 Q 值来更新网络参数。目标 Q 值通过将当前状态的即时奖励与下一个状态的最大 Q 值相结合得到。这有助于更有效地传播奖励信号。
训练: 通过与环境的交互,不断地更新深度 Q 网络的参数。代理通过学习来优化其行为,以最大化预期的累积奖励。
调优: 对算法的超参数进行调优,包括学习率、折扣因子、神经网络结构等。这有助于提高算法的性能和稳定性。
使用 Deep Q-Learning 解决 Lunar Lander 问题是一个复杂的任务,需要仔细调整和实验。算法的性能可能受到许多因素的影响,包括网络结构的选择、超参数的设置以及环境的建模等。
这个环境是 Box2D 环境的一部分,其中包含有关环境的一般信息。
描述 Description
该环境是一个经典的火箭轨迹优化问题。根据庞特里亚金最大值原理,最佳的方法是全速点火或关闭引擎。这就是为什么这个环境具有离散动作的原因:引擎开启或关闭。
有两个环境版本:离散或连续。着陆点始终位于坐标 (0,0)。坐标是状态向量中的前两个数字。在着陆点外着陆是可能的。燃料是无限的,因此代理可以学会飞行,然后在第一次尝试时着陆。
要查看启发式着陆,请运行:
python gymnasium/envs/box2d/lunar_lander.py
动作空间 Action Space
有四个可用的离散动作:
0:什么都不做
1:点火左定向引擎
2:点火主引擎
3:点火右定向引擎
观察空间 Observation Space
状态是一个8维向量:着陆器在 x 和 y 上的坐标,它在 x 和 y 上的线速度,它的角度,它的角速度,以及两个表示每条腿是否与地面接触的布尔值。
奖励 Rewards
每一步都会获得一个奖励。一个 episode(回合)总奖励是该 episode 中所有步骤的奖励之和。
对于每一步,奖励:
着陆器离着陆点越近/远,奖励增加/减少。
着陆器移动越慢/快,奖励增加/减少。
着陆器倾斜度越大,奖励减少(角度非水平)。
每个与地面接触的腿奖励增加10分。
每帧侧引擎点火,奖励减少0.03分。
每帧主引擎点火,奖励减少0.3分。
episode 因坠毁或安全着陆而额外获得-100或+100分的奖励。
如果一个 episode 得分至少为200分,则认为它是一个解决方案。
起始状态 Starting State
着陆器位于视口的顶部中心,对其质心施加随机初始力。
回合终止条件 Episode Termination
如果满足以下条件回合结束:
着陆器坠毁(着陆器本体与月球接触);
着陆器超出视口范围(x 坐标大于1);
着陆器处于非唤醒状态。根据 Box2D 文档,处于非唤醒状态的身体是不移动且不与任何其他身体碰撞的身体:
当 Box2D 确定一个身体(或一组身体)已经停止时,该身体进入了一种几乎没有 CPU 开销的休眠状态。如果一个身体醒着并与休眠中的身体发生碰撞,那么休眠中的身体会醒来。如果连接到它们的关节或接触被销毁,身体也会醒来。
参数 Arguments
要使用连续环境,您需要指定 continuous=True 参数,如下所示:
- import gymnasium as gym
- env = gym.make(
- "LunarLander-v2",
- continuous: bool = False,
- gravity: float = -10.0,
- enable_wind: bool = False,
- wind_power: float = 15.0,
- turbulence_power: float = 1.5,
- )
如果传递 continuous=True,将使用连续动作(对应于引擎的油门),并且动作空间将是 Box(-1, +1, (2,), dtype=np.float32)。动作的第一个坐标确定主引擎的油门,而第二个坐标指定侧推器的油门。给定一个动作 np.array([main, lateral]),如果 main < 0,则主引擎将完全关闭,并且油门在 0 <= main <= 1 时按比例从 50% 缩放到 100%(特别是,主引擎在功率低于 50% 时不起作用)。同样,如果 -0.5 < lateral < 0.5,则侧推器将不会点火。如果 lateral < -0.5,则左推进器将点火,如果 lateral > 0.5,则右推进器将点火。同样,油门在 -1 到 -0.5 之间(以及 0.5 到 1 之间)按比例从 50% 缩放到 100%。
gravity 确定了重力常数,它被限制在 0 和 -12 之间。
如果传递 enable_wind=True,则着陆器将受到风的影响。风是使用函数 tanh(sin(2 k (t+C)) + sin(pi k (t+C))) 生成的。k 设置为 0.01。C 在 -9999 到 9999 之间随机抽样。
wind_power 确定了施加在飞行器上的线性风的最大幅度。wind_power 的推荐值在 0.0 到 20.0 之间。turbulence_power 确定了施加在飞行器上的旋转风的最大幅度。turbulence_power 的推荐值在 0.0 到 2.0 之间。
版本历史 Version History
v2:计算能量消耗,并在 v0.24 版本中,添加了具有风力和 turbulence_power 参数的湍流
v1:在状态向量中添加了与地面接触的腿;与地面接触会奖励 +10 分,如果失去接触则减去 -10 分;奖励重新调整为 200 分;更难的初始随机推力。
v0:初始版本
备注 Notes
在环境实现中存在一些意外的错误。
着陆器身体上侧推器的位置会随着着陆器的方向而变化。这反过来导致对着陆器施加方向依赖性扭矩。
状态的单位不一致。即:
角速度以每秒 0.4 弧度为单位。为了转换为每秒弧度,需要将该值乘以 2.5 的因子。
对于 VIEWPORT_W、VIEWPORT_H、SCALE 和 FPS 的默认值,比例因子相等:'x': 10 'y': 6.666 'vx': 5 'vy': 7.5 'angle': 1 'angular velocity': 2.5
在进行更正后,状态的单位如下:'x':(单位) 'y':(单位) 'vx':(单位/秒) 'vy':(单位/秒) 'angle':(弧度) 'angular velocity':(弧度/秒)
示例代码
- ################ 完整代码
- import gymnasium as gym
- from gym.wrappers.monitoring.video_recorder import VideoRecorder
- from IPython.display import HTML, display
- import imageio
- import base64
- import io
- import glob
- import os
- import random
- import numpy as np
- import torch
- import torch.nn as nn
- import torch.optim as optim
- import torch.nn.functional as F
- import torch.autograd as autograd
- from torch.autograd import Variable
- from collections import deque, namedtuple
-
-
- ######## 创建网络架构
- class Network(nn.Module):
- def __init__(self, state_size, action_size, seed=42):
- super(Network, self).__init__()
- self.seed = torch.manual_seed(seed)
- self.fc1 = nn.Linear(state_size, 64)
- self.fc2 = nn.Linear(64, 64)
- self.fc3 = nn.Linear(64, action_size)
-
-
- def forward(self, state):
- x = self.fc1(state)
- x = F.relu(x)
- x = self.fc2(x)
- x = F.relu(x)
- return self.fc3(x)
-
-
- ######## 设置环境:使用Gymnasium创建了LunarLander环境
- state_shape = env.observation_space.shape
- state_size = env.observation_space.shape[0]
- number_actions = env.action_space.n
- print('State shape: ', state_shape)
- print('State size: ', state_size)
- print('Number of actions: ', number_actions)
- ######## 初始化超参数:定义了学习率、批处理大小、折扣因子等超参数。
- learning_rate = 5e-4
- minibatch_size = 100
- discount_factor = 0.99
- replay_buffer_size = int(1e5)
- interpolation_parameter = 1e-3
-
-
- ####### 实现经验回放:实现了经验回放(Experience Replay)的类 ReplayMemory,用于存储和采样Agent的经验
- class ReplayMemory(object):
- def __init__(self, capacity):
- self.device = torch.device(
- "cuda:0" if torch.cuda.is_available() else "cpu")
- self.capacity = capacity
- self.memory = []
-
-
- def push(self, event):
- self.memory.append(event)
- if len(self.memory) > self.capacity:
- del self.memory[0]
-
-
- def sample(self, batch_size):
- experiences = random.sample(self.memory, k=batch_size)
- states = torch.from_numpy(np.vstack(
- [e[0] for e in experiences if e is not None])).float().to(self.device)
- actions = torch.from_numpy(
- np.vstack([e[1] for e in experiences if e is not None])).long().to(self.device)
- rewards = torch.from_numpy(np.vstack(
- [e[2] for e in experiences if e is not None])).float().to(self.device)
- next_states = torch.from_numpy(np.vstack(
- [e[3] for e in experiences if e is not None])).float().to(self.device)
- dones = torch.from_numpy(np.vstack([e[4] for e in experiences if e is not None]).astype(
- np.uint8)).float().to(self.device)
- return states, next_states, actions, rewards, dones
-
-
- ########## 实现 DQN 代理:创建了一个Agent类,包含本地Q网络和目标Q网络,包含了采取动作、学习、软更新等方法。
- class Agent():
- # 初始化函数,参数为状态大小和动作大小
- def __init__(self, state_size, action_size):
- self.device = torch.device(
- "cuda:0" if torch.cuda.is_available() else "cpu")
- self.state_size = state_size
- self.action_size = action_size
- self.local_qnetwork = Network(state_size, action_size).to(self.device)
- self.target_qnetwork = Network(state_size, action_size).to(self.device)
- self.optimizer = optim.Adam(
- self.local_qnetwork.parameters(), lr=learning_rate)
- self.memory = ReplayMemory(replay_buffer_size)
- self.t_step = 0
- # 定义一个函数,用于存储经验并决定何时从中学习
- def step(self, state, action, reward, next_state, done):
- self.memory.push((state, action, reward, next_state, done))
- self.t_step = (self.t_step + 1) % 4
- if self.t_step == 0:
- if len(self.memory.memory) > minibatch_size:
- experiences = self.memory.sample(100)
- self.learn(experiences, discount_factor)
- # 定义一个函数,根据给定的状态和epsilon值选择一个动作(epsilon贪婪动作选择策略)0.表示浮点数
- def act(self, state, epsilon=0.):
- state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
- self.local_qnetwork.eval()
- with torch.no_grad():
- action_values = self.local_qnetwork(state)
- self.local_qnetwork.train()
- if random.random() > epsilon:
- return np.argmax(action_values.cpu().data.numpy())
- else:
- return random.choice(np.arange(self.action_size))
- # 定义一个函数,根据样本经验更新代理的q值,参数为经验和折扣因子
- def learn(self, experiences, discount_factor):
- states, next_states, actions, rewards, dones = experiences
- next_q_targets = self.target_qnetwork(
- next_states).detach().max(1)[0].unsqueeze(1)
- q_targets = rewards + discount_factor * next_q_targets * (1 - dones)
- q_expected = self.local_qnetwork(states).gather(1, actions)
- loss = F.mse_loss(q_expected, q_targets)
- self.optimizer.zero_grad()
- loss.backward()
- self.optimizer.step()
- self.soft_update(self.local_qnetwork,
- self.target_qnetwork, interpolation_parameter)
- # 定义一个函数,用于软更新目标网络的参数,参数为本地模型,目标模型和插值参数
- def soft_update(self, local_model, target_model, interpolation_parameter):
- for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
- target_param.data.copy_(interpolation_parameter * local_param.data + (
- 1.0 - interpolation_parameter) * target_param.data)
-
-
- ####### 训练DQN代理
- agent = Agent(state_size, number_actions)
-
-
- number_episodes = 2000
- maximum_number_timesteps_per_episode = 1000
- epsilon_starting_value = 1.0
- epsilon_ending_value = 0.01
- epsilon_decay_value = 0.995
- epsilon = epsilon_starting_value
- scores_on_100_episodes = deque(maxlen=100)
-
-
- for episode in range(1, number_episodes + 1):
- state, _ = env.reset()
- score = 0
- for t in range(maximum_number_timesteps_per_episode):
- action = agent.act(state, epsilon)
- next_state, reward, done, _, _ = env.step(action)
- agent.step(state, action, reward, next_state, done)
- state = next_state
- score += reward
- if done:
- break
- scores_on_100_episodes.append(score)
- epsilon = max(epsilon_ending_value, epsilon_decay_value * epsilon)
- print('\rEpisode {}\tAverage Score: {:.2f}'.format(
- episode, np.mean(scores_on_100_episodes)), end="")
- if episode % 100 == 0:
- print('\rEpisode {}\tAverage Score: {:.2f}'.format(
- episode, np.mean(scores_on_100_episodes)))
- if np.mean(scores_on_100_episodes) >= 200.0:
- print('\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(
- episode - 100, np.mean(scores_on_100_episodes)))
- torch.save(agent.local_qnetwork.state_dict(), 'checkpoint.pth')
- break
-
-
- ####### 可视化结果
- def show_video_of_model(agent, env_name):
- env = gym.make(env_name, render_mode='rgb_array')
- state, _ = env.reset()
- done = False
- frames = []
- while not done:
- frame = env.render()
- frames.append(frame)
- action = agent.act(state)
- state, reward, done, _, _ = env.step(action.item())
- env.close()
- imageio.mimsave('video.mp4', frames, fps=30)
-
-
-
-
- show_video_of_model(agent, 'LunarLander-v2')
-
-
-
-
- def show_video():
- mp4list = glob.glob('*.mp4')
- if len(mp4list) > 0:
- mp4 = mp4list[0]
- video = io.open(mp4, 'r+b').read()
- encoded = base64.b64encode(video)
- display(HTML(data='''<video alt="test" autoplay
- loop controls style="height: 400px;">
- <source src="data:video/mp4;base64,{0}" type="video/mp4" />
- </video>'''.format(encoded.decode('ascii'))))
- else:
- print("Could not find video")
-
-
- show_video()
终端输出:
- State shape: (8,)
- State size: 8
- Number of actions: 4
- Episode 100 Average Score: -174.79
- Episode 200 Average Score: -102.59
- Episode 300 Average Score: -68.797
- Episode 400 Average Score: -38.18
- Episode 500 Average Score: 24.301
- Episode 600 Average Score: 149.24
- Episode 700 Average Score: 134.89
- Episode 800 Average Score: 185.41
- Episode 826 Average Score: 200.92
- Environment solved in 726 episodes! Average Score: 200.92
- IMAGEIO FFMPEG_WRITER WARNING: input image is not divisible by macro_block_size=16, resizing from (600, 400) to (608, 400) to ensure video compatibility with most codecs and players. To prevent resizing, make your input image divisible by the macro_block_size or set the macro_block_size to 1 (risking incompatibility).
- [swscaler @ 000001e276cb1f00] Warning: data is not aligned! This can lead to a speed loss
- <IPython.core.display.HTML object>
总结:以上代码是使用深度 Q 学习(DQN)算法训练一个智能体在月球着陆环境中控制火箭的示例。代码分为以下几个部分:
• 导入所需的库和模块,包括 gymnasium(一个开源的强化学习环境库),torch(一个开源的深度学习框架),以及一些辅助的库和模块,如 imageio(用于处理图像和视频),base64(用于编码和解码数据),deque(用于实现双端队列),namedtuple(用于创建命名元组)等。
- import io # 导入io模块,用于读写文件
- import glob # 导入glob模块,用于查找文件
- import gymnasium as gym # 导入环境库
- import os # os 是操作系统相关的库,用来处理文件和目录等。
- import random # random 是随机数生成和操作的库,用来实现 epsilon 贪心策略等。
- import numpy as np # numpy 是科学计算的库,用来处理多维数组和矩阵等。
- import torch # torch 是 PyTorch 框架的主要库,用来实现张量和神经网络等。
- import torch.nn as nn # torch.nn 是 PyTorch 框架的神经网络模块,用来定义神经网络的层和损失函数等。
- import torch.optim as optim # torch.optim 是 PyTorch 框架的优化器模块,用来定义优化算法和更新参数等。
- # torch.nn.functional 是 PyTorch 框架的函数式接口,用来实现激活函数和池化等。
- import torch.nn.functional as F
- # torch.autograd 是 PyTorch 框架的自动微分模块,用来实现反向传播和梯度计算等。
- import torch.autograd as autograd
- # torch.autograd.Variable 是 PyTorch 框架的变量类,用来封装张量和梯度等。
- from torch.autograd import Variable
- # collections 是 Python 的内置模块,用来实现特殊的容器类型,如双端队列和命名元组等。
- from collections import deque, namedtuple
• 定义 Network 类,继承自 torch.nn.Module 类,用来构建神经网络模型,包括三个全连接层和一个前向传播函数,输入是状态的维度,输出是动作的个数。
- class Network(nn.Module): # 继承nn模块中的Module类
- def __init__(self, state_size, action_size, seed=42): # 初始化函数,参数为状态大小(8),动作大小(4),随机种子(42)
- super(Network, self).__init__() # 调用父类的初始化函数
- self.seed = torch.manual_seed(seed) # 设置随机种子
- # 创建第一个全连接层,输入为状态大小,输出为64个神经元,这里64是为了适应月球着陆的问题
- self.fc1 = nn.Linear(state_size, 64)
- self.fc2 = nn.Linear(64, 64) # 创建第二个全连接层,输入为64个神经元,输出为64个神经元
- self.fc3 = nn.Linear(64, action_size) # 创建第三个全连接层,输入为64个神经元,输出为动作大小
- # 完成神经网络的构建
-
-
- def forward(self, state): # 前向传播函数
- x = self.fc1(state) # 从输入层接收状态
- x = F.relu(x) # 使用激活函数
- x = self.fc2(x) # 从第一个隐藏层接收输出
- x = F.relu(x) # 使用激活函数
- return self.fc3(x) # 返回最后一层的输出
• 创建月球着陆环境,使用 gym.make 函数,获取环境的状态空间和动作空间的属性,如状态的形状,状态的维度,动作的个数等。
- # https://gymnasium.farama.org/environments/box2d/lunar_lander/
- env = gym.make('LunarLander-v2')
- # 导入月球着陆环境
- state_shape = env.observation_space.shape # 状态的形状,这里是8维向量
- state_size = env.observation_space.shape[0] # 状态的大小,这里是8个元素,包括坐标,速度等
- number_actions = env.action_space.n # 动作的数量,这里是4个
- print('State shape: ', state_shape)
- print('State size: ', state_size)
- print('Number of actions: ', number_actions)
• 定义一些超参数,如学习率,批次大小,折扣因子,回放缓冲区的容量,软更新的插值参数等。
- learning_rate = 5e-4 # 学习率,这里是0.00005
- minibatch_size = 100 # 批次大小,用于更新参数
- discount_factor = 0.99 # 折扣因子,用于计算未来奖励,越接近1越考虑未来,越接近0越考虑当前
- replay_buffer_size = int(1e5) # 重放缓冲区的大小,用于存储人工智能的经验,稳定和改善学习,这里是10万个经验
- interpolation_parameter = 1e-3 # 插值参数,用于更新目标网络,这里是0.001
- # 所有的参数都是通过实验得到的最优值
• 定义 ReplayMemory 类,用来实现经验回放机制,包括一个初始化函数,一个存储经验的函数,一个采样经验的函数,使用双端队列来存储经验元组,使用随机采样的方法来获取一批经验,将经验转换为 PyTorch 张量并发送到 cpu 或 gpu 上。
- # 定义一个重放缓冲区的类
- class ReplayMemory(object):
- def __init__(self, capacity): # 初始化函数,参数为缓冲区的容量
- # 判断是否有cuda可用,如果有则使用cuda,否则使用cpu
- self.device = torch.device(
- "cuda:0" if torch.cuda.is_available() else "cpu")
- self.capacity = capacity # 设置缓冲区的容量
- self.memory = [] # 创建一个列表,用于存储经验,每个经验包括状态,动作,奖励,下一个状态,是否结束等
-
-
- def push(self, event): # 定义一个函数,用于向缓冲区中添加经验
- self.memory.append(event) # 将经验添加到列表中
- if len(self.memory) > self.capacity: # 如果列表的长度超过了容量
- del self.memory[0] # 删除最旧的经验
-
-
- def sample(self, batch_size): # 定义一个函数,用于从缓冲区中随机抽取一批经验
- experiences = random.sample(self.memory, k=batch_size) # 从列表中随机抽取k个经验
- # 从经验中提取每个元素,并将它们堆叠在一起
- # 使用列表推导式,从每个经验中提取状态,使用np.vstack将它们垂直堆叠,然后转换为pytorch张量,使用.float()将它们转换为浮点数,使用.to(self.device)将它们发送到cpu或gpu
- states = torch.from_numpy(np.vstack(
- [e[0] for e in experiences if e is not None])).float().to(self.device)
- actions = torch.from_numpy(np.vstack([e[1] for e in experiences if e is not None])).long(
- ).to(self.device) # 同理,从每个经验中提取动作,使用.long()将它们转换为整数
- rewards = torch.from_numpy(np.vstack([e[2] for e in experiences if e is not None])).float(
- ).to(self.device) # 同理,从每个经验中提取奖励,使用.float()将它们转换为浮点数
- next_states = torch.from_numpy(np.vstack([e[3] for e in experiences if e is not None])).float(
- ).to(self.device) # 同理,从每个经验中提取下一个状态,使用.float()将它们转换为浮点数
- dones = torch.from_numpy(np.vstack([e[4] for e in experiences if e is not None]).astype(np.uint8)).float(
- ).to(self.device) # 同理,从每个经验中提取是否结束的标志,使用.astype(np.uint8)将它们转换为无符号整数,使用.float()将它们转换为浮点数
- return states, next_states, actions, rewards, dones # 返回这些元素,注意顺序要一致
• 定义 Agent 类,用来实现深度 Q 学习的智能体,包括一个初始化函数,一个执行一步操作的函数,一个选择动作的函数,一个学习的函数,一个软更新的函数,使用两个神经网络模型,一个是本地 Q 网络,用来选择和评估动作,一个是目标 Q 网络,用来计算目标 Q 值,使用 Adam 优化器来更新本地 Q 网络的参数,使用 ReplayMemory 类的实例来存储和采样经验,使用时间步来控制何时学习,使用 epsilon 贪心策略来平衡探索和利用,使用均方误差损失函数来计算预期 Q 值和目标 Q 值之间的差异,使用软更新的方法来更新目标 Q 网络的参数。
- # 定义一个代理类
- class Agent():
- def __init__(self, state_size, action_size): # 初始化函数,参数为状态大小和动作大小
- # 判断是否有cuda可用,如果有则使用cuda,否则使用cpu
- self.device = torch.device(
- "cuda:0" if torch.cuda.is_available() else "cpu")
- self.state_size = state_size # 创建对象变量,存储状态大小
- self.action_size = action_size # 创建对象变量,存储动作大小
- # Q学习 --
- self.local_qnetwork = Network(state_size, action_size).to(
- self.device) # 创建一个本地网络,用于选择动作,将其发送到设备上,随机种子已经在之前提供
- self.target_qnetwork = Network(state_size, action_size).to(
- self.device) # 创建一个目标网络,用于计算目标值,将其发送到设备上
-
-
- # 创建一个优化器,用于更新本地网络的参数,使用Adam算法,学习率为之前定义的值
- self.optimizer = optim.Adam(
- self.local_qnetwork.parameters(), lr=learning_rate)
- # 创建一个重放缓冲区,用于存储人工智能的经验,容量为之前定义的值
- self.memory = ReplayMemory(replay_buffer_size)
- self.t_step = 0 # 时间步数
-
-
- def step(self, state, action, reward, next_state, done): # 定义一个函数,用于存储经验并决定何时从中学习
- self.memory.push((state, action, reward, next_state, done)) # 将经验推入缓冲区
- self.t_step = (self.t_step + 1) % 4 # 时间步数计数器(每4步学习一次)
- if self.t_step == 0: # 如果时间步数为0
- # 如果缓冲区中的经验数量大于批次大小,self.memory是重放缓冲区的实例,memory属性是__init__中定义的列表
- if len(self.memory.memory) > minibatch_size:
- experiences = self.memory.sample(100) # 从缓冲区中随机抽取100个经验
- self.learn(experiences, discount_factor) # 从经验中学习,使用之前定义的折扣因子
-
-
- def act(self, state, epsilon=0.): # 定义一个函数,根据给定的状态和epsilon值选择一个动作(epsilon贪婪动作选择策略)0.表示浮点数
- # 将状态转换为pytorch张量,增加一个维度,表示批次的维度,0表示批次维度的索引,将其放在最前面,将张量发送到设备上
- state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
- self.local_qnetwork.eval() # 将本地网络设置为评估模式,不更新梯度,本地网络是代理类的属性,继承自nn.Module类,有eval()方法
- with torch.no_grad(): # 不计算梯度,检查是否处于推理模式而不是训练模式
- # 使用本地网络对状态进行前向传播,得到每个动作的价值,这些价值将被epsilon贪婪策略选择(这里我们得到的不是最终的值,而是对应于状态的q值)
- action_values = self.local_qnetwork(state)
- self.local_qnetwork.train() # 将本地网络设置为训练模式,更新梯度,本地网络是代理类的属性,继承自nn.Module类,有train()方法
- # Eplison贪婪动作选择策略 --(用于探索和利用的平衡)
- if random.random() > epsilon: # 如果随机数大于epsilon,random.random()是random库的方法,返回一个0到1之间的随机数
- # 返回价值最大的动作,np.argmax是numpy库的方法,返回最大值的索引,action_values发送到cpu上,因为它是简单的,data.numpy()将格式转换为numpy的数据格式
- return np.argmax(action_values.cpu().data.numpy())
- else: # 否则
- # 返回随机的动作,random.choice是random库的方法,从给定的列表中随机选择一个元素,np.arange是numpy库的方法,返回一个从0到动作大小的数组
- return random.choice(np.arange(self.action_size))
-
-
- def learn(self, experiences, discount_factor): # 定义一个函数,根据样本经验更新代理的q值,参数为经验和折扣因子
- states, next_states, actions, rewards, dones = experiences # 从经验中解包元素
- # 从目标网络中获取下一个状态的最大q值,self.target_qnetwork(next_states)返回每个动作的价值,.detach()将张量从计算图中分离,我们不会在反向传播中使用这些值,.max(1)沿着第一个维度取最大值,得到两个张量(最大值和索引),因此我们添加.max[1][0]只获取最大值,.unsqueeze(1)增加一个维度,表示批次的维度,但这次在第一个位置
- next_q_targets = self.target_qnetwork(
- next_states).detach().max(1)[0].unsqueeze(1) # 从目标Q网络的输出中,选择每个下一个状态对应的最大Q值,然后将其整理成一个列向量 next_q_targets。这个向量将用于计算Q-learning的目标值,以便更新本地Q网络
- q_targets = rewards + discount_factor * next_q_targets * \
- (1 - dones) # 计算当前状态的目标值,使用公式:奖励加上折扣后的未来最大值,乘以1减去结束标志
- q_expected = self.local_qnetwork(states).gather(
- 1, actions) # 获取当前状态的预期值,.gather(1, actions)根据动作选择对应的价值
- loss = F.mse_loss(q_expected, q_targets) # 计算预期值和目标值之间的均方误差
- self.optimizer.zero_grad() # 清空优化器的梯度,zero_grad()是Adam的方法
- loss.backward() # 反向传播误差
- self.optimizer.step() # 更新本地网络的参数,step()是优化器的方法
- # 使用软更新的方法更新目标网络的参数,参数为本地网络,目标网络和插值参数
- self.soft_update(self.local_qnetwork,
- self.target_qnetwork, interpolation_parameter)
-
-
- # 定义一个函数,用于软更新目标网络的参数,参数为本地模型,目标模型和插值参数
- def soft_update(self, local_model, target_model, interpolation_parameter):
- # 遍历目标模型和本地模型的参数,zip()是一个函数,用于将参数打包成元组,parameters()是nn.Module的方法,返回模型的参数
- for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
- # 使用插值参数更新目标模型的参数,将本地模型的参数乘以插值参数,再加上目标模型的参数乘以1减去插值参数,.copy_()是一个方法,用于复制张量的数据
- target_param.data.copy_(interpolation_parameter * local_param.data + (
- 1.0 - interpolation_parameter) * target_param.data)
• 创建 Agent 类的实例,传入状态维度和动作个数。
agent = Agent(state_size, number_actions) # 创建一个代理或人工智能,参数为状态大小和动作数量
• 定义训练过程,包括回合数,每个回合的最大时间步数,epsilon 值的初始值,终止值,衰减值,记录最近 100 个回合的分数,遍历每个回合,重置环境,初始化分数,遍历每个时间步,选择动作,执行动作,存储经验,更新状态,累加奖励,判断是否结束,更新 epsilon 值,打印当前回合和平均分数,判断是否达到目标分数,保存模型参数。
- # ### 训练DQN代理
- # %%
- number_episodes = 2000 # 我们想要训练的次数
- # 我们不想让一个回合卡住,所以设置一个最大的时间步数(在月球上着陆的尝试最多为1000个时间步)
- maximum_number_timesteps_per_episode = 1000
- epsilon_starting_value = 1.0 # epsilon的初始值
- epsilon_ending_value = 0.01 # epsilon的结束值
- epsilon_decay_value = 0.995 # epsilon的衰减率,按照(1*0.995 , 1*0.995*0.995,...)的方式递减
- epsilon = epsilon_starting_value # epsilon的变量
- scores_on_100_episodes = deque(maxlen=100) # 最近100个回合的分数(列表) 创建了一个双端队列,最大长度为100
-
-
- for episode in range(1, number_episodes + 1): # 循环直到2000次(上限固定)
- state, _ = env.reset() # 重置环境(这里返回状态和观察值)
- score = 0 # 每个回合的累积分数
- for t in range(maximum_number_timesteps_per_episode): # 循环每个时间步
- action = agent.act(state, epsilon) # 根据当前状态和epsilon贪婪策略选择一个动作
- # 根据动作执行环境的步骤,返回下一个状态,奖励,是否结束等值,_是丢弃不需要的值
- next_state, reward, done, _, _ = env.step(action)
- agent.step(state, action, reward, next_state, done) # 调用代理的学习方法
- state = next_state # 更新状态
- score += reward # 累加奖励
- if done: # 如果回合结束
- break # 跳出循环
- scores_on_100_episodes.append(score) # 将最近的回合分数添加到列表中
- epsilon = max(epsilon_ending_value, epsilon_decay_value *
- epsilon) # 更新epsilon的值,使其按照衰减率递减,但不低于结束值
- print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(
- scores_on_100_episodes)), end="") # \r覆盖之前的输出,\t制表符,.2f保留两位小数,打印当前的回合数和平均分数,end表示不换行
- if episode % 100 == 0: # 每100个回合
- print('\rEpisode {}\tAverage Score: {:.2f}'.format(
- episode, np.mean(scores_on_100_episodes))) # \r覆盖之前的输出,打印当前的回合数和平均分数,换行
- if np.mean(scores_on_100_episodes) >= 200.0: # 如果达到胜利的条件
- print('\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(
- episode - 100, np.mean(scores_on_100_episodes))) # \n换行,:d表示整数,打印环境在多少个回合内解决,以及平均分数
- torch.save(agent.local_qnetwork.state_dict(),
- 'checkpoint.pth') # 将当前Agent的本地Q网络(agent.local_qnetwork)的参数保存到文件中
- break # 跳出循环
• 定义展示模型表现的函数,使用 imageio 库将每一帧的图像保存为视频文件,使用 HTML 标签和 base64 编码在网页中显示视频。
• 调用展示模型表现的函数,传入智能体和环境的名称。
- from gym.wrappers.monitoring.video_recorder import VideoRecorder # 导入gym模块,用于录制视频
- from IPython.display import HTML, display # 导入IPython模块,用于显示HTML和视频
- import imageio # 导入imageio模块,用于处理图像
- import base64 # 导入base64模块,用于编码和解码
- import io # 导入io模块,用于读写文件
- import glob # 导入glob模块,用于查找文件
- import gymnasium as gym # 导入环境库
- # %%
-
-
-
-
- def show_video_of_model(agent, env_name): # 定义一个函数,用于展示代理在环境中的表现
- env = gym.make(env_name, render_mode='rgb_array') # 创建一个环境,渲染模式为rgb数组
- state, _ = env.reset() # 重置环境,获取初始状态
- done = False # 设置结束标志为False
- frames = [] # 创建一个列表,用于存储每一帧的图像
- while not done: # 循环直到结束
- frame = env.render() # 渲染环境,获取当前帧的图像
- frames.append(frame) # 将图像添加到列表中
- action = agent.act(state) # 根据当前状态选择一个动作
- state, reward, done, _, _ = env.step(
- action.item()) # 根据动作执行环境的步骤,获取下一个状态,奖励,是否结束等值
- env.close() # 关闭环境
- # 使用imageio模块,将列表中的图像保存为视频,帧率为30
- imageio.mimsave('video.mp4', frames, fps=30)
-
-
-
-
- show_video_of_model(agent, 'LunarLander-v2') # 调用函数,展示代理在月球着陆环境中的表现
-
-
-
-
- def show_video(): # 定义一个函数,用于显示视频
- mp4list = glob.glob('*.mp4') # 使用glob模块,查找当前目录下的所有mp4文件
- if len(mp4list) > 0: # 如果找到了
- mp4 = mp4list[0] # 取第一个文件
- video = io.open(mp4, 'r+b').read() # 使用io模块,以二进制模式读取文件
- encoded = base64.b64encode(video) # 使用base64模块,对文件进行编码
- display(HTML(data='''<video alt="test" autoplay
- loop controls style="height: 400px;">
- <source src="data:video/mp4;base64,{0}" type="video/mp4" />
- </video>'''.format(encoded.decode('ascii')))) # 使用IPython模块,显示HTML格式的视频,使用base64编码的数据作为源
- else: # 如果没有找到
- print("Could not find video") # 打印提示信息
- show_video() # 调用函数,显示视频
笔记
参考网址:
https://gymnasium.farama.org/environments/box2d/lunar_lander/
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。