当前位置:   article > 正文

Python神经网络学习(七)--强化学习--使用神经网络_python 强化学习

python 强化学习

前言

前面说到了强化学习,但是仅仅是使用了一个表格,这也是属于强化学习的范畴了,毕竟强化学习属于在试错中学习的。

但是现在有一些问题,如果这个表格非常大呢?悬崖徒步仅仅是一个长12宽4,每个位置4个动作的表格而已,如果游戏是英雄联盟,那么多的位置,每个位置那么多的可能动作,画出一个表格简直是不可想象的。

但其实,如果把这个表格看作一个数学函数,他的输入是坐标,输出是一个动作(或者每个动作对应的价值):

那也就是说,只要我们有一个坐标,得到一个动作,中间什么过程是可以不用管的,还记得这篇文章中说过:神经元(函数)+神经元(函数) = 神经网络(人工神经网络),那么,中间这一块也就可以使用神经网络代替,这也就是深度强化学习。

论文(Playing Atari with Deep Reinforcement Learning)地址:https://arxiv.org/abs/1312.5602

 设置环境

注意:今天的环境代码我修改过了,跟上一篇的不一样,所以大家还是要先读一下环境代码。

本次环境代码中添加了对于棋盘大小的设置,修复了一些bug。

  1. # -*- coding: utf-8 -*-
  2. """
  3. 作者:CSDN,chuckiezhu
  4. 作者地址:https://blog.csdn.net/qq_38431572
  5. 本文可用作学习使用,交流代码时需要附带本出处声明
  6. """
  7. import random
  8. import numpy as np
  9. from gym import spaces
  10. """
  11. nrows
  12. 0 1 2 3 4 5 6 7 8 9 10 11 ncols
  13. ---------------------------------------
  14. 0 | | | | | | | | | | | | |
  15. ---------------------------------------
  16. 1 | | | | | | | | | | | | |
  17. ---------------------------------------
  18. 2 | | | | | | | | | | | | |
  19. ---------------------------------------
  20. 3 * | cliff | ^ |
  21. *: start point
  22. cliff: cliff
  23. ^: goal
  24. """
  25. class CustomCliffWalking(object):
  26. def __init__(self, stepReward: int=-1, cliffReward: int=-10, goalReward: int=10, col=12, row=4) -> None:
  27. self.sr = stepReward
  28. self.cr = cliffReward
  29. self.gr = goalReward
  30. self.col = col
  31. self.row = row
  32. self.action_space = spaces.Discrete(4) # 上下左右
  33. self.reward_range = (cliffReward, goalReward)
  34. self.pos = np.array([row-1, 0], dtype=np.int8) # agent 在3,0处出生,掉到悬崖内就会死亡,触发done和cliffReward
  35. self.die_pos = []
  36. for c in range(1, self.col-1):
  37. self.die_pos.append([self.row-1, c])
  38. print("die pos: ", self.die_pos)
  39. print("goal pos: ", [[self.row-1, self.col-1]])
  40. self.reset()
  41. def reset(self, random_reset=False):
  42. """
  43. 初始化agent的位置
  44. random: 是否随机出生, 如果设置random为True, 则出生点会随机产生
  45. """
  46. x, y = self.row-1, 0
  47. if random_reset:
  48. y = random.randint(0, self.col-1)
  49. if y == 0:
  50. x = random.randint(0, self.row-1)
  51. else: # 除了正常坐标之外,还有一个不正常坐标:(3, 0)
  52. x = random.randint(0, self.row-2)
  53. # 严格来讲,cliff和goal不算在坐标体系内
  54. # agent 在3,0处出生,掉到悬崖内就会死亡,触发done和cliffReward
  55. self.pos = np.array([x, y], dtype=np.int8)
  56. # print("reset at:", self.pos)
  57. def step(self, action: int) -> list[list, int, bool, bool, dict]:
  58. """
  59. 执行一个动作
  60. action:
  61. 0: 上
  62. 1: 下
  63. 2: 左
  64. 3: 右
  65. """
  66. move = [
  67. np.array([-1, 0], dtype=np.int8), # 向上,就是x-1, y不动,
  68. np.array([ 1, 0], dtype=np.int8), # 向下,就是x+1, y不动,
  69. np.array([0, -1], dtype=np.int8), # 向左,就是y-1, x不动,
  70. np.array([0, 1], dtype=np.int8), # 向右,就是y+1, x不动,
  71. ]
  72. new_pos = self.pos + move[action]
  73. # 上左不能小于0
  74. new_pos[new_pos < 0] = 0 # 超界的处理,比如0, 0 处向上或者向右走,处理完还是0,0
  75. # 上右不能超界
  76. if new_pos[0] > self.row-1:
  77. new_pos[0] = self.row-1 # 超界处理
  78. if new_pos[1] > self.col-1:
  79. new_pos[1] = self.col-1
  80. reward = self.sr # 每走一步的奖励
  81. die = False
  82. win = False
  83. info = {
  84. "reachGoal": False,
  85. "fallCliff": False,
  86. }
  87. if self.__is_pos_die(new_pos.tolist()):
  88. die = True
  89. info["fallCliff"] = True
  90. reward = self.cr
  91. elif self.__is_pos_win(new_pos.tolist()):
  92. win = True
  93. info["reachGoal"] = True
  94. reward = self.gr
  95. self.pos = new_pos # 更新坐标
  96. return new_pos, reward, die, win, info
  97. def __is_pos_die(self, pos: list[int, int]) -> bool:
  98. """判断自己的这个状态是不是已经结束了"""
  99. return pos in self.die_pos
  100. def __is_pos_win(self, pos: list[int, int]) -> bool:
  101. """判断自己的这个状态是不是已经结束了"""
  102. return pos in [
  103. [self.row-1, self.col-1],
  104. ]

