赞
踩
目录
(1)在功能包文件夹下创建action文件夹,定义action文件
action可以简单理解成service的升级版
一般适用于耗时的请求响应场景,以获取连续的状态反馈
action结构图解
catkin_create_pkg task_service roscpp rospy std_msgs actionlib actionlib_msgs
action 文件内容组成分为三部分:请求目标值、最终响应结果、连续反馈,三者之间使用---
分割示例内容如下:
- int32 num
- ---
- int32 result
- ---
- float64 progress_percentage
四,服务端和客户端代码编写
- #!/usr/bin/env python3
- import rospy
- import actionlib
- from task_action.msg import CheckAction, CheckFeedback, CheckResult
- # 这里的import可以用*,也可以明确指明导入的具体类型,可读性高
-
- class CheckServer:
- def __init__(self):
- # 初始化一个Action服务器
- # 创建一个SimpleActionServer
- # 参数1: action名字,用于匹配客户端和服务器
- # 参数2: Action类型,定义goal, result, feedback的消息类型
- # 参数3: 回调函数,当接收到一个新的goal时会被调用
- # 参数4: auto_start参数,这里设置为False意味着需要手动调用start()
- self.server = actionlib.SimpleActionServer("part_check", CheckAction, self.cb, False)
- self.server.start()
- rospy.loginfo("服务端启动")
-
- def cb(self, goal):
- # 总零件数
- total_parts = goal.num
- # 已检测的零件数
- checked_parts = 0
-
- # 初始化Feedback和Result消息
- feedback = CheckFeedback()
- result = CheckResult()
-
- rate = rospy.Rate(1) # 设置频率为每秒执行一次
-
- # 循环,直到所有零件都被检查
- while checked_parts < total_parts:
- checked_parts += 1
- # 计算并更新进度百分比
- feedback.progress_percentage = (checked_parts / float(total_parts)) * 100.0
-
- rospy.loginfo("检测 %d 个零件", checked_parts)
- # 向客户端发布进度反馈
- self.server.publish_feedback(feedback)
-
- rate.sleep()
-
- # 设置最终的检测结果
- result.result = checked_parts
- self.server.set_succeeded(result)
- rospy.loginfo("检测完成")
-
- if __name__ == "__main__":
- rospy.init_node("task_action_server")
- server = CheckServer()
- rospy.spin()
'运行
- #!/usr/bin/env python3
-
- import rospy
- import actionlib
- from task_action.msg import CheckAction, CheckGoal, CheckFeedback
-
- def done_cb(state, result):
- # 当目标完成时的回调
- if state == actionlib.GoalStatus.SUCCEEDED:
- rospy.loginfo("检测完成")
-
- def active_cb():
- # 当目标激活时的回调
- rospy.loginfo("开始检测...")
-
- def fb_cb(feedback):
- # 进度反馈的回调
- rospy.loginfo("%.2f%%", feedback.progress_percentage)
-
- if __name__ == "__main__":
- rospy.init_node("task_action_client")
-
- # 创建一个Action客户端
- client = actionlib.SimpleActionClient("part_check", CheckAction)
-
- # 等待服务器准备好
- client.wait_for_server()
-
- # 设置并发送目标给服务器
- goal = CheckGoal()
- goal.num = 40
-
- client.send_goal(goal, done_cb, active_cb, fb_cb)
-
- rospy.spin()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。