当前位置:   article > 正文

【ros实验三】使用 ROS 动作(Action)机制实现目标请求——实现零件完成度检测(python)_ros action python

ros action python

目录

前言

一,创建工作空间(略)

二,创建功能包

三,配置“环境”

(1)在功能包文件夹下创建action文件夹,定义action文件

(3)在CMakeLists.txt添加编译选项

(1)在对应功能包的src下创建python文件(略)

(2)服务端代码

(3)客户端代码

五,运行


前言

action可以简单理解成service的升级版

一般适用于耗时的请求响应场景,以获取连续的状态反馈

                                                                action结构图解

一,创建工作空间(略)

二,创建功能包

catkin_create_pkg task_service roscpp rospy std_msgs actionlib actionlib_msgs

三,配置“环境”

(1)在功能包文件夹下创建action文件夹,定义action文件

action 文件内容组成分为三部分:请求目标值、最终响应结果、连续反馈,三者之间使用---分割示例内容如下:

  1. int32 num
  2. ---
  3. int32 result
  4. ---
  5. float64 progress_percentage

(3)在CMakeLists.txt添加编译选项

 

四,服务端和客户端代码编写

(1)在对应功能包的src下创建python文件(略)

(2)服务端代码

  1. #!/usr/bin/env python3
  2. import rospy
  3. import actionlib
  4. from task_action.msg import CheckAction, CheckFeedback, CheckResult
  5. # 这里的import可以用*,也可以明确指明导入的具体类型,可读性高
  6. class CheckServer:
  7. def __init__(self):
  8. # 初始化一个Action服务器
  9. # 创建一个SimpleActionServer
  10. # 参数1: action名字,用于匹配客户端和服务器
  11. # 参数2: Action类型,定义goal, result, feedback的消息类型
  12. # 参数3: 回调函数,当接收到一个新的goal时会被调用
  13. # 参数4: auto_start参数,这里设置为False意味着需要手动调用start()
  14. self.server = actionlib.SimpleActionServer("part_check", CheckAction, self.cb, False)
  15. self.server.start()
  16. rospy.loginfo("服务端启动")
  17. def cb(self, goal):
  18. # 总零件数
  19. total_parts = goal.num
  20. # 已检测的零件数
  21. checked_parts = 0
  22. # 初始化Feedback和Result消息
  23. feedback = CheckFeedback()
  24. result = CheckResult()
  25. rate = rospy.Rate(1) # 设置频率为每秒执行一次
  26. # 循环,直到所有零件都被检查
  27. while checked_parts < total_parts:
  28. checked_parts += 1
  29. # 计算并更新进度百分比
  30. feedback.progress_percentage = (checked_parts / float(total_parts)) * 100.0
  31. rospy.loginfo("检测 %d 个零件", checked_parts)
  32. # 向客户端发布进度反馈
  33. self.server.publish_feedback(feedback)
  34. rate.sleep()
  35. # 设置最终的检测结果
  36. result.result = checked_parts
  37. self.server.set_succeeded(result)
  38. rospy.loginfo("检测完成")
  39. if __name__ == "__main__":
  40. rospy.init_node("task_action_server")
  41. server = CheckServer()
  42. rospy.spin()
'
运行

(3)客户端代码

  1. #!/usr/bin/env python3
  2. import rospy
  3. import actionlib
  4. from task_action.msg import CheckAction, CheckGoal, CheckFeedback
  5. def done_cb(state, result):
  6. # 当目标完成时的回调
  7. if state == actionlib.GoalStatus.SUCCEEDED:
  8. rospy.loginfo("检测完成")
  9. def active_cb():
  10. # 当目标激活时的回调
  11. rospy.loginfo("开始检测...")
  12. def fb_cb(feedback):
  13. # 进度反馈的回调
  14. rospy.loginfo("%.2f%%", feedback.progress_percentage)
  15. if __name__ == "__main__":
  16. rospy.init_node("task_action_client")
  17. # 创建一个Action客户端
  18. client = actionlib.SimpleActionClient("part_check", CheckAction)
  19. # 等待服务器准备好
  20. client.wait_for_server()
  21. # 设置并发送目标给服务器
  22. goal = CheckGoal()
  23. goal.num = 40
  24. client.send_goal(goal, done_cb, active_cb, fb_cb)
  25. rospy.spin()

五,运行

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号