当前位置:   article > 正文

【机器学习】强化学习(四)-时序差分学习

时序差分学习

蒙特卡洛算法需要使用完整的片段进行计算,这在有些问题中是不现实的,尤其是对于没有终止状态的问题。时序差分算法对此进行了改进

5593ef44acfac8d37dabe9e9a713d401.png

7f4fe94283995533602016a8c17cbfd6.png

蒙特卡洛控制和时序差分学习有什么区别?

8f615d1a1ee7aa7986444e9464a94d11.png

四、时序差分算法(Temporal Difference Learning, TD 学习)

7edd4a11530e57e6dae72aa13befee1b.jpeg

58251638064ea782ac62d19423c1b663.png

4.1 时序差分(0)

1cbefc77fe6c1d2e9729a54b234376b2.png

5adec1391dfb5df56b152e2de764608f.png

cea763064e551d677df788f7573d8e79.png

4.2 Sarsa算法

d80eb1f2bd2ed77ede7d87c51fdfe8f3.png

076e50956c2f0df4bf3170ea54c7d170.png

e775b85dd2669c174e58ad26584df9f8.png

4.3 Q学习(Q-learning)

e0bb9c327c91b7a14f705f1f46e04656.png

adcfa8ffc425d9872cd8d9f3359fc34b.png

97541e4088b179157e3381776a9e7522.png

4.4 Sarsa和Q-learning有什么区别?

8ee0528240e51e90ad5f7c9deadc3ba0.png

4.5 示例代码

公共类:discrete.py  plotting.py

离散环境的类 discrete.py它继承自 gym 库的 Env 类,用于创建和管理强化学习的环境。它的主要功能是:

  • 定义了环境的基本属性,如状态的数量,动作的数量,状态转移的概率,初始状态的分布,动作的空间,状态的空间等。

  • 定义了环境的基本方法,如设置随机数种子,重置环境,执行一个动作,返回下一个状态,奖励,是否结束和附加信息等。

  • 使用了 numpy 库,gym 库和 categorical_sample 函数来进行数值计算,环境管理和概率采样等操作。

  1. # 导入 numpy 库,用于进行数值计算
  2. import numpy as np
  3. # 导入 gym 库,用于创建和管理强化学习的环境
  4. from gym import Env, spaces
  5. # 导入 gym 库的 seeding 模块,用于设置随机数种子
  6. from gym.utils import seeding
  7. # 导入 gym 库的 toy_text 模块的 categorical_sample 函数,用于从一个概率分布中采样一个类别
  8. from gym.envs.toy_text.utils import categorical_sample
  9. # 定义一个离散环境的类,继承自 gym 库的 Env 类
  10. class DiscreteEnv(Env):
  11. """
  12. Has the following members
  13. - nS: number of states # 状态的数量
  14. - nA: number of actions # 动作的数量
  15. - P: transitions (*) # 状态转移的概率
  16. - isd: initial state distribution (**) # 初始状态的分布
  17. (*) dictionary of lists, where
  18. P[s][a] == [(probability, nextstate, reward, done), ...] # P[s][a] 是一个列表,表示在状态 s 下采取动作 a 后,可能的下一个状态,奖励和是否结束的概率
  19.     (**) list or array of length nS # isd 是一个长度为 nS 的列表或数组,表示每个状态作为初始状态的概率
  20. """
  21. # 定义初始化方法,接受四个参数:状态的数量,动作的数量,状态转移的概率,初始状态的分布
  22. def __init__(self, nS, nA, P, isd):
  23. self.P = P # 将状态转移的概率赋值给 self.P
  24. self.isd = isd # 将初始状态的分布赋值给 self.isd
  25. self.lastaction = None # for rendering # 定义一个属性,用于记录上一次的动作,用于渲染
  26. self.nS = nS # 将状态的数量赋值给 self.nS
  27. self.nA = nA # 将动作的数量赋值给 self.nA
  28. # 定义一个属性,表示动作的空间,是一个离散的空间,取值范围是 [0, nA-1]
  29. self.action_space = spaces.Discrete(self.nA)
  30. # 定义一个属性,表示状态的空间,是一个离散的空间,取值范围是 [0, nS-1]
  31. self.observation_space = spaces.Discrete(self.nS)
  32. self.seed() # 调用 seed 方法,设置随机数种子
  33. # 从初始状态的分布中采样一个状态,赋值给 self.s
  34. self.s = categorical_sample(self.isd, self.np_random)
  35. # 定义一个方法,用于设置随机数种子,接受一个参数:种子
  36. def seed(self, seed=None):
  37. # 调用 seeding 模块的 np_random 函数,根据种子生成一个随机数生成器,赋值给 self.np_random,并返回种子
  38. self.np_random, seed = seeding.np_random(seed)
  39. return [seed]
  40. # 定义一个方法,用于重置环境,返回初始状态
  41. def reset(self):
  42. # 从初始状态的分布中采样一个状态,赋值给 self.s
  43. self.s = categorical_sample(self.isd, self.np_random)
  44. self.lastaction = None # 将上一次的动作设为 None
  45. return int(self.s) # 返回初始状态,转换为整数类型
  46. # 定义一个方法,用于执行一个动作,返回下一个状态,奖励,是否结束和附加信息
  47. def step(self, a):
  48. # 根据当前状态和动作,从状态转移的概率中获取可能的转移列表,赋值给 transitions
  49. transitions = self.P[self.s][a]
  50. # 从转移列表中,根据转移的概率,采样一个转移的索引,赋值给 i
  51. i = categorical_sample([t[0] for t in transitions], self.np_random)
  52. # 根据转移的索引,获取转移的概率,下一个状态,奖励和是否结束,赋值给 p, s, r, d
  53. p, s, r, d = transitions[i]
  54. self.s = s # 将下一个状态赋值给 self.s
  55. self.lastaction = a # 将当前动作赋值给 self.lastaction
  56. # 返回下一个状态,奖励,是否结束和附加信息,其中附加信息是一个字典,包含转移的概率,下一个状态转换为整数类型
  57.         return (int(s), r, d, {"prob": p})

