赞
踩
由于我自己并没有系统的学习过强化学习的理论知识,所以这部分不涉及数学上的严格证明,只从直观上给出讲一讲我自己对强化学习基本框架的理解,最终还是要落实到程序实现中。故对于这部分的内容的理解可能比较浅薄,望见谅。强化学习(Reinforcement Learning)作为机器学习的一种手段,十分适合应用在各种智能决策中。下面我将从一个初次接触强化学习小白的角度来谈谈我对深度学习的认识。
首先,想要了解强化学习是什么、背后的原理是怎样的、学习机制是如何运行的,就必须对强化学习的几个基本要素有一定认识。
环境的状态S(State)
智能体一定是工作在一定的环境之中的,无论这个环境是虚拟的还是物理现实的,一定会有一个表征系统状态的量,这个就是所谓环境的状态。以路径规划为例,环境的状态则是智能体的位置信息–可以用坐标加以表示。
Agent的动作A(Action)
智能体要在环境中做出决策并执行动作以改变环境的状态。同样以路径规划为例,Agent的动作就是运动方向的选择–上下左右等等。
环境的即时奖励R(Reward)
指的是在Agent执行动作后,环境即时反馈给Agent的奖励。如在路径规划中,为了让Agent学习到最短路径,可以将每一步的动作奖励都设为-1,这样在后面优化过程中,Agent为了获得最大奖励将向路径最短的方向收敛。
个体的策略
π
\pi
π
所谓的策略表现为在某个环境状态下Agent执行某个动作的概率。记
π
(
a
∣
s
)
\pi(a|s)
π(a∣s)表示在状态S下执行动作a的概率。以路径规划为例,在各个坐标下向各个方向行进的概率集合就称为个体的策略
π
\pi
π 。
状态价值
v
π
(
s
)
v_\pi(s)
vπ(s)
从日常经验来看,决策的好坏不单单体现在当前动作所带来价值,还体现在动作所造成的状态的后续价值。就像在棋类运动中,吃子的即时奖励很高,但未必是好的决策。因此,引入状态价值函数
v
π
(
s
)
v_\pi(s)
vπ(s)来表达在策略
π
\pi
π 下,状态的好坏之分。
奖励衰减因子
γ
\gamma
γ
在状态价值的计算和迭代中需要引入奖励衰减因子
γ
\gamma
γ 的概念,在具体操作中简单来说体现为
v
π
(
s
)
=
R
s
′
s
+
γ
∗
v
π
(
s
)
v_\pi(s)=R_{s's}+\gamma*v_\pi(s)
vπ(s)=Rs′s+γ∗vπ(s),即状态价值函数的更新方式为状态价值等于即时奖励加上下一个状态的价值函数乘上奖励衰减因子。这个要素从直观上来看是一个权衡当前奖励和延后奖励权重的要素。当
γ
\gamma
γ为0时,意味着学习过程是贪婪的,Agent只看重眼前利益;相反,当
γ
\gamma
γ为1时,意味着Agent对即时奖励和延时奖励一视同仁。
状态转换模型
所谓状态转换模型指的是在Agent在状态
s
s
s下执行动作
a
a
a时,系统状态转换到
s
′
s'
s′的概率,记作
P
s
s
′
a
P_{ss'}^a
Pss′a。在简单的路径规划问题中,状态转化不是以概率来表示的,如在状态(0,0)时,执行动作UP,则下个状态必然是(0,1)。但有一部分系统的状态转换模型并不是确定的,所以引入该要素来加以描述。
探索率
ϵ
\epsilon
ϵ
这个要素涉及到后续模型的迭代学习。简单来说模型会不断选择价值最大的动作来执行并更新价值,这本质是一个贪心算法。但为了增加模型对环境的探索率,增加最优解获得的可能性,故引入 探索率
ϵ
\epsilon
ϵ的概念,在模型进行迭代时,算法会以
ϵ
\epsilon
ϵ的概率进行贪婪,而以
1
−
ϵ
1-\epsilon
1−ϵ的概率随机选择动作。
强化学习的核心在于通过优化个体策略
π
\pi
π使得价值函数
v
π
(
s
)
v_\pi(s)
vπ(s)达到最大值。
而
v
π
(
s
)
v_\pi(s)
vπ(s)又被称为状态价值函数,这个价值是依据策略
π
\pi
π来进行行动而得到的回报的期望。
由上面的定义不难得出:
v
π
(
s
)
=
∑
a
π
(
a
∣
s
)
∗
Q
π
(
s
,
a
)
v_\pi(s)=\sum_a\pi(a|s)*Q_\pi(s,a)
vπ(s)=a∑π(a∣s)∗Qπ(s,a)
其中
Q
π
(
s
,
a
)
Q_\pi(s,a)
Qπ(s,a)被称为动作价值函数,即在某一状态采取某一动作收获的的价值的期望。
由上述对于强化学习要素的讨论不难得出:
Q
π
(
s
,
a
)
=
∑
s
′
P
s
s
′
a
[
R
s
s
′
+
γ
v
π
(
s
′
)
]
Q_\pi(s,a)=\sum_{s'}P_{ss'}^a[R_{ss'}+\gamma v_\pi (s')]
Qπ(s,a)=s′∑Pss′a[Rss′+γvπ(s′)]
简单解释一下上式,从动作价值函数定义的角度出发,与状态价值函数不同的是状态价值函数仅仅是对一个状态的价值的评估。而动作价值函数则更进一步,不仅固定了状态,还固定了该状态对应的动作,我们知道,在某一状态下采取某一动作的收益来自两个方面:其一是环境的即时奖励R;另一个则是延时奖励
v
π
(
s
′
)
v_\pi(s')
vπ(s′)。而对于一个普遍的系统来说,即使确定了动作,下一个状态也是以概率的形式出现的,故动作价值函数也要以期望的形式表现。上式中的
[
R
s
s
′
+
γ
v
π
(
s
′
)
]
[R_{ss'}+\gamma v_\pi (s')]
[Rss′+γvπ(s′)]就是在不考虑状态转换模型不定是的动作价值函数递推表达式。
结合上述两式可得:
v
π
(
s
)
=
∑
a
π
(
a
∣
s
)
∗
∑
s
′
P
s
s
′
a
[
R
s
s
′
+
γ
v
π
(
s
′
)
]
v_\pi(s)=\sum_a\pi(a|s)*\sum_{s'}P_{ss'}^a[R_{ss'}+\gamma v_\pi (s')]
vπ(s)=a∑π(a∣s)∗s′∑Pss′a[Rss′+γvπ(s′)]
强化学习的优化目标就是:
v
∗
(
s
)
=
max
π
v
π
(
s
)
v^*(s)=\max_{\pi}{v_\pi(s)}
v∗(s)=πmaxvπ(s)
Q
∗
(
s
,
a
)
=
max
π
Q
π
(
s
,
a
)
Q_*(s,a)=\max_\pi Q_\pi(s,a)
Q∗(s,a)=πmaxQπ(s,a)
最优策略为:
π
∗
(
s
,
a
)
=
{
1
if
a
=
a
r
g
max
a
∈
A
Q
∗
(
s
,
a
)
0
else
\pi_*(s,a)=
总结来说,强化学习的目标就是最优化价值函数,而后将最优策略定为每个状态下动作价值函数最大值所对应的动作即可。
通过上面的分析不难看出,只要优化
Q
π
(
s
,
a
)
Q_\pi(s,a)
Qπ(s,a)就可以得到最优策略,从数学上直接求取优化解是非常困难的。常用的优化方法非常类似于神经网络中的梯度下降,优化的迭代表达式如下:
Q
π
(
s
,
a
)
⟵
Q
π
(
s
,
a
)
+
l
r
∗
[
T
−
Q
π
(
s
,
a
)
]
Q_\pi(s,a)\longleftarrow Q_\pi(s,a)+lr*[T-Q_\pi(s,a)]
Qπ(s,a)⟵Qπ(s,a)+lr∗[T−Qπ(s,a)]
l
r
lr
lr为学习率,
T
T
T为TD目标,一般形式为
R
+
γ
Q
′
R+\gamma Q'
R+γQ′。有关TD目标的具体形式的不同也就形成了所谓Q_Learning和Sarsa的区别。
关于上式的收敛性证明由于时间有限,没有详细查找资料,这里CY,后续记得查看收敛性证明。
接下来的内容是我个人对于时间差分法、蒙特卡洛法、动态规划之间的区别理解:
时间差分法是一个单步更新的迭代算法–即执行一个动作,获取一次TD目标,更新一次Q。
而蒙特卡洛法在更新Q值时,则必须快速走完所有状态直到结束才会返回G值从而进行Q值的更新。
动态规划法则更像是堆积木,由底层开始逐步向顶层更新,只有底层的Q值更新完成才会更新上一层的Q值。
分享一个详细介绍三者区别的帖子:时序差分
Q_Learning的具体更新策略:
Q
π
(
s
,
a
)
⟵
Q
π
(
s
,
a
)
+
l
r
∗
[
R
s
s
′
+
γ
Q
max
a
(
s
′
,
a
)
−
Q
π
(
s
,
a
)
]
Q_\pi(s,a)\longleftarrow Q_\pi(s,a)+lr*[R_{ss'}+\gamma Q_{\max a}(s',a)-Q_\pi(s,a)]
Qπ(s,a)⟵Qπ(s,a)+lr∗[Rss′+γQmaxa(s′,a)−Qπ(s,a)]
在算法实现时,对于Agent当前所处的状态,算法会以Q值为参照按照
ϵ
\epsilon
ϵ-greedy思想选取动作。确定该动作的下一个状态后选取下一个状态的最大Q值作为
Q
max
a
(
s
′
,
a
)
Q_{\max a}(s',a)
Qmaxa(s′,a)来更新本次状态-动作对所对应的Q值。
Sarsa的具体更新策略:
Q
π
(
s
,
a
)
⟵
Q
π
(
s
,
a
)
+
l
r
∗
[
R
s
s
′
+
γ
Q
(
s
′
,
a
′
)
−
Q
π
(
s
,
a
)
]
Q_\pi(s,a)\longleftarrow Q_\pi(s,a)+lr*[R_{ss'}+\gamma Q(s',a')-Q_\pi(s,a)]
Qπ(s,a)⟵Qπ(s,a)+lr∗[Rss′+γQ(s′,a′)−Qπ(s,a)]
在算法实现时,对于Agent当前所处的状态,算法会以Q值为参照按照
ϵ
\epsilon
ϵ-greedy思想选取动作。确定该动作的下一个状态后再次按照
ϵ
\epsilon
ϵ-greedy思想选取动作
a
′
a'
a′,并以
s
′
s'
s′和
a
′
a'
a′所对应的Q值来更新上一个状态-动作对的Q值。
下面通过两个具体的例子来看一下上述算法的实现问题。
算法核心思想:
构造一个Q_table。记录每个状态下每个动作的动作价值函数
Q
π
(
s
,
a
)
Q_\pi(s,a)
Qπ(s,a),
ϵ
\epsilon
ϵ-greedy选取动作并按照
Q
π
(
s
,
a
)
⟵
Q
π
(
s
,
a
)
+
l
r
∗
[
R
s
s
′
+
γ
Q
max
a
(
s
′
,
a
)
−
Q
π
(
s
,
a
)
]
Q_\pi(s,a)\longleftarrow Q_\pi(s,a)+lr*[R_{ss'}+\gamma Q_{\max a}(s',a)-Q_\pi(s,a)]
Qπ(s,a)⟵Qπ(s,a)+lr∗[Rss′+γQmaxa(s′,a)−Qπ(s,a)]
更新Q_table,进行一定次数的更新后,从初始状态开始选择Q值最大的动作依次执行,直到最终目标。
算法具体实现:
1、初始化Q_table,用DataFrame实现,列族为代表动作的行动序列[0 1 2 3],行索引为Agent状态。
2、按照
ϵ
\epsilon
ϵ-greedy策略选取动作并执行,得到Agent的下一个状态。
3、检验新状态是否在Q_table中:如果不在则在Q_table中增加信息。随后检验下一个状态是否为目标状态或是强制结束状态(障碍物),若是,则返回结束标志Done。依据不同的状态返回R。
在本次测试中,R的设置情况如下:
R
=
{
−
100
if nextState in obstacle
100
if nextState is goal
−
1
else
R=
将nextState加入到路径字典d中。
4、依据Q_learning的更新策略更新Q_table。
5、依据返回的Done标志选择是否继续本次循环。若结束循环则将状态置为初始状态并跳到步骤2继续执行。如未结束循环则跳到步骤2继续执行。
源码如下:
from env import Environment from env import final_states from plotting import Plotting from agent_brain import SarsaTable # Resulted list for the plotting Episodes via Steps steps = [] # Summed costs for all episodes in resulted list all_costs = [] for episode in range(1000):#进行多次学习过程 # Initial Observation observation = env.reset() #将机器人放在(0,0)处并清空路径字典 # Updating number of Steps for each Episode i = 0 # Updating the cost for each episode cost = 0 while True:#重复执行直到达到目标节点或障碍物 # RL chooses action based on observation observation当前机器人的坐标位置 action = RL.choose_action(str(observation)) #寻找动作的依据为以一定概率选择目前状态下动作值函数最大的动作,以一定概率随机选择(随机选择的目的是增加探索率) # RL takes an action and get the next observation and reward observation_, reward, done = env.step(action) #将该动作执行,得到奖励值,下个状态以及是否结束寻路标志 # RL learns from this transition and calculating the cost cost += RL.learn(str(observation), action, reward, str(observation_))#计算整个过程中的cost,与算法无关,用作后续查看算法执行情况,不过在learn函数中完成了Q_table的更新 # Swapping the observations - current and next observation = observation_ #状态转移 # Calculating number of Steps in the current Episode i += 1 # Break while loop when it is the end of current Episode # When agent reached the goal or obstacle if done: steps += [i] all_costs += [cost] break # Showing the final route env.final() # Showing the Q-table with values for each action RL.print_q_table() # Plotting the results RL.plot_results(steps, all_costs) if __name__ == "__main__": x_start = (2, 2) # Starting node x_goal = (30, 20) # Goal node env = Environment(x_start,x_goal) #环境初始化 RL = SarsaTable(actions=list(range(env.n_actions)), learning_rate=0.1, reward_decay=0.9, e_greedy=0.9) #初始化 update() #学习过程 plotting = Plotting(x_start, x_goal,env) #初始化绘图工具 path=list(final_states().values()) #获得路径 path.insert(0,x_start) if path: plotting.animation([], path, "Q_Learning", True) else: print("No Path Found!")
算法关键步骤全部体现在update()函数中,代码内已做详细注释。
下面的是每个过程具体实现的代码
# Importing libraries import numpy as np import pandas as pd import matplotlib.pyplot as plt # Importing function from the env.py from env import final_states # Creating class for the Q-learning table class QLearningTable: def __init__(self, actions, learning_rate=0.01, reward_decay=0.9, e_greedy=0.9): # List of actions self.actions = actions # Learning rate self.lr = learning_rate # Value of gamma self.gamma = reward_decay # Value of epsilon self.epsilon = e_greedy # Creating full Q-table for all cells self.q_table = pd.DataFrame(columns=self.actions, dtype=np.float64) # Creating Q-table for cells of the final route self.q_table_final = pd.DataFrame(columns=self.actions, dtype=np.float64) # Function for choosing the action for the agent def choose_action(self, observation): # Checking if the state exists in the table self.check_state_exist(observation) # Selection of the action - 90 % according to the epsilon == 0.9 # Choosing the best action if np.random.uniform() < self.epsilon: state_action = self.q_table.loc[observation, :] #提取当前状态下所有动作的价值函数 # print(state_action) state_action = state_action.reindex(np.random.permutation(state_action.index))#打乱顺序,避免每次选择的动作都为序号偏前的动作 # print(state_action) action = state_action.idxmax() else: # Choosing random action - left 10 % for choosing randomly action = np.random.choice(self.actions) return action # Function for learning and updating Q-table with new knowledge def learn(self, state, action, reward, next_state): # Checking if the next step exists in the Q-table 如果不在Q-table中则将其加入到Q-table中 self.check_state_exist(next_state) # Current state in the current position q_predict = self.q_table.loc[state, action] #预测的Q值,即目前Q_table内存储的Q值 # Checking if the next state is free or it is obstacle or goal if next_state != 'goal' or next_state != 'obstacle': q_target = reward + self.gamma * self.q_table.loc[next_state, :].max() #实际最大值 由动作奖励以及下一状态的最大Q值×折损率组成 else: q_target = reward # Updating Q-table with new knowledge self.q_table.loc[state, action] += self.lr * (q_target - q_predict) #更新Q值 return self.q_table.loc[state, action] # Adding to the Q-table new states def check_state_exist(self, state): if state not in self.q_table.index: self.q_table = self.q_table.append( pd.Series( [0]*len(self.actions), index=self.q_table.columns, name=state, ) ) # Printing the Q-table with states def print_q_table(self): # Getting the coordinates of final route from env.py e = final_states() # Comparing the indexes with coordinates and writing in the new Q-table values for i in range(len(e)): state = str(e[i]) # state = '[5.0, 40.0]' # Going through all indexes and checking for j in range(len(self.q_table.index)): if self.q_table.index[j] == state: self.q_table_final.loc[state, :] = self.q_table.loc[state, :] print() print('Length of final Q-table =', len(self.q_table_final.index)) print('Final Q-table with values from the final route:') print(self.q_table_final) print() print('Length of full Q-table =', len(self.q_table.index)) print('Full Q-table:') print(self.q_table) # Plotting the results for the number of steps def plot_results(self, steps, cost): # f, (ax1, ax2) = plt.subplots(nrows=1, ncols=2) # ax1.plot(np.arange(len(steps)), steps, 'b') ax1.set_xlabel('Episode') ax1.set_ylabel('Steps') ax1.set_title('Episode via steps') # ax2.plot(np.arange(len(cost)), cost, 'r') ax2.set_xlabel('Episode') ax2.set_ylabel('Cost') ax2.set_title('Episode via cost') plt.tight_layout() # Function to make distance between figures # plt.figure() plt.plot(np.arange(len(steps)), steps, 'b') plt.title('Episode via steps') plt.xlabel('Episode') plt.ylabel('Steps') # plt.figure() plt.plot(np.arange(len(cost)), cost, 'r') plt.title('Episode via cost') plt.xlabel('Episode') plt.ylabel('Cost') # Showing the plots plt.show()
环境:
Importing libraries import math import config # Global variable for dictionary with coordinates for the final route a = {} # Creating class for the environment class Environment: def __init__(self,start,goal): self.action_space = ['up', 'down', 'left', 'right'] self.n_actions = len(self.action_space) self.build_environment() self.start=start self.goal=goal self.coords=start # Dictionaries to draw the final route self.d = {} self.f = {} # Key for the dictionaries self.i = 0 # Writing the final dictionary first time self.c = True # Showing the steps for longest found route self.longest = 0 # Showing the steps for the shortest route self.shortest = 0 # Function to build the environment def build_environment(self): #配置文件 self.con=config.Config() #环境的x范围 self.x_range = eval(self.con.range['x']) #环境的y范围 self.y_range = eval(self.con.range['y']) #环境的边界 self.obs_boundary = eval(self.con.obs['bound']) #环境的矩形障碍 self.obs_circle = eval(self.con.obs['cir']) #环境的圆形障碍 self.obs_rectangle = eval(self.con.obs['rec']) # Function to reset the environment and start new Episode def reset(self): # Updating agent self.coords=self.start #将坐标置为起点 # # Clearing the dictionary and the i self.d = {} self.i = 0 # Return observation return self.coords # Function to get the next observation and reward by doing next step def step(self, action): # Current state of the agent state = self.coords base_action = [0,0] # Updating next state according to the action # Action 'up' if action == 0: if state[1]<self.obs_boundary[1][1]: base_action[1]+=1 # Action 'down' elif action == 1: if state[1]>1: base_action[1]-=1 # Action right elif action == 2: if state[0]<self.obs_boundary[1][2]: base_action[0]+=1 # Action left elif action == 3: if state[0]>1: base_action[0]-=1 # Moving the agent according to the action self.coords=(self.coords[0]+base_action[0],self.coords[1]+base_action[1]) # Writing in the dictionary coordinates of found route self.d[self.i] = self.coords # Updating next state next_state = self.d[self.i] # Updating key for the dictionary self.i += 1 # Calculating the reward for the agent if next_state == self.goal: reward = 100 done = True next_state = 'goal' # Filling the dictionary first time if self.c == True: for j in range(len(self.d)): self.f[j] = self.d[j] self.c = False self.longest = len(self.d) self.shortest = len(self.d) # Checking if the currently found route is shorter if len(self.d) < len(self.f): # Saving the number of steps for the shortest route self.shortest = len(self.d) # Clearing the dictionary for the final route self.f = {} # Reassigning the dictionary for j in range(len(self.d)): self.f[j] = self.d[j] # Saving the number of steps for the longest route if len(self.d) > self.longest: self.longest = len(self.d) elif self.is_collision(next_state): reward = -100 done = True next_state = 'obstacle' # Clearing the dictionary and the i self.d = {} self.i = 0 else: reward = -1 done = False return next_state, reward, done # Function to refresh the environment def render(self): #time.sleep(0.03) self.update() # Function to show the found route def final(self): # Deleting the agent at the end # Showing the number of steps print('The shortest route:', self.shortest) print('The longest route:', self.longest) # Filling the route for j in range(len(self.f)): # Showing the coordinates of the final route print(self.f[j]) a[j] = self.f[j] def is_collision(self,state): delta = 0.5 #判断是否在圆形障碍物中 for (x, y, r) in self.obs_circle: if math.hypot(state[0] - x, state[1] - y) <= r + delta: return True #判断是否在矩形障碍物中 for (x, y, w, h) in self.obs_rectangle: if 0 <= state[0] - (x - delta) <= w + 2 * delta \ and 0 <= state[1] - (y - delta) <= h + 2 * delta: return True #判断是否在边界中 for (x, y, w, h) in self.obs_boundary: if 0 <= state[0] - (x - delta) <= w + 2 * delta \ and 0 <= state[1] - (y - delta) <= h + 2 * delta: return True return False # Returning the final dictionary with route coordinates # Then it will be used in agent_brain.py def final_states(): return a # This we need to debug the environment # If we want to run and see the environment without running full algorithm if __name__ == '__main__': env = Environment() env.mainloop()
源码Github:Reinforcement-Learning_Path-Planning
基本思想与Q_Learning完全一致,大家都是维护一个全局的Q_table并不断学习更新Q_table,区别在与更新Q_table的策略稍有不同,两种方法的TD目标存在差异。
在此处不再详述,源码也放在:Reinforcement-Learning_Path-Planning
可以对比Q_learning和Sarsa中的update()发现两者之间的差别。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。