当前位置:   article > 正文

Ros2中Action两个节点主动双向通信_rclpy.action.server.servergoalhandle

rclpy.action.server.servergoalhandle

计划实现:

过程一:机器人给服务器发一段音频

过程二:服务器处理音频后,再主动给机器人发送播放请求

过程三:机器人播放完成后,给服务器一个响应

过程四:然后服务器给机器人返回结束指令

整个过程有两个action,过程一和过程四是一个action,过程二和过程三是一个action,过程二和过程三是过程一和过程四的一部分。 

注意的点:

  1. 在robot中有一个主线程,一个定时任务子线程,在server中只有一个主线程(可以查看之前的文章导入一下ros环境,然后直接在ros源代码中打印数据)
  2. 在robot中的子线程中,不能使用rclpy.spin_until_future_complete(self, _send_goal_future),不然会提示ValueError: generator already executing。 在server中的主线程中可以使用这个方法。 我理解的是,该方法会阻塞主线程,直接让ros不再处理任何事情,只等待这个返回,但是这个时候,服务端还会主动给机器人发送信息,所以程序就死了
  3. 在server中,我们使用client发送请求的时候,必须要新创建一个node,因为是在主线程中了,所以可以使用rclpy.spin_until_future_complete进行阻塞

上面的原理都不是很清楚,是自己猜的

代码如下:

文件一:server.py

  1. import threading
  2. import time
  3. import rclpy
  4. from rclpy.action import ActionClient
  5. from rclpy.action import ActionServer
  6. from rclpy.node import Node
  7. from rclpy.action.server import ServerGoalHandle
  8. from bothway_interfaces.action import AskNbReplyRobot
  9. from bothway_interfaces.action import AskRobotReplyNb
  10. # 导入Action接口
  11. # 两个线程,一个Main线程,一个定时任务线程
  12. class Robot(Node):
  13. """Action客户端"""
  14. def __init__(self, name):
  15. super().__init__(name)
  16. current_thread = threading.current_thread()
  17. self.get_logger().info(
  18. f"Main thread: {current_thread.name}, ID: {current_thread.ident}")
  19. self.send_index = 0
  20. self.get_logger().info(f"节点已启动:{name}!")
  21. self.action_server_reply_robot = ActionServer(
  22. self,
  23. AskRobotReplyNb,
  24. "AskRobotReplyNb",
  25. execute_callback=self.callback_execute_callback,# 主线程运行
  26. )
  27. self.timer = threading.Timer(2, self.send_request)
  28. self.timer.start()
  29. def callback_execute_callback(self,goal_handle: ServerGoalHandle):
  30. current_thread = threading.current_thread()
  31. self.get_logger().info(
  32. f"callback_execute_callback thread: {current_thread.name}, ID: {current_thread.ident}")
  33. playwave = goal_handle.request.base64wav
  34. self.get_logger().info(f"成功拿到播放参数:{playwave}")
  35. self.get_logger().info("播放成功")
  36. #给反馈
  37. goal_handle.succeed()
  38. result = AskRobotReplyNb.Result()
  39. result.code = 0
  40. result.msg = '播放成功'
  41. result.t = time.time()
  42. return result
  43. def send_request(self):
  44. #因为启动了定时器,所以是在子线程中运行
  45. current_thread = threading.current_thread()
  46. self.get_logger().info(
  47. f"request thread: {current_thread.name}, ID: {current_thread.ident}")
  48. # 每次创建一个新的client
  49. # test = rclpy.create_node("test")
  50. action_client_ask_nb = ActionClient(
  51. self, AskNbReplyRobot, "ask_nb_reply_robot")
  52. while not action_client_ask_nb.wait_for_server(timeout_sec=1.0):
  53. self.get_logger().info('service not available, waiting again...')
  54. goal_msg = AskNbReplyRobot.Goal()
  55. goal_msg.base64wav = f"base64wav--aaaaa:{self.send_request}"
  56. goal_msg.index = self.send_index
  57. self.send_index = self.send_index + 1
  58. goal_msg.t = time.time()
  59. self.get_logger().info(f"goal_msg封装完成:{goal_msg.t},准备发送请求")
  60. _send_goal_future = action_client_ask_nb.send_goal_async(
  61. goal_msg, feedback_callback=self.feedback_callback)
  62. self.get_logger().info("等待 send_goal_future返回中.....")
  63. # 阻塞当前线程,直到 _send_goal_future 完成。这意味着ROS节点的事件循环将被阻塞,不允许节点继续执行其他任务,直到 _send_goal_future 完成或超时
  64. # rclpy.spin_until_future_complete(self, _send_goal_future)
  65. while not _send_goal_future.done(): # 用于阻塞当前线程,当前线程中其他代码都不能执行,但是ros事件循环能执行
  66. time.sleep(0.2)
  67. self.get_logger().info("等待 send_goal_future返回中.....")
  68. self.get_logger().info("请求成功返回")
  69. goal_handle = _send_goal_future.result()
  70. if goal_handle.accepted:
  71. result_future = goal_handle.get_result_async()
  72. self.get_logger().info('等待result_future返回中......')
  73. # rclpy.spin_until_future_complete(self, result_future)
  74. while not result_future.done():
  75. time.sleep(0.2)
  76. self.get_logger().info("等待 result_future.....")
  77. self.get_logger().info('result_future成功')
  78. if result_future == None or result_future.result() == None or result_future.result().result == None:
  79. self.get_logger().info(
  80. "result_future == None or result_future.result() == None or result_future.result().result == None")
  81. return None
  82. self.get_logger().info(f"result_future:{result_future}")
  83. self.get_logger().info(
  84. f"result_future.result():{result_future.result()}")
  85. # self.timer = threading.Timer(0.1, self.send_request)
  86. # self.timer.start()
  87. def feedback_callback(self, feedback_msg): # 此处是第一个话题,过程反馈话题,客户端订阅
  88. """获取回调反馈"""
  89. feedback = feedback_msg.feedback
  90. # self.get_logger().info(f"Received feedback: {feedback.state}, {feedback.process}")
  91. def main(args=None):
  92. """主函数"""
  93. rclpy.init(args=args)
  94. robot = Robot("robot")
  95. rclpy.spin(robot)
  96. rclpy.shutdown()