用于绘制一些问题中的价值函数的图形的函数 plotting.py 。价值函数表示在不同的状态下,采取最优策略能够获得的期望回报。这些代码使用了matplotlib库,numpy库,pandas库和namedtuple来进行数据处理和图形绘制。代码中定义了三个函数,分别是:

  • plot_cost_to_go_mountain_car:这个函数用于绘制山地车问题的价值函数,山地车问题是一个连续状态空间的强化学习问题,目标是让一辆车在两座山之间来回移动,最终到达右边的山顶。这个函数接受一个环境对象,一个估计器对象和一个网格数作为参数,然后生成一个三维的曲面图,显示在不同的位置和速度下,采取最优动作的成本(负的价值)。

  • plot_value_function:这个函数用于绘制二十一点游戏的价值函数,二十一点游戏是一个离散状态空间的强化学习问题,目标是让玩家的牌的总和尽可能接近21,但不超过21,同时要比庄家的牌的总和大。这个函数接受一个价值函数字典和一个标题作为参数,然后分别绘制两个三维的曲面图,显示在不同的玩家总和和庄家显示牌下,有可用的Ace和没有可用的Ace的情况下的价值。

  • plot_episode_stats:这个函数用于绘制每个回合的统计信息,包括回合的长度,回合的奖励,回合的时间步数和回合的编号。这个函数接受一个命名元组,一个平滑窗口和一个是否显示图形的标志作为参数,然后分别绘制三个二维的折线图,显示回合的长度,回合的奖励和回合的时间步数随回合的编号的变化。这个函数返回三个图形对象。

  1. # 导入matplotlib库,用于绘制图形
  2. import matplotlib
  3. # 导入numpy库,用于进行数值计算
  4. import numpy as np
  5. # 导入pandas库,用于进行数据分析
  6. import pandas as pd
  7. # 导入namedtuple,用于创建命名元组
  8. from collections import namedtuple
  9. # 导入pyplot模块,用于绘制二维图形
  10. from matplotlib import pyplot as plt
  11. # 导入Axes3D模块,用于绘制三维图形
  12. from mpl_toolkits.mplot3d import Axes3D
  13. # 创建一个命名元组,用于存储每个回合的长度和奖励
  14. EpisodeStats = namedtuple("Stats",["episode_lengths", "episode_rewards"])
  15. # 定义一个函数,用于绘制山地车问题的价值函数
  16. def plot_cost_to_go_mountain_car(env, estimator, num_tiles=20):
  17. # 生成一个等差数列,表示状态空间中的位置范围
  18. x = np.linspace(env.observation_space.low[0], env.observation_space.high[0], num=num_tiles)
  19. # 生成一个等差数列,表示状态空间中的速度范围
  20. y = np.linspace(env.observation_space.low[1], env.observation_space.high[1], num=num_tiles)
  21. # 生成一个网格,表示状态空间中的所有可能组合
  22. X, Y = np.meshgrid(x, y)
  23. # 对每个状态,计算估计器预测的最大动作价值,并取负数,表示成本
  24. Z = np.apply_along_axis(lambda _: -np.max(estimator.predict(_)), 2, np.dstack([X, Y]))
  25. # 创建一个图形对象,设置大小为10*5
  26. fig = plt.figure(figsize=(10, 5))
  27. # 在图形对象上添加一个子图,设置为三维投影
  28. ax = fig.add_subplot(111, projection='3d')
  29. # 在子图上绘制一个曲面,表示价值函数
  30. surf = ax.plot_surface(X, Y, Z, rstride=1, cstride=1,
  31. cmap=matplotlib.cm.coolwarm, vmin=-1.0, vmax=1.0)
  32. # 设置子图的x轴标签为位置
  33. ax.set_xlabel('Position')
  34. # 设置子图的y轴标签为速度
  35. ax.set_ylabel('Velocity')
  36. # 设置子图的z轴标签为价值
  37. ax.set_zlabel('Value')
  38. # 设置子图的标题为山地车问题的成本函数
  39. ax.set_title("Mountain \"Cost To Go\" Function")
  40. # 在图形对象上添加一个颜色条,表示价值的范围
  41. fig.colorbar(surf)
  42. # 显示图形
  43. plt.show()
  44. # 定义一个函数,用于绘制价值函数的曲面图
  45. def plot_value_function(V, title="Value Function"):
  46. """
  47. Plots the value function as a surface plot.
  48. """
  49. # 找到价值函数中的最小和最大的玩家总和
  50. min_x = min(k[0] for k in V.keys())
  51. max_x = max(k[0] for k in V.keys())
  52. # 找到价值函数中的最小和最大的庄家显示牌
  53. min_y = min(k[1] for k in V.keys())
  54. max_y = max(k[1] for k in V.keys())
  55. # 生成一个等差数列,表示玩家总和的范围
  56. x_range = np.arange(min_x, max_x + 1)
  57. # 生成一个等差数列,表示庄家显示牌的范围
  58. y_range = np.arange(min_y, max_y + 1)
  59. # 生成一个网格,表示所有可能的状态组合
  60. X, Y = np.meshgrid(x_range, y_range)
  61. # 对每个状态,根据是否有可用的Ace,计算价值函数的值
  62. Z_noace = np.apply_along_axis(lambda _: V[(_[0], _[1], False)], 2, np.dstack([X, Y]))
  63. Z_ace = np.apply_along_axis(lambda _: V[(_[0], _[1], True)], 2, np.dstack([X, Y]))
  64. # 定义一个内部函数,用于绘制一个曲面图
  65. def plot_surface(X, Y, Z, title):
  66. # 创建一个图形对象,设置大小为20*10
  67. fig = plt.figure(figsize=(20, 10))
  68. # 在图形对象上添加一个子图,设置为三维投影
  69. ax = fig.add_subplot(111, projection='3d')
  70. # 在子图上绘制一个曲面,表示价值函数
  71. surf = ax.plot_surface(X, Y, Z, rstride=1, cstride=1,
  72. cmap=matplotlib.cm.coolwarm, vmin=-1.0, vmax=1.0)
  73. # 设置子图的x轴标签为玩家总和
  74. ax.set_xlabel('Player Sum')
  75. # 设置子图的y轴标签为庄家显示牌
  76. ax.set_ylabel('Dealer Showing')
  77. # 设置子图的z轴标签为价值
  78. ax.set_zlabel('Value')
  79. # 设置子图的标题
  80. ax.set_title(title)
  81. # 设置子图的视角
  82. ax.view_init(ax.elev, -120)
  83. # 在图形对象上添加一个颜色条,表示价值的范围
  84. fig.colorbar(surf)
  85. # 显示图形
  86. plt.show()
  87. # 调用内部函数,分别绘制没有可用Ace和有可用Ace的情况下的价值函数
  88. plot_surface(X, Y, Z_noace, "{} (No Usable Ace)".format(title))
  89. plot_surface(X, Y, Z_ace, "{} (Usable Ace)".format(title))
  90. # 定义一个函数,用于绘制每个回合的统计信息
  91. def plot_episode_stats(stats, smoothing_window=10, noshow=False):
  92. # 绘制每个回合的长度随时间的变化
  93. fig1 = plt.figure(figsize=(10,5))
  94. plt.plot(stats.episode_lengths)
  95. plt.xlabel("Episode")
  96. plt.ylabel("Episode Length")
  97. plt.title("Episode Length over Time")
  98. # 如果noshow为真,不显示图形,否则显示图形
  99. if noshow:
  100. plt.close(fig1)
  101. else:
  102. plt.show()#fig1
  103. # 绘制每个回合的奖励随时间的变化,使用平滑窗口进行平滑处理
  104. fig2 = plt.figure(figsize=(10,5))
  105. rewards_smoothed = pd.Series(stats.episode_rewards).rolling(smoothing_window, min_periods=smoothing_window).mean()
  106. plt.plot(rewards_smoothed)
  107. plt.xlabel("Episode")
  108. plt.ylabel("Episode Reward (Smoothed)")
  109. plt.title("Episode Reward over Time (Smoothed over window size {})".format(smoothing_window))
  110. # 如果noshow为真,不显示图形,否则显示图形
  111. if noshow:
  112. plt.close(fig2)
  113. else:
  114. plt.show()#fig2
  115. # 绘制每个回合的时间步数和回合数的关系
  116. fig3 = plt.figure(figsize=(10,5))
  117. plt.plot(np.cumsum(stats.episode_lengths), np.arange(len(stats.episode_lengths)))
  118. plt.xlabel("Time Steps")
  119. plt.ylabel("Episode")
  120. plt.title("Episode per time step")
  121. # 如果noshow为真,不显示图形,否则显示图形
  122. if noshow:
  123. plt.close(fig3)
  124. else:
  125. plt.show()#fig3
  126. # 返回三个图形对象
  127. return fig1, fig2, fig3

