当前位置:   article > 正文

深度强化学习中深度Q网络(Q-Learning+CNN)的讲解以及在Atari游戏中的实战(超详细 附源码)_深度q网络算法

深度q网络算法

需要源码请点赞收藏关注后评论区留下QQ~~~

深度强化学习将深度学习的感知(预测能力)与强化学习的决策能力相结合,利用深度神经网络具有有效识别高维数据的能力,使得强化学习算法在处理高纬度状态空间任务中更加有效

一、DQN算法简介

1:核心思想

深度Q网络算法(DQN)是一种经典的基于值函数的深度强化学习算法,它将卷积神经网络与Q-Learning算法相结合,利用CNN对图像的强大表征能力,将视频帧视为强化学习中的状态输入网络,然后由网络输出离散的动作值函数,Agent再根据动作值函数选择对应的动作

DQN利用CNN输入原始图像数据,能够在不依赖于任意特定问题的情况下,采用相同的算法模型,在广泛的问题中获得较好的学习效果,常用于处理Atari游戏

2:模型架构

深度Q网络模型架构的输入是距离当前时刻最近的连续4帧预处理后的图像,该输入信号经过3哥卷积层和2个全连接层的非线性变换,变换成低维的,抽象的特征表达,并最终在输出层产生每个动作对应的Q值函数

具体架构如下

1:输入层

2:对输入层进行卷积操作

3:对第一隐藏层的输出进行卷积操作

4:对第二隐藏层的输出进行卷积操作

5:第三隐藏层与第四隐藏层的全连接操作

6:第四隐藏层与输出层的全连接操作

3:数据预处理 

包括以下几个部分

1:图像处理

2:动态信息预处理

3:游戏得分预处理

4:游戏随机开始的预处理

二、训练算法 

 DQN之所以能够较好的将深度学习与强化学习相结合,是因为它引入了三个核心技术 

1:目标函数

使用卷积神经网络结合全连接作为动作值函数的逼近器,实现端到端的效果,输入为视频画面,输出为有限数量的动作值函数

2:目标网络

设置目标网络来单独处理TD误差 使得目标值相对稳定

3:经验回放机制

有效解决数据间的相关性和非静态问题,使得网络输入的信息满足独立同分布的条件

 DQN训练流程图如下

 三、DQN算法优缺点

DQN算法的优点在于:算法通用性强,是一种端到端的处理方式,可为监督学习产生大量的样本。其缺点在于:无法应用于连续动作控制,只能处理具有短时记忆的问题,无法处理需长时记忆的问题,且算法不一定收敛,需要仔细调参

四、DQN在Breakout、Asterix游戏中的实战

接下来通过Atari 2600游戏任务中的Breakout,Asterix游戏来验证DQN算法的性能。

在训练过程中 Agent实行贪心策略,开始值为1并与环境进行交互,并将交互的样本经验保存在经验池中,点对于每个Atari游戏,DQN算法训练1000000时间步,每经历10000时间步,Agent将行为网络的参数复杂到目标网络,每经历1000时间步,模型进行一次策略性能评估

可视化如下 

训练阶段的实验数据如下

可以看出 有固定目标值的Q网络可以提高训练的稳定性和收敛性

loss变化如下 

 

 五、代码

