本文主要提供了Python版本的A*路径规划算法实现,并在此基础上提供了命令行和基于Matplotlib的GUI用户界面(User Interface)
其实动态路径规划更进一步,在探索的构建出来全局地图,那就成了SLAM问题了,即同时定位与建图(Simultaneous Localization And Mapping,SLAM)。关于SLAM本文就不再深入了。
因此在基础的求解迷宫问题的要求上,我利用Matplotlib中的Interactive Plotting Method将路径规划的过程以GUI的形式表达出来,并在此基础上提供地图绘制、地图保存等一系列功能,最后完成一个不错的程序。
From Wikipedia:
A* (pronounced “A-star”) is a graph traversal and path search algorithm, which is often used in many fields of computer science due to its completeness, optimality, and optimal efficiency. One major practical drawback is its O ( b d O(b^{d} O(bd) space complexity, as it stores all generated nodes in memory. Thus, in practical travel-routing systems, it is generally outperformed by algorithms which can pre-process the graph to attain better performance, as well as memory-bounded approaches; however, A* is still the best solution in many cases.
From BaiduBaike:
- 对于矢量图,其内部的图形是由基本的几何图形构成的,因此在矢量图文件(例如.dot)文件中,只需要按照指定的格式描述图形的属性即可。例如Windows的.dot文件,我们按照其要求的语法对图形进行描述,然后对应的解析该语法,利用图形学的算法即可渲染出来图形,我们常见的PDF、SVG等等文件都是矢量图。例如下面的例子
- 而对于光栅图,其基本构成单位是像素,每个像素大小只有零点几个毫米,而每平方厘米都会有几百几千个像素。而每个像素都会有自己的数值,表现出来自己的颜色。因此我们通过几万个极小的、在人眼看来是点的像素组合起来就成了我们所看到的图像。因此这种图像又称为位图、点阵图、像素图。我们常见的JPG、PNG、BMP等等都是光栅图,例如下面的例子
算法实现层负责实现A*算法。A*算法中使用使用OpenList以及CloseList维护已探索区域和边缘区域。此外为实现算法实现层与核心层解耦合,而二维地图的状 态的维护由核心层提供,因此在算法实现层只需要调用核心层提供的接口进行地图状态修改即可
用户界面层负责GUI的显示以及命令行用户界面。GUI部分通过Matplotlib内嵌的信号与槽机制实现用户鼠标、键盘事件的监听以及回调函数的绑定,进而响应用户事件,完成功能,例如:保存地图 。命令行用户界面负责根绝接收到不同的用户参数,运行不同的功能,例如绘制地图、开始路径规划……
import time
import queue
import pprint
import argparse
from typing import *
from pathlib import Path
from collections import namedtuple
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.backend_bases as bbase
from colorama import Fore, Style
class _MapBase(object): """ Notes: MapBase 用于可视化的地图创建 Args: mapsize (tuple[int, int]): size of map figsize (tuple[int, int]): size of figure """ def __init__(self, mapsize: Tuple[int, int], figsize: Tuple[int, int] = (12, 12)): super(_MapBase, self).__init__() # meta information self._mapsize: Tuple[int, int] = mapsize self._figsize: Tuple[int, int] = figsize # map, inf obstacle, 0 accessible, 1 in path self.map: np.ndarray = np.zeros(shape=mapsize) self.map_temp: np.ndarray = None # visualize core object self._figure, self._axes = plt.subplots(figsize=figsize) self._canvas = self._figure.canvas self._title: plt.Text = None self._go_on: bool = False # gui interaction, finite state machine to control plot self._on_press: bool = False self._current_state = "add_obstacle" self.btn_pressed_cid = self._canvas.mpl_connect("button_press_event", self._btn_press_cb) self.btn_released_cid = self._canvas.mpl_connect("button_release_event", self._btn_release_cb) self.mouse_move_cid = self._canvas.mpl_connect("motion_notify_event", self._move_cb) self.keyboard_cid = self._canvas.mpl_connect("key_press_event", self._keyboard_cb) self.new() def __del__(self): plt.close(self._figure) def start(self): assert False, f"This method should be overwritten by subclass" def update(self): self._canvas.draw_idle() def new(self) -> bool: """ Notes: new is used to clear the map. In practice, generate a new axes Returns: if_clear (bool): true if successfully clear the map """ try: self.map = np.zeros(shape=self._mapsize) self._axes.cla() self._axes.grid(True) self._axes.set_xticks(range(0, self._mapsize[0] + 1)) self._axes.set_yticks(range(0, self._mapsize[1] + 1)) self._axes.set_xlim(0, self._mapsize[0]) self._axes.set_ylim(0, self._mapsize[1]) self._title = self._axes.set_title(f"Current State: {self._current_state}") self._axes.scatter([], [], marker="s", c="red", label="goal") self._axes.scatter([], [], marker="s", c="black", label="obstacle") self._axes.scatter([], [], marker="s", c="green", label="start") self._axes.scatter([], [], marker="s", c="blue", label="final path") self._axes.scatter([], [], marker="s", c="gray", label="explored") self._axes.scatter([], [], marker="s", c="cyan", label="frontier") self._axes.legend(loc=(1.01, 0.85)) self.update() return True except KeyboardInterrupt as e: print("Keyboard interrupted") return False def add_patch(self, x: Union[int, np.ndarray, list], y: Union[int, np.ndarray, list], patches: Union[str, None] = None) -> bool: prompt = (x, y) if isinstance(x, int) else list(zip(x, y)) if self._current_state == "add_obstacle": try: self.map[x, y] = np.inf except IndexError: pass self._axes.scatter(x + 0.5, y + 0.5, s=200, marker="s", c="black") print(f"Add obstacle: {prompt}") elif self._current_state == "add_goal": if (self.map == -1).any(): print(f"{Fore.YELLOW}Already have one goal!{Style.RESET_ALL}") return False self.map[x, y] = -1 self._axes.scatter(x + 0.5, y + 0.5, s=200, marker="s", c="red") print(f"Add goal: {prompt}") elif self._current_state == "add_start": if (self.map == -2).any(): print(f"{Fore.YELLOW}Already have one start!{Style.RESET_ALL}") return False self.map[x, y] = -2 self._axes.scatter(x + 0.5, y + 0.5, s=200, marker="s", c="green") print(f"Add start: {prompt}") elif self._current_state == "going_back": self.map[x, y] = -3 self._axes.scatter(x + 0.5, y + 0.5, s=200, marker="s", c="blue") elif self._current_state == "clear_patch": if self.map[x, y] == 0: print(f"{Fore.YELLOW}Not in use!{Style.RESET_ALL}") return False self.map[x, y] = 0 self._axes.scatter(x + 0.5, y + 0.5, s=250, marker="s", c="white") print(f"Cleared: {prompt}") elif self._current_state == "searching": if patches == "explored": # assert (0 <= self.map[x, y] <= 2).all(), f"{Fore.RED}Illegal Operation on map{Fore.RESET}" # self.map[x, y] = 1 self._axes.scatter(x + 0.5, y + 0.5, s=200, marker="s", c="gray", alpha=1) elif patches == "frontier": # assert (0 <= self.map[x, y] <= 2).all(), f"{Fore.RED}Illegal Operation on map{Fore.RESET}" # self.map[x, y] = 2 self._axes.scatter(x + 0.5, y + 0.5, s=200, marker="s", c="cyan", alpha=1) else: assert False, F"{Fore.RED}Invalid State: {self._current_state}" self.update() return True def pandasfy(self) -> pd.DataFrame: """ Notes: pandasfy will serialize current map from matplotlib to cvs file Returns: serialized_map (pd.Dataframe) """ pp: pd.DataFrame = pd.DataFrame(self.map.copy().T[::-1, :], dtype=str) pp.replace("-1.0", "GOAL", inplace=True) pp.replace("-2.0", "START", inplace=True) return pp def depandasfy(self, csv_path: Path): """ Notes: depandasfy will deserialized csv file map to matplotlib Args: csv_path (Path): path of csv file Returns: None """ df: pd.DataFrame = pd.read_csv(filepath_or_buffer=csv_path, index_col=0, header=0) df.replace("START", "-2.0", inplace=True) df.replace("GOAL", "-1.0", inplace=True) self.new() t = df.to_numpy(dtype=np.float64)[::-1, :].T # self.map = df.to_numpy() xs, ys = np.where(t == np.inf) self._current_state = "add_obstacle" self.add_patch(xs, ys) xs, ys = np.where(t == -1) self._current_state = "add_goal" self.add_patch(xs, ys) xs, ys = np.where(t == -2) self._current_state = "add_start" self.add_patch(xs, ys) def _update_title(self, s: str = None): if s is None: self._title.set_text(f"Current State: {self._current_state}") else: self._title.set_text(s) self.update() def _btn_press_cb(self, event: bbase.MouseEvent) -> None: self._on_press = True if event.inaxes: point = (int(event.xdata), int(event.ydata)) self.add_patch(*point) def _btn_release_cb(self, event: bbase.MouseEvent) -> None: self._on_press = False def _move_cb(self, event: bbase.MouseEvent) -> None: if self._on_press: if event.inaxes: point = (int(event.xdata), int(event.ydata)) self.add_patch(*point) def _keyboard_cb(self, event: bbase.KeyEvent): if event.key == "c": self._current_state = "add_obstacle" print(f"{Fore.YELLOW}Clear all!{Style.RESET_ALL}") self.new() elif event.key == "1": self._current_state = "add_obstacle" print(f"{Fore.GREEN}Switch to {self._current_state}{Style.RESET_ALL}") elif event.key == "2": self._current_state = "add_goal" print(f"{Fore.GREEN}Switch to {self._current_state}{Style.RESET_ALL}") elif event.key == "3": self._current_state = "add_start" print(f"{Fore.GREEN}Switch to {self._current_state}{Style.RESET_ALL}") elif event.key == "0" or event.key == "d": self._current_state = "clear_patch" print(f"{Fore.GREEN}Switch to {self._current_state}{Style.RESET_ALL}") elif event.key == "r": p = self._current_state self._current_state = "add_obstacle" xs = np.random.randint(0, self._mapsize[0], size=(n := self._mapsize[0] * self._mapsize[1] // 5)) ys = np.random.randint(0, self._mapsize[1], size=n) self.add_patch(xs, ys) self._current_state = p elif event.key == "w": files = [i.stem for i in Path(__file__).resolve().absolute().parent.glob("map_*.csv")] if len(files) > 0: name = f"map_{int(max([f.split('_')[1] for f in files])) + 1}" else: name = "map_0" self.pandasfy().to_csv(p := Path(__file__).resolve().parent.joinpath(f"{name}.csv")) print(f"{Fore.GREEN}Save current Map to {p}{Style.RESET_ALL}") elif event.key == "v": print(f"{Fore.GREEN}Start finding goal{self._current_state}{Style.RESET_ALL}") self._go_on = True self.start() self._update_title()
class Astar(_MapBase): def __init__(self, load_path: Union[Path, None], mapsize: Tuple[int, int] = (30, 30)): super(Astar, self).__init__(mapsize=mapsize) if load_path is not None: print(f"{Fore.GREEN}Load from {load_path}{Style.RESET_ALL}") self.depandasfy(load_path) self.update() t = np.where(self.map == -2), np.where(self.map == -1) self.start_point: str = f"{t[0][0][0]},{t[0][1][0]}" self.goal_point: str = f"{t[1][0][0]},{t[1][1][0]}" # open list, but I think frontier is a better name # no need for close list for which has been represented as items of came_from or keys of cost_so_far self.frontier = queue.PriorityQueue() self.frontier.put((0, self.start_point)) self.came_from: Dict[str, Union[str, None]] = {self.start_point: None} self.cost_so_far: Dict[str, float] = {self.start_point: 0} self._last: np.ndarray = None plt.show() else: print(f"{Fore.GREEN}Start drawing map{Style.RESET_ALL}") plt.show() def start(self): """ Notes: A* algorithm implementation Returns: None """ if_can_find_goal: bool = False # get the closest point self._current_state = "searching" self._update_title() while not self.frontier.empty(): current_p: str = self.frontier.get()[1] # find the goal if current_p == self.goal_point: if_can_find_goal = True break next_p: str for next_p in (n := self._get_neighbor(current_p)): new_cost = self.cost_so_far[current_p] + self._get_cost(next_p, current_p) # if next point has not been considered / next point in in undiscovered area # or there exists a nearer way to get to next point by passing current point # this also can help preventing going back if next_p not in self.cost_so_far.keys() or new_cost < self.cost_so_far[next_p]: self.cost_so_far[next_p] = new_cost priority = new_cost + self._get_cost(next_p, self.goal_point) self.frontier.put((priority, next_p)) current_pp = [int(i) for i in current_p.split(",")] self.came_from[next_p] = f"{current_pp[0]},{current_pp[1]}" self._plot4searching() plt.pause(0.0001) self.update() if not if_can_find_goal: print(f"{Fore.RED}Cannot find the goal in current map{Style.RESET_ALL}") self._update_title("STOP: cannot find the path!") return else: # go back to find the path self._current_state = "add_goal" goal = (int(i) for i in self.goal_point.split(",")) self.add_patch(*goal) self._current_state = "going_back" self._update_title() path = [] current_p = self.goal_point while current_p != self.start_point: current_p = self.came_from[current_p] p = tuple(int(i) for i in current_p.split(",")) path.append(p) self.add_patch(*p) self._current_state = "add_start" start = (int(i) for i in self.start_point.split(",")) self.add_patch(*start) print(f"{Fore.BLUE}{Style.BRIGHT}Success!{Style.RESET_ALL}") print("Path:") pprint.pprint(path) return path def _get_neighbor(self, point: str): from itertools import product x, y = [int(i) for i in point.split(",")] neighbor = [] for i, j in product([-1, 0, 1], [-1, 0, 1]): if abs(i) + abs(j) == 0: continue if 0 <= (dx := x + i) <= self._mapsize[0] and 0 <= (dy := y + j) <= self._mapsize[1] and \ self.map[dx, dy] != np.inf: neighbor.append(f"{dx},{dy}") return neighbor def _get_cost(self, point1: str, point2: str): # x, y = [int(i) for i in point.split(",")] # return self.manhattan_distance(*[int(i) for i in point1.split(",")], # *[int(i) for i in point2.split(",")]) return self.euler_distance(*[int(i) for i in point1.split(",")], *[int(i) for i in point2.split(",")]) def _plot4searching(self): """Codes for plotting""" # update current map # explore area explored_ps = ([], []) for p, parent in self.came_from.items(): pp = [int(i) for i in p.split(",")] if parent is not None: explored_ps[0].append(pp[0]) explored_ps[1].append(pp[1]) self.map[np.array(explored_ps[0]), np.array((explored_ps[1]))] = 1 # frontier frontier_ps = ([], []) t = [] while not self.frontier.empty(): t.append(self.frontier.get()) for i in t: self.frontier.put(i) for tt in t: # frontier_ps.append(t.get()[1]) p = [int(i) for i in tt[1].split(",")] frontier_ps[0].append(p[0]) frontier_ps[1].append(p[1]) # simply deepcopy locked object will raise an error # t = copy.deepcopy(self.frontier) self.map[np.array(frontier_ps[0]), np.array(frontier_ps[1])] = 2 # find difference if self._last is None: update_explored = list(zip(*np.where(self.map == 1))) update_frontier = list(zip(*np.where(self.map == 2))) # update_frontier = [(i,j) for i, j in zip(*update_frontier)] else: last_explored = list(zip(*np.where(self._last == 1))) last_frontier = list(zip(*np.where(self._last == 2))) current_explored = list(zip(*np.where(self.map == 1))) current_frontier = list(zip(*np.where(self.map == 2))) update_explored = list(set(current_explored) - set(last_explored)) update_frontier = list(set(current_frontier) - set(last_frontier)) self._last = self.map.copy() # draw explored area if len(update_explored) > 0: self.add_patch(np.array([i[0] for i in update_explored]), np.array([i[1] for i in update_explored]), patches="explored") # draw frontier if len(update_frontier) > 0: self.add_patch(np.array([i[0] for i in update_frontier]), np.array([i[1] for i in update_frontier]), patches="frontier") @staticmethod def manhattan_distance(x1, y1, x2, y2): return abs(x2 - x1) + abs(y2 - y1) @staticmethod def euler_distance(x1, y1, x2, y2): if (d := abs(x2 - x1) + abs(y2 - y1)) == 1: return 1 elif d == 2: return 1.4 else: return round(np.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2), ndigits=2)
def parse_arg(): parer = argparse.ArgumentParser( description=f"{Fore.GREEN}{Style.BRIGHT}A* Algorithm, animated with matplotlib{Style.RESET_ALL}. " f"Author: {Fore.YELLOW}Jack Wang{Fore.RESET}. " f"Date: {Fore.YELLOW}2021-12-15{Style.RESET_ALL}." "This script provides A* algorithm illustration with the help of matplotlib\n" f"You can use this script either by {Fore.GREEN}command-line interface{Fore.RESET} " f"or {Fore.GREEN}python code interface{Fore.RESET}." "To use this script with command-line interface, call this script with -h option. " "To use this script in python code, " "simply calling Astar class to create an Astar object, then everything will be done. " "If load_path argument of Astar class are not specified (i.e. load_path=None), " "this program will create a blank" "map for you, else loading an existing map." "\n\n" "In the matplotlib GUI, press 1 to add obstacle, 2 to add goal, 3 to add start, w to save map, " "r to random generate obstacles, d to delete an grid and v to start path planning. " f"{Fore.YELLOW}Note: when call with -c, you can not press v to start path planning, -c is only " f"for drawing map. {Style.RESET_ALL}") parer.add_argument("-o", dest="ok", action="store_true", help="must be added when calling the scripts to let me " "know you understand how to use this program") parer.add_argument("-c", dest="gen_map", action="store_true", help="create a new map") parer.add_argument("-l", dest="load_num", type=int, default=-1, help="load a existing map") return parer.parse_args()
def main(): args = parse_arg() if not args.ok: print(f"{Fore.RED}Please read the help info with -h option the rerun the scripts with -o option{Fore.RESET}") return if args.gen_map: Astar(load_path=None) return if args.load_num != -1: Astar(load_path=Path(__file__).resolve().parent.joinpath(f"map_{args.load_num}.csv")) if __name__ == "__main__": # a = Astar(load_path=Path("./map_4.csv")) # a = Astar(load_path=Path(__file__).resolve().parent.joinpath("./map_1.csv")) main()