SARSA算法求解有风格子世界问题

有风格子世界环境的类 windy_gridworld,它继承自 discrete.DiscreteEnv 类,用于创建和管理一个强化学习的环境。它的主要功能是:

  • 定义了环境的基本属性,如状态的数量,动作的数量,状态转移的概率,初始状态的分布,风的强度,动作的空间,状态的空间等。

  • 定义了环境的基本方法,如限制坐标的范围,计算转移的概率,重置环境,渲染环境等。

  • 使用了 io 库,gym 库,numpy 库,sys 库和 discrete 模块来进行输入输出,环境管理,数值计算,系统操作和离散环境的管理等操作。

  1. # 导入io模块,这是一个内置的模块,提供了与输入输出流相关的功能
  2. import io
  3. # 导入gym库,这是一个用于强化学习的开源库,提供了多种环境和接口
  4. import gym
  5. # 导入numpy库,这是一个用于科学计算的开源库,提供了多维数组和矩阵运算等功能
  6. import numpy as np
  7. # 导入sys模块,这是一个内置的模块,提供了一些与Python解释器和系统相关的变量和函数
  8. import sys
  9. # 从当前目录下的discrete模块中导入DiscreteEnv类,这是一个用于实现离散动作空间的环境的基类
  10. from . import discrete
  11. # 定义四个常量,表示四个动作的编号
  12. UP = 0
  13. RIGHT = 1
  14. DOWN = 2
  15. LEFT = 3
  16. # 定义一个类,继承自DiscreteEnv类,用于实现有风格子世界问题的强化学习环境
  17. class WindyGridworldEnv(discrete.DiscreteEnv):
  18. # 定义一个元数据字典,表示该环境支持的渲染模式
  19. metadata = {'render.modes': ['human', 'ansi']}
  20. # 定义一个私有方法,用于限制坐标的范围,使其不超过网格的边界
  21. def _limit_coordinates(self, coord):
  22. # 将坐标的第一个分量限制在0到网格的行数减一之间
  23. coord[0] = min(coord[0], self.shape[0] - 1)
  24. coord[0] = max(coord[0], 0)
  25. # 将坐标的第二个分量限制在0到网格的列数减一之间
  26. coord[1] = min(coord[1], self.shape[1] - 1)
  27. coord[1] = max(coord[1], 0)
  28. # 返回限制后的坐标
  29. return coord
  30. # 定义一个私有方法,用于计算状态转移的概率,根据当前位置,动作的变化量,和风的强度
  31. def _calculate_transition_prob(self, current, delta, winds):
  32. # 计算新的位置,等于当前位置加上动作的变化量,再加上风的影响
  33. new_position = np.array(current) + np.array(delta) + np.array([-1, 0]) * winds[tuple(current)]
  34. # 限制新的位置的范围,转换为整数类型
  35. new_position = self._limit_coordinates(new_position).astype(int)
  36. # 计算新的状态,将新的位置转换为一维的索引
  37. new_state = np.ravel_multi_index(tuple(new_position), self.shape)
  38. # 判断是否达到目标位置,即(3, 7)
  39. is_done = tuple(new_position) == (3, 7)
  40. # 返回一个列表,包含一个元组,表示状态转移的概率,新的状态,即时奖励,和是否结束的标志
  41. return [(1.0, new_state, -1.0, is_done)]
  42. # 定义一个构造方法,用于初始化环境的属性
  43. def __init__(self):
  44. # 定义网格的形状,为7行10列
  45. self.shape = (7, 10)
  46. # 计算状态空间的大小,为网格的元素个数
  47. nS = np.prod(self.shape)
  48. # 定义动作空间的大小,为4个动作
  49. nA = 4
  50. # 定义风的强度,为一个与网格形状相同的数组,某些列有不同的风力
  51. winds = np.zeros(self.shape)
  52. winds[:,[3,4,5,8]] = 1
  53. winds[:,[6,7]] = 2
  54. # 计算状态转移的概率,用一个字典表示,键为状态,值为另一个字典,键为动作,值为一个列表,包含状态转移的元组
  55. P = {}
  56. # 对每个状态进行循环
  57. for s in range(nS):
  58. # 将状态转换为二维的位置
  59. position = np.unravel_index(s, self.shape)
  60. # 初始化状态对应的字典,键为动作,值为一个空列表
  61. P[s] = { a : [] for a in range(nA) }
  62. # 对每个动作进行循环,分别计算状态转移的概率,调用之前定义的私有方法
  63. P[s][UP] = self._calculate_transition_prob(position, [-1, 0], winds)
  64. P[s][RIGHT] = self._calculate_transition_prob(position, [0, 1], winds)
  65. P[s][DOWN] = self._calculate_transition_prob(position, [1, 0], winds)
  66. P[s][LEFT] = self._calculate_transition_prob(position, [0, -1], winds)
  67. # 定义初始状态分布,为一个与状态空间大小相同的数组,只有(3, 0)位置的概率为1,其他为0
  68. isd = np.zeros(nS)
  69. isd[np.ravel_multi_index((3,0), self.shape)] = 1.0
  70. # 调用父类的构造方法,传入状态空间大小,动作空间大小,状态转移概率,和初始状态分布
  71. super(WindyGridworldEnv, self).__init__(nS, nA, P, isd)
  72. # 定义一个方法,用于渲染环境,根据模式和关闭标志,调用另一个私有方法
  73. def render(self, mode='human', close=False):
  74. self._render(mode, close)
  75. # 定义一个私有方法,用于渲染环境,根据模式和关闭标志,输出或显示网格世界的图形界面
  76. def _render(self, mode='human', close=False):
  77. # 如果关闭标志为True,表示不需要渲染,直接返回
  78. if close:
  79. return
  80. # 根据模式,选择输出的文件对象,如果是ansi模式,使用io模块中的StringIO对象,如果是human模式,使用系统的标准输出
  81. outfile = io.StringIO() if mode == 'ansi' else sys.stdout
  82. # 对每个状态进行循环
  83. for s in range(self.nS):
  84. # 将状态转换为二维的位置
  85. position = np.unravel_index(s, self.shape)
  86. # print(self.s)
  87. # 根据位置,选择输出的符号,如果是当前状态,输出 x,如果是目标位置,输出 T,否则输出 o
  88. if self.s == s:
  89. output = " x "
  90. elif position == (3,7):
  91. output = " T "
  92. else:
  93. output = " o "
  94. # 如果位置在第一列,去掉输出符号的左边空格
  95. if position[1] == 0:
  96. output = output.lstrip()
  97. # 如果位置在最后一列,去掉输出符号的右边空格,并换行
  98. if position[1] == self.shape[1] - 1:
  99. output = output.rstrip()
  100. output += "\n"
  101. # 将输出符号写入文件对象
  102. outfile.write(output)
  103. # 在所有状态循环结束后,再换行
  104. outfile.write("\n")