部分代码如下

  1. import gym, random, pickle, os.path, math, glob
  2. import numpy as np
  3. import pandas as pd
  4. import matplotlib.pyplot as plt
  5. import numpy
  6. numpy.random.bit_generator = numpy.random.bit_generator
  7. import torch
  8. im=
  9. from atari_wrappers import make_atari, wrap_deepmind, LazyFrames
  10. from IPython.display import clear_output
  11. from tensorboardX import SummaryWriter
  12. from gym import envs
  13. env_names = [spec for spec in envs.registry]
  14. for name in sorted(env_names):
  15. print(name)
  16. device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  17. class DQN(nn.Module):
  18. def __init__(self, in_channels=4, num_actions=5):
  19. = nn.Conv2d(32, 64, kernel_size=4, stride=2)
  20. self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
  21. self.fc4 = nn.Linear(7 * 7 * 64, 512)
  22. self.fc5 = nn.Linear(512, num_actions)
  23. def forward(self, x):
  24. x = F.relu(self.conv1(x))
  25. x = F.relu(self.conv2(x))
  26. x = F.relu(self.conv3(x))
  27. x = F.relu(self.fc4(x.view(x.size(0), -1))) # 输出的维度是为[x.size(0),1]
  28. return self.fc5(x)
  29. class Memory_Buffer(object):
  30. def __init__(self, memory_size=1000):
  31. self.buffer = []
  32. self.memory_size = memory_size
  33. self.next_idx = 0
  34. def push(self, state, action, reward, next_state, done):
  35. data = (state, action, reward, next_state, done)
  36. if len(self.buffer) <= self.memory_size: # buffer not full
  37. self.buffer.append(data)
  38. else: # buffer is full
  39. self.buffer[self.next_idx] = data
  40. self.=s, rewards, next_states, dones = [], [], [], [], []
  41. for i in range(batch_size):
  42. idx = random.randint(0, self.size() - 1)
  43. data = self.buffer[idx]
  44. state, action, reward, next_state, done = data
  45. states.append(state)
  46. actions.append(action)
  47. rewards.append(reward)
  48. next_states.append(next_state)
  49. dones.append(done)
  50. return np.concatenate(states), actions, rewards, np.concatenate(next_states), dones
  51. def size(self):
  52. return len(self.buffer)
  53. class DQNAgent:
  54. def __init__(self, in_channels=1, action_space=[], USE_CUDA=False, memory_size=10000, epsilon=1, lr=1e-4):
  55. self.epsilo=ction_space
  56. self.memory_buffer = Memory_Buffer(memory_size)
  57. self.DQN = DQN(in_channels=in_channels, num_actions=action_space.n)
  58. self.DQN_target = DQN(in_channels=in_channels, num_actions=action_space.n)
  59. self.DQN_target.load_state_dict(self.DQN.state_dict())
  60. self.USE_CUDA = USE_CUDA
  61. if USE_CUDA:
  62. self.DQN = self.DQN.to(device)
  63. self.DQN_target = self.DQN_target.to(device)
  64. self.optimizer = optim.RMSprop(self.DQN.parameters(), lr=lr, eps=0.001, alpha=0.95)
  65. def observe(self, lazyframe):
  66. # from Lazy frame to tensor
  67. state = torch.from_numpy(lazyframe._force().transpose(2, 0, 1)[None] / 255).float()
  68. if self.USE_CUDA:
  69. state = state.to(device)
  70. return state
  71. def value(self, state):
  72. q_values = self.DQN(state)
  73. return q_values
  74. def act(self, state, epsilon=None):
  75. """
  76. sample actions with epsilon-greedy policy
  77. recap: with p = epsilon pick random action, else pick action with highest Q(s,a)
  78. """
  79. if epsilon is None:
  80. epsilon = self.epsilon
  81. q_values = self.value(state).cpu().detach().numpy()
  82. if random.random() < epsilon:
  83. aciton = random.randrange(self.action_space.n)
  84. else:
  85. aciton = q_values.argmax(1)[0]
  86. return aciton
  87. def compute_td_loss(self, states, actions, rewards, next_states, is_done, gamma=0=tensor(actions).long() # shape: [batch_size]
  88. rewards = torch.tensor(rewards, dtype=torch.float) # shape: [batch_size]
  89. is_done = torch.tensor(is_done, dtype=torch.uint8) # shape: [batch_size]
  90. if self.USE_CUDA:
  91. actions = actions.to(device)
  92. rewards = rewards.to(device)
  93. is_done = is_done.to(device)
  94. # get q-values for all actions in current states
  95. predicted_qvalues = self.DQN(states) # [32,action]
  96. # print("predicted_qvalues:",predicted_qvalues)
  97. # input()
  98. # select q-values for chosen actions
  99. predicted_qvalues_for_actions = predicted_qvalues[range(states.shape[0]), actions]
  100. # print("predicted_qvalues_for_actions:",predicted_qvalues_for_actions)
  101. # input()
  102. # compute q-values for all actions in next states
  103. predicted_next_qvalues = self.DQN_target(next_states)
  104. # compute V*(next_states) using predicted next q-values
  105. next_state_values = predicted_next_qvalues.max(-1)[0]
  106. # compute "target q-values" for loss - it's what's inside square parentheses in the above formula.
  107. target_qvalues_for_actions = rewards + gamma * next_state_values
  108. # at the last state we shall use simplified formula: Q(s,a) = r(s,a) since s' doesn't exist
  109. target_qvalues_for_actions = torch.where(is_done, rewards, target_qvalues_for_actions)
  110. # mean squared error loss to minimize
  111. # loss = torch.mean((predicted_qvalues_for_actions -
  112. # target_qvalues_for_actions.detach()) ** 2)
  113. loss = F.smooth_l1_loss(predicted_qvalues_for_actions, target_qvalues_for_actions.detach())
  114. return loss
  115. def sample_from_buffer(self, batch_size):
  116. states, actions, rewards, next_states, dones = [], [], [], [], []
  117. for i in range(batch_size):
  118. idx = random.randint(0, self.memory_buffer.size() - 1)
  119. data = self.memory_buffer.buffer[idx]
  120. frame, action, reward, next_frame, done = data
  121. states.append(self.observe(frame))
  122. actions.append(action)
  123. rewards.append(reward)
  124. next_states.append(self.observe(next_frame))
  125. dones.append(done)
  126. return torch.cat(states), actions, rewards, torch.cat(next_states), dones
  127. def learn_from_experience(self, batch_size):
  128. if self.memory_buffer.size() > batch_size:
  129. states, actions, rewards, next_states, dones = self.sample_from_buffer(batch_size)
  130. td_loss = self.compute_td_loss(states, actions, rewards, next_states, dones)
  131. self.optimizer.zero_grad()
  132. td_loss.backward()
  133. for param in self.DQN.parameters():
  134. param.grad.data.clamp_(-1, 1) # 梯度截断,防止梯度爆炸
  135. self.optimizer.step()
  136. return (td_loss.item())
  137. else:
  138. return (0)
  139. def plot_training(frame_idx, rewards, losses):
  140. pd.DataFrame(rewards, columns=['Reward']).to_csv(idname, index=False)
  141. clear_output(True)
  142. plt.figure(figsize=(20, 5))
  143. plt.subplot(131)
  144. plt.title('frame %s. reward: %s' % (frame_idx, np.mean(rewards[-10:])))
  145. plt.plot(rewards)
  146. plt.subplot(132)
  147. plt.title('loss')
  148. plt.plot(losses)
  149. plt.show()
  150. # Training DQN in PongNoFrameskip-v4
  151. idname = 'PongNoFrameskip-v4'
  152. env = make_atari(idname)
  153. env = wrap_deepmind(env, scale=False, frame_stack=True)
  154. #state = env.reset()
  155. #print(state.count())
  156. gamma = 0.99
  157. epsilon_max = 1
  158. epsilon_min = 0.01
  159. eps_decay = 30000
  160. frames = 2000000
  161. USE_CUDA = True
  162. learning_rate = 2e-4
  163. max_buff = 100000
  164. update_tar_interval = 1000
  165. batch_size = 32
  166. print_interval = 1000
  167. log_interval = 1000
  168. learning_start = 10000
  169. win_reward = 18 # Pong-v4
  170. win_break = True
  171. action_space = env.action_space
  172. action_dim = env.action_space.n
  173. state_dim = env.observation_space.shape[0]
  174. state_channel = env.observation_space.shape[2]
  175. agent = DQNAgent(in_channels=state_channel, action_space=action_space, USE_CUDA=USE_CUDA, lr=learning_rate)
  176. #frame = env.reset()
  177. episode_reward = 0
  178. all_rewards = []
  179. losses = []
  180. episode_num = 0
  181. is_win = False
  182. # tensorboard
  183. summary_writer = SummaryWriter(log_dir="DQN_stackframe", comment="good_makeatari")
  184. # e-greedy decay
  185. epsilon_by_frame = lambda frame_idx: epsilon_min + (epsilon_max - epsilon_min) * math.exp(-1. * frame_idx / eps_decay)
  186. plt.plot([epsilon_by_frame(i) for i in range(10000)])
  187. for i in range(frames):
  188. epsilon = epsilon_by_frame(i)
  189. #state_tensor = agent.observe(frames)
  190. #action = agent.act(state_tensor, epsilon)
  191. #next_frame, reward, done, _ = env.step(action)
  192. #episode_reward += reward
  193. #agent.memory_buffer.push(frame, action, reward, next_frame, done)
  194. #frame = next_frame
  195. loss = 0
  196. if agent.memory_buffer.size() >= learning_start:
  197. loss = agent.learn_from_experience(batch_size)
  198. losses.append(loss)
  199. if i % print_interval == 0:
  200. print("frames: %5d, reward: %5f, loss: %4f, epsilon: %5f, episode: %4d" %
  201. (i, np.mean(all_rewards[-10:]), loss, epsilon, episode_num))
  202. summary_writer.add_scalar("Temporal Difference Loss", loss, i)
  203. summary_writer.add_scalar("Mean Reward", np.mean(all_rewards[-10:]), i)
  204. summary_writer.add_scalar("Epsilon", epsilon, i)
  205. if i % update_tar_interval == 0:
  206. agent.DQN_target.load_state_dict(agent.DQN.state_dict())
  207. '''
  208. if done:
  209. frame = env.reset()
  210. all_rewards.append(episode_reward)
  211. episode_reward = 0
  212. episode_num += 1
  213. avg_reward = float(np.mean(all_rewards[-100:]))
  214. '''
  215. summary_writer.close()
  216. # 保存网络参数
  217. #torch.save(agent.DQN.state_dict(), "trained model/DQN_dict.pth.tar")
  218. plot_training(i, all_r=

创作不易 觉得有帮助请点赞关注收藏~~~

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/神奇cpp/article/detail/806939
推荐阅读
相关标签
  

闽ICP备14008679号