赞
踩
强化学习是智能体(Agent)不断地与环境交互、获取奖励、更新自身策略的迭代过程,目标是学习到能够使整体回报最大化的策略。总的来说,强化学习相比机器学习的监督学习方法有如下特点:1)定义模型所需的约束更少,大大降低定义问题的难度;2)强化学习更重视整个过程的回报,不局限于单步行动。强化学习方法类似于人类学习决策的过程,在预先设定的规则下,不断地尝试各种行为,在奖励的指导下最终找到最优策略来完成任务,该思维模式也是实现人工智能的重要发展方向。
本文的参考书籍:《强化学习精要:核心算法与TensorFlow实现》
强化学习一般以马尔科夫决策过程(Markov decision process,MDP)作为其形式化的手段。这里先简单介绍一种棋类游戏——蛇棋。类似于常见的飞行棋,玩家通过掷骰子,决定棋子前进的步数;与飞行棋不同的是,棋盘上某些格子之间用梯子相连,棋子走到有梯子的格子中会自动到达与之相连的格子中;抵达终点后如果步数有剩余,必须反向前进;最先抵达终点的棋子获胜。可以看出,想要获胜必须以尽可能少的次数抵达终点。这里为了适配算法,对游戏进行一些修改,假设玩家有两种不同的骰子可以选择,一种是正常的1-6骰子,另一种是1-3骰子(即4,5,6被改为1,2,3)。
以蛇棋(后续会详细介绍并给出对应的代码实现)为例,其实有两个主要因素决定最终挖到的总分数:选择什么样的骰子以及骰子掷出的数目。第一个因素是玩家决定的,也是其唯一可以决定的。第二个因素是游戏本身设定。假定用
s
t
s_t
st表示
t
t
t时刻游戏状态的观测值(棋子的位置),用
a
t
a_t
at表示当前的决策(选择哪一种骰子),整个游戏过程可以用一条状态-行动链条表示:
s
0
,
a
0
,
s
1
,
a
1
,
.
.
.
,
s
t
−
1
,
a
t
−
1
,
s
t
,
a
t
s_0,a_0,s_1,a_1,...,s_{t-1}, a_{t-1}, s_t, a_t
s0,a0,s1,a1,...,st−1,at−1,st,at
上述链条包含了两种状态转换,一种是从状态到行动的转换,即策略,Agent 根据当前状态信息选取自认为最好的行动方式。从数学角度分析,策略将 Agent 的状态值映射到其行动集合的概率分布(离散)或概率密度函数(连续)上。如果对于每一个行动,Agent都有一定的概率取执行,且行动的评价越高,该行动被选择的概率越大。最优策略是在当前状态选择概率最高的行动,即
a
t
∗
=
a
r
g
m
a
x
a
i
p
(
a
i
∣
s
t
)
a_t^*=argmax_{a_i}p(a_i|s_t)
at∗=argmaxaip(ai∣st)
实际上,上述公式成立时有一个重要前提:序列的马尔科夫性,即下一时刻的行动只和当前时刻的状态有关,而与更早时刻的状态无关。如果行动集合是离散有限的,可以将选择行动的问题变成一个多分类问题。
第二种转换是Agent 在环境中的状态转换。这里也用到马尔科夫性,这里的状态转换能以概率的形式表现为 p ( s t + 1 ∣ s t , a t ) p(s_{t+1}|s_t, a_t) p(st+1∣st,at),还是以蛇棋为例,假设骰子投掷结果是均匀的,棋子将等概率地前往各自步数对应的位置。实际中需要考虑棋盘中每一个格子的概率,即便绝大多数格子的概率为0。这部分属于环境内部的信息,在蛇棋游戏中是公开的,即状态转换是已知的,但是生活中也有不少问题,环境的信息是不公开或者不全面的,后续对强化学习算法分类时还会提到这一点。
MDP包含以下三层含义:
整个游戏的关键在于策略,理想状态下每一个行动都要为最终的目标——最大化长期回报努力,那么只需找到一种方法,量化每一个行动对实现最终目标贡献的价值,Agent就可以根据该量化指标做出明智的判断。其实,交互环境给了Agent很大的提示,它提供了可以量化某一时刻的回报(奖励)值 r r r。虽然与最终目标不同,但可以利用它,将其扩展为需要的目标。还是以蛇棋为例,可以根据棋子的位置来判断玩家行动的奖励值。策略等同于给出行动,使整个游戏的回报最大化。
计算整个游戏过程的累积回报并不简单,主要反映在两方面:计算的时间跨度。如果游戏的时间是有限的,表示Agent可以在有限的步数内完成游戏,计算固然复杂但至少可以计算。如果游戏可以无限进行下去,那计算累积回报就没有意义。为了解决该问题,使无穷数列的和收敛,需要降低未来回报对当前时刻状态的影响,即对未来回报乘以一个0-1的系数,使长期累积回报变得有意义。好比钱存在银行中,会缓慢升值,未来的钱换算到当下就需要打个折扣。这样一来,长期回报数列和就变得有界。可以算出具体值。此时,我们将当前状态之后所有的回报取出,分别乘以对应的打折率,加起来得到汇总的值称为长期回报。
R
=
∑
k
=
0
γ
k
r
t
+
k
+
1
R=\sum_{k=0} \gamma^k r_{t+k+1}
R=k=0∑γkrt+k+1
解决长期回报的表示问题后,另一个困难也出现了,求和形式表示长期回报比较复杂,而且它与Agent选择的策略强相关。换言之,我们需要定义策略的价值。之前提及的MDP,其中从状态到行动的转换可以通过某个确定策略决定,但由于环境的原因,从行动到下一时刻的状态并不能确定。因此衡量价值时需要考虑每一种状态转移的影响,也就是基于状态转换求解长期回报的期望。假设
τ
\tau
τ为Agent采用某个策略与环境交互的序列,价值的公式定义为
υ
π
(
s
t
)
=
E
s
,
a
∈
τ
[
∑
k
=
0
γ
k
r
t
+
k
+
1
]
\upsilon_\pi(s_t)=E_{s,a\in\tau}[\sum_{k=0} \gamma^k r_{t+k+1}]
υπ(st)=Es,a∈τ[k=0∑γkrt+k+1]
价值函数(值函数)一般可以分为两种类型:
采用上述表达式,计算价值仍然是很复杂的事。计算从某个状态出发的值函数,相当于按照某个策略,把所有从这个状态出发的可能路径走一遍,将这些路径的长期回报按照各自概率求期望:
υ
π
(
s
t
)
=
∑
τ
p
(
τ
)
∑
k
=
0
γ
k
r
t
+
k
+
1
\upsilon_\pi(s_t)=\sum_{\tau}p(\tau) \sum_{k=0} \gamma^k r_{t+k+1}
υπ(st)=τ∑p(τ)k=0∑γkrt+k+1
上式中,
τ
\tau
τ表示从状态
s
t
s_t
st出发的某条路径。
假设游戏过程符合MDP模型,将路径部分展开为:
υ
π
(
s
t
)
=
∑
(
s
t
,
a
t
,
.
.
.
)
π
(
a
t
∣
s
t
)
p
(
s
t
+
1
∣
s
t
,
a
t
)
.
.
.
∑
k
=
0
γ
k
r
t
+
k
+
1
\upsilon_\pi(s_t)=\sum_{(s_t,a_t,...)}\pi(a_t|s_t)p(s_{t+1}|s_t,a_t) ... \sum_{k=0} \gamma^k r_{t+k+1}
υπ(st)=(st,at,...)∑π(at∣st)p(st+1∣st,at)...k=0∑γkrt+k+1
观察上述公式,其实它具有递归的性质,具体推导这里不展开,给出书中的结论:
υ
π
(
s
t
)
=
∑
τ
π
(
a
t
∣
s
t
)
p
(
s
t
+
1
∣
s
t
,
a
t
)
.
.
.
∑
k
=
0
γ
k
r
t
+
k
+
1
=
∑
a
t
π
(
a
t
∣
s
t
)
∑
s
t
+
1
p
(
s
t
+
1
∣
s
t
,
a
t
)
[
r
t
+
1
+
∑
τ
π
(
a
t
+
1
∣
s
t
+
1
)
.
.
.
∑
k
=
1
γ
k
r
t
+
k
+
1
]
=
∑
a
t
π
(
a
t
∣
s
t
)
∑
s
t
+
1
p
(
s
t
+
1
∣
s
t
,
a
t
)
[
r
t
+
1
+
υ
π
(
s
t
+
1
)
]
\upsilon_\pi(s_t) =\sum_{\tau}\pi(a_t|s_t)p(s_{t+1}|s_t,a_t) ... \sum_{k=0} \gamma^k r_{t+k+1} \\ =\sum_{a_t}\pi(a_t|s_t)\sum_{s_{t+1}}p(s_{t+1}|s_t,a_t)[r_{t+1}+\sum_{\tau}\pi(a_{t+1}|s_{t+1}) ... \sum_{k=1} \gamma^k r_{t+k+1} ] \\=\sum_{a_t}\pi(a_t|s_t)\sum_{s_{t+1}}p(s_{t+1}|s_t,a_t)[r_{t+1}+\upsilon_\pi(s_{t+1})]
υπ(st)=τ∑π(at∣st)p(st+1∣st,at)...k=0∑γkrt+k+1=at∑π(at∣st)st+1∑p(st+1∣st,at)[rt+1+τ∑π(at+1∣st+1)...k=1∑γkrt+k+1]=at∑π(at∣st)st+1∑p(st+1∣st,at)[rt+1+υπ(st+1)]
假设值函数已经稳定,任意一个状态的价值可以由其他状态的价值得到,这个公式就称为贝尔曼公式(Bellman Equation),同样的,状态-行动值函数有一个类似的公式:
q
π
(
s
t
,
a
t
)
=
∑
s
t
+
1
p
(
s
t
+
1
∣
s
t
,
a
t
)
∑
a
t
+
1
p
(
a
t
+
1
∣
s
t
+
1
)
[
r
t
+
1
+
q
π
(
s
t
+
1
,
a
t
+
1
)
]
q_\pi(s_t, a_t)=\sum_{s_{t+1}}p(s_{t+1}|s_t,a_t)\sum_{a_{t+1}}p(a_{t+1}|s_{t+1})[r_{t+1}+q_\pi(s_{t+1},a_{t+1})]
qπ(st,at)=st+1∑p(st+1∣st,at)at+1∑p(at+1∣st+1)[rt+1+qπ(st+1,at+1)]
该公式的推导方法与状态值函数类似。这两个公式可以说是强化学习理论的基石,非常重要。
此处还有一个结论:在状态转移中,Agent可能在不同时刻遇到同一个状态,这两个状态的价值是相等的。从公式中可以看出,值函数并没有对状态的时刻有特殊要求。其次,MDP的状态转移过程持续时间足够长,最终每个状态、行动的转移进入稳态,每个状态有其固定不变的价值,证明状态的价值与时间无关。
这里大概介绍一下表格式Agent需要的基本数据结构。从上两节的内容可以看出,MDP模型通常可以运用五元数组来描述 { S , A , R , P , γ } \{S,A,R,P,\gamma\} {S,A,R,P,γ},其中 S S S表示Agent状态集合, A A A表示行动集合, R R R表示在状态 s t s_t st采取行动 a t a_t at后从环境中获得的回报, P P P表示从状态 s t s_t st转移到 s t + 1 s_{t+1} st+1的概率, γ \gamma γ是计算长期回报时的折扣系数。
表格式如何理解:以蛇棋为例,假设棋盘上共有100个格子,棋子一共有100个离散的状态。玩家只能选择两种骰子来投掷,因此也可以通过离散的方式表达出来。因此,在这个问题中,状态和行动都是离散且有限的,可以用N维张量的形式表达。例如,策略 π ( a t ∣ s t ) \pi(a_t|s_t) π(at∣st)是一个条件概率分布,则这个条件概率可以由一个 ∣ S ∣ ∗ ∣ A ∣ |S|*|A| ∣S∣∗∣A∣的矩阵表示,矩阵中每一个数值都处于0-1,且每一行的数值之和为1。而对于状态转移来说,条件概率分布可以由一个 ∣ S ∣ ∗ ∣ S ∣ ∗ ∣ A ∣ |S|*|S|*|A| ∣S∣∗∣S∣∗∣A∣的张量表示。
通常情况下,表格式Agent还有一个条件约束:环境的状态转移概率需要对Agent公开,Agent就能利用这些信息进行更好地决策。还是以蛇棋为例,骰子每一面朝上的概率是均匀的,棋盘上的每一个梯子都是可见的,可以计算出状态转移概率。
表格式Agent需要的数据结构大致如下:
蛇棋规则如下:
1)棋盘一共100个格子, 10*10, 并有若干个梯子;
2)玩家拥有两种骰子, 一种均匀投出1-6的数字,另一种均匀投出1-3的数字;
3)玩家持有的棋子每次根据骰子的点数向前行进相应的步数;
4)如果棋子落入有梯子的格子中,自动走到梯子对应的另一个格子中;
5)棋子的终点是位置为100的格子处,但如果到达时掷出的点数加上当前位置超过100,则棋子到达100后反向前进。
游戏环境采用pygame、numpy第三方库。具体代码如下:
import sys import math import pygame import numpy as np # 定义全局变量 UNIT = 50 # 每个格子的大小 MARGIN = 10 # 棋盘的边距 WHITE = 255, 255, 255 BLACK = 0, 0, 0 BLUE = 0, 0, 255 GREEN = 0, 255, 0 RED = 255, 0, 0 class SnakeEnv(object): def __init__(self, size=(520, 520)): self.grid_size = 10 # 棋盘大小10*10 # 定义相互连接的格子 self.ladders = { 3: 20, 20: 3, 6: 14, 14: 6, 11: 28, 28: 11, 15: 34, 34: 15, 17: 74, 74: 17, 22: 37, 37: 22, 36: 59, 59: 36, 49: 67, 67: 49, 57: 76, 76: 57, 61: 78, 78: 61, 73: 86, 86: 73, 89: 91, 91: 89, 81: 98, 98: 81 } self.ladder_num = len(self.ladders) / 2 # 梯子的数量 # 行动空间 self.action_space = np.array([0, 1], dtype=np.int32) # 状态空间 self.state_space = np.arange(1, 101).astype(np.int32) # 行动对应的骰子点数最大值,0表示选用1-6的骰子 self.dices = {0: 7, 1: 4} # 棋子的位置,初始值默认为1 self.pos = 1 self.chessman = {'x': 0, 'y': 9} # 窗口信息初始化 pygame.init() self.screen_width, self.screen_height = size self.window = pygame.display.set_mode((self.screen_width, self.screen_height)) # 设置标题 pygame.display.set_caption('蛇棋游戏') # 字体初始化, 选用微软雅黑, 默认大小为24 self.font = pygame.font.SysFont('simhei', 24) self.fclock = pygame.time.Clock() self.fps = 3 def reset(self): self.pos = 1 self.chessman = {'x': 0, 'y': 9} return self.pos def step(self, a, show=False): if a not in self.action_space: a = 0 # 掷骰子 number = np.random.randint(1, self.dices[a]) # 需要显示 reserve = False for step in range(number): if self.chessman['x'] == 0 and self.chessman['y'] == 0: reserve = True # 偶数行, 前进等于往左走 if self.chessman['y'] % 2 == 0: if self.chessman['x'] == 0 and not reserve: self.chessman['y'] -= 1 else: if reserve: self.chessman['x'] += 1 else: self.chessman['x'] -= 1 else: # 奇数行, 前进等于往右走 if self.chessman['x'] == self.grid_size - 1: self.chessman['y'] -= 1 else: self.chessman['x'] += 1 if show: self.render() # 根据点数前进 self.pos += number if self.pos == 100: return 100, 100, 1 elif self.pos > 100: # 超过100反向前进 self.pos = 200 - self.pos # 落入梯子的格子中,则移动到另一端 if self.pos in self.ladders.keys(): self.pos = self.ladders[self.pos] # 计算起点的索引 self.chessman['y'] = self.grid_size - ((self.pos - 1) // self.grid_size + 1) if self.chessman['y'] % 2 == 0: self.chessman['x'] = self.grid_size - 1 - ((self.pos - 1) % self.grid_size) else: self.chessman['x'] = (self.pos - 1) % self.grid_size if show: self.render() return self.pos, -1, 0 def draw_chessboard(self): # 先画10条横线 for i in range(self.grid_size + 1): # 绘制直线 surface, color, start_pos, end_pos, blend pygame.draw.line(self.window, BLUE, (MARGIN, i * UNIT + MARGIN), (self.screen_width - MARGIN, i * UNIT + MARGIN), 2) # 画10条竖线 for i in range(self.grid_size + 1): pygame.draw.line(self.window, BLUE, (i * UNIT + MARGIN, MARGIN), (i * UNIT + MARGIN, self.screen_height - MARGIN), 2) # 绘制格子中的数字 for row in range(self.grid_size): for col in range(self.grid_size): # 偶数行从右往左 if row % 2 == 0: text = str((self.grid_size - row - 1) * self.grid_size + self.grid_size - col) else: # 奇数行从左往右 text = str((self.grid_size - row - 1) * self.grid_size + col + 1) text_surface = self.font.render(text, True, BLACK) text_width = text_surface.get_width() text_height = text_surface.get_height() text_x = round(MARGIN + UNIT * col + (UNIT - text_width) / 2) text_y = round(MARGIN + UNIT * row + (UNIT - text_height) / 2) self.window.blit(text_surface, (text_x, text_y)) # ---画梯子---# for index, (key, value) in enumerate(self.ladders.items()): if index % 2 != 0: continue # 计算格子的行和列索引 # 计算起点的索引 row_s = self.grid_size - ((key - 1) // self.grid_size + 1) if row_s % 2 == 0: col_s = self.grid_size - 1 - ((key - 1) % self.grid_size) else: col_s = (key - 1) % self.grid_size # 计算终点的索引 row_e = self.grid_size - ((value - 1) // self.grid_size + 1) if row_e % 2 == 0: col_e = self.grid_size - 1 - ((value - 1) % self.grid_size) else: col_e = (value - 1) % self.grid_size # 计算起点格子和终点格子中心点在屏幕中的坐标 start_x, start_y = MARGIN + col_s * UNIT + UNIT / 2, MARGIN + row_s * UNIT + UNIT / 2 end_x, end_y = MARGIN + col_e * UNIT + UNIT / 2, MARGIN + row_e * UNIT + UNIT / 2 # 计算中心点连线的航向 yaw = math.atan2(end_y - start_y, end_x - start_x) # 依次计算出梯子的4个顶点 points = [] points.append((round(start_x - UNIT / 4 * math.sin(yaw)), round(start_y + UNIT / 4 * math.cos(yaw)))) points.append((round(start_x + UNIT / 4 * math.sin(yaw)), round(start_y - UNIT / 4 * math.cos(yaw)))) points.append((round(end_x + UNIT / 4 * math.sin(yaw)), round(end_y - UNIT / 4 * math.cos(yaw)))) points.append((round(end_x - UNIT / 4 * math.sin(yaw)), round(end_y + UNIT / 4 * math.cos(yaw)))) # 绘制多边形 surface, color, pointlist, width, 画个四个顶点围起来的矩形 pygame.draw.polygon(self.window, GREEN, points, 2) def render(self): self.window.fill(WHITE) # 先画棋盘 self.draw_chessboard() # 再画棋子 pygame.draw.circle(self.window, RED, (MARGIN + UNIT * (self.chessman['x'] + 0.5), MARGIN + UNIT * (self.chessman['y'] + 0.5)), 15) pygame.display.update() # 退出程序 for event in pygame.event.get(): if event.type == pygame.QUIT: sys.exit() self.fclock.tick(self.fps) def query_transition(self, s, a): # 计算转移概率 p = np.ones(self.dices[a] - 1) * (1 / (self.dices[a] - 1)) # 计算下一状态 s_ = np.ones(self.dices[a] - 1) * s + np.arange(1, self.dices[a]) # 找到超过100的数值 s_g_100 = np.where(s_ > 100) s_[s_g_100] = 200 - s_[s_g_100] s_ = s_.astype(np.int32) # 验证梯子, vectorize将函数向量化, 方便传递一个向量,函数负责对每一个元素操作, 返回结果仍是向量 ladder_move = np.vectorize(lambda x: self.ladders[x] if x in self.ladders.keys() else x) s_ = ladder_move(s_) # 计算奖励 query_reward = np.vectorize(lambda x: 100 if x == 100 else -1) r = query_reward(s_) r = r.astype(np.float32) return s_, r, p
主函数代码如下:
if __name__ == '__main__': snake = SnakeEnv() while True: snake.reset() total_r = 0 while True: snake.render() a = np.random.randint(0, 2) s_, r, done = snake.step(a, show=True) total_r += r if done: print('total rewards: ', total_r) break for event in pygame.event.get(): if event.type == pygame.QUIT: sys.exit()
本文主要介绍了强化学习使用的MDP模型概念、值函数的由来与形式、表格式Agent的数据结构及条件约束。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。