当前位置:   article > 正文

强化学习系列(二):Q learning算法简介及python实现Q learning求解TSP问题_强化学习 tsp

强化学习 tsp

目录

一、什么是Q learning算法?

1.Q table

2.Q-learning算法伪代码

二、Q-Learning求解TSP的python实现

1)问题定义     

2)创建TSP环境

3)定义 DeliveryQAgent类

4)定义每个episode下agent学习的过程

5)   定义训练的主函数

6)实验结果

 1. 环境创建

 2.实例化agent类

 3.agent训练学习


一、什么是Q learning算法?

         Q-learning算法非常适合新手入门理解强化学习,它是最容易编码和理解的。 Q-learning算法是一种model-free、off-policy/value_based的强化学习算法,即不考虑环境的特征,通过Q函数寻找最优的动作选择策略。Q值(action value function)计算的是当前状态下采取该动作的未来奖励期望,公式推导如下:

 

更多强化学习基本原理和概念见强化学习系列(一):基本原理和概念

1.Q table

        Q代表quality,即动作的质量。创建一个表格Q,将state-action-Q估计值存储进去,通过检索Q表,就能获取在当前state下选取各个action能够获得的未来奖励期望的估计值,Q-learning中最核心的就是不停更新Q表给出越来越好的近似。

2.Q-learning算法伪代码

步骤一:创建并初始化一个action-space*state space大小的Q表,一般初始化设置所有值为0;

步骤二:进入循环,直到达到迭代条件:

        步骤三:检索Q表,在当前状态 s下根据Q的估计值和Policy选择一个action a;

        步骤四:执行action a,检索Q表,转移到的状态s{}'对应的Q最大值加上该动作得到的实时奖励reward是状态 s价值的真实值;

        步骤五:根据贝尔曼方程更新Q表。

那么,开始时Q值都为0,我们该怎么选择下一个动作呢?这个时候就体现Policy的重要性了,常见做法是引入一个参数\varepsilon,取值在0-1之间,\varepsilon体现了探索/利用(exploration/exploitation)的权衡。\varepsilon越大,随机性/探索性越强,通常初始情况下\varepsilon接近或等于1随机选择下一个动作进行大量的探索;随着agent的不断学习,对Q的估计越来越准确,我们将逐渐减小\varepsilon的值,更多依赖利用当前的Q值。

二、Q-Learning求解TSP的python实现

1)问题定义     

        旅行商问题( TSP)是一个典型的优化问题,目的是找到访问各个城市的最短路线。要使用RL的方法解决TSP问题,就需要把TSP问题转化为RL问题,定义RL的各要素:

        agent:送货人;

        environment:要交付的商品和要访问的城市节点位置;

        state:当前送货员所在的城市节点;

        action:在每个节点要做出的决策,下一步去哪一个节点;

        reward:实时的奖励,两个节点之间的距离多长。

RL的目标goal是使得reward的求和最大,即访问路线的距离最短。