测试程序 Cliff Environment Playground.py,用于在有风格子世界环境中进行一些动作,并打印出环境的状态和渲染结果。它的主要功能是:

  • 导入 gym 库,numpy 库,sys 库和 WindyGridworldEnv 类,用于创建和管理环境,进行数值计算,系统操作和有风格子世界的管理等操作。

  • 如果当前路径中没有 “../”,则将其添加到路径中,方便导入其他模块。

  • 创建一个有风格子世界的环境,赋值给 env。

  • 调用 env 的 reset 方法,重置环境,返回初始状态,并打印出来。

  • 调用 env 的 render 方法,渲染环境,显示出当前的位置。

  • 调用 env 的 step 方法,执行一个向右的动作,返回下一个状态,奖励,是否结束和附加信息,并打印出来。

  • 调用 env 的 render 方法,渲染环境,显示出当前的位置。

  • 重复上述两步,执行五次向右的动作和一次向下的动作,打印和渲染每一步的结果

  1. import gym
  2. import numpy as np
  3. import sys
  4. if "../" not in sys.path:
  5. sys.path.append("../")
  6. from lib.envs.windy_gridworld import WindyGridworldEnv
  7. # %%
  8. env = WindyGridworldEnv()
  9. print(env.reset())
  10. env.render()
  11. print(env.step(1))
  12. env.render()
  13. print(env.step(1))
  14. env.render()
  15. print(env.step(1))
  16. env.render()
  17. print(env.step(2))
  18. env.render()
  19. print(env.step(1))
  20. env.render()
  21. print(env.step(1))
  22. env.render()

9da5ebae868ca7744882ef2e91d3387e.png

实现SARSA算法 SARSA Solution.py,SARSA算法是一种基于时序差分学习的强化学习算法,可以找到最优的epsilon-贪婪策略。

代码使用了gym库,itertools库,matplotlib库,numpy库,pandas库和sys库来进行环境模拟,数据处理和图形绘制。代码中定义了两个函数,分别是:

make_epsilon_greedy_policy:这个函数用于根据给定的Q函数和epsilon参数,创建一个epsilon-贪婪策略。这个函数接受一个状态到动作价值的字典,一个随机选择动作的概率,和一个环境中的动作数作为参数。这个函数返回一个函数,这个函数接受一个观察作为参数,返回一个长度为动作数的numpy数组,表示每个动作的概率。

sarsa:这个函数用于实现SARSA算法,找到最优的epsilon-贪婪策略。这个函数接受一个OpenAI环境,一个回合数,一个折扣因子,一个学习率,和一个epsilon参数作为参数。这个函数返回一个元组(Q, stats)。Q是最优的动作价值函数,一个状态到动作价值的字典。stats是一个EpisodeStats对象,包含两个numpy数组,分别表示每个回合的长度和奖励。

