当前位置:   article > 正文

动态规划法在扫地机器人中的实战应用(基于动作值函数的策略迭代 python 附源码)_自制扫地机器人编程和代码

自制扫地机器人编程和代码

需要源码或觉得有帮助请点赞关注收藏后评论区留下QQ邮箱或者私信博主

与基于状态值函数的策略迭代不同,基于动作值函数的策略迭代是在当前策略下用另一个式子进行评估。

关于条件描述和环境搭建可以参考我这篇博客扫地机器人简介

算法步骤如下

下面通过基于动作值函数的策略迭代算法应用于确定环境的扫地机器人任务中,经过多轮迭代后,得到下图中动作值函数和策略迭代的更新过程

 

 代码运行结果如下 经过五次迭代逐渐收敛

 部分代码如下

  1. # 代11-例4.7-基于动作值函数的确定环境扫地机器人任务策略迭代
  2. import numpy as np
  3. world_h = 5
  4. world_w = 5
  5. length = world_h * world_w
  6. gamma = 0.8
  7. state = [i for i in range(length)] # 状态(编号)
  8. action = ['n', 's', 'w', 'e'] # 动作名称
  9. ds_action = {'n': -world_w, 'e': 1, 's': world_w, 'w': -1}
  10. value = [0 for i in range(length)] # 初始化状态值函数,均为0. [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
  11. policy = np.zeros([length, len(action)])
  12. suqe=[20, 21, 22, 23, 24, 15, 16, 17, 18, 19, 10, 11, 12, 13, 14, 5, 6, 7, 8, 9, 0, 1, 2, 3,4]
  13. # 定义奖励
  14. def reward(s):
  15. if s == 20: # 到充电站
  16. return 1
  17. elif s == 12: # 到陷阱中
  18. return -10
  19. elif s == 9: # 到垃圾处
  20. return 3
  21. else:
  22. return 0 # 其他
  23. # in表示0是[***]中的一个
  24. def getAction(a):
  25. if a == 'n':
  26. return 0
  27. elif a == 'e':
  28. return 3
  29. elif a == 's':
  30. return 1
  31. elif a == 'w':
  32. return 2
  33. # 在s状态下执行动作a,返回下一状态(编号)
  34. def next_states(s, a):
  35. # 越过边界时pass
  36. if (s < world_w and a == 'n') \
  37. or (s % world_w == 0 and a == 'w') \
  38. or (s > length - world_w - 1 and a == 's') \
  39. or ((s + 1) % world_w == 0 and a == 'e'): # (s % (world_w - 1) == 0 and a == 'e' and s != 0)
  40. next_state = s # 表现为next_state不变
  41. else:
  42. next_state = s + ds_action[a] # 进入下一个状态
  43. return next_state
  44. # 在s状态下执行动作,返回所有可能的下一状态(编号)list
  45. def getsuccessor(s):
  46. successor = []
  47. for a in action: # 遍历四个动作
  48. if s == next_states(s, a):
  49. continue
  50. else:
  51. # print("状态s=%s,动作a=%s"%(s,a))
  52. next = next_states(s, a) # 得到下一个状态(编号)
  53. successor.append(next) # 以list保存当前状态s下执行四个动作的下一状态
  54. # print(len(successor))
  55. return successor
  56. def initPolicy():
  57. for s in range(length):
  58. for a in action:
  59. if next_states(s, a) == s:
  60. continue
  61. newAction = getAction(a)
  62. policy[s][newAction] = 1 / len(getsuccessor(s))
  63. # print(policy)
  64. def Caculate_Q(s, V, num, discount_factor=0.8):
  65. """
  66. Returns:
  67. A vector of length env.nA containing the expected value of each action.
  68. """
  69. Q = np.zeros((length, 4))
  70. Q[:][:] = -100
  71. for a in action: # 遍历能走的动作
  72. # for prob, next_state, reward, done in env.P[s][a]:
  73. # Q[s][a] += prob * (reward + discount_factor * V[next_state]) # 计算当前状态s下的动作值函数列表 [q1,q2,q3,q4]
  74. next_state = next_states(s, a)
  75. if next_state == s: # 碰壁
  76. continue
  77. rewards = reward(next_state)
  78. numberA = getAction(a)
  79. if s == 12:
  80. Q[s][numberA] = rewards + discount_factor * V[s]
  81. else:
  82. Q[s][numberA] = rewards + discount_factor * V[next_state]
  83. action_name1 = ""
  84. if (numberA == 0):
  85. action_name1 = "UP"
  86. if (numberA == 1):
  87. action_name1 = "DOWN"
  88. if (numberA == 2):
  89. action_name1 = "LEFT"
  90. if (numberA == 3):
  91. action_name1 = "RIGHT"
  92. print("Q[%s][%s] %f = %s + 0.8 * %.2f:" % (num, action_name1, Q[s][numberA], rewards, V[next_state]))
  93. return Q
  94. def max_index(lst_int):#最大值索引函数
  95. index = []
  96. max_n = max(lst_int)
  97. for i in range(len(lst_int)):
  98. if lst_int[i] == max_n:
  99. index.append(i)
  100. return index #返回一个列表
  101. def list_not_equal(old,new):#判断list是否相等
  102. listequal=False
  103. for i in range(0,4):
  104. if old[i]!=new[i]:
  105. listequal=True
  106. break
  107. return listequal
  108. def policy_eval(theta=0.0001):
  109. V = np.zeros(length) # 初始化状态值函数列表
  110. iter = 0
  111. while True:
  112. k = -1
  113. delta = 0 # 定义最大差值,判断是否有进行更新
  114. for s in suqe: # 遍历所有状态 [0~25]
  115. k += 1
  116. if s in [9, 20, 12]: # 若当前状态为吸入状态,则直接pass不做操作
  117. continue
  118. v = 0 # 针对每个状态值函数进行计算
  119. # print("第%d 的状态" % (k), end="")
  120. for a in action:
  121. newAction = getAction(a)
  122. next_state = next_states(s, a)
  123. rewards = reward(next_state)
  124. if next_state == 12:
  125. v += policy[s][newAction] * (rewards + gamma * V[s])
  126. else:
  127. v += policy[s][newAction] * (rewards + gamma * V[next_state])
  128. delta = max(delta, np.abs(v - V[s])) # 更新差值
  129. V[s] = v # 存储(更新)每个状态下的状态值函数,即伪代码中的 v <- V(s)
  130. value = np.array(V).reshape(world_h, world_w)
  131. iter += 1
  132. if delta < theta: # 策略评估的迭代次数不能太多,否则状态值函数的数值会越来越大(即使算法仍然在收敛)
  133. break
  134. return V # 一轮迭代结束后,状态值函数暂时固定
  135. def policy_improvement(V, policy): # 策略改进
  136. """
  137. Returns:
  138. policy: the optimal policy, a matrix of shape [S, A] where each state s contains a valid probability distribution over actions.
  139. V: the value function for the optimal policy.
  140. """
  141. k = -1
  142. policy_stable = True # Will be set to false if we make any changes to the policy
  143. for s in [20, 21, 22, 23, 24, 15, 16, 17, 18, 19, 10, 11, 12, 13, 14, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4]:
  144. if s in [9, 20, 12]:
  145. k += 1
  146. continue;
  147. k += 1
  148. [i]
  149. return V, policy_stable
  150. def Policy_Iterration_byQ():
  151. initPolicy()
  152. k = 1
  153. while True: # Evaluate the current policy
  154. print("迭代的次数",k)
  155. Q = policy_eval()
  156. print(Q)
  157. Q,policy_stable = policy_improvement(Q,policy)
  158. print("*" * 100)
  159. k+=1
  160. if policy_stable: # # If the policy is stable we've found an optimal policy. Return it
  161. return policy, Q
  162. Policy_Iterration_byQ()

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/560116
推荐阅读
相关标签
  

闽ICP备14008679号