当前位置:   article > 正文

强化学习DQN代码解析——自动驾驶_dqn代码逐行解读

dqn代码逐行解读

        本次将介绍一下强化学习当中的DQN (Deep Q-Learning) 算法,并通过一个自动驾驶的案例(pytorch)来具体说明,代码如下,包含代码运行过程中会出现的一些错误的解决方法。

一、DQN算法

        关于DQN算法的原理介绍,网上有很多详细的解读,在此不具体展开,只提及一些个人认为有助于理解和比较关键的点。

        首先,理解DQN算法之前,我们可以先了解一下强化学习和Q-learning算法的实现原理。强化学习可以理解为与环境的交互学习。其中,最关键的即是:

        状态(state)、策略(policy)、动作(action)、reward(奖励)

        对于每一个状态,我们会给出一个策略,智能体会执行一个动作,并得到相应的奖励。我们训练的目的则是使得累计的奖励最大。

    Q-learning算法:简单来说,是在策略的选择上进行展开的。该算法中存在一张表格,记录着每一个状态下执行每一个动作所得到的Q值,在选取动作时会进行表格的查阅,然后选取Q值最大的动作。

        DQN算法:了解完Q-learning算法后,我们再来考虑DQN算法出现的初衷。当状态和动作为连续的、无限的,我们通过表格的方式记录就不合理了。这时我们采用神经网络来替代表格,输入为状态,输出为动作。这其中涉及几个关键的改变:

        1、经验池的储存:既然存在神经网络,我们就需要有样本进行训练,由于输入是状态输出是动作,那么我们可以将每一次的【当前状态,动作,奖励,下一状态】储存进经验池,当经验池满容量时,进行训练,此后有新的样本将不断替换经验池中的原有样本,反复训练。

        2、双网络:在DQN中存在两个神经网络,一个是Q-target,一个是Q-eval。其中Q-eval网络会不断进行训练和Q值的更新,而Q-target则是相对固定,只有经过固定训练轮次后才会与Q-eval同步。损失函数就是两个网络的Q值之差,我们需要使其差值越来越小。如果两个网络都在改变,很有可能会“错过”,当固定一个网络时,网络的训练会更有目标性,即更容易收敛。

        3、e-greedy算法:上述我们在选择动作时,一般是选择Q值最大的那个动作。但如果有的执行动作不被选择过,它的Q值将始终是0,因此可能永远都不会被选择。所以我们引入动作选择时的随机性,使其具有一定概率去随机选择动作而不是完全依靠Q值的大小选择最优动作。

二、自动驾驶环境配置

        我们使用Edouard Leurent发布在github上的highway-env包。

        包下载链接:项目目录预览 - highway-env - GitCode  

  1. import torch
  2. import torch.nn as nn
  3. from torch.autograd import Variable
  4. from torch import FloatTensor, LongTensor, ByteTensor
  5. from collections import namedtuple
  6. import random
  7. import gymnasium as gym
  8. import highway_env
  9. from matplotlib import pyplot as plt
  10. import numpy as np

首先导入所需要的库。