代码的主要流程是:

        1. 创建一个默认的动作价值函数,一个统计信息对象,和一个epsilon-贪婪策略。

        2. 对于每个回合,重置环境,选择第一个动作,然后循环执行以下步骤:

                a. 执行一个动作,观察下一个状态,奖励,和是否结束。

                b. 选择下一个动作,根据当前的策略。

                c. 更新统计信息。

                d. 使用时序差分更新公式,更新动作价值函数。

                e. 如果结束,跳出循环。

                f. 更新当前的动作和状态。

        3. 返回动作价值函数和统计信息对象。

        4. 使用plotting模块,绘制统计信息的图形。

  1. # 导入gym库,这是一个用于强化学习的开源库,提供了多种环境和接口[^1^][1]
  2. import gym
  3. # 导入itertools库,这是一个用于创建迭代器的标准库,提供了多种迭代工具[^2^][2]
  4. import itertools
  5. # 导入matplotlib库,这是一个用于绘图的开源库,提供了多种图形和图表[^3^][3]
  6. import matplotlib
  7. # 导入numpy库,这是一个用于科学计算的开源库,提供了多维数组和矩阵运算等功能[^4^][4]
  8. import numpy as np
  9. # 导入pandas库,这是一个用于数据分析和处理的开源库,提供了DataFrame等数据结构[^5^][5]
  10. import pandas as pd
  11. # 导入sys模块,这是一个内置的模块,提供了一些与Python解释器和系统相关的变量和函数
  12. import sys
  13. # 检查当前的系统路径中是否包含上一级目录,如果不包含,则将其添加到系统路径中
  14. # 这样做的目的是为了能够导入上一级目录中的lib文件夹中的模块
  15. if "../" not in sys.path:
  16. sys.path.append("../")
  17. # 从lib文件夹中的envs子文件夹中导入WindyGridworldEnv类,这是一个用于实现有风网格世界问题的强化学习环境
  18. from collections import defaultdict
  19. from lib.envs.windy_gridworld import WindyGridworldEnv
  20. # 从lib文件夹中导入plotting模块,这是一个用于绘制统计数据的模块
  21. from lib import plotting
  22. # 设置matplotlib的样式为ggplot,这是一种美观的绘图风格
  23. matplotlib.style.use('ggplot')
  24. # %%
  25. # 创建一个WindyGridworldEnv的实例对象,命名为env,这是一个7x10的网格世界,有一些单元格有风向和风力,智能体需要从起点走到终点,受到风的影响
  26. env = WindyGridworldEnv()
  27. # %%
  28. # 定义一个函数,用于根据给定的Q函数和epsilon值,创建一个epsilon贪婪策略
  29. def make_epsilon_greedy_policy(Q, epsilon, nA):
  30. """
  31. 根据给定的Q函数和epsilon值,创建一个epsilon贪婪策略
  32. 参数:
  33. Q: 一个字典,映射从状态到动作值
  34. 每个值是一个长度为nA的numpy数组(见下文)
  35. epsilon: 选择一个随机动作的概率,介于0和1之间的浮点数
  36. nA: 环境中的动作数量
  37. 返回:
  38. 一个函数,接受一个观察值作为参数,返回
  39. 每个动作的概率,以长度为nA的numpy数组的形式
  40. """
  41. # 定义一个内部函数,用于根据观察值,返回每个动作的概率
  42. def policy_fn(observation):
  43. # 创建一个长度为nA的numpy数组,每个元素的值为epsilon/nA,表示选择一个随机动作的概率
  44. A = np.ones(nA, dtype=float) * epsilon / nA
  45. # 根据Q函数,找到当前状态下最优的动作
  46. best_action = np.argmax(Q[observation])
  47. # 将最优动作的概率增加1-epsilon,表示选择最优动作的概率
  48. A[best_action] += (1.0 - epsilon)
  49. # 返回动作概率数组
  50. return A
  51. # 返回内部函数
  52. return policy_fn
  53. # %%
  54. # 定义一个函数,用于实现SARSA算法,即基于策略的时序差分控制,寻找最优的epsilon贪婪策略
  55. def sarsa(env, num_episodes, discount_factor=1.0, alpha=0.5, epsilon=0.1):
  56. """
  57. SARSA算法: 基于策略的时序差分控制,寻找最优的epsilon贪婪策略
  58. 参数:
  59. env: OpenAI环境
  60. num_episodes: 运行的回合数
  61. discount_factor: Gamma折扣因子
  62. alpha: 时序差分学习率
  63. epsilon: 选择一个随机动作的概率,介于0和1之间的浮点数
  64. 返回:
  65. 一个元组 (Q, stats)
  66. Q是最优的动作值函数,一个字典,映射从状态到动作值
  67. stats是一个EpisodeStats对象,包含两个numpy数组,分别记录每个回合的长度和奖励
  68. """
  69. # 最终的动作值函数
  70. # 一个嵌套的字典,映射从状态到(动作到动作值)
  71. # 使用defaultdict,当访问不存在的键时,返回一个长度为nA的零数组
  72. Q = defaultdict(lambda: np.zeros(env.action_space.n))
  73. # 跟踪有用的统计数据
  74. # 使用plotting模块中的EpisodeStats类,创建一个对象,包含两个长度为 num_episodes 的零数组,分别记录每个回合的长度和奖励
  75. stats = plotting.EpisodeStats(
  76. episode_lengths=np.zeros(num_episodes),
  77. episode_rewards=np.zeros(num_episodes))
  78. # 我们正在遵循的策略
  79. # 使用前面定义的函数,根据Q函数和epsilon值,创建一个epsilon贪婪策略
  80. policy = make_epsilon_greedy_policy(Q, epsilon, env.action_space.n)
  81. # 对于每个回合
  82. for i_episode in range(num_episodes):
  83. # 打印出当前的回合数,方便调试
  84. if (i_episode + 1) % 100 == 0:
  85. print("\rEpisode {}/{}.".format(i_episode + 1, num_episodes), end="")
  86. sys.stdout.flush()
  87. # 重置环境,选择第一个动作
  88. state = env.reset()
  89. # 根据策略,得到当前状态下每个动作的概率
  90. action_probs = policy(state)
  91. # 根据动作概率,随机选择一个动作
  92. action = np.random.choice(np.arange(len(action_probs)), p=action_probs)
  93. # 在环境中进行一步
  94. # 使用itertools库中的count函数,创建一个无限的计数器,表示每个回合的时间步数
  95. for t in itertools.count():
  96. # 执行一个动作,观察下一个状态,奖励,是否结束,和其他信息
  97. next_state, reward, done, _ = env.step(action)
  98. # 根据当前的策略,选择下一个动作,这是一个概率性的选择,根据每个动作的概率分布
  99. next_action_probs = policy(next_state)
  100. next_action = np.random.choice(np.arange(len(next_action_probs)), p=next_action_probs)
  101. # 更新统计信息,累加每个回合的奖励,记录每个回合的长度
  102. stats.episode_rewards[i_episode] += reward
  103. stats.episode_lengths[i_episode] = t
  104. # 使用时序差分更新公式,更新动作价值函数
  105. # 计算目标值,即当前的奖励加上折扣后的下一个状态和动作的价值
  106. td_target = reward + discount_factor * Q[next_state][next_action]
  107. # 计算误差,即目标值减去当前的状态和动作的价值
  108. td_delta = td_target - Q[state][action]
  109. # 用学习率乘以误差,更新当前的状态和动作的价值
  110. Q[state][action] += alpha * td_delta
  111. # 如果回合结束,跳出循环
  112. if done:
  113. break
  114. # 更新当前的动作和状态,为下一个时间步做准备
  115. action = next_action
  116. state = next_state
  117. # 返回动作价值函数和统计数据
  118. return Q, stats
  119. # %%
  120. # 调用sarsa函数,传入环境和回合数等参数,得到动作价值函数和统计数据
  121. Q, stats = sarsa(env, 200)
  122. # %%
  123. # 调用plotting模块中的plot_episode_stats函数,传入统计数据,绘制回合长度和回合奖励的图形
  124. plotting.plot_episode_stats(stats)
  125. # print("\nQ:\r{}".format(Q)) #输出最终价值函数
  126. # defaultdict(<function sarsa.<locals>.<lambda> at 0x000001CA7D86FF70>, {30: array([-16.62121833, -16.68188683, -16.73355158, -17.17868707]), 20: array([-15.85469654, -15.82517745, -16.65515113, -15.62910245]), 10: array([-15.38586765, -15.58275272, -15.2877975 , -15.24034802]), 0: array([-15.09297672, -15.06110493, -15.13709802, -14.7251808 ]), 1: array([-14.56234244, -14.54038227, -14.91194528, -14.92618857]), 2: array([-14.17266391, -13.80525148, -13.93608606, -13.78078067]), 3: array([-14.28053776, -13.74421705, -13.56175795, -14.14192259]), 4: array([-13.27142219, -12.79400183, -13.54938834, -13.71948143]), 5: array([-12.48366893, -11.3930307 , -12.77222323, -12.9336916 ]), 6: array([-11.91175853, -9.91457551, -11.78255518, -11.69363854]), 7: array([-10.77910057, -9.81671771, -10.98045998, -10.65801588]), 8: array([-10.74969529, -8.0102353 , -9.77162591, -11.43791039]), 9: array([ -8.33103372, -8.39513518, -6.32893384, -10.33290655]), 19: array([-8.84390206, -8.1119523 , -6.72538289, -9.03481089]), 12: array([-14.20294124, -13.97901573, -13.96065147, -15.22720425]), 11: array([-15.13194653, -14.98909044, -15.48678909, -15.04447429]), 13: array([-13.37943249, -13.53181049, -13.7098475 , -13.81976352]), 29: array([-8.55710389, -6.87412944, -4.85772855, -7.17036152]), 18: array([-8.94868871, -8.0233618 , -9.06798147, -9.26537925]), 22: array([-14.52083337, -14.19290266, -14.38417474, -14.27854828]), 21: array([-15.4374521 , -15.00991466, -15.5801335 , -15.60899897]), 23: array([-13.95910955, -13.42154284, -13.83539163, -14.19399668]), 39: array([-6.4785114 , -5.94494094, -3.99613988, -6.79803347]), 28: array([-7.66699712, -6.61983091, -7.60264565, -8.3233084 ]), 32: array([-14.70999481, -14.81900147, -14.63750975, -16.3293473 ]), 31: array([-15.94781378, -16.20896192, -16.07386806, -15.91479389]), 33: array([-13.86220924, -13.99533353, -14.58005824, -14.60042878]), 40: array([-16.37138175, -15.93543669, -16.00626584, -16.10155303]), 41: array([-15.11021159, -15.0313713 , -15.12066262, -16.1427075 ]), 42: array([-14.39118224, -14.04513618, -14.11888151, -15.06676474]), 43: array([-13.47905898, -13.3280808 , -13.36766233, -14.36908711]), 14: array([-13.24084653, -12.82979917, -12.7967486 , -12.89074042]), 51: array([-15.16797579, -14.42686921, -14.48158134, -14.3608748 ]), 50: array([-16.05803884, -14.96313727, -14.8700738 , -15.28846151]), 52: array([-13.58167077, -13.26323861, -13.58650088, -14.25622422]), 53: array([-12.36202997, -12.46939028, -12.89155887, -13.90026794]), 24: array([-12.75612708, -12.31330449, -12.97263319, -13.78728591]), 61: array([-14.51254789, -13.84352851, -14.20420505, -14.03010117]), 60: array([-14.67217839, -14.48765693, -14.49793836, -14.88186542]), 62: array([-13.10041286, -13.10038901, -13.11464442, -13.2568325 ]), 34: array([-12.82238768, -12.67668025, -13.06624469, -12.75352672]), 49: array([-5.44358852, -3.80098726, -3.26364328, -4.10764147]), 38: array([-7.86199643, -6.35308864, -6.60455766, -6.3621277 ]), 17: array([ -9.21518149, -9.06675019, -9.02526121, -10.003373 ]), 63: array([-12.62337832, -12.26700563, -12.85346847, -12.74301769]), 15: array([-11.70362604, -11.39586923, -12.27834132, -12.50392354]), 59: array([-2.81008911, -2.54296875, -2.81005859, -2.52370968]), 69: array([-1.875 , -2.52734375, -2.3359375 , -1.87109375]), 48: array([-6.67165589, -5.06948669, -1.75 , -1. ]), 27: array([-7.71386745, -6.34017058, -7.76830481, -6.69818919]), 58: array([-3.79243281, -2.51201346, -2.29996305, -1.76249076]), 37: array([0., 0., 0., 0.]), 44: array([-11.66811483, -11.73730811, -12.26177971, -11.80549878]), 54: array([-12.15950782, -11.33421547, -12.16753238, -12.14326109]), 25: array([-12.10995948, -10.72434757, -11.13916866, -11.16080802]), 16: array([-10.27994511, -9.72139598, -10.71965764, -11.12713914]), 35: array([-11.3114562 , -10.77935897, -11.14367845, -10.83009369]), 68: array([-1.46875 , -2.16746892, -1.5 , -0.9375 ]), 45: array([-11.21385201, -10.38149598, -11.15339582, -11.649213 ]), 47: array([-2.82864534, -4.36609306, -0.9375 , -5.39884604]), 36: array([ -9.49267328, -9.54858587, -10.25678092, -10.16425445]), 26: array([-10.2265589 , -9.85410817, -9.97563059, -10.89997015]), 57: array([-2.66771439, -2.42248535, -0.5 , -5.24217275])})

