当前位置:   article > 正文

动态规划法在扫地机器人中的实战应用(基于动作值函数的策略迭代 python 附源码)_自制扫地机器人编程和代码








 代码运行结果如下 经过五次迭代逐渐收敛


  1. # 代11-例4.7-基于动作值函数的确定环境扫地机器人任务策略迭代
  2. import numpy as np
  3. world_h = 5
  4. world_w = 5
  5. length = world_h * world_w
  6. gamma = 0.8
  7. state = [i for i in range(length)] # 状态(编号)
  8. action = ['n', 's', 'w', 'e'] # 动作名称
  9. ds_action = {'n': -world_w, 'e': 1, 's': world_w, 'w': -1}
  10. value = [0 for i in range(length)] # 初始化状态值函数,均为0. [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
  11. policy = np.zeros([length, len(action)])
  12. suqe=[20, 21, 22, 23, 24, 15, 16, 17, 18, 19, 10, 11, 12, 13, 14, 5, 6, 7, 8, 9, 0, 1, 2, 3,4]
  13. # 定义奖励
  14. def reward(s):
  15. if s == 20: # 到充电站
  16. return 1
  17. elif s == 12: # 到陷阱中
  18. return -10
  19. elif s == 9: # 到垃圾处
  20. return 3
  21. else:
  22. return 0 # 其他
  23. # in表示0是[***]中的一个
  24. def getAction(a):
  25. if a == 'n':
  26. return 0
  27. elif a == 'e':
  28. return 3
  29. elif a == 's':
  30. return 1
  31. elif a == 'w':
  32. return 2
  33. # 在s状态下执行动作a,返回下一状态(编号)
  34. def next_states(s, a):
  35. # 越过边界时pass
  36. if (s < world_w and a == 'n') \
  37. or (s % world_w == 0 and a == 'w') \
  38. or (s > length - world_w - 1 and a == 's') \
  39. or ((s + 1) % world_w == 0 and a == 'e'): # (s % (world_w - 1) == 0 and a == 'e' and s != 0)
  40. next_state = s # 表现为next_state不变
  41. else:
  42. next_state = s + ds_action[a] # 进入下一个状态
  43. return next_state
  44. # 在s状态下执行动作,返回所有可能的下一状态(编号)list
  45. def getsuccessor(s):
  46. successor = []
  47. for a in action: # 遍历四个动作
  48. if s == next_states(s, a):
  49. continue
  50. else:
  51. # print("状态s=%s,动作a=%s"%(s,a))
  52. next = next_states(s, a) # 得到下一个状态(编号)
  53. successor.append(next) # 以list保存当前状态s下执行四个动作的下一状态
  54. # print(len(successor))
  55. return successor
  56. def initPolicy():
  57. for s in range(length):
  58. for a in action:
  59. if next_states(s, a) == s:
  60. continue
  61. newAction = getAction(a)
  62. policy[s][newAction] = 1 / len(getsuccessor(s))
  63. # print(policy)
  64. def Caculate_Q(s, V, num, discount_factor=0.8):
  65. """
  66. Returns:
  67. A vector of length env.nA containing the expected value of each action.
  68. """
  69. Q = np.zeros((length, 4))
  70. Q[:][:] = -100
  71. for a in action: # 遍历能走的动作
  72. # for prob, next_state, reward, done in env.P[s][a]:
  73. # Q[s][a] += prob * (reward + discount_factor * V[next_state]) # 计算当前状态s下的动作值函数列表 [q1,q2,q3,q4]
  74. next_state = next_states(s, a)
  75. if next_state == s: # 碰壁
  76. continue
  77. rewards = reward(next_state)
  78. numberA = getAction(a)
  79. if s == 12:
  80. Q[s][numberA] = rewards + discount_factor * V[s]
  81. else:
  82. Q[s][numberA] = rewards + discount_factor * V[next_state]
  83. action_name1 = ""
  84. if (numberA == 0):
  85. action_name1 = "UP"
  86. if (numberA == 1):
  87. action_name1 = "DOWN"
  88. if (numberA == 2):
  89. action_name1 = "LEFT"
  90. if (numberA == 3):
  91. action_name1 = "RIGHT"
  92. print("Q[%s][%s] %f = %s + 0.8 * %.2f:" % (num, action_name1, Q[s][numberA], rewards, V[next_state]))
  93. return Q
  94. def max_index(lst_int):#最大值索引函数
  95. index = []
  96. max_n = max(lst_int)
  97. for i in range(len(lst_int)):
  98. if lst_int[i] == max_n:
  99. index.append(i)
  100. return index #返回一个列表
  101. def list_not_equal(old,new):#判断list是否相等
  102. listequal=False
  103. for i in range(0,4):
  104. if old[i]!=new[i]:
  105. listequal=True
  106. break
  107. return listequal
  108. def policy_eval(theta=0.0001):
  109. V = np.zeros(length) # 初始化状态值函数列表
  110. iter = 0
  111. while True:
  112. k = -1
  113. delta = 0 # 定义最大差值,判断是否有进行更新
  114. for s in suqe: # 遍历所有状态 [0~25]
  115. k += 1
  116. if s in [9, 20, 12]: # 若当前状态为吸入状态,则直接pass不做操作
  117. continue
  118. v = 0 # 针对每个状态值函数进行计算
  119. # print("第%d 的状态" % (k), end="")
  120. for a in action:
  121. newAction = getAction(a)
  122. next_state = next_states(s, a)
  123. rewards = reward(next_state)
  124. if next_state == 12:
  125. v += policy[s][newAction] * (rewards + gamma * V[s])
  126. else:
  127. v += policy[s][newAction] * (rewards + gamma * V[next_state])
  128. delta = max(delta, np.abs(v - V[s])) # 更新差值
  129. V[s] = v # 存储(更新)每个状态下的状态值函数,即伪代码中的 v <- V(s)
  130. value = np.array(V).reshape(world_h, world_w)
  131. iter += 1
  132. if delta < theta: # 策略评估的迭代次数不能太多,否则状态值函数的数值会越来越大(即使算法仍然在收敛)
  133. break
  134. return V # 一轮迭代结束后,状态值函数暂时固定
  135. def policy_improvement(V, policy): # 策略改进
  136. """
  137. Returns:
  138. policy: the optimal policy, a matrix of shape [S, A] where each state s contains a valid probability distribution over actions.
  139. V: the value function for the optimal policy.
  140. """
  141. k = -1
  142. policy_stable = True # Will be set to false if we make any changes to the policy
  143. for s in [20, 21, 22, 23, 24, 15, 16, 17, 18, 19, 10, 11, 12, 13, 14, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4]:
  144. if s in [9, 20, 12]:
  145. k += 1
  146. continue;
  147. k += 1
  148. [i]
  149. return V, policy_stable
  150. def Policy_Iterration_byQ():
  151. initPolicy()
  152. k = 1
  153. while True: # Evaluate the current policy
  154. print("迭代的次数",k)
  155. Q = policy_eval()
  156. print(Q)
  157. Q,policy_stable = policy_improvement(Q,policy)
  158. print("*" * 100)
  159. k+=1
  160. if policy_stable: # # If the policy is stable we've found an optimal policy. Return it
  161. return policy, Q
  162. Policy_Iterration_byQ()