文件二:

robot.py

  1. import threading
  2. import time
  3. import rclpy
  4. from rclpy.action import ActionClient
  5. from rclpy.action import ActionServer
  6. from rclpy.node import Node
  7. from rclpy.action.server import ServerGoalHandle
  8. from bothway_interfaces.action import AskNbReplyRobot
  9. from bothway_interfaces.action import AskRobotReplyNb
  10. # 导入Action接口
  11. # 两个线程,一个Main线程,一个定时任务线程
  12. class Robot(Node):
  13. """Action客户端"""
  14. def __init__(self, name):
  15. super().__init__(name)
  16. current_thread = threading.current_thread()
  17. self.get_logger().info(
  18. f"Main thread: {current_thread.name}, ID: {current_thread.ident}")
  19. self.send_index = 0
  20. self.get_logger().info(f"节点已启动:{name}!")
  21. self.action_server_reply_robot = ActionServer(
  22. self,
  23. AskRobotReplyNb,
  24. "AskRobotReplyNb",
  25. execute_callback=self.callback_execute_callback,# 主线程运行
  26. )
  27. self.timer = threading.Timer(2, self.send_request)
  28. self.timer.start()
  29. def callback_execute_callback(self,goal_handle: ServerGoalHandle):
  30. current_thread = threading.current_thread()
  31. self.get_logger().info(
  32. f"callback_execute_callback thread: {current_thread.name}, ID: {current_thread.ident}")
  33. playwave = goal_handle.request.base64wav
  34. self.get_logger().info(f"成功拿到播放参数:{playwave}")
  35. self.get_logger().info("播放成功")
  36. #给反馈
  37. goal_handle.succeed()
  38. result = AskRobotReplyNb.Result()
  39. result.code = 0
  40. result.msg = '播放成功'
  41. result.t = time.time()
  42. return result
  43. def send_request(self):
  44. #因为启动了定时器,所以是在子线程中运行
  45. current_thread = threading.current_thread()
  46. self.get_logger().info(
  47. f"request thread: {current_thread.name}, ID: {current_thread.ident}")
  48. # 每次创建一个新的client
  49. # test = rclpy.create_node("test")
  50. action_client_ask_nb = ActionClient(
  51. self, AskNbReplyRobot, "ask_nb_reply_robot")
  52. while not action_client_ask_nb.wait_for_server(timeout_sec=1.0):
  53. self.get_logger().info('service not available, waiting again...')
  54. goal_msg = AskNbReplyRobot.Goal()
  55. goal_msg.base64wav = f"base64wav--aaaaa:{self.send_request}"
  56. goal_msg.index = self.send_index
  57. self.send_index = self.send_index + 1
  58. goal_msg.t = time.time()
  59. self.get_logger().info(f"goal_msg封装完成:{goal_msg.t},准备发送请求")
  60. _send_goal_future = action_client_ask_nb.send_goal_async(
  61. goal_msg, feedback_callback=self.feedback_callback)
  62. self.get_logger().info("等待 send_goal_future返回中.....")
  63. # 阻塞当前线程,直到 _send_goal_future 完成。这意味着ROS节点的事件循环将被阻塞,不允许节点继续执行其他任务,直到 _send_goal_future 完成或超时
  64. # rclpy.spin_until_future_complete(self, _send_goal_future)
  65. while not _send_goal_future.done(): # 用于阻塞当前线程,当前线程中其他代码都不能执行,但是ros事件循环能执行
  66. time.sleep(0.2)
  67. self.get_logger().info("等待 send_goal_future返回中.....")
  68. self.get_logger().info("请求成功返回")
  69. goal_handle = _send_goal_future.result()
  70. if goal_handle.accepted:
  71. result_future = goal_handle.get_result_async()
  72. self.get_logger().info('等待result_future返回中......')
  73. # rclpy.spin_until_future_complete(self, result_future)
  74. while not result_future.done():
  75. time.sleep(0.2)
  76. self.get_logger().info("等待 result_future.....")
  77. self.get_logger().info('result_future成功')
  78. if result_future == None or result_future.result() == None or result_future.result().result == None:
  79. self.get_logger().info(
  80. "result_future == None or result_future.result() == None or result_future.result().result == None")
  81. return None
  82. self.get_logger().info(f"result_future:{result_future}")
  83. self.get_logger().info(
  84. f"result_future.result():{result_future.result()}")
  85. # self.timer = threading.Timer(0.1, self.send_request)
  86. # self.timer.start()
  87. def feedback_callback(self, feedback_msg): # 此处是第一个话题,过程反馈话题,客户端订阅
  88. """获取回调反馈"""
  89. feedback = feedback_msg.feedback
  90. # self.get_logger().info(f"Received feedback: {feedback.state}, {feedback.process}")
  91. def main(args=None):
  92. """主函数"""
  93. rclpy.init(args=args)
  94. robot = Robot("robot")
  95. rclpy.spin(robot)
  96. rclpy.shutdown()

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/神奇cpp/article/detail/982454
推荐阅读
相关标签
  

闽ICP备14008679号