输出结果:

fffeac1ef92a59c74142b960f57881f8.png

每个回合的长度随时间的变化

说明:共200回合,随着价值函数的更新,越往后的回合执行越少的动作就能抵达终点

de3a3ba05a335db849aa1c9032650c68.png

每个回合的奖励随时间的变化,使用平滑窗口进行平滑处理

说明:随着价值函数的更新越往后的回合得到的回报越高

beff9ff101b5b65770cb14cab032fb00.png

每个回合的时间步数累加和回合数的关系

说明:每个回合的时间步数表示在一个回合中,执行了多少次动作。回合数表示完成了多少个回合。一个回合的结束条件是到达目标状态或者超过最大的时间步数。这个图反映了学习的效果和效率,如果回合数随着时间步数的增加而快速增加,说明学习的效果好,能够更快地找到最优的策略和动作。如果回合数随着时间步数的增加而缓慢增加,说明学习的效率低,需要更多的时间和尝试才能找到最优的策略和动作。

Q-Learning 算法求解悬崖行走问题

悬崖行走环境的类 cliff_walking.py,它继承自 discrete.DiscreteEnv 类,用于创建和管理一个强化学习的环境。它的主要功能是:

  • 定义了环境的基本属性,如状态的数量,动作的数量,状态转移的概率,初始状态的分布,悬崖的位置,动作的空间,状态的空间等。

  • 定义了环境的基本方法,如限制坐标的范围,计算转移的概率,重置环境,执行一个动作,返回下一个状态,奖励,是否结束和附加信息,渲染环境等。

  • 使用了 io 库,numpy 库,sys 库和 discrete 模块来进行输入输出,数值计算,系统操作和离散环境的管理等操作。

  1. # 导入io模块,这是一个内置的模块,提供了与输入输出流相关的功能
  2. import io
  3. # 导入numpy库,这是一个用于科学计算的开源库,提供了多维数组和矩阵运算等功能
  4. import numpy as np
  5. # 导入sys模块,这是一个内置的模块,提供了一些与Python解释器和系统相关的变量和函数
  6. import sys
  7. # 从当前目录下的discrete模块中导入DiscreteEnv类,这是一个用于实现离散动作空间的环境的基类
  8. from . import discrete
  9. # 定义四个常量,表示四个动作的编号
  10. UP = 0
  11. RIGHT = 1
  12. DOWN = 2
  13. LEFT = 3
  14. # 定义一个类,继承自DiscreteEnv类,用于实现悬崖行走问题的强化学习环境
  15. class CliffWalkingEnv(discrete.DiscreteEnv):
  16. # 定义一个元数据字典,表示该环境支持的渲染模式
  17. metadata = {'render.modes': ['human', 'ansi']}
  18. # 定义一个私有方法,用于限制坐标的范围,使其不超过网格的边界
  19. def _limit_coordinates(self, coord):
  20. # 将坐标的第一个分量限制在0到网格的行数减一之间
  21. coord[0] = min(coord[0], self.shape[0] - 1)
  22. coord[0] = max(coord[0], 0)
  23. # 将坐标的第二个分量限制在0到网格的列数减一之间
  24. coord[1] = min(coord[1], self.shape[1] - 1)
  25. coord[1] = max(coord[1], 0)
  26. # 返回限制后的坐标
  27. return coord
  28. # 定义一个私有方法,用于计算状态转移的概率,根据当前位置和动作的变化量
  29. def _calculate_transition_prob(self, current, delta):
  30. # 计算新的位置,等于当前位置加上动作的变化量
  31. new_position = np.array(current) + np.array(delta)
  32. # 限制新的位置的范围,转换为整数类型
  33. new_position = self._limit_coordinates(new_position).astype(int)
  34. # 计算新的状态,将新的位置转换为一维的索引
  35. new_state = np.ravel_multi_index(tuple(new_position), self.shape)#给定一个多维数组的形状和一个多维的坐标,返回一个整数,表示该坐标在多维数组中对应的一维索引。例如,如果 self.shape 是 (4, 12),表示环境是一个 4 行 12 列的网格,那么 new_position 是 (0, 0) 对应的 new_state 是 0,表示网格的左上角,new_position 是 (3, 11) 对应的 new_state 是 47,表示网格的右下角
  36. # 判断是否落入悬崖,如果是,奖励为-100,否则为-1
  37. reward = -100.0 if self._cliff[tuple(new_position)] else -1.0
  38. # 判断是否达到目标位置或落入悬崖,如果是,回合结束
  39. is_done = self._cliff[tuple(new_position)] or (tuple(new_position) == (3,11))
  40. # 返回一个列表,包含一个元组,表示状态转移的概率,新的状态,即时奖励,和是否结束的标志
  41. return [(1.0, new_state, reward, is_done)]
  42. # 定义一个构造方法,用于初始化环境的属性
  43. def __init__(self):
  44. # 定义网格的形状,为4行12列
  45. self.shape = (4, 12)
  46. # 计算状态空间的大小,为网格的元素个数
  47. nS = np.prod(self.shape)
  48. # 定义动作空间的大小,为4个动作
  49. nA = 4
  50. # 定义悬崖的位置,为一个与网格形状相同的布尔数组,第四行的第二列到倒数第二列为True,表示悬崖
  51. self._cliff = np.zeros(self.shape, dtype=bool)
  52. self._cliff[3, 1:-1] = True #是悬崖
  53. # 计算状态转移的概率,用一个字典表示,键为状态,值为另一个字典,键为动作,值为一个列表,包含状态转移的元组
  54. P = {}
  55. # 对每个状态进行循环
  56. for s in range(nS):
  57. # 将状态转换为二维的位置
  58. position = np.unravel_index(s, self.shape) #给定一个多维数组的形状和一个一维的索引,返回一个元组,表示该索引在多维数组中对应的坐标。例如,如果 self.shape 是 (4, 12),表示环境是一个 4 行 12 列的网格,那么 s = 0 对应的 position 是 (0, 0),表示网格的左上角,s = 47 对应的 position 是 (3, 11),表示网格的右下角。这个函数可以方便地将状态的表示从一维转换为二维,便于进行坐标的运算和渲染。
  59. # 初始化状态对应的字典,键为动作,值为一个空列表
  60. P[s] = { a : [] for a in range(nA) }
  61. # 对每个动作进行循环,分别计算状态转移的概率,调用之前定义的私有方法
  62. P[s][UP] = self._calculate_transition_prob(position, [-1, 0]) #行-1
  63. P[s][RIGHT] = self._calculate_transition_prob(position, [0, 1])# 列+1
  64. P[s][DOWN] = self._calculate_transition_prob(position, [1, 0])#行+1
  65. P[s][LEFT] = self._calculate_transition_prob(position, [0, -1])#列-1
  66. # 定义初始状态分布,为一个与状态空间大小相同的数组,只有(3, 0)位置的概率为1,其他为0
  67. isd = np.zeros(nS)
  68. isd[np.ravel_multi_index((3,0), self.shape)] = 1.0
  69. # 调用父类的构造方法,传入状态空间大小,动作空间大小,状态转移概率,和初始状态分布
  70. super(CliffWalkingEnv, self).__init__(nS, nA, P, isd)
  71. # 定义一个方法,用于渲染环境,根据模式和关闭标志,调用另一个私有方法
  72. def render(self, mode='human', close=False):
  73. self._render(mode, close)
  74. # 定义一个私有方法,用于渲染环境,根据模式和关闭标志,输出或显示网格世界的图形界面
  75. def _render(self, mode='human', close=False):
  76. # 如果关闭标志为True,表示不需要渲染,直接返回
  77. if close:
  78. return
  79. # 根据模式,选择输出的文件对象,如果是ansi模式,使用io模块中的StringIO对象,如果是human模式,使用系统的标准输出
  80. outfile = io.StringIO() if mode == 'ansi' else sys.stdout
  81. # 对每个状态进行循环
  82. for s in range(self.nS):
  83. # 将状态转换为二维的位置
  84. position = np.unravel_index(s, self.shape)
  85. # print(self.s)
  86. # 根据位置,选择输出的符号,如果是当前状态,输出 x,如果是目标位置,输出 T,如果是悬崖位置,输出 C,否则输出 o
  87. if self.s == s:
  88. output = " x "
  89. elif position == (3,11):
  90. output = " T "
  91. elif self._cliff[position]:
  92. output = " C "
  93. else:
  94. output = " o "
  95. # 如果位置在第一列,去掉输出符号的左边空格
  96. if position[1] == 0:
  97. output = output.lstrip()
  98. # 如果位置在最后一列,去掉输出符号的右边空格,并换行
  99. if position[1] == self.shape[1] - 1:
  100. output = output.rstrip()
  101. output += "\n"
  102. # 将输出符号写入文件对象
  103. outfile.write(output)
  104. # 在所有状态循环结束后,再换行
  105. outfile.write("\n")

