赞
踩
您正在阅读较旧但仍受支持的 ROS 2 版本的文档。 有关最新版本的信息,请查看 Iron。
参考原文链接:
http://fishros.org/doc/ros2/humble/Tutorials/Beginner-Client-Libraries/Writing-A-Simple-Py-Service-And-Client.html
**目标:**使用 Python 创建和运行服务和客户机节点。
**教程级别:**初学者
**时间:**20分钟
内容
当节点使用服务进行通信时,发送数据请求的节点称为客户端节点,响应请求的节点称为服务节点。 请求和响应的结构由文件确定。.srv
这里使用的示例是一个简单的整数加法系统;一个节点请求两个整数的总和,另一个节点响应结果。
打开一个新终端并获取您的 ROS 2 安装,以便命令正常工作。ros2
导航到在上一教程中创建的目录。ros2_ws
回想一下,包应该在目录中创建,而不是在工作区的根目录中创建。 导航到并创建一个新包:src``ros2_ws/src
ros2 pkg create --build-type ament_python --license Apache-2.0 py_srvcli --dependencies rclpy example_interfaces
您的终端将返回一条消息,验证您的包及其所有必要文件和文件夹的创建。py_srvcli
该参数会自动将必要的依赖项行添加到 。 是包含构建请求和响应所需的 .srv 文件的包:--dependencies``package.xml``example_interfaces
int64 a
int64 b
---
int64 sum
前两行是请求的参数,破折号下方是响应。
package.xml
由于您在包创建过程中使用了该选项,因此不必手动向 .--dependencies``package.xml
但是,与往常一样,请确保将描述、维护者电子邮件和名称以及许可证信息添加到 .package.xml
<description>Python client server tutorial</description>
<maintainer email="you@email.com">Your Name</maintainer>
<license>Apache License 2.0</license>
setup.py
将相同的信息添加到文件中的 、 和 字段:setup.py``maintainer``maintainer_email``description``license
maintainer='Your Name',
maintainer_email='you@email.com',
description='Python client server tutorial',
license='Apache License 2.0',
在目录中,创建一个名为的新文件,并将以下代码粘贴到其中:ros2_ws/src/py_srvcli/py_srvcli``service_member_function.py
from example_interfaces.srv import AddTwoInts
import rclpy
from rclpy.node import Node
class MinimalService(Node):
def __init__(self):
super().__init__('minimal_service')
self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)
def add_two_ints_callback(self, request, response):
response.sum = request.a + request.b
self.get_logger().info('Incoming request\na: %d b: %d' % (request.a, request.b))
return response
def main():
rclpy.init()
minimal_service = MinimalService()
rclpy.spin(minimal_service)
rclpy.shutdown()
if __name__ == '__main__':
main()
第一个语句从包中导入服务类型。 以下语句导入 ROS 2 Python 客户端库,特别是类。import``AddTwoInts``example_interfaces``import``Node
from example_interfaces.srv import AddTwoInts
import rclpy
from rclpy.node import Node
类构造函数使用名称 初始化节点。 然后,它创建一个服务并定义类型、名称和回调。MinimalService``minimal_service
def __init__(self):
super().__init__('minimal_service')
self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)
服务回调的定义接收请求数据,对数据求和,并将求和作为响应返回。
def add_two_ints_callback(self, request, response):
response.sum = request.a + request.b
self.get_logger().info('Incoming request\na: %d b: %d' % (request.a, request.b))
return response
最后,主类初始化 ROS 2 Python 客户端库,实例化该类以创建服务节点,并旋转该节点以处理回调。MinimalService
要允许该命令运行节点,您必须将入口点添加到(位于目录中)。ros2 run``setup.py``ros2_ws/src/py_srvcli
在括号之间添加以下行:'console_scripts':
'service = py_srvcli.service_member_function:main',
在目录中,创建一个名为的新文件,并将以下代码粘贴到其中:ros2_ws/src/py_srvcli/py_srvcli``client_member_function.py
import sys
from example_interfaces.srv import AddTwoInts
import rclpy
from rclpy.node import Node
class MinimalClientAsync(Node):
def __init__(self):
super().__init__('minimal_client_async')
self.cli = self.create_client(AddTwoInts, 'add_two_ints')
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('service not available, waiting again...')
self.req = AddTwoInts.Request()
def send_request(self, a, b):
self.req.a = a
self.req.b = b
self.future = self.cli.call_async(self.req)
rclpy.spin_until_future_complete(self, self.future)
return self.future.result()
def main():
rclpy.init()
minimal_client = MinimalClientAsync()
response = minimal_client.send_request(int(sys.argv[1]), int(sys.argv[2]))
minimal_client.get_logger().info(
'Result of add_two_ints: for %d + %d = %d' %
(int(sys.argv[1]), int(sys.argv[2]), response.sum))
minimal_client.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
与服务代码一样,我们首先使用必要的库。import
import sys
from example_interfaces.srv import AddTwoInts
import rclpy
from rclpy.node import Node
类构造函数使用名称 初始化节点。 构造函数定义创建与服务节点具有相同类型和名称的客户端。 类型和名称必须匹配,客户端和服务才能进行通信。 构造函数中的循环每秒检查一次与客户端类型和名称匹配的服务是否可用。 最后,它创建一个新的请求对象。MinimalClientAsync``minimal_client_async``while``AddTwoInts
def __init__(self):
super().__init__('minimal_client_async')
self.cli = self.create_client(AddTwoInts, 'add_two_ints')
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('service not available, waiting again...')
self.req = AddTwoInts.Request()
构造函数下方是方法,该方法将发送请求并旋转,直到收到响应或失败。send_request
def send_request(self, a, b):
self.req.a = a
self.req.b = b
self.future = self.cli.call_async(self.req)
rclpy.spin_until_future_complete(self, self.future)
return self.future.result()
最后,我们有了该方法,该方法构造一个对象,使用传入的命令行参数发送请求,并记录结果。main``MinimalClientAsync
def main():
rclpy.init()
minimal_client = MinimalClientAsync()
response = minimal_client.send_request(int(sys.argv[1]), int(sys.argv[2]))
minimal_client.get_logger().info(
'Result of add_two_ints: for %d + %d = %d' %
(int(sys.argv[1]), int(sys.argv[2]), response.sum))
minimal_client.destroy_node()
rclpy.shutdown()
与服务节点一样,您还必须添加一个入口点才能运行客户机节点。
文件的字段应如下所示:entry_points``setup.py
entry_points={
'console_scripts': [
'service = py_srvcli.service_member_function:main',
'client = py_srvcli.client_member_function:main',
],
},
在构建之前,最好在工作区 () 的根目录中运行以检查缺少的依赖项:rosdep``ros2_ws
Linux操作系统macOS 操作系统窗户
rosdep install -i --from-path src --rosdistro humble -y
导航回工作区的根目录,并生成新包:Navigate back to the root of your workspace, and build your new package:ros2_ws
colcon build --packages-select py_srvcli
打开一个新终端,导航到 ,并获取安装文件:ros2_ws
Linux操作系统macOS 操作系统窗户
source install/setup.bash
现在运行服务节点:
ros2 run py_srvcli service
节点将等待客户端的请求。
打开另一个终端并再次从内部获取安装文件。 启动客户机节点,后跟用空格分隔的任意两个整数:ros2_ws
ros2 run py_srvcli client 2 3
例如,如果选择 和 ,客户端将收到如下响应:2``3
[INFO] [minimal_client_async]: Result of add_two_ints: for 2 + 3 = 5
返回到运行服务节点的终端。 您将看到它在收到请求时发布了日志消息:
[INFO] [minimal_service]: Incoming request
a: 2 b: 3
进入服务器终端,停止节点旋转。Ctrl+C
您创建了两个节点来通过服务请求和响应数据。 您将它们的依赖项和可执行文件添加到包配置文件中,以便您可以构建和运行它们,从而允许您查看正在运行的服务/客户端系统。
在最近的几个教程中,您一直在利用接口跨主题和服务传递数据。 接下来,您将学习如何创建自定义界面。
有几种方法可以在 Python 中编写服务和客户端;查看 ROS2/Examples 存储库中的 AND 包。minimal_client``minimal_service
在本教程中,您使用了客户端节点中的 API 来调用服务。 还有另一个可用于 Python 的服务调用 API,称为同步调用。 我们不建议使用同步调用,但如果您想了解有关它们的更多信息,请阅读同步与异步客户端指南。call_async()
![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/249b376ca72a4562b52e49856e4b564d.png)
文件夹名称如下:
其中的代码如下:
setup.py
from setuptools import find_packages, setup
package_name = 'py_srvcli'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='a',
maintainer_email='a@todo.todo',
description='TODO: Package description',
license='Apache-2.0',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'service = py_srvcli.service_member_function:main',
'client = py_srvcli.client_member_function:main',
],
},
)
setup.cfg
[develop]
script_dir=$base/lib/py_srvcli
[install]
install_scripts=$base/lib/py_srvcli
package.xml
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>py_srvcli</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="a@todo.todo">a</maintainer>
<license>Apache-2.0</license>
<depend>rclpy</depend>
<depend>example_interfaces</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>
主要文件在这里:
初始化可以为空不用管
init.py
客户端
client_member_function.py
import sys
from example_interfaces.srv import AddTwoInts
import rclpy
from rclpy.node import Node
class MinimalClientAsync(Node):
def __init__(self):
super().__init__('minimal_client_async')
self.cli = self.create_client(AddTwoInts, 'add_two_ints')
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('service not available, waiting again...')
self.req = AddTwoInts.Request()
def send_request(self, a, b):
self.req.a = a
self.req.b = b
self.future = self.cli.call_async(self.req)
rclpy.spin_until_future_complete(self, self.future)
return self.future.result()
def main():
rclpy.init()
minimal_client = MinimalClientAsync()
response = minimal_client.send_request(int(sys.argv[1]), int(sys.argv[2]))
minimal_client.get_logger().info(
'Result of add_two_ints: for %d + %d = %d' %
(int(sys.argv[1]), int(sys.argv[2]), response.sum))
minimal_client.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
服务端
service_member_function.py
from example_interfaces.srv import AddTwoInts
import rclpy
from rclpy.node import Node
class MinimalService(Node):
def __init__(self):
super().__init__('minimal_service')
self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)
def add_two_ints_callback(self, request, response):
response.sum = request.a + request.b
self.get_logger().info('Incoming request\na: %d b: %d' % (request.a, request.b))
return response
def main():
rclpy.init()
minimal_service = MinimalService()
rclpy.spin(minimal_service)
rclpy.shutdown()
if __name__ == '__main__':
main()
运行代码 :
构建
rosdep install -i --from-path src --rosdistro humble -y
编译
colcon build --packages-select py_srvcli
最后的运行跟之前的一样,对于这个包使用:
source install/setup.bash
现在运行服务节点:
ros2 run py_srvcli service
节点将等待客户端的请求。
打开另一个终端并再次从内部获取安装文件。 启动客户机节点,后跟用空格分隔的任意两个整数:ros2_ws
ros2 run py_srvcli client 2 3
完成。。。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。