当前位置:   article > 正文

ROS2编写动作服务端案例(Python)_ros2 python 例子

ros2 python 例子


注:这里我的ROS2版本为foxy,代码基于官方的案例代码

官网教程:Writing an Action Server (Python)

了解动作

动作主要用于需要运行一段时间的事件。动作并不是一个全新的机制,而是由底层的一个话题和两个服务组成:一个任务目标(Goal,服务),一个执行结果(Result,服务),周期数据反馈(Feedback,话题)。

动作是可抢占式的,动作的执行需要一定的时间。可以随时发送取消指令,令动作终止;若程序执行过程中发送一个新的action目标,则会直接终止上一个目标开始执行最新的任务目标。

总体上来讲,action是一个客户端 / 服务器的通信模型。客户发送一个任务目标,服务端根据收到的目标执行并周期反馈状态,执行完成之后反馈一个执行结果。

(大体过程:客户端Goal发送请求,服务端Goal收到之后应答;客户端Result发送请求,服务端Result接收到之后,服务端Feedback周期性发布消息以反馈状态;目标完成之后Result应答。)

动作结构示例

当我们运行海龟模拟器

ros2 run turtlesim turtlesim_node

之后,再运行:

ros2 interface show turtlesim/action/RotateAbsolute.action

可以看到终端显示如下:
动作结构
这个就是action的结构案例。

action的结构用“—” 分成了三段,第一段描述客户端发送的请求目标(Goal),第二段描述action执行完成的反馈结果(Result),第三段描述的是action执行过程中的周期反馈(Feedback)。

创建自己的动作

创建功能包

这里我的案例功能包是基于官方的教程:Creating custom ROS2 msg and srv files

该教程的内容大致为将消息和服务放在一个功能包内,便于维护;同理,也可以将描述动作的文件.action放入其中。

在工作空间下的src目录下创建功能包
创建功能包注意,这里我们创建的是一个C++功能包,是因为我们没有办法在一个纯Python包中生成.msg、.srv或者.action文件。我们可在一个CMake包中创建这些文件并定义接口,然后在一个Python节点中使用它们。

同时在该功能包下创建三个名为action、msg、srv的文件夹,分别用于存放.action、.msg、.srv文件。(这里我们只使用.action文件)

编写动作结构

此次案例的内容为构建斐波那契数列:动作客户端发送需构建的阶数,动作服务端返回构建成功的数列。

这里创建一个 Fibonacci.action文件放在tutorial_interfaces的action文件夹下:

int32 order
---
int32[] sequence
---
int32[] partial_sequence
  • 1
  • 2
  • 3
  • 4
  • 5

这里的请求是我们想要构建斐波那契数列的阶数(order),最终返回的结果是构建完成的数列(sequence),构建过程中的反馈是目前已经构建的部分数列(partial_sequence)。

在CmakeLists.txt文件对应位置中添加如下内容:

rosidl_generate_interfaces(${PROJECT_NAME}
	"action/Fibonacci.action"
	)
  • 1
  • 2
  • 3

同时在package.xml文件中添加依赖:

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>tutorial_interfaces</name>
  <version>0.0.0</version>
  <description>TODO: Package description</description>
  <maintainer email="wmiii@todo.todo">wmiii</maintainer>
  <license>TODO: License declaration</license>

  <buildtool_depend>ament_cmake</buildtool_depend>
  <buildtool_depend>rosidl_default_generators</buildtool_depend>
  <depend>action_msgs</depend>

  <test_depend>ament_lint_auto</test_depend>
  <test_depend>ament_lint_common</test_depend>

  <export>
    <build_type>ament_cmake</build_type>
  </export>
  <build_depend>rosidl_default_generators</build_depend>
  <exec_depend>rosidl_default_runtime</exec_depend>
  <member_of_group>rosidl_interface_packages</member_of_group>
</package>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

