当前位置:   article > 正文

【 编写简单的服务和客户端 (Python)——ROS2_ubuntu22.04使用方法】_ros2 python 客户端程序

ros2 python 客户端程序

您正在阅读较旧但仍受支持的 ROS 2 版本的文档。 有关最新版本的信息,请查看 Iron。

参考原文链接:
http://fishros.org/doc/ros2/humble/Tutorials/Beginner-Client-Libraries/Writing-A-Simple-Py-Service-And-Client.html

编写简单的服务和客户端 (Python)

**目标:**使用 Python 创建和运行服务和客户机节点。

**教程级别:**初学者

**时间:**20分钟

内容

背景

当节点使用服务进行通信时,发送数据请求的节点称为客户端节点,响应请求的节点称为服务节点。 请求和响应的结构由文件确定。.srv

这里使用的示例是一个简单的整数加法系统;一个节点请求两个整数的总和,另一个节点响应结果。

先决条件

在前面的教程中,你学习了如何创建工作区创建包

任务

1 创建包

打开一个新终端并获取您的 ROS 2 安装,以便命令正常工作。ros2

导航到在上一教程中创建的目录。ros2_ws

回想一下,包应该在目录中创建,而不是在工作区的根目录中创建。 导航到并创建一个新包:src``ros2_ws/src

ros2 pkg create --build-type ament_python --license Apache-2.0 py_srvcli --dependencies rclpy example_interfaces
  • 1

您的终端将返回一条消息,验证您的包及其所有必要文件和文件夹的创建。py_srvcli

该参数会自动将必要的依赖项行添加到 。 是包含构建请求和响应所需的 .srv 文件的包:--dependencies``package.xml``example_interfaces

int64 a
int64 b
---
int64 sum
  • 1
  • 2
  • 3
  • 4

前两行是请求的参数,破折号下方是响应。

1.1 更新package.xml

由于您在包创建过程中使用了该选项,因此不必手动向 .--dependencies``package.xml

但是,与往常一样,请确保将描述、维护者电子邮件和名称以及许可证信息添加到 .package.xml

<description>Python client server tutorial</description>
<maintainer email="you@email.com">Your Name</maintainer>
<license>Apache License 2.0</license>
  • 1
  • 2
  • 3
1.2 更新setup.py

将相同的信息添加到文件中的 、 和 字段:setup.py``maintainer``maintainer_email``description``license

maintainer='Your Name',
maintainer_email='you@email.com',
description='Python client server tutorial',
license='Apache License 2.0',
  • 1
  • 2
  • 3
  • 4

2 编写服务节点

在目录中,创建一个名为的新文件,并将以下代码粘贴到其中:ros2_ws/src/py_srvcli/py_srvcli``service_member_function.py

from example_interfaces.srv import AddTwoInts

import rclpy
from rclpy.node import Node


class MinimalService(Node):

    def __init__(self):
        super().__init__('minimal_service')
        self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)

    def add_two_ints_callback(self, request, response):
        response.sum = request.a + request.b
        self.get_logger().info('Incoming request\na: %d b: %d' % (request.a, request.b))

        return response


def main():
    rclpy.init()

    minimal_service = MinimalService()

    rclpy.spin(minimal_service)

    rclpy.shutdown()


if __name__ == '__main__':
    main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
2.1 检查代码

第一个语句从包中导入服务类型。 以下语句导入 ROS 2 Python 客户端库,特别是类。import``AddTwoInts``example_interfaces``import``Node

from example_interfaces.srv import AddTwoInts

import rclpy
from rclpy.node import Node
  • 1
  • 2
  • 3
  • 4

类构造函数使用名称 初始化节点。 然后,它创建一个服务并定义类型、名称和回调。MinimalService``minimal_service

def __init__(self):
    super().__init__('minimal_service')
    self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)
  • 1
  • 2
  • 3

服务回调的定义接收请求数据,对数据求和,并将求和作为响应返回。

def add_two_ints_callback(self, request, response):
    response.sum = request.a + request.b
    self.get_logger().info('Incoming request\na: %d b: %d' % (request.a, request.b))

    return response
  • 1
  • 2
  • 3
  • 4
  • 5

最后,主类初始化 ROS 2 Python 客户端库,实例化该类以创建服务节点,并旋转该节点以处理回调。MinimalService

2.2 添加入口点

要允许该命令运行节点,您必须将入口点添加到(位于目录中)。ros2 run``setup.py``ros2_ws/src/py_srvcli

在括号之间添加以下行:'console_scripts':

'service = py_srvcli.service_member_function:main',
  • 1

3 编写客户端节点