测试程序 Cliff Environment Playground.py ,用于在悬崖行走环境中进行一些动作,并打印出环境的状态和渲染结果。它的主要功能是:

  • 导入 gym 库,numpy 库,sys 库和 CliffWalkingEnv 类,用于创建和管理环境,进行数值计算,系统操作和悬崖行走的管理等操作。

  • 如果当前路径中没有 “../”,则将其添加到路径中,方便导入其他模块。

  • 创建一个悬崖行走的环境,赋值给 env。

  • 调用 env 的 reset 方法,重置环境,返回初始状态,并打印出来。

  • 调用 env 的 render 方法,渲染环境,显示出当前的位置。

  • 调用 env 的 step 方法,执行一个向上的动作,返回下一个状态,奖励,是否结束和附加信息,并打印出来。

  • 调用 env 的 render 方法,渲染环境,显示出当前的位置。

  • 重复上述两步,执行两次向右的动作和一次向下的动作,打印和渲染每一步的结果。

  1. import gym
  2. import numpy as np
  3. import sys
  4. if "../" not in sys.path:
  5. sys.path.append("../")
  6. from lib.envs.cliff_walking import CliffWalkingEnv
  7. env = CliffWalkingEnv()
  8. print(env.reset())
  9. env.render()
  10. print(env.step(0))
  11. env.render()
  12. print(env.step(1))
  13. env.render()
  14. print(env.step(1))
  15. env.render()
  16. print(env.step(2))
  17. env.render()

7f38f608cba9669c2360faa96a4dabcd.png