至于讲解这个环境,我觉得这个注释还是比较清楚的,如果有不明白的,请评论留言告知我。

制作网络

首先,我们先把自己代入表格,如果我们站到某个坐标,那么我们应该知道四个方向上的奖励,所以,网络可以有两种方式;

方式一、

网络输入是坐标和方向,输出是对应的奖励。

方式二、

网络输入是坐标,输出是四个方向对应的奖励。

这里我要来一句场外推理:方式一真的很麻烦,并且选择动作的时候,有多少个动作需要经过多少次网络。所以方式二是比较好的选择。

  1. class Qac(nn.Module):
  2. def __init__(self, in_shape, out_shape) -> None:
  3. super(Qac, self).__init__()
  4. self.in_shape = in_shape # 就是 智能体 现在的坐标
  5. self.action_space = out_shape # 上0下1左2右3
  6. self.dense1 = nn.Linear(self.in_shape, self.action_space)
  7. # 输出就是每个动作的价值
  8. self.lrelu = nn.LeakyReLU() # 换用tanh
  9. self.softmax = nn.Softmax(-1)
  10. def forward(self, x) -> torch.Tensor:
  11. x = self.dense1(x)
  12. return x
  13. def sample_action(self, action_value: torch.Tensor, epsilon: float):
  14. """从产生的动作概率中采样一个动作,利用epsilon贪心"""
  15. if random.random() < epsilon:
  16. # 随机选择
  17. action = random.randint(0, self.action_space-1)
  18. action = torch.tensor(action)
  19. else:
  20. action = torch.argmax(action_value)
  21. return action
  22. def load_model(self, modelpath):
  23. """加载模型"""
  24. tmp = torch.load(modelpath)
  25. self.load_state_dict(tmp["model"])
  26. def save_model(self, modelpath):
  27. """保存模型"""
  28. tmp = {
  29. "model": self.state_dict(),
  30. }
  31. torch.save(tmp, modelpath)

细心的人可能发现了,这个网络只有一层,非常简单,好像没有所谓的“特征提取”就直接到输出层了。这里有一个小技巧,就是我手动把坐标转成了onehot向量,可以认为是手动提取了特征。

  1. def num_to_onehot(pos: torch.Tensor) -> torch.Tensor:
  2. """把坐标转成one_hot向量"""
  3. n = int((pos[0] * 12 + pos[1]).item())
  4. return nn.functional.one_hot(torch.tensor(n), num_classes=48)