在目录中,创建一个名为的新文件,并将以下代码粘贴到其中:ros2_ws/src/py_srvcli/py_srvcli``client_member_function.py

import sys

from example_interfaces.srv import AddTwoInts
import rclpy
from rclpy.node import Node


class MinimalClientAsync(Node):

    def __init__(self):
        super().__init__('minimal_client_async')
        self.cli = self.create_client(AddTwoInts, 'add_two_ints')
        while not self.cli.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('service not available, waiting again...')
        self.req = AddTwoInts.Request()

    def send_request(self, a, b):
        self.req.a = a
        self.req.b = b
        self.future = self.cli.call_async(self.req)
        rclpy.spin_until_future_complete(self, self.future)
        return self.future.result()


def main():
    rclpy.init()

    minimal_client = MinimalClientAsync()
    response = minimal_client.send_request(int(sys.argv[1]), int(sys.argv[2]))
    minimal_client.get_logger().info(
        'Result of add_two_ints: for %d + %d = %d' %
        (int(sys.argv[1]), int(sys.argv[2]), response.sum))

    minimal_client.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
3.1 检查代码

与服务代码一样,我们首先使用必要的库。import

import sys

from example_interfaces.srv import AddTwoInts
import rclpy
from rclpy.node import Node
  • 1
  • 2
  • 3
  • 4
  • 5

类构造函数使用名称 初始化节点。 构造函数定义创建与服务节点具有相同类型和名称的客户端。 类型和名称必须匹配,客户端和服务才能进行通信。 构造函数中的循环每秒检查一次与客户端类型和名称匹配的服务是否可用。 最后,它创建一个新的请求对象。MinimalClientAsync``minimal_client_async``while``AddTwoInts

def __init__(self):
    super().__init__('minimal_client_async')
    self.cli = self.create_client(AddTwoInts, 'add_two_ints')
    while not self.cli.wait_for_service(timeout_sec=1.0):
        self.get_logger().info('service not available, waiting again...')
    self.req = AddTwoInts.Request()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

构造函数下方是方法,该方法将发送请求并旋转,直到收到响应或失败。send_request

def send_request(self, a, b):
    self.req.a = a
    self.req.b = b
    self.future = self.cli.call_async(self.req)
    rclpy.spin_until_future_complete(self, self.future)
    return self.future.result()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

最后,我们有了该方法,该方法构造一个对象,使用传入的命令行参数发送请求,并记录结果。main``MinimalClientAsync

def main():
    rclpy.init()

    minimal_client = MinimalClientAsync()
    response = minimal_client.send_request(int(sys.argv[1]), int(sys.argv[2]))
    minimal_client.get_logger().info(
        'Result of add_two_ints: for %d + %d = %d' %
        (int(sys.argv[1]), int(sys.argv[2]), response.sum))

    minimal_client.destroy_node()
    rclpy.shutdown()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
3.2 添加入口点

与服务节点一样,您还必须添加一个入口点才能运行客户机节点。

文件的字段应如下所示:entry_points``setup.py

entry_points={
    'console_scripts': [
        'service = py_srvcli.service_member_function:main',
        'client = py_srvcli.client_member_function:main',
    ],
},
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

4 构建和运行

在构建之前,最好在工作区 () 的根目录中运行以检查缺少的依赖项:rosdep``ros2_ws

Linux操作系统macOS 操作系统窗户

rosdep install -i --from-path src --rosdistro humble -y
  • 1

导航回工作区的根目录,并生成新包:Navigate back to the root of your workspace, and build your new package:ros2_ws

colcon build --packages-select py_srvcli
  • 1

打开一个新终端,导航到 ,并获取安装文件:ros2_ws

Linux操作系统macOS 操作系统窗户

source install/setup.bash
  • 1

现在运行服务节点:

ros2 run py_srvcli service
  • 1

节点将等待客户端的请求。

打开另一个终端并再次从内部获取安装文件。 启动客户机节点,后跟用空格分隔的任意两个整数:ros2_ws

ros2 run py_srvcli client 2 3
  • 1

例如,如果选择 和 ,客户端将收到如下响应:2``3

[INFO] [minimal_client_async]: Result of add_two_ints: for 2 + 3 = 5
  • 1

返回到运行服务节点的终端。 您将看到它在收到请求时发布了日志消息:

[INFO] [minimal_service]: Incoming request
a: 2 b: 3
  • 1
  • 2

进入服务器终端,停止节点旋转。Ctrl+C

总结

您创建了两个节点来通过服务请求和响应数据。 您将它们的依赖项和可执行文件添加到包配置文件中,以便您可以构建和运行它们,从而允许您查看正在运行的服务/客户端系统。

后续步骤