2.1 初始化环境

  1. # 初始化环境
  2. config = \
  3. {
  4. "observation":
  5. {
  6. "type": "Kinematics",
  7. "vehicles_count": 5,
  8. "features": ["presence", "x", "y", "vx", "vy", "cos_h", "sin_h"],
  9. "features_range":
  10. {
  11. "x": [-100, 100],
  12. "y": [-100, 100],
  13. "vx": [-20, 20],
  14. "vy": [-20, 20]
  15. },
  16. "absolute": False,
  17. "order": "sorted"
  18. },
  19. "simulation_frequency": 8, # [Hz]
  20. "policy_frequency": 2, # [Hz]
  21. }
  22. highway_env.register_highway_envs() # 解决Environment `highway` doesn't exist.
  23. env = gym.make("highway-v0", render_mode='rgb_array')
  24. env.configure(config)

 其中,highway_env.register_highway_envs()可以解决由于gym版本带来的以下问题:          (Environment `highway` doesn't exist.)

如果出现:OMP: Error #15: Initializing libiomp5md.dll, but found libiomp5md.dll already initialized.

可以参考:关于OMP: Error #15: Initializing libiomp5md.dll, but found libiomp5md.dll already initialized.错误解决方法 - 知乎 (zhihu.com)

2.2 网络的建立

  1. # DQN网络模型基本参数
  2. Tensor = FloatTensor
  3. GAMMA = 0.9
  4. TARGET_NETWORK_REPLACE_FREQ = 50 # target网络更新频率
  5. MEMORY_CAPACITY = 200 # 经验库容量
  6. BATCH_SIZE = 100 # 批量训练
  7. LR = 0.01 # 学习率
  8. # DQNNET的建立
  9. class DQNNet(nn.Module):
  10. def __init__(self):
  11. super(DQNNet, self).__init__()
  12. self.linear1 = nn.Linear(35, 35)
  13. self.linear2 = nn.Linear(35, 5)
  14. def forward(self, s):
  15. s = torch.FloatTensor(s)
  16. s = s.view(s.size(0), 1, 35)
  17. s = self.linear1(s)
  18. s = self.linear2(s)
  19. return s

        此小节定义了网络参数以及网络结构。在此我们只定义了两层线性全连接层,当然也可以根据自己的需求进行修改。 

2.3  DQN算法

  1. class DQN(object):
  2. def __init__(self):
  3. self.net, self.target_net = DQNNet(), DQNNet()
  4. self.learn_step_counter = 0
  5. self.memory = []
  6. self.position = 0
  7. self.capacity = MEMORY_CAPACITY
  8. self.optimizer = torch.optim.Adam(self.net.parameters(), lr=LR)
  9. self.loss_func = nn.MSELoss()
  10. # 动作选择函数,动作的选择遵循e-greedy算法
  11. def choose_action(self, s, e):
  12. x = np.expand_dims(s, axis=0) # 拓展数组的形状和维度,使其满足输入格式
  13. if np.random.uniform() < 1 - e: # 从(0,1)范围内随机取值
  14. actions_value = self.net.forward(x) # 向前传播
  15. action = torch.max(actions_value, -1)[1].data.numpy()
  16. action = action.max() # 选取Q值最大的一个动作
  17. else:
  18. action = np.random.randint(0, 5) # 随机选取动作
  19. return action
  20. # 放入经验库
  21. def push_memory(self, s, a, r, s_):
  22. if len(self.memory) < self.capacity:
  23. self.memory.append(None)
  24. self.memory[self.position] = Transition(torch.unsqueeze(torch.FloatTensor(s), 0),
  25. torch.unsqueeze(torch.FloatTensor(s_), 0),
  26. torch.from_numpy(np.array([a])),
  27. torch.from_numpy(np.array([r], dtype='float32'))) #
  28. self.position = (self.position + 1) % self.capacity
  29. # 如果有超过经验库容量的样本,则从头开始替换经验库里的样本
  30. # 随机选取batch个样本进行训练
  31. def get_sample(self, batch_size):
  32. sample = random.sample(self.memory, batch_size)
  33. return sample
  34. # 训练
  35. def learn(self):
  36. if self.learn_step_counter % TARGET_NETWORK_REPLACE_FREQ == 0: # 检查是否到达更新频率
  37. self.target_net.load_state_dict(self.net.state_dict()) # 将eval网络参数同步到target网络
  38. self.learn_step_counter += 1
  39. transitions = self.get_sample(BATCH_SIZE) # 选择批量训练样本
  40. batch = Transition(*zip(*transitions))
  41. b_s = Variable(torch.cat(batch.state))
  42. b_s_ = Variable(torch.cat(batch.next_state))
  43. b_a = Variable(torch.cat(batch.action))
  44. b_r = Variable(torch.cat(batch.reward))
  45. q_eval = self.net.forward(b_s).squeeze(1).gather(1, b_a.unsqueeze(1).to(torch.int64))
  46. q_next = self.target_net.forward(b_s_).detach() #
  47. q_target = b_r + GAMMA * q_next.squeeze(1).max(1)[0].view(BATCH_SIZE, 1).t()
  48. loss = self.loss_func(q_eval, q_target.t())
  49. self.optimizer.zero_grad()
  50. loss.backward()
  51. self.optimizer.step()
  52. return loss

        此小节定义了DQN类。其中包含动作的选择、经验库的增加、训练样本的提取、训练过程。 

 2.4 模型训练

  1. Transition = namedtuple('Transition', ('state', 'next_state', 'action', 'reward'))
  2. # 训练模型
  3. dqn = DQN()
  4. count = 0
  5. sum_reward = 0
  6. all_reward = []
  7. all_loss = []
  8. for episode in range(100): # 100次碰撞
  9. done = False
  10. s = env.reset()[0] # 初始化环境,得到初始状态
  11. reward = []
  12. while not done:
  13. e = np.exp(-count / 300) # 随机选择action的概率,随着训练次数增多逐渐降低
  14. a = dqn.choose_action(s, e) # 动作选择
  15. s_, r, done, info, _ = env.step(a) # 环境交互,得到下一步的状态
  16. env.render() # 渲染
  17. dqn.push_memory(s, a, r, s_) # 将该行为放入经验池
  18. if (dqn.position != 0) & (dqn.position % 99 == 0): # if 经验池容量已满或者全部重新更新
  19. loss_ = dqn.learn() # 训练、损失函数
  20. all_loss.append(loss_)
  21. count += 1
  22. print('trained times:', count)
  23. s = s_ # 更新状态
  24. reward.append(r)
  25. sum_reward = np.sum(reward)
  26. all_reward.append(sum_reward)
  27. plt.plot(all_reward, 'b*--', alpha=0.5, linewidth=1, label='acc')
  28. plt.show()
  29. a1 = torch.Tensor(all_loss)
  30. plt.plot(a1.detach().numpy(), 'b*--', alpha=0.5, linewidth=1, label='acc')
  31. plt.show()

env.step()返回值的含义:
        观察(Observation): 新的观察表示汽车的新位置、速度等,即状态。这些信息会用于决定下一步的动作;
        奖励(Reward): 如果汽车成功避开了障碍物并靠近了目的地,你可能会得到正的奖励。反之,如果碰到了障碍物,可能是负的奖励;
        完成标记(Done): 如果汽车到达目的地或者碰到障碍物导致任务失败,done会被设为 True,表示这一回合结束;
        额外信息(Info): 这里可能包含模拟器内部的一些额外信息,比如是否违反了交通规则。但这通常不用于模型训练;

其中,s = env.reset()    →   s = env.reset()[0]

可以解决问题 :expected sequence of length 5 at dim 1 (got 4).

s_, r, done, info= env.step(a)    →   s_, r, done, info, _ = env.step(a)

可以解决问题 :too many values to unpack (expected 4).

最后画出了累计奖励和损失函数图。后续我们可以自行调参训练,本文参数无参考意义。大部分代码问题和版本有关,如果运行本代码还是有问题,可以尝试将本文做的一些修改给改回去。

附上原文链接,但跑原文代码时遇到些许问题,在此分享个人的一些解决方法和理解。

仅供学习用途,侵权删。

原链接

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/601516
推荐阅读
相关标签
  

闽ICP备14008679号