赞
踩
在CartPole中,一般的DQN网络如下所示
Dueling Q-Network的结构如下:
优势函数:
A
(
s
,
r
i
g
h
t
)
=
Q
(
s
,
r
i
g
h
t
)
−
V
(
s
)
A(s,right)=Q(s,right)-V(s)
A(s,right)=Q(s,right)−V(s)
在CartPole任务中,动作价值函数
Q
Q
Q与状态
s
s
s有关,可以获得动作的累计折扣奖励。例如可以取向右推或者向左推的动作使杆子跌倒所获得的总回报非常小。换句话说,
Q
Q
Q函数所具有的信息分成仅有状态
s
s
s组成的部分,和该动作确定的部分。因此Dueling Q-Network将Q函数分离为仅有状态
s
s
s确定的部分
V
(
s
)
V(s)
V(s)和根据动作
a
a
a确定的Advantage
A
(
s
,
a
)
A(s,a)
A(s,a),在最终的输出层中将
V
(
s
)
V(s)
V(s)和
A
(
s
,
a
)
A(s,a)
A(s,a)相加求得
Q
(
s
,
a
)
Q(s,a)
Q(s,a)。
Dueling Q-Network和DQN相比,优点是无论动作 a a a如何,都可以逐步学习与 V ( s ) V(s) V(s)相关的网络连接参数,学习所需的实验轮数比DQN更少,随着动作选择的增加,优势更加明显。
Dueling Network的实现只需重写上一节中DDQN代码中构建神经网络的Net类,如下所示。
from torch import nn
import torch.nn.functional as F
class Net(nn.Module):
def __init__(self, n_in, n_mid,n_out):
super(Net,self).__init__()
self.fc1 = nn.Linear(n_in,n_mid)
self.fc2 = nn.Linear(n_mid,n_mid)
#Dueling Network
self.fc3_adv = nn.Linear(n_mid,n_out) #advantage
self.fc3_v = nn.Linear(n_mid,1) #v部分
def forward(self,x):
h1 = F.relu(self.fc1(x))
h2 = F.relu(self.fc2(h1))
adv = self.fc3_adv(h2)
val = self.fc3_v(h2).expand(-1,adv.size(1))
output = val + adv -adv.mean(1,keepdim=True).expand(-1,adv.size(1))
return output
Dueling Network的实现是用PyTorch设计神经网络的一个很好的实践。在Net类的初始化函数中,与DDQN类似地构建输入层fc1和第一个隐藏层fc2,在最后创建与Advantage有关的层fc3_adv和与状态价值有关的层fc3_v。fc3_adv的输出数是可选动作的数量n_out。fc3_v表示状态价值,因此输出数为1。
在Net类的forward计算中,输入x经过fc1层,其输出经由ReLU传递给fc2,再通过ReLU传递给变量h2,h2再进入fc3_adv和fc3_v层。fc3_adv和fc3_v的输出不经过ReLU,分别作为变量adv和val保留。最后输出的动作价值通过adv和val的和来计算,用变量output表示。实际上,adv的大小是[minibatch的大小×动作类型的数量],val的大小是[minibatch的大小×1]。因此,在求val时,使用expand调整它的大小,使其成为[minibatch的大小×动作的数量]。
请注意,在计算输出时,要从输出中减去Advantage的平均值。在其后的实现代码中,为了使得平均值的大小与变量adv的大小相匹配,执行了expand操作。我们接下来解释为什么使用val+adv-adv.mean而不是简单地输出val+adv。
减去平均值是因为如果只是简单地相加,由于动作的类型不同,其具有不同的偏置量,直接进行相加可能无法很好地完成学习。例如,假设Advantage在向右时偏差为b0。在这种状态下,为了通过相加正确地计算Q(s,右),需要将-b0的偏置施加到V(s)上以抵消偏置b0。换句话说,偏置可以通过V(s)和Adv(s,右)消除,因此即使有一定偏置,也可以学习。
另一方面,如果向左的偏置是b1,则对应于状态价值的V(s)部分需要施加偏置-b1。也就是说,根据动作的类型,需要分别施加不同的偏置-b0和-b1到V(s)上。这是使学习不稳定的一个重要因素。为了尽可能避免这种情况,Dueling Network从输出中减去动作的平均值。
如果减去平均值,以向右为例,它将表示为
Q
(
s
,
右
)
=
V
(
s
)
+
A
d
v
(
s
,
右
)
−
(
A
d
v
(
s
,
右
)
+
A
d
u
(
s
,
左
)
)
/
2
Q(s,右)=V(s)+Adv(s,右)-(Adv(s,右)+Adu(s,左))/2
Q(s,右)=V(s)+Adv(s,右)−(Adv(s,右)+Adu(s,左))/2
除了网络不同,其他和DDQN代码一样。
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import HTML
import gym
import random
import numpy as np
# 动画显示函数
def display_frames_as_gif(frames):
plt.figure(figsize=(frames[0].shape[1] / 72.0, frames[0].shape[0] / 72.0), dpi=72)
patch = plt.imshow(frames[0])
plt.axis('off')
def animate(i):
patch.set_data(frames[i])
anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=50)
anim.save('./image/movie_cartpole_DQN.mp4')
plt.close() # 防止显示两个输出
return HTML(anim.to_jshtml())
from collections import namedtuple
#namedtuple就是有名字的元组,使得元组有键名,以便在DQN访问状态和动作值
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))
ENV = 'CartPole-v0'
GAMMA = 0.99
MAX_STEPS = 200
NUM_EPISODES = 500
#为了实现小批量学习,实现了内存类ReplayMemory来存储经验数据。ReplayMemory
class ReplayMemory:
'''push函数保存步骤中的transition,随机选择的sample函数'''
def __init__(self,CAPACITY) -> None:
self.capacity = CAPACITY
self.memory = []
self.index = 0
def push(self,state,action,state_next,reward):
'''将transition(state,action,state_next,reward)保存在存储器中'''
if len(self.memory) < self.capacity:
self.memory.append(None)
self.memory[self.index] = Transition(state,action,state_next,reward)
self.index = (self.index + 1) % self.capacity #保存的index移动一位
def sample(self,batch_size):
return random.sample(self.memory,batch_size)
def __len__(self):
'''返回当前memory长度'''
return len(self.memory)
# 这是一个类,充当代理的大脑,执行 DDQN
import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
BATCH_SIZE = 32
CAPACITY = 10000
class Brain:
def __init__(self, num_states, num_actions):
self.num_actions = num_actions # 获取 CartPole 的两个动作(向右或向左推)
# 创建用于存储经验的内存对象
self.memory = ReplayMemory(CAPACITY)
# 构建神经网络
n_in, n_mid, n_out = num_states, 32, num_actions
self.main_q_network = Net(n_in, n_mid, n_out) # 使用 Net 类
self.target_q_network = Net(n_in, n_mid, n_out) # 使用 Net 类
print(self.main_q_network) # 输出网络结构
# 设置优化方法
self.optimizer = optim.Adam(
self.main_q_network.parameters(), lr=0.0001)
def replay(self):
'''使用 Experience Replay 学习网络的连接参数'''
# 1. 检查内存大小
if len(self.memory) < BATCH_SIZE:
return
# 2. 创建小批量
self.batch, self.state_batch, self.action_batch, self.reward_batch, self.non_final_next_states = self.make_minibatch()
# 3. 求出作为教师信号的 Q(s_t, a_t) 值
self.expected_state_action_values = self.get_expected_state_action_values()
# 4. 更新连接参数
self.update_main_q_network()
def decide_action(self, state, episode):
'''根据当前状态决定动作'''
# 逐渐只采取最优动作的 ε-greedy 法
epsilon = 0.5 * (1 / (episode + 1))
if epsilon <= np.random.uniform(0, 1):
self.main_q_network.eval() # 将网络切换到推理模式
with torch.no_grad():
action = self.main_q_network(state).max(1)[1].view(1, 1)
# 提取网络输出的最大值的索引 = max(1)[1]
# .view(1,1) 将 [torch.LongTensor of size 1] 转换为 size 1x1
else:
# 随机返回 0 或 1 动作
action = torch.LongTensor(
[[random.randrange(self.num_actions)]]) # 随机返回 0 或 1 动作
# action 将是 [torch.LongTensor of size 1x1] 的形状
return action
def make_minibatch(self):
'''2. 创建小批量'''
# 2.1 从内存中取出小批量数据
transitions = self.memory.sample(BATCH_SIZE)
# 2.2 将各变量转换为小批量相对应的形状
# transitions 包含 BATCH_SIZE 个 (state, action, state_next, reward)
# 也就是说,有 (state, action, state_next, reward)×BATCH_SIZE
# 我们想要将其转换为小批量的形式
# 也就是 (state×BATCH_SIZE, action×BATCH_SIZE, state_next×BATCH_SIZE, reward×BATCH_SIZE)
batch = Transition(*zip(*transitions))
# 2.3 将每个变量的元素转换为小批量对应的形状,并转换为 Variable 以便网络处理
# 例如,对于 state,将 BATCH_SIZE 个 [torch.FloatTensor of size 1x4] 转换为 [torch.FloatTensor of size BATCH_SIZEx4]
# 创建状态、动作、奖励和非终止状态的小批量 Variable
# cat 是 Concatenates(连接)的意思
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
non_final_next_states = torch.cat([s for s in batch.next_state
if s is not None])
# 仅将存在的下一个状态进行连接,排除了 None 值
return batch, state_batch, action_batch, reward_batch, non_final_next_states
def get_expected_state_action_values(self):
'''3. 计算作为教师信号的 Q(s_t, a_t) 值'''
# 3.1 切换网络到推理模式
self.main_q_network.eval()
self.target_q_network.eval()
# 3.2 计算网络输出的 Q(s_t, a_t)
# self.model(state_batch) 输出左右两个动作的 Q 值
# 形状为 [torch.FloatTensor of size BATCH_SIZEx2]
# 为了得到执行动作 a_t 对应的 Q 值,我们根据 action_batch 中执行的动作是左还是右
# 通过 gather 提取相应的 Q 值
self.state_action_values = self.main_q_network(
self.state_batch).gather(1, self.action_batch)
# 3.3 计算 max{Q(s_{t+1}, a)} 值,注意检查下一个状态是否存在
# 创建一个检查 cartpole 是否未完成并且 next_state 存在的索引掩码
non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None,
self.batch.next_state)))
# 先将所有值设为 0
next_state_values = torch.zeros(BATCH_SIZE)
a_m = torch.zeros(BATCH_SIZE).type(torch.LongTensor)
# 从 Main Q-Network 获取下一状态的最大 Q 值动作 a_m
# 最后的 [1] 返回对应的动作索引
a_m[non_final_mask] = self.main_q_network(
self.non_final_next_states).detach().max(1)[1]
# 筛选出存在的下一状态并将其尺寸从 32 转换为 32×1
a_m_non_final_next_states = a_m[non_final_mask].view(-1, 1)
# 从 Target Q-Network 获取对应 a_m 动作的 Q 值
# 使用 detach() 提取
# 使用 squeeze() 将尺寸从 [minibatch×1] 转换为 [minibatch]
next_state_values[non_final_mask] = self.target_q_network(
self.non_final_next_states).gather(1, a_m_non_final_next_states).detach().squeeze()
# 3.4 根据 Q 学习公式计算作为教师的 Q(s_t, a_t) 值
expected_state_action_values = self.reward_batch + GAMMA * next_state_values
return expected_state_action_values
def update_main_q_network(self):
'''4. 更新连接参数'''
# 4.1 切换网络到训练模式
self.main_q_network.train()
# 4.2 计算损失函数(smooth_l1_loss 是 Huber 损失)
# expected_state_action_values 的尺寸为 [minibatch],所以使用 unsqueeze 将其转换为 [minibatch x 1]
loss = F.smooth_l1_loss(self.state_action_values,
self.expected_state_action_values.unsqueeze(1))
# 4.3 更新连接参数
self.optimizer.zero_grad() # 重置梯度
loss.backward() # 计算反向传播
self.optimizer.step() # 更新参数
def update_target_q_network(self): # DDQN 新增
'''使 Target Q-Network 与 Main Q-Network 相同'''
self.target_q_network.load_state_dict(self.main_q_network.state_dict())
class Agent:
'''CartPole智能体,带有杆的小车'''
def __init__(self,num_states,num_actions) -> None:
self.brain = Brain(num_states,num_actions)
#为智能体创建大脑以做出决策
def update_q_function(self):
'''Q函数的更新'''
self.brain.replay()
def get_action(self,state,episode):
'''动作的确定'''
action = self.brain.decide_action(state,episode)
return action
def memorize(self,state,action,state_next,reward):
self.brain.memory.push(state,action,state_next,reward)
def update_target_q_function(self):
self.brain.update_target_q_network()
# 这是一个执行CartPole的环境类
class Environment:
def __init__(self):
self.env = gym.make(ENV) # 设定要执行的任务
num_states = self.env.observation_space.shape[0] # 设定任务状态和动作的数量
num_actions = self.env.action_space.n # CartPole的动作(向做或向右)数量为2
self.agent = Agent(num_states, num_actions) # 创建Agent在环境中执行的动作
def run(self):
'''执行'''
episode_10_list = np.zeros(10) # 存储10个试验的连续站立步骤数,并使用平均步骤数进行输出
complete_episodes = 0 # 持续站立195步或更多的试验次数
episode_final = False # 最终尝试目标
frames = [] # 用于存储图像的变量,以使最后一轮成为动画
for episode in range(NUM_EPISODES): # 重复试验次数
observation = self.env.reset() # 环境初始化
state = observation # 直接使用观测作为状态state使用
state = torch.from_numpy(state).type(
torch.FloatTensor) # 将NumPy变量转换为PyTorch Tensor
state = torch.unsqueeze(state, 0) # size 4转换为size 1x4
for step in range(MAX_STEPS): # 1 episode(轮)循环
# if episode_final is True: # 在最终试验中,将各时刻图像添加到帧中
# frames.append(self.env.render(mode='rgb_array'))
action = self.agent.get_action(state, episode) # 求取动作
# 通过执行动作a_t求s_{t+1}和done标志
# 从acttion中指定.item()并获取内容
observation_next, _, done, _ = self.env.step(
action.item()) # 使用'_'是因为在面的流程中不适用reward和info
# 给予奖励。另外,设置episode和state_next的结束评估
if done: # 如果step不超过200,或者如果倾斜超过某个角度,则done为true
state_next = None # 没有下一个状态,因此存储为None
# 添加到最近的10轮的站立步数列表中
episode_10_list = np.hstack(
(episode_10_list[1:], step + 1))
if step < 195:
reward = torch.FloatTensor(
[-1.0]) # 如果您在途中倒下,给予奖励-1作为惩罚
complete_episodes = 0 # 重置连续成功记录
else:
reward = torch.FloatTensor([1.0]) # 一直站立直到结束时奖励为1
complete_episodes = complete_episodes + 1 # 更新连续记录
else:
reward = torch.FloatTensor([0.0]) # 普通奖励为0
state_next = observation_next # 保持观察不变
state_next = torch.from_numpy(state_next).type(
torch.FloatTensor) # 将numpy变量转换为PyTorch Tensor
state_next = torch.unsqueeze(state_next, 0) # size 4转换为size 1x4
# 向经验池中添加经验
self.agent.memorize(state, action, state_next, reward)
# Experience Replay中更新Q函数
self.agent.update_q_function()
# 更新观测值
state = state_next
# 结束处理
if done:
print('%d Episode: Finished after %d steps:10次试验的平均step数 = %.1lf' % (
episode, step + 1, episode_10_list.mean()))
if(episode % 2 == 0):
self.agent.update_target_q_function()
break
if episode_final is True:
# 保存并绘制动画
# display_frames_as_gif(frames)
break
# 连续十轮成功
if complete_episodes >= 100:
print('10轮连续成功')
episode_final = True # 使下一次尝试成为最终绘制的动画
cartpole_env = Environment()
cartpole_env.run()
Net(
(fc1): Linear(in_features=4, out_features=32, bias=True)
(fc2): Linear(in_features=32, out_features=32, bias=True)
(fc3_adv): Linear(in_features=32, out_features=2, bias=True)
(fc3_v): Linear(in_features=32, out_features=1, bias=True)
)
0 Episode: Finished after 15 steps:10次试验的平均step数 = 1.5
1 Episode: Finished after 8 steps:10次试验的平均step数 = 2.3
2 Episode: Finished after 10 steps:10次试验的平均step数 = 3.3
3 Episode: Finished after 12 steps:10次试验的平均step数 = 4.5
4 Episode: Finished after 11 steps:10次试验的平均step数 = 5.6
5 Episode: Finished after 9 steps:10次试验的平均step数 = 6.5
6 Episode: Finished after 12 steps:10次试验的平均step数 = 7.7
7 Episode: Finished after 10 steps:10次试验的平均step数 = 8.7
8 Episode: Finished after 9 steps:10次试验的平均step数 = 9.6
……
489 Episode: Finished after 200 steps:10次试验的平均step数 = 200.0
490 Episode: Finished after 200 steps:10次试验的平均step数 = 200.0
491 Episode: Finished after 200 steps:10次试验的平均step数 = 200.0
492 Episode: Finished after 200 steps:10次试验的平均step数 = 200.0
493 Episode: Finished after 200 steps:10次试验的平均step数 = 200.0
494 Episode: Finished after 200 steps:10次试验的平均step数 = 200.0
10轮连续成功
495 Episode: Finished after 200 steps:10次试验的平均step数 = 200.0
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。