在最近的几个教程中,您一直在利用接口跨主题和服务传递数据。 接下来,您将学习如何创建自定义界面

相关内容

  • 有几种方法可以在 Python 中编写服务和客户端;查看 ROS2/Examples 存储库中的 AND 包。minimal_client``minimal_service

  • 在本教程中,您使用了客户端节点中的 API 来调用服务。 还有另一个可用于 Python 的服务调用 API,称为同步调用。 我们不建议使用同步调用,但如果您想了解有关它们的更多信息,请阅读同步与异步客户端指南。call_async()

  • 在这里插入图片描述

  • 在这里插入图片描述
    ![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/249b376ca72a4562b52e49856e4b564d.png)

源码分析

文件夹名称如下:
在这里插入图片描述

其中的代码如下:

setup.py

from setuptools import find_packages, setup

package_name = 'py_srvcli' 

setup( 
    name=package_name, 
    version='0.0.0', 
    packages=find_packages(exclude=['test']), 
    data_files=[ 
        ('share/ament_index/resource_index/packages', 
            ['resource/' + package_name]), 
        ('share/' + package_name, ['package.xml']), 
    ],
    install_requires=['setuptools'], 
    zip_safe=True, 
    maintainer='a', 
    maintainer_email='a@todo.todo', 
    description='TODO: Package description', 
    license='Apache-2.0', 
    tests_require=['pytest'], 
    entry_points={ 
        'console_scripts': [         
                'service = py_srvcli.service_member_function:main',

        'client = py_srvcli.client_member_function:main', 
        ],
    },
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

setup.cfg

[develop]
script_dir=$base/lib/py_srvcli
[install]
install_scripts=$base/lib/py_srvcli
  • 1
  • 2
  • 3
  • 4

package.xml

<?xml version="1.0"?> 
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>

<package format="3"> 
  <name>py_srvcli</name> 
  <version>0.0.0</version> 
  <description>TODO: Package description</description>

  <maintainer email="a@todo.todo">a</maintainer> 
  <license>Apache-2.0</license> 

  <depend>rclpy</depend> 
  <depend>example_interfaces</depend> 

  <test_depend>ament_copyright</test_depend> 
  <test_depend>ament_flake8</test_depend> 
  <test_depend>ament_pep257</test_depend> 
  <test_depend>python3-pytest</test_depend> 

  <export> 
    <build_type>ament_python</build_type> 
  </export> 
</package> 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

主要文件在这里:
在这里插入图片描述
初始化可以为空不用管
init.py

客户端
client_member_function.py

import sys

from example_interfaces.srv import AddTwoInts
import rclpy
from rclpy.node import Node


class MinimalClientAsync(Node):

    def __init__(self):
        super().__init__('minimal_client_async')
        self.cli = self.create_client(AddTwoInts, 'add_two_ints')
        while not self.cli.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('service not available, waiting again...')
        self.req = AddTwoInts.Request()

    def send_request(self, a, b):
        self.req.a = a
        self.req.b = b
        self.future = self.cli.call_async(self.req)
        rclpy.spin_until_future_complete(self, self.future)
        return self.future.result()


def main():
    rclpy.init()

    minimal_client = MinimalClientAsync()
    response = minimal_client.send_request(int(sys.argv[1]), int(sys.argv[2]))
    minimal_client.get_logger().info(
        'Result of add_two_ints: for %d + %d = %d' %
        (int(sys.argv[1]), int(sys.argv[2]), response.sum))

    minimal_client.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

服务端
service_member_function.py

from example_interfaces.srv import AddTwoInts

import rclpy
from rclpy.node import Node


class MinimalService(Node):  

    def __init__(self):  
        super().__init__('minimal_service')  
        self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)



    def add_two_ints_callback(self, request, response):  
        response.sum = request.a + request.b   
        self.get_logger().info('Incoming request\na: %d b: %d' % (request.a, request.b))




        return response   


def main():   
    rclpy.init()   

    minimal_service = MinimalService()   

    rclpy.spin(minimal_service)   

    rclpy.shutdown()   


if __name__ == '__main__':   
    main()   
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

运行代码 :
构建
rosdep install -i --from-path src --rosdistro humble -y

编译
colcon build --packages-select py_srvcli

最后的运行跟之前的一样,对于这个包使用:
source install/setup.bash
现在运行服务节点:

ros2 run py_srvcli service
节点将等待客户端的请求。
打开另一个终端并再次从内部获取安装文件。 启动客户机节点,后跟用空格分隔的任意两个整数:ros2_ws
ros2 run py_srvcli client 2 3
完成。。。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/797845
推荐阅读
相关标签
  

闽ICP备14008679号