Q-learning 算法 求解悬崖行走问题的代码Q-Learning Solution.py

  • 该代码使用了一个名为悬崖行走的OpenAI环境,该环境是一个4x12的网格世界,其中代理人从左下角的起点开始,目标是到达右下角的终点,而不掉入悬崖中。

  • 该代码定义了一个函数make_epsilon_greedy_policy,该函数根据给定的Q函数和epsilon值,创建一个epsilon-贪婪策略。该函数返回一个函数,该函数接受一个观察值作为参数,并返回每个动作的概率,形式为一个长度为nA的numpy数组。

  • 该代码定义了一个函数q_learning,该函数实现了Q-Learning算法,即离策略的TD控制算法。该算法在遵循一个epsilon-贪婪策略的同时,寻找最优的贪婪策略。该函数接受以下参数:

    • env: OpenAI环境。

    • num_episodes: 运行的回合数。

    • discount_factor: Gamma折扣因子。

    • alpha: TD学习率。

    • epsilon: 选择随机动作的概率,介于0和1之间的浮点数。

  • 该函数返回一个元组(Q, stats)。Q是最优的动作值函数,是一个映射状态到动作值的字典。stats是一个EpisodeStats对象,包含两个numpy数组,分别记录了每个回合的长度和奖励。

  • 该函数的主要步骤如下:

    • 重置环境并选择第一个动作。

    • 对于每个时间步:

    • 执行一个动作,并观察下一个状态,奖励,是否结束,以及其他信息。

    • 更新统计信息。

    • TD更新:根据下一个状态的最优动作,计算TD目标。计算TD误差。更新Q中当前状态和动作的值。

    • 如果结束,跳出循环。

    • 更新当前状态为下一个状态。

    • 初始化一个空的Q字典,用于存储每个状态的动作值。

    • 初始化一个stats对象,用于记录有用的统计信息。

    • 根据Q和epsilon,创建一个epsilon-贪婪策略。

    • 对于每个回合:

  • 该代码使用了q_learning函数来求解悬崖行走问题,设置了500个回合,其他参数使用默认值。

  • 该代码使用了plotting.plot_episode_stats函数来绘制每个回合的长度和奖励的图表,以及每个状态的动作值的热力图。

  1. # 导入gym库,用于提供强化学习的环境
  2. import gym
  3. # 导入itertools库,用于提供一些迭代器的工具函数
  4. import itertools
  5. # 导入matplotlib库,用于提供图形绘制的功能
  6. import matplotlib
  7. # 导入numpy库,用于提供数组和数学运算的功能
  8. import numpy as np
  9. # 导入pandas库,用于提供数据分析和处理的功能
  10. import pandas as pd
  11. # 导入sys库,用于提供系统相关的功能
  12. import sys
  13. # 判断当前的路径中是否包含"../",如果不包含,就添加到路径中,用于导入lib模块
  14. if "../" not in sys.path:
  15. sys.path.append("../")
  16. # 导入defaultdict类,用于创建一个默认字典,即一个可以使用任意不存在的键访问的字典,如果访问一个不存在的键,它会自动创建一个默认值
  17. from collections import defaultdict
  18. # 导入CliffWalkingEnv类,用于创建一个悬崖行走的环境,这是一个网格世界,目标是从起点走到终点,中间有一些悬崖,如果掉入悬崖,就会返回起点并受到惩罚
  19. from lib.envs.cliff_walking import CliffWalkingEnv
  20. # 导入plotting模块,用于提供一些图形绘制的函数,例如plot_episode_stats函数
  21. from lib import plotting
  22. # 设置图形的风格为ggplot,一种流行的图形风格
  23. matplotlib.style.use('ggplot')
  24. # %%
  25. # 创建一个悬崖行走的环境对象,用于与智能体进行交互
  26. env = CliffWalkingEnv()
  27. # %%
  28. # 定义一个函数,叫做make_epsilon_greedy_policy,用于根据给定的Q函数和epsilon参数,创建一个epsilon-贪婪策略
  29. def make_epsilon_greedy_policy(Q, epsilon, nA):
  30. """
  31. Creates an epsilon-greedy policy based on a given Q-function and epsilon.
  32. Args:
  33. Q: A dictionary that maps from state -> action-values.
  34. Each value is a numpy array of length nA (see below)
  35. epsilon: The probability to select a random action. Float between 0 and 1.
  36. nA: Number of actions in the environment.
  37. Returns:
  38. A function that takes the observation as an argument and returns
  39. the probabilities for each action in the form of a numpy array of length nA.
  40. """
  41. # 定义一个函数,叫做policy_fn,用于根据一个观察,返回一个动作的概率分布
  42. def policy_fn(observation):
  43. # 创建一个全为epsilon/nA的数组,表示每个动作的初始概率,其中epsilon是随机选择动作的概率,nA是动作数
  44. A = np.ones(nA, dtype=float) * epsilon / nA
  45. # 找到Q函数中对应于当前状态的最大动作价值的动作,即最优动作
  46. best_action = np.argmax(Q[observation])
  47. # 给最优动作的概率增加1-epsilon,表示最优动作被选择的概率更高
  48. A[best_action] += (1.0 - epsilon)
  49. # 返回这个数组,表示当前状态下的策略
  50. return A
  51. # 返回这个函数,作为epsilon-贪婪策略
  52. return policy_fn
  53. # %%
  54. # 定义一个函数,叫做q_learning,用于实现Q学习算法,找到最优的贪婪策略,同时遵循一个epsilon-贪婪策略
  55. def q_learning(env, num_episodes, discount_factor=1.0, alpha=0.5, epsilon=0.1):
  56. """
  57. Q-Learning algorithm: Off-policy TD control. Finds the optimal greedy policy
  58. while following an epsilon-greedy policy
  59. Args:
  60. env: OpenAI environment.
  61. num_episodes: Number of episodes to run for.
  62. discount_factor: Gamma discount factor.
  63. alpha: TD learning rate.
  64. epsilon: Chance to sample a random action. Float between 0 and 1.
  65. Returns:
  66. A tuple (Q, episode_lengths).
  67. Q is the optimal action-value function, a dictionary mapping state -> action values.
  68. stats is an EpisodeStats object with two numpy arrays for episode_lengths and episode_rewards.
  69. """
  70. # 创建一个默认字典,用于存储最终的动作价值函数,即在不同的状态下,每个动作能够获得的期望回报
  71. # 这个字典的键是状态,它的值是一个长度为动作数的零数组,表示在该状态下,每个动作的价值都是零
  72. # 如果访问一个不存在的状态,它会自动创建一个对应的零数组作为值
  73. Q = defaultdict(lambda: np.zeros(env.action_space.n))
  74. # 创建一个EpisodeStats对象,用于记录每个回合的长度和奖励的numpy数组
  75. stats = plotting.EpisodeStats(
  76. episode_lengths=np.zeros(num_episodes),
  77. episode_rewards=np.zeros(num_episodes))
  78. # 创建一个epsilon-贪婪策略,用于在每个状态下,以一定的概率epsilon随机选择一个动作,否则选择当前最优的动作,即具有最大的动作价值的动作
  79. policy = make_epsilon_greedy_policy(Q, epsilon, env.action_space.n)
  80. # 对于每个回合,重置环境,选择第一个动作,然后循环执行以下步骤:
  81. for i_episode in range(num_episodes):
  82. # 打印出当前的回合数,用于调试
  83. if (i_episode + 1) % 100 == 0:
  84. print("\rEpisode {}/{}.".format(i_episode + 1, num_episodes), end="")
  85. sys.stdout.flush()
  86. # 重置环境,返回初始状态
  87. state = env.reset()
  88. # 在环境中执行一个步骤
  89. # total_reward = 0.0
  90. for t in itertools.count():
  91. # 选择一个动作,根据当前的策略,这是一个概率性的选择,根据每个动作的概率分布
  92. action_probs = policy(state)
  93. action = np.random.choice(np.arange(len(action_probs)), p=action_probs)
  94. # 执行一个动作,观察下一个状态,奖励,是否结束,和其他信息
  95. next_state, reward, done, _ = env.step(action)
  96. # 更新统计信息,累加每个回合的奖励,记录每个回合的长度
  97. stats.episode_rewards[i_episode] += reward
  98. stats.episode_lengths[i_episode] = t
  99. # 使用时序差分更新公式,更新动作价值函数
  100. # 找到下一个状态中,具有最大动作价值的动作,即最优动作
  101. best_next_action = np.argmax(Q[next_state])
  102. # 计算目标值,即当前的奖励加上折扣后的下一个状态和最优动作的价值
  103. td_target = reward + discount_factor * Q[next_state][best_next_action]
  104. # 计算误差,即目标值减去当前的状态和动作的价值
  105. td_delta = td_target - Q[state][action]
  106. # 用学习率乘以误差,更新当前的状态和动作的价值
  107. Q[state][action] += alpha * td_delta
  108. # 如果回合结束,跳出循环
  109. if done:
  110. break
  111. # 更新当前的状态,为下一个步骤做准备
  112. state = next_state
  113. # 返回最终的动作价值函数和统计信息
  114. return Q, stats
  115. # 调用q_learning函数,传入环境对象,回合数,折扣因子,学习率,和epsilon参数,返回最终的动作价值函数和统计信息
  116. Q, stats = q_learning(env, 500)
  117. # %%
  118. # 调用plotting模块中的plot_episode_stats函数,传入统计信息,绘制每个回合的长度,每个回合的奖励,和每个回合的时间步数与回合数的关系的图形
  119. plotting.plot_episode_stats(stats)

输出结果:

9da64358d98779aaa2c0ee1b95220ada.png

每个回合的长度随时间的变化

6871be9a3eca629605fc78a5a7a406fc.png

每个回合的奖励随时间的变化,使用平滑窗口进行平滑处理

986d8f87da77ebe7ddecc604cc9426f6.png

每个回合的时间步数和回合数的关系

最终价值函数

最终价值函数Q:在学习过程中,通过不断地更新和优化,最终收敛到一个稳定的价值函数,即最接近真实的价值函数的价值函数。训练最终的价值函数有以下的用途:

  • 训练最终的价值函数可以反映出最优的策略,即在每个状态下,选择哪个动作能够获得最大的价值。我们可以根据训练最终的价值函数,制定出最优的决策规则,从而在环境中表现出最佳的行为。

  • 训练最终的价值函数可以评估出不同的状态的重要性,即哪些状态能够带来更高的回报,哪些状态应该避免。我们可以根据训练最终的价值函数,分析出环境中的特征和规律,从而提高我们对环境的理解和掌握。

  • 训练最终的价值函数可以作为一种性能指标,即我们可以通过比较训练最终的价值函数和真实的价值函数,或者不同的训练方法和参数下的价值函数,来评估我们的学习效果和效率,从而优化我们的学习过程和方法。

The End

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/798082
推荐阅读
相关标签
  

闽ICP备14008679号