如果大家使用两层神经网络,直接输入坐标,中间层是48,然后是一个输出层,也可以, 但是我试了,训练很慢,效果不好。不如这样直接手动编码了。

训练

整个训练的代码我直接贴在这里了:

  1. # -*- coding: utf-8 -*-
  2. """
  3. 利用DQN实现
  4. """
  5. """
  6. 作者:CSDN,chuckiezhu
  7. 作者地址:https://blog.csdn.net/qq_38431572
  8. 本文可用作学习使用,交流代码时需要附带本出处声明
  9. """
  10. import os
  11. import random
  12. import torch
  13. import numpy as np
  14. from torch import nn
  15. from matplotlib import pyplot as plt
  16. from cliff_walking_env import CustomCliffWalking
  17. nepisodes = 10000 # total 1w episodes
  18. epsilon = 1.0 # epsilon greedy policy
  19. epsilon_min = 0.05
  20. epsilon_decay = 0.9975
  21. gamma = 0.9 # discount factor
  22. lr = 0.001
  23. random_reset = False
  24. seed = 42
  25. normalization = torch.tensor([3, 11], dtype=torch.float)
  26. sr = -1
  27. cr = -10
  28. gr = 10
  29. class Qac(nn.Module):
  30. def __init__(self, in_shape, out_shape) -> None:
  31. super(Qac, self).__init__()
  32. self.in_shape = in_shape # 就是智能体现在的坐标
  33. self.action_space = out_shape # 上0下1左2右3
  34. self.dense1 = nn.Linear(self.in_shape, self.action_space)
  35. # 输出就是每个动作的价值
  36. self.lrelu = nn.LeakyReLU() # 换用tanh
  37. self.softmax = nn.Softmax(-1)
  38. def forward(self, x) -> torch.Tensor:
  39. x = self.dense1(x)
  40. return x
  41. def sample_action(self, action_value: torch.Tensor, epsilon: float):
  42. """从产生的动作概率中采样一个动作,利用epsilon贪心"""
  43. if random.random() < epsilon:
  44. # 随机选择
  45. action = random.randint(0, self.action_space-1)
  46. action = torch.tensor(action)
  47. else:
  48. action = torch.argmax(action_value)
  49. return action
  50. def load_model(self, modelpath):
  51. """加载模型"""
  52. tmp = torch.load(modelpath)
  53. self.load_state_dict(tmp["model"])
  54. def save_model(self, modelpath):
  55. """保存模型"""
  56. tmp = {
  57. "model": self.state_dict(),
  58. }
  59. torch.save(tmp, modelpath)
  60. def num_to_onehot(pos: torch.Tensor) -> torch.Tensor:
  61. """把坐标转成one_hot向量"""
  62. n = int((pos[0] * 12 + pos[1]).item())
  63. return nn.functional.one_hot(torch.tensor(n), num_classes=48)
  64. def main():
  65. global epsilon
  66. random.seed(seed)
  67. torch.manual_seed(seed=seed)
  68. plt.ion()
  69. os.makedirs("./out/ff_DQN/")
  70. # cw = gym.make("CliffWalking-v0", render_mode="human")
  71. cw = CustomCliffWalking(stepReward=sr, goalReward=gr, cliffReward=cr)
  72. # 专程onehot了
  73. Q = Qac(in_shape=48, out_shape=cw.action_space.n)
  74. optimizer = torch.optim.Adam(Q.parameters(), lr=lr)
  75. loss_fn = torch.nn.MSELoss()
  76. win_1000 = [] # 记录最近一千场赢的几率
  77. total_win = 0
  78. for i in range(1, nepisodes+1):
  79. cw.reset(random_reset=random_reset) # 重置环境
  80. steps = 0
  81. while True:
  82. steps += 1
  83. state_now = torch.tensor(cw.pos, dtype=torch.float)
  84. state_now = num_to_onehot(state_now).unsqueeze_(0).to(torch.float)
  85. action_values = Q(state_now)
  86. action_values = action_values.squeeze()
  87. action_now = Q.sample_action(action_value=action_values, epsilon=epsilon)
  88. action_now_value = action_values[action_now] # 这个是采取这个动作的预测奖励
  89. state_next, reward_now, terminated, truncated, info = cw.step(action=action_now.item()) # 执行动作
  90. state_next = num_to_onehot(state_next).unsqueeze_(0).to(torch.float)
  91. with torch.no_grad():
  92. next_values = Q(state_next)
  93. next_values = next_values.squeeze()
  94. # 得到下一个的动作,(同一个策略下,因为这是onpolicy的sarsa
  95. action_next = Q.sample_action(action_value=action_values, epsilon=epsilon)
  96. action_next_value = next_values[action_next] # 计算下一个动作的预期价值
  97. # 计算 instantR + gamma * value_next,这个是实际上这个动作带来的预期收益
  98. discounted_reward = reward_now + gamma * action_next_value * (1 - terminated) * (1 - truncated)
  99. # 计算误差
  100. loss = loss_fn(action_now_value, discounted_reward)
  101. optimizer.zero_grad()
  102. loss.backward()
  103. optimizer.step()
  104. if terminated or truncated:
  105. if terminated:
  106. win_1000.append(0)
  107. if truncated:
  108. win_1000.append(1)
  109. total_win += 1
  110. break
  111. epsilon = epsilon * epsilon_decay
  112. epsilon = max(epsilon, epsilon_min) # 衰减学习旅
  113. win_1000 = win_1000[-1000:]
  114. win_rate = sum(win_1000)/1000.0
  115. print("{}/{}, 当前探索率: {}, 是否成功: {}, 千场胜率:{}.".format(i, nepisodes, epsilon, truncated, win_rate), flush=True)
  116. if i % 10000 == 0:
  117. Q.save_model("./out/ff_DQN/Qac_{}_{}_{}_{}.pth".format(i, gr, cr, win_rate))
  118. print("total win: ", total_win)
  119. # 收尾测试看看能不能通关
  120. path = np.zeros((4, 12), dtype=np.float64)
  121. cw.reset(random_reset=False)
  122. steps = 0
  123. while steps <= 48: # 走,48步走不到头就不会走到了
  124. steps += 1
  125. state_now = torch.tensor(cw.pos, dtype=torch.float)
  126. state_now = num_to_onehot(state_now).unsqueeze_(0).to(torch.float)
  127. action_values = Q(state_now).squeeze()
  128. # 贪心算法选择动作
  129. action_now = Q.sample_action(action_values, 0)
  130. print(cw.pos[0], cw.pos[1], action_now)
  131. new_pos, _, die, win, _ = cw.step(action=action_now)
  132. if win:
  133. print("[+] you win!")
  134. break
  135. if die:
  136. print("[+] you lose!")
  137. break
  138. x = new_pos[0]
  139. y = new_pos[1]
  140. if x >= 0 and x <= 3 and y >= 0 and y <= 11:
  141. path[x, y] = 1.0
  142. plt.imshow(path)
  143. plt.colorbar()
  144. plt.savefig("./out/ff_DQN/path_sarsa_"+str(sr)+"_"+str(gr)+"_"+str(cr)+".png")
  145. if __name__ == "__main__":
  146. main()

上面的代码我测试没问题,如果不修改直接使用是完全可以的,目录结构是这样的:

那两个文件夹都是自动生成的,不需要手动建立。 

网络结构分析

这是上面代码的网络结构和更新流程。注意:实线代表有梯度,虚线代表无梯度。

每次由环境产生一个状态,先转成一个one_hot向量,作为网络的输入,得到四个动作分别价值多少。然后采样到的动作得到当前的Q(s, a)值,也就是action_value。

另一方面,采样得到的动作送入环境,环境给出下一个状态和立即奖励。下一个状态送入网络(没有梯度的计算),同样得到四个动作的价值。由于代码使用的是SARSA算法,所以需要按照同样的策略采样一个动作,同时得到动作的价值。也就是next_action_value。

这个时候,就可以根据环境的立即奖励reward_now和下一个状态的动作的价值next_action_value得到一个ground truth,而action_value作为网络的预测值,这两个可以用于计算损失。

损失的反向传播就是沿着实现传递到顶。实现网络的更新。

 

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号