然后编译该功能包:(记得 . install/setup.bash

colcon build --packages-select tutorial_interfaces

编译
(我也搞不清楚为什么这里编译了二十多秒…)

运行如下指令查看该动作:

ros2 interface show tutorial_interfaces/action/Fibonacci

看到如下内容则说明动作构建成功:
测试

编写动作服务端

没有反馈(Feedback)版本

在工作空间的src目录下新建一个名为action_tutorials的功能包(也可以不创建,在现有功能包下放置节点文件):
创建功能包
在功能包同名目录下创建一个名为fibonacci_action_server.py的文件:

import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from action_tutorials.action import Fibonacci


class FibonacciActionServer(Node):

    def __init__(self):
        super().__init__('fibonacci_action_server')
        self._action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback)

 	def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')

        sequence = [0, 1]

        for i in range(1, goal_handle.request.order):
            sequence.append(sequence[i] + sequence[i-1])

        goal_handle.succeed()

        result = Fibonacci.Result()
        result.sequence = sequence
        return result
 
 def main(args=None):
    rclpy.init(args=args)

    fibonacci_action_server = FibonacciActionServer()

    rclpy.spin(fibonacci_action_server)

if __name__ == '__main__':
    main()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

动作的服务端需要四个参数(第11行):
1.ROS节点;这里是自己
2.动作的类型;这里是Fibonacci
3.动作的名称;这里我们将动作命名为fibonacci
4.回调函数,用于接收并处理目标,传入的参数为节点和action结构。

注意第24行的代码:goal_handle.succeed()

如果我们注释掉这行代码并运行这个节点时,我们先是会看到我们的日志消息“执行目标......”,然后会跟着一个目标状态未设置的警告。在默认情况下,如果在执行回调函数中没有设置目标句柄(goal_handle)状态,那么它会假定为中止状态。
我们可以使用目标句柄的方法succeed()来表示目标成功了(如上)。

回调函数内部的for循环语句的作用就是计算斐波那契数列。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

有反馈版本

我们可以调用目标句柄的publish_feedback()方法,让动作的服务端为动作的客户端发布反馈。

接下来我们将替换sequence变量,并且使用反馈(这里是partial_sequence数组)来保存数列。每次for循环更新了反馈的信息之后,我们都会发布反馈信息,然后人为的延时一秒(以体现出周期反馈效果):

import time
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from tutorial_interfaces.action import Fibonacci

class FibonacciActionServer(Node):
	
	def __init__(self):
		super().__init__('fibonacci_action_server')
		self._action_server = ActionServer(
			self,
			Fibonacci,
			'fibonacci',
			self.execute_callback
		)
		
	def execute_callback(self, goal_handle):
		self.get_logger().info('Executing goal..')
		
		feedback_msg = Fibonacci.Feedback()
		feedback_msg.partial_sequence = [0, 1]
		
		for i in range(1, goal_handle.request.order):
			feedback_msg.partial_sequence.append(
				feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i-1])
			self.get_logger().info('Feedback: {0}'.format(feedback_msg.partial_sequence))
			goal_handle.publish_feedback(feedback_msg)
			time.sleep(1)
			
		goal_handle.succeed()
		result = Fibonacci.Result()
		result.sequence = feedback_msg.partial_sequence
		return result
		
def main(args = None):
	rclpy.init(args = args)
	fibonacci_action_server = FibonacciActionServer()
	rclpy.spin(fibonacci_action_server)
	
if __name__ == '__main__':
	main()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

修改package.xml与setup.py文件:

注:此步骤在官网教程中没有,可以参考官网初级教程:创建简单的发布/订阅者等。
在package.xml文件对应位置中添加如下依赖:

<export>
    <build_type>ament_python</build_type>
    <exec_depend>action_msgs</exec_depend>
    <exec_depend>rclpy</exec_depend>
    <exec_depend>tutorial_interfaces</exec_depend>
  </export>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在setup.py文件对应位置中添加程序入口:

entry_points={
        'console_scripts': [
        	'action_server = action_tutorials.fibonacci_action_server:main',
        ],
    },
  • 1
  • 2
  • 3
  • 4
  • 5
'
运行

测试动作服务端

编译等过程省略
运行如下命令,打开动作服务端:

ros2 run action_tutorials action_server

此时由于没有请求传入,不会进入回调函数,所以终端没有反应。

新开一个窗口,运行如下指令:

ros2 action send_goal --feedback fibonacci tutorial_interfaces/action/Fibonacci “{order: 5}”

此时可以观察到:

服务端终端:
在这里插入图片描述
发送请求终端:
在这里插入图片描述
注意,终端显示的内容在实际运行过程中是每过一秒输出一段的,因为我们在程序中设置了time.sleep(1)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/982391
推荐阅读
相关标签
  

闽ICP备14008679号