2)创建TSP环境

      在python中创建一个简单的TSP环境非常简单,指定城市节点数量,随机生成城市节点坐标;并计算不同城市间的距离作为reward值。python具体实现代码如下,创建了一个DeliveryEnvironment类,默认的城市节点数是10个,随机选择一个节点作为出发点,定义了一个画图的函数展示TSP的Environment。

  1. #导入需要的包
  2. import pandas as pd
  3. import numpy as np
  4. import matplotlib.pyplot as plt
  5. import os
  6. import time
  7. from tqdm import tqdm_notebook
  8. from scipy.spatial.distance import cdist
  9. import imageio
  10. from matplotlib.patches import Rectangle
  11. from matplotlib.collections import PatchCollection
  12. plt.style.use("seaborn-dark")
  13. import sys
  14. sys.path.append("../")
  15. from rl.agents.q_agent import QAgent
  16. class DeliveryEnvironment(object): #初始化环境
  17. def __init__(self,n_stops = 10,max_box = 10,method = "distance",**kwargs):
  18. print(f"Initialized Delivery Environment with {n_stops} random stops")
  19. print(f"Target metric for optimization is {method}")
  20. # 参数初始化
  21. self.n_stops = n_stops
  22. self.action_space = self.n_stops
  23. self.observation_space = self.n_stops
  24. self.max_box = max_box
  25. self.stops = []
  26. self.method = method
  27. # 产生城市节点
  28. self._generate_constraints(**kwargs)
  29. self._generate_stops()
  30. self._generate_q_values()
  31. self.render()
  32. # 初始化环境
  33. self.reset()
  34. def _generate_stops(self): #产生城市节点
  35. # Generate geographical coordinates
  36. xy = np.random.rand(self.n_stops,2)*self.max_box #产生客户点坐标
  37. self.x = xy[:,0]
  38. self.y = xy[:,1]
  39. def _generate_q_values(self,box_size = 0.2): #计算不同节点之间的距离充当reward
  40. # Generate actual Q Values corresponding to time elapsed between two points
  41. if self.method in ["distance"]:
  42. xy = np.column_stack([self.x,self.y])
  43. self.q_stops = cdist(xy,xy) #计算距离矩阵充当reward
  44. else:
  45. raise Exception("Method not recognized")
  46. #画图的函数
  47. def render(self,return_img = False):
  48. fig = plt.figure(figsize=(7,7))
  49. ax = fig.add_subplot(111)
  50. ax.set_title("Delivery Stops")
  51. # Show stops
  52. ax.scatter(self.x,self.y,c = "red",s = 50)
  53. # Show START
  54. if len(self.stops)>0:
  55. xy = self._get_xy(initial = True) #生成的第一个点作为start点,文本位置在xy[1]-0.05
  56. xytext = xy[0]+0.1,xy[1]-0.05
  57. ax.annotate("START",xy=xy,xytext=xytext,weight = "bold")
  58. # Show itinerary
  59. if len(self.stops) > 1:
  60. ax.plot(self.x[self.stops],self.y[self.stops],c = "blue",linewidth=1,linestyle="--")
  61. # 路径结尾要回到出发点
  62. xy = self._get_xy(initial = False)
  63. xytext = xy[0]+0.1,xy[1]-0.05
  64. ax.annotate("END",xy=xy,xytext=xytext,weight = "bold")
  65. if hasattr(self,"box"):
  66. left,bottom = self.box[0],self.box[2]
  67. width = self.box[1] - self.box[0]
  68. height = self.box[3] - self.box[2]
  69. rect = Rectangle((left,bottom), width, height)
  70. collection = PatchCollection([rect],facecolor = "red",alpha = 0.2)
  71. ax.add_collection(collection)
  72. plt.xticks([])
  73. plt.yticks([])
  74. if return_img:
  75. # From https://ndres.me/post/matplotlib-animated-gifs-easily/
  76. fig.canvas.draw_idle()
  77. image = np.frombuffer(fig.canvas.tostring_rgb(), dtype='uint8')
  78. image = image.reshape(fig.canvas.get_width_height()[::-1] + (3,))
  79. plt.close()
  80. return image
  81. else:
  82. plt.show()
  83. #重置进入下一轮迭代
  84. def reset(self):
  85. # Stops placeholder
  86. self.stops = []
  87. # Random first stop
  88. first_stop = np.random.randint(self.n_stops) #随机生成第一个初始节点
  89. self.stops.append(first_stop)
  90. return first_stop
  91. #根据reward选择下一个动作
  92. def step(self,destination):
  93. # Get current state 得到当前的state
  94. state = self._get_state()
  95. new_state = destination
  96. # Get reward for such a move 每个action得到reward
  97. reward = self._get_reward(state,new_state)
  98. # Append new_state to stops 进入下一个state
  99. self.stops.append(destination)
  100. done = len(self.stops) == self.n_stops
  101. return new_state,reward,done
  102. #得到当前状态 即当前到达的节点位置
  103. def _get_state(self):
  104. return self.stops[-1]
  105. #得到每个坐标的X和Y值
  106. def _get_xy(self,initial = False):
  107. state = self.stops[0] if initial else self._get_state()
  108. x = self.x[state]
  109. y = self.y[state]
  110. return x,y
  111. #定义reward函数
  112. def _get_reward(self,state,new_state): #
  113. base_reward = self.q_stops[state,new_state] #base_reward是两个节点之间的距离
  114. if self.method == "distance":
  115. return base_reward
  116. @staticmethod
  117. def _calculate_point(x1,x2,y1,y2,x = None,y = None):
  118. if y1 == y2:
  119. return y1
  120. elif x1 == x2:
  121. return x1
  122. else:
  123. a = (y2-y1)/(x2-x1)
  124. b = y2 - a * x2
  125. if x is None:
  126. x = (y-b)/a
  127. return x
  128. elif y is None:
  129. y = a*x+b
  130. return y
  131. else:
  132. raise Exception("Provide x or y")

3)定义 DeliveryQAgent类

        决定选择下一个节点的Policy,随机产生一个0-1之间的值,如果比\varepsilon大,选择Q值最大的action,否则随机选择一个未访问过的节点去访问。

  1. class DeliveryQAgent(QAgent):
  2. def __init__(self,*args,**kwargs):
  3. super().__init__(*args,**kwargs)
  4. self.reset_memory()
  5. def act(self,s):
  6. # Get Q Vector copyq表
  7. q = np.copy(self.Q[s,:])
  8. # Avoid already visited states 屏蔽已经走过的节点 这个地方应该可以加约束条件起到mask一些节点的作用
  9. q[self.states_memory] = -np.inf
  10. if np.random.rand() > self.epsilon:
  11. a = np.argmax(q)
  12. else:
  13. a = np.random.choice([x for x in range(self.actions_size) if x not in self.states_memory])
  14. return a
  15. def remember_state(self,s):
  16. self.states_memory.append(s)
  17. def reset_memory(self):
  18. self.states_memory = []

