赞
踩
蒙特卡洛算法需要使用完整的片段进行计算,这在有些问题中是不现实的,尤其是对于没有终止状态的问题。时序差分算法对此进行了改进
蒙特卡洛控制和时序差分学习有什么区别?
四、时序差分算法(Temporal Difference Learning, TD 学习)
4.1 时序差分(0)
4.2 Sarsa算法
4.3 Q学习(Q-learning)
4.4 Sarsa和Q-learning有什么区别?
4.5 示例代码
公共类:discrete.py plotting.py
离散环境的类 discrete.py,它继承自 gym 库的 Env 类,用于创建和管理强化学习的环境。它的主要功能是:
定义了环境的基本属性,如状态的数量,动作的数量,状态转移的概率,初始状态的分布,动作的空间,状态的空间等。
定义了环境的基本方法,如设置随机数种子,重置环境,执行一个动作,返回下一个状态,奖励,是否结束和附加信息等。
使用了 numpy 库,gym 库和 categorical_sample 函数来进行数值计算,环境管理和概率采样等操作。
- # 导入 numpy 库,用于进行数值计算
- import numpy as np
-
-
- # 导入 gym 库,用于创建和管理强化学习的环境
- from gym import Env, spaces
- # 导入 gym 库的 seeding 模块,用于设置随机数种子
- from gym.utils import seeding
- # 导入 gym 库的 toy_text 模块的 categorical_sample 函数,用于从一个概率分布中采样一个类别
- from gym.envs.toy_text.utils import categorical_sample
-
-
- # 定义一个离散环境的类,继承自 gym 库的 Env 类
- class DiscreteEnv(Env):
-
-
- """
- Has the following members
- - nS: number of states # 状态的数量
- - nA: number of actions # 动作的数量
- - P: transitions (*) # 状态转移的概率
- - isd: initial state distribution (**) # 初始状态的分布
- (*) dictionary of lists, where
- P[s][a] == [(probability, nextstate, reward, done), ...] # P[s][a] 是一个列表,表示在状态 s 下采取动作 a 后,可能的下一个状态,奖励和是否结束的概率
- (**) list or array of length nS # isd 是一个长度为 nS 的列表或数组,表示每个状态作为初始状态的概率
- """
-
-
- # 定义初始化方法,接受四个参数:状态的数量,动作的数量,状态转移的概率,初始状态的分布
- def __init__(self, nS, nA, P, isd):
- self.P = P # 将状态转移的概率赋值给 self.P
- self.isd = isd # 将初始状态的分布赋值给 self.isd
- self.lastaction = None # for rendering # 定义一个属性,用于记录上一次的动作,用于渲染
- self.nS = nS # 将状态的数量赋值给 self.nS
- self.nA = nA # 将动作的数量赋值给 self.nA
-
-
- # 定义一个属性,表示动作的空间,是一个离散的空间,取值范围是 [0, nA-1]
- self.action_space = spaces.Discrete(self.nA)
- # 定义一个属性,表示状态的空间,是一个离散的空间,取值范围是 [0, nS-1]
- self.observation_space = spaces.Discrete(self.nS)
-
-
- self.seed() # 调用 seed 方法,设置随机数种子
- # 从初始状态的分布中采样一个状态,赋值给 self.s
- self.s = categorical_sample(self.isd, self.np_random)
-
-
- # 定义一个方法,用于设置随机数种子,接受一个参数:种子
- def seed(self, seed=None):
- # 调用 seeding 模块的 np_random 函数,根据种子生成一个随机数生成器,赋值给 self.np_random,并返回种子
- self.np_random, seed = seeding.np_random(seed)
- return [seed]
-
-
- # 定义一个方法,用于重置环境,返回初始状态
- def reset(self):
- # 从初始状态的分布中采样一个状态,赋值给 self.s
- self.s = categorical_sample(self.isd, self.np_random)
- self.lastaction = None # 将上一次的动作设为 None
- return int(self.s) # 返回初始状态,转换为整数类型
-
-
- # 定义一个方法,用于执行一个动作,返回下一个状态,奖励,是否结束和附加信息
- def step(self, a):
- # 根据当前状态和动作,从状态转移的概率中获取可能的转移列表,赋值给 transitions
- transitions = self.P[self.s][a]
- # 从转移列表中,根据转移的概率,采样一个转移的索引,赋值给 i
- i = categorical_sample([t[0] for t in transitions], self.np_random)
- # 根据转移的索引,获取转移的概率,下一个状态,奖励和是否结束,赋值给 p, s, r, d
- p, s, r, d = transitions[i]
- self.s = s # 将下一个状态赋值给 self.s
- self.lastaction = a # 将当前动作赋值给 self.lastaction
- # 返回下一个状态,奖励,是否结束和附加信息,其中附加信息是一个字典,包含转移的概率,下一个状态转换为整数类型
- return (int(s), r, d, {"prob": p})
用于绘制一些问题中的价值函数的图形的函数 plotting.py 。价值函数表示在不同的状态下,采取最优策略能够获得的期望回报。这些代码使用了matplotlib库,numpy库,pandas库和namedtuple来进行数据处理和图形绘制。代码中定义了三个函数,分别是:
plot_cost_to_go_mountain_car
:这个函数用于绘制山地车问题的价值函数,山地车问题是一个连续状态空间的强化学习问题,目标是让一辆车在两座山之间来回移动,最终到达右边的山顶。这个函数接受一个环境对象,一个估计器对象和一个网格数作为参数,然后生成一个三维的曲面图,显示在不同的位置和速度下,采取最优动作的成本(负的价值)。
plot_value_function
:这个函数用于绘制二十一点游戏的价值函数,二十一点游戏是一个离散状态空间的强化学习问题,目标是让玩家的牌的总和尽可能接近21,但不超过21,同时要比庄家的牌的总和大。这个函数接受一个价值函数字典和一个标题作为参数,然后分别绘制两个三维的曲面图,显示在不同的玩家总和和庄家显示牌下,有可用的Ace和没有可用的Ace的情况下的价值。
plot_episode_stats
:这个函数用于绘制每个回合的统计信息,包括回合的长度,回合的奖励,回合的时间步数和回合的编号。这个函数接受一个命名元组,一个平滑窗口和一个是否显示图形的标志作为参数,然后分别绘制三个二维的折线图,显示回合的长度,回合的奖励和回合的时间步数随回合的编号的变化。这个函数返回三个图形对象。
- # 导入matplotlib库,用于绘制图形
- import matplotlib
- # 导入numpy库,用于进行数值计算
- import numpy as np
- # 导入pandas库,用于进行数据分析
- import pandas as pd
- # 导入namedtuple,用于创建命名元组
- from collections import namedtuple
- # 导入pyplot模块,用于绘制二维图形
- from matplotlib import pyplot as plt
- # 导入Axes3D模块,用于绘制三维图形
- from mpl_toolkits.mplot3d import Axes3D
-
-
- # 创建一个命名元组,用于存储每个回合的长度和奖励
- EpisodeStats = namedtuple("Stats",["episode_lengths", "episode_rewards"])
-
-
- # 定义一个函数,用于绘制山地车问题的价值函数
- def plot_cost_to_go_mountain_car(env, estimator, num_tiles=20):
- # 生成一个等差数列,表示状态空间中的位置范围
- x = np.linspace(env.observation_space.low[0], env.observation_space.high[0], num=num_tiles)
- # 生成一个等差数列,表示状态空间中的速度范围
- y = np.linspace(env.observation_space.low[1], env.observation_space.high[1], num=num_tiles)
- # 生成一个网格,表示状态空间中的所有可能组合
- X, Y = np.meshgrid(x, y)
- # 对每个状态,计算估计器预测的最大动作价值,并取负数,表示成本
- Z = np.apply_along_axis(lambda _: -np.max(estimator.predict(_)), 2, np.dstack([X, Y]))
-
-
- # 创建一个图形对象,设置大小为10*5
- fig = plt.figure(figsize=(10, 5))
- # 在图形对象上添加一个子图,设置为三维投影
- ax = fig.add_subplot(111, projection='3d')
- # 在子图上绘制一个曲面,表示价值函数
- surf = ax.plot_surface(X, Y, Z, rstride=1, cstride=1,
- cmap=matplotlib.cm.coolwarm, vmin=-1.0, vmax=1.0)
- # 设置子图的x轴标签为位置
- ax.set_xlabel('Position')
- # 设置子图的y轴标签为速度
- ax.set_ylabel('Velocity')
- # 设置子图的z轴标签为价值
- ax.set_zlabel('Value')
- # 设置子图的标题为山地车问题的成本函数
- ax.set_title("Mountain \"Cost To Go\" Function")
- # 在图形对象上添加一个颜色条,表示价值的范围
- fig.colorbar(surf)
- # 显示图形
- plt.show()
-
-
-
-
- # 定义一个函数,用于绘制价值函数的曲面图
- def plot_value_function(V, title="Value Function"):
- """
- Plots the value function as a surface plot.
- """
- # 找到价值函数中的最小和最大的玩家总和
- min_x = min(k[0] for k in V.keys())
- max_x = max(k[0] for k in V.keys())
- # 找到价值函数中的最小和最大的庄家显示牌
- min_y = min(k[1] for k in V.keys())
- max_y = max(k[1] for k in V.keys())
-
-
- # 生成一个等差数列,表示玩家总和的范围
- x_range = np.arange(min_x, max_x + 1)
- # 生成一个等差数列,表示庄家显示牌的范围
- y_range = np.arange(min_y, max_y + 1)
- # 生成一个网格,表示所有可能的状态组合
- X, Y = np.meshgrid(x_range, y_range)
-
-
- # 对每个状态,根据是否有可用的Ace,计算价值函数的值
- Z_noace = np.apply_along_axis(lambda _: V[(_[0], _[1], False)], 2, np.dstack([X, Y]))
- Z_ace = np.apply_along_axis(lambda _: V[(_[0], _[1], True)], 2, np.dstack([X, Y]))
-
-
- # 定义一个内部函数,用于绘制一个曲面图
- def plot_surface(X, Y, Z, title):
- # 创建一个图形对象,设置大小为20*10
- fig = plt.figure(figsize=(20, 10))
- # 在图形对象上添加一个子图,设置为三维投影
- ax = fig.add_subplot(111, projection='3d')
- # 在子图上绘制一个曲面,表示价值函数
- surf = ax.plot_surface(X, Y, Z, rstride=1, cstride=1,
- cmap=matplotlib.cm.coolwarm, vmin=-1.0, vmax=1.0)
- # 设置子图的x轴标签为玩家总和
- ax.set_xlabel('Player Sum')
- # 设置子图的y轴标签为庄家显示牌
- ax.set_ylabel('Dealer Showing')
- # 设置子图的z轴标签为价值
- ax.set_zlabel('Value')
- # 设置子图的标题
- ax.set_title(title)
- # 设置子图的视角
- ax.view_init(ax.elev, -120)
- # 在图形对象上添加一个颜色条,表示价值的范围
- fig.colorbar(surf)
- # 显示图形
- plt.show()
-
-
- # 调用内部函数,分别绘制没有可用Ace和有可用Ace的情况下的价值函数
- plot_surface(X, Y, Z_noace, "{} (No Usable Ace)".format(title))
- plot_surface(X, Y, Z_ace, "{} (Usable Ace)".format(title))
-
-
- # 定义一个函数,用于绘制每个回合的统计信息
- def plot_episode_stats(stats, smoothing_window=10, noshow=False):
- # 绘制每个回合的长度随时间的变化
- fig1 = plt.figure(figsize=(10,5))
- plt.plot(stats.episode_lengths)
- plt.xlabel("Episode")
- plt.ylabel("Episode Length")
- plt.title("Episode Length over Time")
- # 如果noshow为真,不显示图形,否则显示图形
- if noshow:
- plt.close(fig1)
- else:
- plt.show()#fig1
-
-
- # 绘制每个回合的奖励随时间的变化,使用平滑窗口进行平滑处理
- fig2 = plt.figure(figsize=(10,5))
- rewards_smoothed = pd.Series(stats.episode_rewards).rolling(smoothing_window, min_periods=smoothing_window).mean()
- plt.plot(rewards_smoothed)
- plt.xlabel("Episode")
- plt.ylabel("Episode Reward (Smoothed)")
- plt.title("Episode Reward over Time (Smoothed over window size {})".format(smoothing_window))
- # 如果noshow为真,不显示图形,否则显示图形
- if noshow:
- plt.close(fig2)
- else:
- plt.show()#fig2
-
-
- # 绘制每个回合的时间步数和回合数的关系
- fig3 = plt.figure(figsize=(10,5))
- plt.plot(np.cumsum(stats.episode_lengths), np.arange(len(stats.episode_lengths)))
- plt.xlabel("Time Steps")
- plt.ylabel("Episode")
- plt.title("Episode per time step")
- # 如果noshow为真,不显示图形,否则显示图形
- if noshow:
- plt.close(fig3)
- else:
- plt.show()#fig3
-
-
- # 返回三个图形对象
- return fig1, fig2, fig3
SARSA算法求解有风格子世界问题
有风格子世界环境的类 windy_gridworld,它继承自 discrete.DiscreteEnv 类,用于创建和管理一个强化学习的环境。它的主要功能是:
定义了环境的基本属性,如状态的数量,动作的数量,状态转移的概率,初始状态的分布,风的强度,动作的空间,状态的空间等。
定义了环境的基本方法,如限制坐标的范围,计算转移的概率,重置环境,渲染环境等。
使用了 io 库,gym 库,numpy 库,sys 库和 discrete 模块来进行输入输出,环境管理,数值计算,系统操作和离散环境的管理等操作。
- # 导入io模块,这是一个内置的模块,提供了与输入输出流相关的功能
- import io
- # 导入gym库,这是一个用于强化学习的开源库,提供了多种环境和接口
- import gym
- # 导入numpy库,这是一个用于科学计算的开源库,提供了多维数组和矩阵运算等功能
- import numpy as np
- # 导入sys模块,这是一个内置的模块,提供了一些与Python解释器和系统相关的变量和函数
- import sys
-
-
- # 从当前目录下的discrete模块中导入DiscreteEnv类,这是一个用于实现离散动作空间的环境的基类
- from . import discrete
-
-
- # 定义四个常量,表示四个动作的编号
- UP = 0
- RIGHT = 1
- DOWN = 2
- LEFT = 3
-
-
- # 定义一个类,继承自DiscreteEnv类,用于实现有风格子世界问题的强化学习环境
- class WindyGridworldEnv(discrete.DiscreteEnv):
-
-
- # 定义一个元数据字典,表示该环境支持的渲染模式
- metadata = {'render.modes': ['human', 'ansi']}
-
-
- # 定义一个私有方法,用于限制坐标的范围,使其不超过网格的边界
- def _limit_coordinates(self, coord):
- # 将坐标的第一个分量限制在0到网格的行数减一之间
- coord[0] = min(coord[0], self.shape[0] - 1)
- coord[0] = max(coord[0], 0)
- # 将坐标的第二个分量限制在0到网格的列数减一之间
- coord[1] = min(coord[1], self.shape[1] - 1)
- coord[1] = max(coord[1], 0)
- # 返回限制后的坐标
- return coord
-
-
- # 定义一个私有方法,用于计算状态转移的概率,根据当前位置,动作的变化量,和风的强度
- def _calculate_transition_prob(self, current, delta, winds):
- # 计算新的位置,等于当前位置加上动作的变化量,再加上风的影响
- new_position = np.array(current) + np.array(delta) + np.array([-1, 0]) * winds[tuple(current)]
- # 限制新的位置的范围,转换为整数类型
- new_position = self._limit_coordinates(new_position).astype(int)
- # 计算新的状态,将新的位置转换为一维的索引
- new_state = np.ravel_multi_index(tuple(new_position), self.shape)
- # 判断是否达到目标位置,即(3, 7)
- is_done = tuple(new_position) == (3, 7)
- # 返回一个列表,包含一个元组,表示状态转移的概率,新的状态,即时奖励,和是否结束的标志
- return [(1.0, new_state, -1.0, is_done)]
-
-
- # 定义一个构造方法,用于初始化环境的属性
- def __init__(self):
- # 定义网格的形状,为7行10列
- self.shape = (7, 10)
-
-
- # 计算状态空间的大小,为网格的元素个数
- nS = np.prod(self.shape)
- # 定义动作空间的大小,为4个动作
- nA = 4
-
-
- # 定义风的强度,为一个与网格形状相同的数组,某些列有不同的风力
- winds = np.zeros(self.shape)
- winds[:,[3,4,5,8]] = 1
- winds[:,[6,7]] = 2
-
-
- # 计算状态转移的概率,用一个字典表示,键为状态,值为另一个字典,键为动作,值为一个列表,包含状态转移的元组
- P = {}
- # 对每个状态进行循环
- for s in range(nS):
- # 将状态转换为二维的位置
- position = np.unravel_index(s, self.shape)
- # 初始化状态对应的字典,键为动作,值为一个空列表
- P[s] = { a : [] for a in range(nA) }
- # 对每个动作进行循环,分别计算状态转移的概率,调用之前定义的私有方法
- P[s][UP] = self._calculate_transition_prob(position, [-1, 0], winds)
- P[s][RIGHT] = self._calculate_transition_prob(position, [0, 1], winds)
- P[s][DOWN] = self._calculate_transition_prob(position, [1, 0], winds)
- P[s][LEFT] = self._calculate_transition_prob(position, [0, -1], winds)
-
-
- # 定义初始状态分布,为一个与状态空间大小相同的数组,只有(3, 0)位置的概率为1,其他为0
- isd = np.zeros(nS)
- isd[np.ravel_multi_index((3,0), self.shape)] = 1.0
-
-
- # 调用父类的构造方法,传入状态空间大小,动作空间大小,状态转移概率,和初始状态分布
- super(WindyGridworldEnv, self).__init__(nS, nA, P, isd)
-
-
- # 定义一个方法,用于渲染环境,根据模式和关闭标志,调用另一个私有方法
- def render(self, mode='human', close=False):
- self._render(mode, close)
-
-
- # 定义一个私有方法,用于渲染环境,根据模式和关闭标志,输出或显示网格世界的图形界面
- def _render(self, mode='human', close=False):
- # 如果关闭标志为True,表示不需要渲染,直接返回
- if close:
- return
-
-
- # 根据模式,选择输出的文件对象,如果是ansi模式,使用io模块中的StringIO对象,如果是human模式,使用系统的标准输出
- outfile = io.StringIO() if mode == 'ansi' else sys.stdout
-
-
- # 对每个状态进行循环
- for s in range(self.nS):
- # 将状态转换为二维的位置
- position = np.unravel_index(s, self.shape)
- # print(self.s)
- # 根据位置,选择输出的符号,如果是当前状态,输出 x,如果是目标位置,输出 T,否则输出 o
- if self.s == s:
- output = " x "
- elif position == (3,7):
- output = " T "
- else:
- output = " o "
-
-
- # 如果位置在第一列,去掉输出符号的左边空格
- if position[1] == 0:
- output = output.lstrip()
- # 如果位置在最后一列,去掉输出符号的右边空格,并换行
- if position[1] == self.shape[1] - 1:
- output = output.rstrip()
- output += "\n"
-
-
- # 将输出符号写入文件对象
- outfile.write(output)
- # 在所有状态循环结束后,再换行
- outfile.write("\n")
测试程序 Cliff Environment Playground.py,用于在有风格子世界环境中进行一些动作,并打印出环境的状态和渲染结果。它的主要功能是:
导入 gym 库,numpy 库,sys 库和 WindyGridworldEnv 类,用于创建和管理环境,进行数值计算,系统操作和有风格子世界的管理等操作。
如果当前路径中没有 “../”,则将其添加到路径中,方便导入其他模块。
创建一个有风格子世界的环境,赋值给 env。
调用 env 的 reset 方法,重置环境,返回初始状态,并打印出来。
调用 env 的 render 方法,渲染环境,显示出当前的位置。
调用 env 的 step 方法,执行一个向右的动作,返回下一个状态,奖励,是否结束和附加信息,并打印出来。
调用 env 的 render 方法,渲染环境,显示出当前的位置。
重复上述两步,执行五次向右的动作和一次向下的动作,打印和渲染每一步的结果
- import gym
- import numpy as np
- import sys
-
-
- if "../" not in sys.path:
- sys.path.append("../")
-
-
- from lib.envs.windy_gridworld import WindyGridworldEnv
-
-
- # %%
- env = WindyGridworldEnv()
-
-
- print(env.reset())
- env.render()
-
-
- print(env.step(1))
- env.render()
-
-
- print(env.step(1))
- env.render()
-
-
- print(env.step(1))
- env.render()
-
-
- print(env.step(2))
- env.render()
-
-
- print(env.step(1))
- env.render()
-
-
- print(env.step(1))
- env.render()
实现SARSA算法 SARSA Solution.py,SARSA算法是一种基于时序差分学习的强化学习算法,可以找到最优的epsilon-贪婪策略。
代码使用了gym库,itertools库,matplotlib库,numpy库,pandas库和sys库来进行环境模拟,数据处理和图形绘制。代码中定义了两个函数,分别是:
make_epsilon_greedy_policy:这个函数用于根据给定的Q函数和epsilon参数,创建一个epsilon-贪婪策略。这个函数接受一个状态到动作价值的字典,一个随机选择动作的概率,和一个环境中的动作数作为参数。这个函数返回一个函数,这个函数接受一个观察作为参数,返回一个长度为动作数的numpy数组,表示每个动作的概率。
sarsa:这个函数用于实现SARSA算法,找到最优的epsilon-贪婪策略。这个函数接受一个OpenAI环境,一个回合数,一个折扣因子,一个学习率,和一个epsilon参数作为参数。这个函数返回一个元组(Q, stats)。Q是最优的动作价值函数,一个状态到动作价值的字典。stats是一个EpisodeStats对象,包含两个numpy数组,分别表示每个回合的长度和奖励。
代码的主要流程是:
1. 创建一个默认的动作价值函数,一个统计信息对象,和一个epsilon-贪婪策略。
2. 对于每个回合,重置环境,选择第一个动作,然后循环执行以下步骤:
a. 执行一个动作,观察下一个状态,奖励,和是否结束。
b. 选择下一个动作,根据当前的策略。
c. 更新统计信息。
d. 使用时序差分更新公式,更新动作价值函数。
e. 如果结束,跳出循环。
f. 更新当前的动作和状态。
3. 返回动作价值函数和统计信息对象。
4. 使用plotting模块,绘制统计信息的图形。
- # 导入gym库,这是一个用于强化学习的开源库,提供了多种环境和接口[^1^][1]
- import gym
- # 导入itertools库,这是一个用于创建迭代器的标准库,提供了多种迭代工具[^2^][2]
- import itertools
- # 导入matplotlib库,这是一个用于绘图的开源库,提供了多种图形和图表[^3^][3]
- import matplotlib
- # 导入numpy库,这是一个用于科学计算的开源库,提供了多维数组和矩阵运算等功能[^4^][4]
- import numpy as np
- # 导入pandas库,这是一个用于数据分析和处理的开源库,提供了DataFrame等数据结构[^5^][5]
- import pandas as pd
- # 导入sys模块,这是一个内置的模块,提供了一些与Python解释器和系统相关的变量和函数
- import sys
-
-
- # 检查当前的系统路径中是否包含上一级目录,如果不包含,则将其添加到系统路径中
- # 这样做的目的是为了能够导入上一级目录中的lib文件夹中的模块
- if "../" not in sys.path:
- sys.path.append("../")
-
-
- # 从lib文件夹中的envs子文件夹中导入WindyGridworldEnv类,这是一个用于实现有风网格世界问题的强化学习环境
- from collections import defaultdict
- from lib.envs.windy_gridworld import WindyGridworldEnv
- # 从lib文件夹中导入plotting模块,这是一个用于绘制统计数据的模块
- from lib import plotting
-
-
- # 设置matplotlib的样式为ggplot,这是一种美观的绘图风格
- matplotlib.style.use('ggplot')
-
-
- # %%
- # 创建一个WindyGridworldEnv的实例对象,命名为env,这是一个7x10的网格世界,有一些单元格有风向和风力,智能体需要从起点走到终点,受到风的影响
- env = WindyGridworldEnv()
-
-
- # %%
- # 定义一个函数,用于根据给定的Q函数和epsilon值,创建一个epsilon贪婪策略
- def make_epsilon_greedy_policy(Q, epsilon, nA):
- """
- 根据给定的Q函数和epsilon值,创建一个epsilon贪婪策略
- 参数:
- Q: 一个字典,映射从状态到动作值
- 每个值是一个长度为nA的numpy数组(见下文)
- epsilon: 选择一个随机动作的概率,介于0和1之间的浮点数
- nA: 环境中的动作数量
- 返回:
- 一个函数,接受一个观察值作为参数,返回
- 每个动作的概率,以长度为nA的numpy数组的形式
- """
- # 定义一个内部函数,用于根据观察值,返回每个动作的概率
- def policy_fn(observation):
- # 创建一个长度为nA的numpy数组,每个元素的值为epsilon/nA,表示选择一个随机动作的概率
- A = np.ones(nA, dtype=float) * epsilon / nA
- # 根据Q函数,找到当前状态下最优的动作
- best_action = np.argmax(Q[observation])
- # 将最优动作的概率增加1-epsilon,表示选择最优动作的概率
- A[best_action] += (1.0 - epsilon)
- # 返回动作概率数组
- return A
- # 返回内部函数
- return policy_fn
-
-
- # %%
- # 定义一个函数,用于实现SARSA算法,即基于策略的时序差分控制,寻找最优的epsilon贪婪策略
- def sarsa(env, num_episodes, discount_factor=1.0, alpha=0.5, epsilon=0.1):
- """
- SARSA算法: 基于策略的时序差分控制,寻找最优的epsilon贪婪策略
- 参数:
- env: OpenAI环境
- num_episodes: 运行的回合数
- discount_factor: Gamma折扣因子
- alpha: 时序差分学习率
- epsilon: 选择一个随机动作的概率,介于0和1之间的浮点数
- 返回:
- 一个元组 (Q, stats)
- Q是最优的动作值函数,一个字典,映射从状态到动作值
- stats是一个EpisodeStats对象,包含两个numpy数组,分别记录每个回合的长度和奖励
- """
-
-
- # 最终的动作值函数
- # 一个嵌套的字典,映射从状态到(动作到动作值)
- # 使用defaultdict,当访问不存在的键时,返回一个长度为nA的零数组
- Q = defaultdict(lambda: np.zeros(env.action_space.n))
-
-
- # 跟踪有用的统计数据
- # 使用plotting模块中的EpisodeStats类,创建一个对象,包含两个长度为 num_episodes 的零数组,分别记录每个回合的长度和奖励
- stats = plotting.EpisodeStats(
- episode_lengths=np.zeros(num_episodes),
- episode_rewards=np.zeros(num_episodes))
-
-
- # 我们正在遵循的策略
- # 使用前面定义的函数,根据Q函数和epsilon值,创建一个epsilon贪婪策略
- policy = make_epsilon_greedy_policy(Q, epsilon, env.action_space.n)
-
-
- # 对于每个回合
- for i_episode in range(num_episodes):
- # 打印出当前的回合数,方便调试
- if (i_episode + 1) % 100 == 0:
- print("\rEpisode {}/{}.".format(i_episode + 1, num_episodes), end="")
- sys.stdout.flush()
-
-
- # 重置环境,选择第一个动作
- state = env.reset()
- # 根据策略,得到当前状态下每个动作的概率
- action_probs = policy(state)
- # 根据动作概率,随机选择一个动作
- action = np.random.choice(np.arange(len(action_probs)), p=action_probs)
-
-
- # 在环境中进行一步
- # 使用itertools库中的count函数,创建一个无限的计数器,表示每个回合的时间步数
- for t in itertools.count():
- # 执行一个动作,观察下一个状态,奖励,是否结束,和其他信息
- next_state, reward, done, _ = env.step(action)
-
-
- # 根据当前的策略,选择下一个动作,这是一个概率性的选择,根据每个动作的概率分布
- next_action_probs = policy(next_state)
- next_action = np.random.choice(np.arange(len(next_action_probs)), p=next_action_probs)
-
-
- # 更新统计信息,累加每个回合的奖励,记录每个回合的长度
- stats.episode_rewards[i_episode] += reward
- stats.episode_lengths[i_episode] = t
-
-
- # 使用时序差分更新公式,更新动作价值函数
- # 计算目标值,即当前的奖励加上折扣后的下一个状态和动作的价值
- td_target = reward + discount_factor * Q[next_state][next_action]
- # 计算误差,即目标值减去当前的状态和动作的价值
- td_delta = td_target - Q[state][action]
- # 用学习率乘以误差,更新当前的状态和动作的价值
- Q[state][action] += alpha * td_delta
-
-
- # 如果回合结束,跳出循环
- if done:
- break
-
-
- # 更新当前的动作和状态,为下一个时间步做准备
- action = next_action
- state = next_state
-
-
-
-
- # 返回动作价值函数和统计数据
- return Q, stats
-
-
- # %%
- # 调用sarsa函数,传入环境和回合数等参数,得到动作价值函数和统计数据
- Q, stats = sarsa(env, 200)
-
-
- # %%
- # 调用plotting模块中的plot_episode_stats函数,传入统计数据,绘制回合长度和回合奖励的图形
- plotting.plot_episode_stats(stats)
- # print("\nQ:\r{}".format(Q)) #输出最终价值函数
- # defaultdict(<function sarsa.<locals>.<lambda> at 0x000001CA7D86FF70>, {30: array([-16.62121833, -16.68188683, -16.73355158, -17.17868707]), 20: array([-15.85469654, -15.82517745, -16.65515113, -15.62910245]), 10: array([-15.38586765, -15.58275272, -15.2877975 , -15.24034802]), 0: array([-15.09297672, -15.06110493, -15.13709802, -14.7251808 ]), 1: array([-14.56234244, -14.54038227, -14.91194528, -14.92618857]), 2: array([-14.17266391, -13.80525148, -13.93608606, -13.78078067]), 3: array([-14.28053776, -13.74421705, -13.56175795, -14.14192259]), 4: array([-13.27142219, -12.79400183, -13.54938834, -13.71948143]), 5: array([-12.48366893, -11.3930307 , -12.77222323, -12.9336916 ]), 6: array([-11.91175853, -9.91457551, -11.78255518, -11.69363854]), 7: array([-10.77910057, -9.81671771, -10.98045998, -10.65801588]), 8: array([-10.74969529, -8.0102353 , -9.77162591, -11.43791039]), 9: array([ -8.33103372, -8.39513518, -6.32893384, -10.33290655]), 19: array([-8.84390206, -8.1119523 , -6.72538289, -9.03481089]), 12: array([-14.20294124, -13.97901573, -13.96065147, -15.22720425]), 11: array([-15.13194653, -14.98909044, -15.48678909, -15.04447429]), 13: array([-13.37943249, -13.53181049, -13.7098475 , -13.81976352]), 29: array([-8.55710389, -6.87412944, -4.85772855, -7.17036152]), 18: array([-8.94868871, -8.0233618 , -9.06798147, -9.26537925]), 22: array([-14.52083337, -14.19290266, -14.38417474, -14.27854828]), 21: array([-15.4374521 , -15.00991466, -15.5801335 , -15.60899897]), 23: array([-13.95910955, -13.42154284, -13.83539163, -14.19399668]), 39: array([-6.4785114 , -5.94494094, -3.99613988, -6.79803347]), 28: array([-7.66699712, -6.61983091, -7.60264565, -8.3233084 ]), 32: array([-14.70999481, -14.81900147, -14.63750975, -16.3293473 ]), 31: array([-15.94781378, -16.20896192, -16.07386806, -15.91479389]), 33: array([-13.86220924, -13.99533353, -14.58005824, -14.60042878]), 40: array([-16.37138175, -15.93543669, -16.00626584, -16.10155303]), 41: array([-15.11021159, -15.0313713 , -15.12066262, -16.1427075 ]), 42: array([-14.39118224, -14.04513618, -14.11888151, -15.06676474]), 43: array([-13.47905898, -13.3280808 , -13.36766233, -14.36908711]), 14: array([-13.24084653, -12.82979917, -12.7967486 , -12.89074042]), 51: array([-15.16797579, -14.42686921, -14.48158134, -14.3608748 ]), 50: array([-16.05803884, -14.96313727, -14.8700738 , -15.28846151]), 52: array([-13.58167077, -13.26323861, -13.58650088, -14.25622422]), 53: array([-12.36202997, -12.46939028, -12.89155887, -13.90026794]), 24: array([-12.75612708, -12.31330449, -12.97263319, -13.78728591]), 61: array([-14.51254789, -13.84352851, -14.20420505, -14.03010117]), 60: array([-14.67217839, -14.48765693, -14.49793836, -14.88186542]), 62: array([-13.10041286, -13.10038901, -13.11464442, -13.2568325 ]), 34: array([-12.82238768, -12.67668025, -13.06624469, -12.75352672]), 49: array([-5.44358852, -3.80098726, -3.26364328, -4.10764147]), 38: array([-7.86199643, -6.35308864, -6.60455766, -6.3621277 ]), 17: array([ -9.21518149, -9.06675019, -9.02526121, -10.003373 ]), 63: array([-12.62337832, -12.26700563, -12.85346847, -12.74301769]), 15: array([-11.70362604, -11.39586923, -12.27834132, -12.50392354]), 59: array([-2.81008911, -2.54296875, -2.81005859, -2.52370968]), 69: array([-1.875 , -2.52734375, -2.3359375 , -1.87109375]), 48: array([-6.67165589, -5.06948669, -1.75 , -1. ]), 27: array([-7.71386745, -6.34017058, -7.76830481, -6.69818919]), 58: array([-3.79243281, -2.51201346, -2.29996305, -1.76249076]), 37: array([0., 0., 0., 0.]), 44: array([-11.66811483, -11.73730811, -12.26177971, -11.80549878]), 54: array([-12.15950782, -11.33421547, -12.16753238, -12.14326109]), 25: array([-12.10995948, -10.72434757, -11.13916866, -11.16080802]), 16: array([-10.27994511, -9.72139598, -10.71965764, -11.12713914]), 35: array([-11.3114562 , -10.77935897, -11.14367845, -10.83009369]), 68: array([-1.46875 , -2.16746892, -1.5 , -0.9375 ]), 45: array([-11.21385201, -10.38149598, -11.15339582, -11.649213 ]), 47: array([-2.82864534, -4.36609306, -0.9375 , -5.39884604]), 36: array([ -9.49267328, -9.54858587, -10.25678092, -10.16425445]), 26: array([-10.2265589 , -9.85410817, -9.97563059, -10.89997015]), 57: array([-2.66771439, -2.42248535, -0.5 , -5.24217275])})
输出结果:
每个回合的长度随时间的变化
说明:共200回合,随着价值函数的更新,越往后的回合执行越少的动作就能抵达终点
每个回合的奖励随时间的变化,使用平滑窗口进行平滑处理
说明:随着价值函数的更新越往后的回合得到的回报越高
每个回合的时间步数累加和回合数的关系
说明:每个回合的时间步数表示在一个回合中,执行了多少次动作。回合数表示完成了多少个回合。一个回合的结束条件是到达目标状态或者超过最大的时间步数。这个图反映了学习的效果和效率,如果回合数随着时间步数的增加而快速增加,说明学习的效果好,能够更快地找到最优的策略和动作。如果回合数随着时间步数的增加而缓慢增加,说明学习的效率低,需要更多的时间和尝试才能找到最优的策略和动作。
Q-Learning 算法求解悬崖行走问题
悬崖行走环境的类 cliff_walking.py,它继承自 discrete.DiscreteEnv 类,用于创建和管理一个强化学习的环境。它的主要功能是:
定义了环境的基本属性,如状态的数量,动作的数量,状态转移的概率,初始状态的分布,悬崖的位置,动作的空间,状态的空间等。
定义了环境的基本方法,如限制坐标的范围,计算转移的概率,重置环境,执行一个动作,返回下一个状态,奖励,是否结束和附加信息,渲染环境等。
使用了 io 库,numpy 库,sys 库和 discrete 模块来进行输入输出,数值计算,系统操作和离散环境的管理等操作。
- # 导入io模块,这是一个内置的模块,提供了与输入输出流相关的功能
- import io
- # 导入numpy库,这是一个用于科学计算的开源库,提供了多维数组和矩阵运算等功能
- import numpy as np
- # 导入sys模块,这是一个内置的模块,提供了一些与Python解释器和系统相关的变量和函数
- import sys
-
-
- # 从当前目录下的discrete模块中导入DiscreteEnv类,这是一个用于实现离散动作空间的环境的基类
- from . import discrete
-
-
- # 定义四个常量,表示四个动作的编号
- UP = 0
- RIGHT = 1
- DOWN = 2
- LEFT = 3
-
-
- # 定义一个类,继承自DiscreteEnv类,用于实现悬崖行走问题的强化学习环境
- class CliffWalkingEnv(discrete.DiscreteEnv):
- # 定义一个元数据字典,表示该环境支持的渲染模式
- metadata = {'render.modes': ['human', 'ansi']}
-
-
- # 定义一个私有方法,用于限制坐标的范围,使其不超过网格的边界
- def _limit_coordinates(self, coord):
- # 将坐标的第一个分量限制在0到网格的行数减一之间
- coord[0] = min(coord[0], self.shape[0] - 1)
- coord[0] = max(coord[0], 0)
- # 将坐标的第二个分量限制在0到网格的列数减一之间
- coord[1] = min(coord[1], self.shape[1] - 1)
- coord[1] = max(coord[1], 0)
- # 返回限制后的坐标
- return coord
-
-
- # 定义一个私有方法,用于计算状态转移的概率,根据当前位置和动作的变化量
- def _calculate_transition_prob(self, current, delta):
- # 计算新的位置,等于当前位置加上动作的变化量
- new_position = np.array(current) + np.array(delta)
- # 限制新的位置的范围,转换为整数类型
- new_position = self._limit_coordinates(new_position).astype(int)
- # 计算新的状态,将新的位置转换为一维的索引
- new_state = np.ravel_multi_index(tuple(new_position), self.shape)#给定一个多维数组的形状和一个多维的坐标,返回一个整数,表示该坐标在多维数组中对应的一维索引。例如,如果 self.shape 是 (4, 12),表示环境是一个 4 行 12 列的网格,那么 new_position 是 (0, 0) 对应的 new_state 是 0,表示网格的左上角,new_position 是 (3, 11) 对应的 new_state 是 47,表示网格的右下角
- # 判断是否落入悬崖,如果是,奖励为-100,否则为-1
- reward = -100.0 if self._cliff[tuple(new_position)] else -1.0
- # 判断是否达到目标位置或落入悬崖,如果是,回合结束
- is_done = self._cliff[tuple(new_position)] or (tuple(new_position) == (3,11))
- # 返回一个列表,包含一个元组,表示状态转移的概率,新的状态,即时奖励,和是否结束的标志
- return [(1.0, new_state, reward, is_done)]
-
-
- # 定义一个构造方法,用于初始化环境的属性
- def __init__(self):
- # 定义网格的形状,为4行12列
- self.shape = (4, 12)
-
-
- # 计算状态空间的大小,为网格的元素个数
- nS = np.prod(self.shape)
- # 定义动作空间的大小,为4个动作
- nA = 4
-
-
- # 定义悬崖的位置,为一个与网格形状相同的布尔数组,第四行的第二列到倒数第二列为True,表示悬崖
- self._cliff = np.zeros(self.shape, dtype=bool)
- self._cliff[3, 1:-1] = True #是悬崖
-
-
- # 计算状态转移的概率,用一个字典表示,键为状态,值为另一个字典,键为动作,值为一个列表,包含状态转移的元组
- P = {}
- # 对每个状态进行循环
- for s in range(nS):
- # 将状态转换为二维的位置
- position = np.unravel_index(s, self.shape) #给定一个多维数组的形状和一个一维的索引,返回一个元组,表示该索引在多维数组中对应的坐标。例如,如果 self.shape 是 (4, 12),表示环境是一个 4 行 12 列的网格,那么 s = 0 对应的 position 是 (0, 0),表示网格的左上角,s = 47 对应的 position 是 (3, 11),表示网格的右下角。这个函数可以方便地将状态的表示从一维转换为二维,便于进行坐标的运算和渲染。
- # 初始化状态对应的字典,键为动作,值为一个空列表
- P[s] = { a : [] for a in range(nA) }
- # 对每个动作进行循环,分别计算状态转移的概率,调用之前定义的私有方法
- P[s][UP] = self._calculate_transition_prob(position, [-1, 0]) #行-1
- P[s][RIGHT] = self._calculate_transition_prob(position, [0, 1])# 列+1
- P[s][DOWN] = self._calculate_transition_prob(position, [1, 0])#行+1
- P[s][LEFT] = self._calculate_transition_prob(position, [0, -1])#列-1
-
-
- # 定义初始状态分布,为一个与状态空间大小相同的数组,只有(3, 0)位置的概率为1,其他为0
- isd = np.zeros(nS)
- isd[np.ravel_multi_index((3,0), self.shape)] = 1.0
-
-
- # 调用父类的构造方法,传入状态空间大小,动作空间大小,状态转移概率,和初始状态分布
- super(CliffWalkingEnv, self).__init__(nS, nA, P, isd)
-
-
- # 定义一个方法,用于渲染环境,根据模式和关闭标志,调用另一个私有方法
- def render(self, mode='human', close=False):
- self._render(mode, close)
-
-
- # 定义一个私有方法,用于渲染环境,根据模式和关闭标志,输出或显示网格世界的图形界面
- def _render(self, mode='human', close=False):
- # 如果关闭标志为True,表示不需要渲染,直接返回
- if close:
- return
-
-
- # 根据模式,选择输出的文件对象,如果是ansi模式,使用io模块中的StringIO对象,如果是human模式,使用系统的标准输出
- outfile = io.StringIO() if mode == 'ansi' else sys.stdout
-
-
- # 对每个状态进行循环
- for s in range(self.nS):
- # 将状态转换为二维的位置
- position = np.unravel_index(s, self.shape)
- # print(self.s)
- # 根据位置,选择输出的符号,如果是当前状态,输出 x,如果是目标位置,输出 T,如果是悬崖位置,输出 C,否则输出 o
- if self.s == s:
- output = " x "
- elif position == (3,11):
- output = " T "
- elif self._cliff[position]:
- output = " C "
- else:
- output = " o "
-
-
- # 如果位置在第一列,去掉输出符号的左边空格
- if position[1] == 0:
- output = output.lstrip()
- # 如果位置在最后一列,去掉输出符号的右边空格,并换行
- if position[1] == self.shape[1] - 1:
- output = output.rstrip()
- output += "\n"
-
-
- # 将输出符号写入文件对象
- outfile.write(output)
- # 在所有状态循环结束后,再换行
- outfile.write("\n")
测试程序 Cliff Environment Playground.py ,用于在悬崖行走环境中进行一些动作,并打印出环境的状态和渲染结果。它的主要功能是:
导入 gym 库,numpy 库,sys 库和 CliffWalkingEnv 类,用于创建和管理环境,进行数值计算,系统操作和悬崖行走的管理等操作。
如果当前路径中没有 “../”,则将其添加到路径中,方便导入其他模块。
创建一个悬崖行走的环境,赋值给 env。
调用 env 的 reset 方法,重置环境,返回初始状态,并打印出来。
调用 env 的 render 方法,渲染环境,显示出当前的位置。
调用 env 的 step 方法,执行一个向上的动作,返回下一个状态,奖励,是否结束和附加信息,并打印出来。
调用 env 的 render 方法,渲染环境,显示出当前的位置。
重复上述两步,执行两次向右的动作和一次向下的动作,打印和渲染每一步的结果。
- import gym
- import numpy as np
- import sys
-
-
- if "../" not in sys.path:
- sys.path.append("../")
-
-
- from lib.envs.cliff_walking import CliffWalkingEnv
-
-
- env = CliffWalkingEnv()
-
-
- print(env.reset())
- env.render()
-
-
- print(env.step(0))
- env.render()
-
-
- print(env.step(1))
- env.render()
-
-
- print(env.step(1))
- env.render()
-
-
- print(env.step(2))
- env.render()
Q-learning 算法 求解悬崖行走问题的代码Q-Learning Solution.py
该代码使用了一个名为悬崖行走的OpenAI环境,该环境是一个4x12的网格世界,其中代理人从左下角的起点开始,目标是到达右下角的终点,而不掉入悬崖中。
该代码定义了一个函数make_epsilon_greedy_policy
,该函数根据给定的Q函数和epsilon值,创建一个epsilon-贪婪策略。该函数返回一个函数,该函数接受一个观察值作为参数,并返回每个动作的概率,形式为一个长度为nA的numpy数组。
该代码定义了一个函数q_learning
,该函数实现了Q-Learning算法,即离策略的TD控制算法。该算法在遵循一个epsilon-贪婪策略的同时,寻找最优的贪婪策略。该函数接受以下参数:
env
: OpenAI环境。
num_episodes
: 运行的回合数。
discount_factor
: Gamma折扣因子。
alpha
: TD学习率。
epsilon
: 选择随机动作的概率,介于0和1之间的浮点数。
该函数返回一个元组(Q, stats)。Q是最优的动作值函数,是一个映射状态到动作值的字典。stats是一个EpisodeStats对象,包含两个numpy数组,分别记录了每个回合的长度和奖励。
该函数的主要步骤如下:
重置环境并选择第一个动作。
对于每个时间步:
执行一个动作,并观察下一个状态,奖励,是否结束,以及其他信息。
更新统计信息。
TD更新:根据下一个状态的最优动作,计算TD目标。计算TD误差。更新Q中当前状态和动作的值。
如果结束,跳出循环。
更新当前状态为下一个状态。
初始化一个空的Q字典,用于存储每个状态的动作值。
初始化一个stats对象,用于记录有用的统计信息。
根据Q和epsilon,创建一个epsilon-贪婪策略。
对于每个回合:
该代码使用了q_learning
函数来求解悬崖行走问题,设置了500个回合,其他参数使用默认值。
该代码使用了plotting.plot_episode_stats
函数来绘制每个回合的长度和奖励的图表,以及每个状态的动作值的热力图。
- # 导入gym库,用于提供强化学习的环境
- import gym
- # 导入itertools库,用于提供一些迭代器的工具函数
- import itertools
- # 导入matplotlib库,用于提供图形绘制的功能
- import matplotlib
- # 导入numpy库,用于提供数组和数学运算的功能
- import numpy as np
- # 导入pandas库,用于提供数据分析和处理的功能
- import pandas as pd
- # 导入sys库,用于提供系统相关的功能
- import sys
-
-
-
-
- # 判断当前的路径中是否包含"../",如果不包含,就添加到路径中,用于导入lib模块
- if "../" not in sys.path:
- sys.path.append("../")
-
-
- # 导入defaultdict类,用于创建一个默认字典,即一个可以使用任意不存在的键访问的字典,如果访问一个不存在的键,它会自动创建一个默认值
- from collections import defaultdict
- # 导入CliffWalkingEnv类,用于创建一个悬崖行走的环境,这是一个网格世界,目标是从起点走到终点,中间有一些悬崖,如果掉入悬崖,就会返回起点并受到惩罚
- from lib.envs.cliff_walking import CliffWalkingEnv
- # 导入plotting模块,用于提供一些图形绘制的函数,例如plot_episode_stats函数
- from lib import plotting
-
-
- # 设置图形的风格为ggplot,一种流行的图形风格
- matplotlib.style.use('ggplot')
-
-
- # %%
- # 创建一个悬崖行走的环境对象,用于与智能体进行交互
- env = CliffWalkingEnv()
-
-
- # %%
- # 定义一个函数,叫做make_epsilon_greedy_policy,用于根据给定的Q函数和epsilon参数,创建一个epsilon-贪婪策略
- def make_epsilon_greedy_policy(Q, epsilon, nA):
- """
- Creates an epsilon-greedy policy based on a given Q-function and epsilon.
-
- Args:
- Q: A dictionary that maps from state -> action-values.
- Each value is a numpy array of length nA (see below)
- epsilon: The probability to select a random action. Float between 0 and 1.
- nA: Number of actions in the environment.
-
- Returns:
- A function that takes the observation as an argument and returns
- the probabilities for each action in the form of a numpy array of length nA.
-
- """
- # 定义一个函数,叫做policy_fn,用于根据一个观察,返回一个动作的概率分布
- def policy_fn(observation):
- # 创建一个全为epsilon/nA的数组,表示每个动作的初始概率,其中epsilon是随机选择动作的概率,nA是动作数
- A = np.ones(nA, dtype=float) * epsilon / nA
- # 找到Q函数中对应于当前状态的最大动作价值的动作,即最优动作
- best_action = np.argmax(Q[observation])
- # 给最优动作的概率增加1-epsilon,表示最优动作被选择的概率更高
- A[best_action] += (1.0 - epsilon)
- # 返回这个数组,表示当前状态下的策略
- return A
- # 返回这个函数,作为epsilon-贪婪策略
- return policy_fn
-
-
- # %%
- # 定义一个函数,叫做q_learning,用于实现Q学习算法,找到最优的贪婪策略,同时遵循一个epsilon-贪婪策略
- def q_learning(env, num_episodes, discount_factor=1.0, alpha=0.5, epsilon=0.1):
- """
- Q-Learning algorithm: Off-policy TD control. Finds the optimal greedy policy
- while following an epsilon-greedy policy
-
- Args:
- env: OpenAI environment.
- num_episodes: Number of episodes to run for.
- discount_factor: Gamma discount factor.
- alpha: TD learning rate.
- epsilon: Chance to sample a random action. Float between 0 and 1.
-
- Returns:
- A tuple (Q, episode_lengths).
- Q is the optimal action-value function, a dictionary mapping state -> action values.
- stats is an EpisodeStats object with two numpy arrays for episode_lengths and episode_rewards.
- """
-
- # 创建一个默认字典,用于存储最终的动作价值函数,即在不同的状态下,每个动作能够获得的期望回报
- # 这个字典的键是状态,它的值是一个长度为动作数的零数组,表示在该状态下,每个动作的价值都是零
- # 如果访问一个不存在的状态,它会自动创建一个对应的零数组作为值
- Q = defaultdict(lambda: np.zeros(env.action_space.n))
-
-
- # 创建一个EpisodeStats对象,用于记录每个回合的长度和奖励的numpy数组
- stats = plotting.EpisodeStats(
- episode_lengths=np.zeros(num_episodes),
- episode_rewards=np.zeros(num_episodes))
-
- # 创建一个epsilon-贪婪策略,用于在每个状态下,以一定的概率epsilon随机选择一个动作,否则选择当前最优的动作,即具有最大的动作价值的动作
- policy = make_epsilon_greedy_policy(Q, epsilon, env.action_space.n)
-
- # 对于每个回合,重置环境,选择第一个动作,然后循环执行以下步骤:
- for i_episode in range(num_episodes):
- # 打印出当前的回合数,用于调试
- if (i_episode + 1) % 100 == 0:
- print("\rEpisode {}/{}.".format(i_episode + 1, num_episodes), end="")
- sys.stdout.flush()
-
- # 重置环境,返回初始状态
- state = env.reset()
-
- # 在环境中执行一个步骤
- # total_reward = 0.0
- for t in itertools.count():
-
- # 选择一个动作,根据当前的策略,这是一个概率性的选择,根据每个动作的概率分布
- action_probs = policy(state)
- action = np.random.choice(np.arange(len(action_probs)), p=action_probs)
- # 执行一个动作,观察下一个状态,奖励,是否结束,和其他信息
- next_state, reward, done, _ = env.step(action)
-
-
- # 更新统计信息,累加每个回合的奖励,记录每个回合的长度
- stats.episode_rewards[i_episode] += reward
- stats.episode_lengths[i_episode] = t
-
- # 使用时序差分更新公式,更新动作价值函数
- # 找到下一个状态中,具有最大动作价值的动作,即最优动作
- best_next_action = np.argmax(Q[next_state])
- # 计算目标值,即当前的奖励加上折扣后的下一个状态和最优动作的价值
- td_target = reward + discount_factor * Q[next_state][best_next_action]
- # 计算误差,即目标值减去当前的状态和动作的价值
- td_delta = td_target - Q[state][action]
- # 用学习率乘以误差,更新当前的状态和动作的价值
- Q[state][action] += alpha * td_delta
-
- # 如果回合结束,跳出循环
- if done:
- break
-
- # 更新当前的状态,为下一个步骤做准备
- state = next_state
-
- # 返回最终的动作价值函数和统计信息
- return Q, stats
-
-
- # 调用q_learning函数,传入环境对象,回合数,折扣因子,学习率,和epsilon参数,返回最终的动作价值函数和统计信息
- Q, stats = q_learning(env, 500)
-
-
- # %%
- # 调用plotting模块中的plot_episode_stats函数,传入统计信息,绘制每个回合的长度,每个回合的奖励,和每个回合的时间步数与回合数的关系的图形
- plotting.plot_episode_stats(stats)
输出结果:
每个回合的长度随时间的变化
每个回合的奖励随时间的变化,使用平滑窗口进行平滑处理
每个回合的时间步数和回合数的关系
最终价值函数
最终价值函数Q:在学习过程中,通过不断地更新和优化,最终收敛到一个稳定的价值函数,即最接近真实的价值函数的价值函数。训练最终的价值函数有以下的用途:
训练最终的价值函数可以反映出最优的策略,即在每个状态下,选择哪个动作能够获得最大的价值。我们可以根据训练最终的价值函数,制定出最优的决策规则,从而在环境中表现出最佳的行为。
训练最终的价值函数可以评估出不同的状态的重要性,即哪些状态能够带来更高的回报,哪些状态应该避免。我们可以根据训练最终的价值函数,分析出环境中的特征和规律,从而提高我们对环境的理解和掌握。
训练最终的价值函数可以作为一种性能指标,即我们可以通过比较训练最终的价值函数和真实的价值函数,或者不同的训练方法和参数下的价值函数,来评估我们的学习效果和效率,从而优化我们的学习过程和方法。
The End
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。