当前位置:   article > 正文

ROS2动作通信(Python)_ros2 python 创建 action

ros2 python 创建 action

1.动作服务端实现

功能包py03_action的py03_action目录下,新建Python文件demo01_action_server_py.py,并编辑文件,输入如下内容:

  1. """
  2. 需求:编写动作服务端实习,可以提取客户端请求提交的整型数据,并累加从1到该数据之间的所有整数以求和,
  3. 每累加一次都计算当前运算进度并连续反馈回客户端,最后,在将求和结果返回给客户端。
  4. 步骤:
  5. 1.导包;
  6. 2.初始化 ROS2 客户端;
  7. 3.定义节点类;
  8. 3-1.创建动作服务端;
  9. 3-2.生成连续反馈;
  10. 3-3.生成最终响应。
  11. 4.调用spin函数,并传入节点对象;
  12. 5.释放资源。
  13. """
  14. # 1.导包;
  15. import time
  16. import rclpy
  17. from rclpy.action import ActionServer
  18. from rclpy.node import Node
  19. from base_interfaces_demo.action import Progress
  20. # 3.定义节点类;
  21. class ProgressActionServer(Node):
  22. def __init__(self):
  23. super().__init__('progress_action_server')
  24. # 3-1.创建动作服务端;
  25. self._action_server = ActionServer(
  26. self,
  27. Progress,
  28. 'get_sum',
  29. self.execute_callback)
  30. self.get_logger().info('动作服务已经启动!')
  31. def execute_callback(self, goal_handle):
  32. self.get_logger().info('开始执行任务....')
  33. # 3-2.生成连续反馈;
  34. feedback_msg = Progress.Feedback()
  35. sum = 0
  36. for i in range(1, goal_handle.request.num + 1):
  37. sum += i
  38. feedback_msg.progress = i / goal_handle.request.num
  39. self.get_logger().info('连续反馈: %.2f' % feedback_msg.progress)
  40. goal_handle.publish_feedback(feedback_msg)
  41. time.sleep(1)
  42. # 3-3.生成最终响应。
  43. goal_handle.succeed()
  44. result = Progress.Result()
  45. result.sum = sum
  46. self.get_logger().info('任务完成!')
  47. return result
  48. def main(args=None):
  49. # 2.初始化 ROS2 客户端;
  50. rclpy.init(args=args)
  51. # 4.调用spin函数,并传入节点对象;
  52. Progress_action_server = ProgressActionServer()
  53. rclpy.spin(Progress_action_server)
  54. # 5.释放资源。
  55. rclpy.shutdown()
  56. if __name__ == '__main__':
  57. main()
2.动作客户端实现

功能包py03_action的py03_action目录下,新建Python文件demo02_action_client_py.py,并编辑文件,输入如下内容:

  1. """
  2. 需求:编写动作客户端实现,可以提交一个整型数据到服务端,并处理服务端的连续反馈以及最终返回结果。
  3. 步骤:
  4. 1.导包;
  5. 2.初始化 ROS2 客户端;
  6. 3.定义节点类;
  7. 3-1.创建动作客户端;
  8. 3-2.发送请求;
  9. 3-3.处理目标发送后的反馈;
  10. 3-4.处理连续反馈;
  11. 3-5.处理最终响应。
  12. 4.调用spin函数,并传入节点对象;
  13. 5.释放资源。
  14. """
  15. # 1.导包;
  16. import rclpy
  17. from rclpy.action import ActionClient
  18. from rclpy.node import Node
  19. from base_interfaces_demo.action import Progress
  20. # 3.定义节点类;
  21. class ProgressActionClient(Node):
  22. def __init__(self):
  23. super().__init__('progress_action_client')
  24. # 3-1.创建动作客户端;
  25. self._action_client = ActionClient(self, Progress, 'get_sum')
  26. def send_goal(self, num):
  27. # 3-2.发送请求;
  28. goal_msg = Progress.Goal()
  29. goal_msg.num = num
  30. self._action_client.wait_for_server()
  31. self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
  32. self._send_goal_future.add_done_callback(self.goal_response_callback)
  33. def goal_response_callback(self, future):
  34. # 3-3.处理目标发送后的反馈;
  35. goal_handle = future.result()
  36. if not goal_handle.accepted:
  37. self.get_logger().info('请求被拒绝')
  38. return
  39. self.get_logger().info('请求被接收,开始执行任务!')
  40. self._get_result_future = goal_handle.get_result_async()
  41. self._get_result_future.add_done_callback(self.get_result_callback)
  42. # 3-5.处理最终响应。
  43. def get_result_callback(self, future):
  44. result = future.result().result
  45. self.get_logger().info('最终计算结果:sum = %d' % result.sum)
  46. # 5.释放资源。
  47. rclpy.shutdown()
  48. # 3-4.处理连续反馈;
  49. def feedback_callback(self, feedback_msg):
  50. feedback = (int)(feedback_msg.feedback.progress * 100)
  51. self.get_logger().info('当前进度: %d%%' % feedback)
  52. def main(args=None):
  53. # 2.初始化 ROS2 客户端;
  54. rclpy.init(args=args)
  55. # 4.调用spin函数,并传入节点对象;
  56. action_client = ProgressActionClient()
  57. action_client.send_goal(10)
  58. rclpy.spin(action_client)
  59. # rclpy.shutdown()
  60. if __name__ == '__main__':
  61. main()
3.编辑配置文件
1.package.xml

在创建功能包时,所依赖的功能包已经自动配置了,配置内容如下:

  1. <depend>rclpy</depend>
  2. <depend>base_interfaces_demo</depend>
2.setup.py

entry_points字段的console_scripts中添加如下内容:

  1. entry_points={
  2. 'console_scripts': [
  3. 'demo01_action_server_py = py03_action.demo01_action_server_py:main',
  4. 'demo02_action_client_py = py03_action.demo02_action_client_py:main'
  5. ],
  6. },
4.编译

终端中进入当前工作空间,编译功能包:

colcon build --packages-select py03_action
5.执行

当前工作空间下,启动两个终端,终端1执行动作服务端程序,终端2执行动作客户端程序。

终端1输入如下指令:

  1. . install/setup.bash
  2. ros2 run py03_action demo01_action_server_py

终端2输入如下指令:

  1. . install/setup.bash
  2. ros2 run py03_action demo02_action_client_py

最终运行结果与案例类似。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/982376
推荐阅读
相关标签
  

闽ICP备14008679号