4)定义每个episode下agent学习的过程

       每一次迭代需要将环境reset到初始状态,随机选择一个节点作为初始节点,然后根据Policy不断去选择下一个节点并更新Q值。

  1. #每一个episode学习的函数
  2. def run_episode(env,agent,verbose = 1):
  3. s = env.reset()
  4. agent.reset_memory()
  5. max_step = env.n_stops
  6. episode_reward = 0
  7. i = 0
  8. while i < max_step: #节点个数
  9. # Remember the states 存储已经走过的点
  10. agent.remember_state(s)
  11. # Choose an action 选择一个action
  12. a = agent.act(s)
  13. # Take the action, and get the reward from environment 得到一个reward
  14. s_next,r,done = env.step(a)
  15. # Tweak the reward 加负号最小化问题变成最大化问题
  16. r = -1 * r
  17. if verbose: print(s_next,r,done)
  18. # Update our knowledge in the Q-table 更新reward在Q表中
  19. agent.train(s,a,r,s_next)
  20. # Update the caches 累加reward
  21. episode_reward += r
  22. s = s_next
  23. # If the episode is terminated
  24. i += 1
  25. if done:
  26. break
  27. return env,agent,episode_reward

 在导入的QAgent.py里定义了更新Q值的函数如下,也就是贝尔曼公式,通常epsilon_decay小于1,也就是随着学习的不断进行,\varepsilon的值在不断减小,探索性降低。

  1. def train(self,s,a,r,s_next):
  2. self.Q[s,a] = self.Q[s,a] + self.lr * (r + self.gamma*np.max(self.Q[s_next,a]) - self.Q[s,a])
  3. if self.epsilon > self.epsilon_min:
  4. self.epsilon *= self.epsilon_decay

5)定义训练的主函数

      并输出训练过程的动图及迭代训练过程中reward值得变化趋势。

  1. #模型训练的函数
  2. def run_n_episodes(env,agent,name="training.gif",n_episodes=1000,render_each=10,fps=10): #训练1000次,10次画图一次
  3. # Store the rewards 存储下reward和图形
  4. rewards = []
  5. imgs = []
  6. # Experience replay
  7. for i in tqdm_notebook(range(n_episodes)):
  8. # Run the episode 迭代学习
  9. env,agent,episode_reward = run_episode(env,agent,verbose = 0)
  10. rewards.append(episode_reward)
  11. if i % render_each == 0:
  12. img = env.render(return_img = True)
  13. imgs.append(img)
  14. # Show rewards 画出reward的变化趋势
  15. plt.figure(figsize = (15,3))
  16. plt.title("Rewards over training")
  17. plt.plot(rewards)
  18. plt.show()
  19. # Save imgs as gif
  20. imageio.mimsave(name,imgs,fps = fps) #输出动图,fps是帧率(每秒播放的帧数)
  21. return env,agent

 6)实验结果

        调用以上定义的类和函数,现简单实现一个规模为500个旅行商的TSP问题。

       1. 环境创建:

          指定规模为500,选定衡量reward的方法是distance;

env = DeliveryEnvironment(n_stops = 500,method = "distance")  #随机生成500个节点

        输出环境如下:

 还可以通过在各个走过的节点之间画线可视化路径,距离画出路径的前几个点:

  1. for i in [0,1,2,3]: #画出接下来几步要走的路径
  2. env.step(i)
  3. env.render()

        2.实例化agent类

agent = DeliveryQAgent(env.observation_space,env.action_space)  #env.observation_space和action_space都是节点数

        3.agent训练学习

        默认迭代学习1000次,可以根据需求更改;记录训练的时间。

  1. start=time.time()
  2. run_n_episodes(env,agent,"training_500_stops.gif") #训练1000次reward的变化趋势 前400次基本上是在随机选择一个节点行走,后面就用到了之前行走的经验,Q表中没有的就设置reward,有的就更新
  3. end=time.time()
  4. print('运行时间',end-start)

        模型输出的reward变化趋势及运行时间如下:

         前400次迭代,基本都在探索新的不同的路线,随机性很强;400次迭代往后,agent开始利用自己所学到的东西,越来越少的采取随机行动而是倾向于选择Q值最大的行动;800次左右开始基本收敛到一个可接受的路线。

           迭代训练过程的动图也能反映出开始杂乱无章不断探索然后趋于收敛到一个可接受的路线。

由于找我要代码的人太多,不一一回复了,全部代码已上传至我的资源——q-learning for tsp,需要自取。

更新github地址如下:

https://github.com/TheoLvs/reinforcement-learning/tree/master/5.%20Delivery%20Optimization

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/647600
推荐阅读
相关标签
  

闽ICP备14008679号