赞
踩
笔者跟着鱼香ROS的ROS2学习之旅
学习参考:
【ROS2机器人入门到实战】
笔者的学习目录
在插件栏中搜索PlatformIO IDE,如下图安装蚂蚁头插件
安装后,左侧栏中就出现了蚂蚁头
打开VSCode,点击左边的蚂蚁头图标,然后在PROJECT TASKS类目中,点击Create New Project,再点击Quick Access上的New Project,在Project Wizard中,填写规则如下:
Borad中搜索ESP32 DEV,然后选择Espressif ESP32 Dev Module,这是我们ESP32开发板对应的Board类型,一定要选择正确
Framework选择Arduino,表示我们是基于Arduino框架进行开发
Location可以勾选Use default location,表示将项目存方法在默认目录(鼠标悬浮在旁边的问号,可看到项目默认存放路径);或者不勾选,然后自己决定项目存放目录即可
在开始之前我们需要正确读取IMU数据才能在ROS2上进行发布
笔者以 Yesense元生创新的YIS506为例
参考笔者的博文链接:MOMO的鱼香ROS2(七)ROS2进阶篇——从驱动IMU开始前篇:串口数据读取
笔者整理了ROS2编写节点7步程序,如下:
- 创建工作空间
mkdir -p ROS_WS/colcon_ws/src
cd ROS_WS/colcon_ws/src- 创建功能包
ros2 pkg create imu_py[功能包名称] --build-type ament_python --dependencies rclpy- 创建节点
touch imu_py/imu_py /node_imu.py- 编写节点脚本程序如下:
(1) 导入库文件
(2) 初始化客户端库
(3) 新建节点
(4) spin循环节点
(5) 关闭客户端库shutdown- 修改 setup.py
node_imu = imu_py.node_imu:main
node_imu:节点名称
imu_py.node_imu:main:【软件包】.【执行文件.py】:【执行函数mian】- 编译运行节点
cd ROS_WS/colcon_ws
colcon build --packages-select imu_py --symlink-install
source install/setup.bash- 运行节点
ros2 run imu_py node_imu
其中节点脚本如下:
# -*- coding: utf-8 -*- # 1. 导入库文件 import rclpy from rclpy.node import Node # imu接收数据类型 class Node_imu(Node): def __init__(self,name): super().__init__(name) # 继承父类,初始化名称 self.get_logger().info("大家好,我是%s!" % name) def main(args=None): """ ros2运行该节点的入口函数 编写ROS2节点的一般步骤 1. 导入库文件 2. 初始化客户端库 3. 新建节点对象 4. spin循环节点 5. 关闭客户端库 """ rclpy.init(args=args) # 2. 初始化客户端库 node = IMUPublisher("imu_publisher") # 3. 新建节点对象 rclpy.spin(node) # 4. spin循环节点,保持节点运行,检测是否收到退出指令(Ctrl+C) rclpy.shutdown() # 5. 关闭客户端库
下面正式开始编写程序!
步骤1-2如上
步骤3:创建IMU发布者和订阅者节点
touch imu_py /imu_publisher.py
touch imu_py /imu_subscribe.py
# -*- coding: utf-8 -*- """ 2024.03.15 author:alian function:ROS2发布IMU 1.查看映射端口 ls /dev/ttyUSB* 2.更改端口的权限 sudo chmod 777 /dev/ttyUSB0 sudo usermod -aG dialout username[修改] """ import rclpy from rclpy.node import Node # 话题接口 from sensor_msgs.msg import Imu # imu接口 from sensor_msgs.msg import LaserScan # 激光雷达接口 from sensor_msgs.msg import Image # 相机接口 # Usart Library import serial import struct import codecs # imu接收数据类型 class IMUPublisher(Node): def __init__(self,name): super().__init__(name) # 继承父类,初始化名称 self.get_logger().info("大家好,我是%s!" % name) # 初始化发布者,传入参数:消息类型、话题名称、缓冲区大小 self.imu_publisher = self.create_publisher(Imu, 'alian_IMU', 10) # 创建发布imu数据的发布者到话题:imu_data上 # 串口初始化 self.IMU_Usart = serial.Serial( port='/dev/ttyUSB0', # 串口 baudrate=460800 ) # 接收数据初始化---------------------------------------------------------- # 加速度 self.accel_x = 0.0 self.accel_y = 0.0 self.accel_z = 0.0 # 角速度 self.angle_x = 0.0 self.angle_y = 0.0 self.angle_z = 0.0 # 欧拉角 self.pitch = 0.0 self.roll = 0.0 self.yaw = 0.0 # 四元数 self.quaternion_data0 = 0.0 self.quaternion_data1 = 0.0 self.quaternion_data2 = 0.0 self.quaternion_data3 = 0.0 # 采样时间戳 self.sample_timestamp = 0 # 同步输出时间戳 self.data_ready_timestamp = 0 # 判断串口是否打开成功 if self.IMU_Usart.isOpen(): print("open success1111") else: print("open failed") # 回调函数返回周期 # 定时器:一定的时间间隔(time_period)触发回调函数(self.timer_callback) time_period = 0.001 self.timer = self.create_timer(time_period, self.timer_callback) def timer_callback(self): """ 定时器回调函数 """ # ----读取IMU的内部数据----------------------------------- try: self.Read_data() # 发布sensor_msgs/Imu 数据类型 imu_data = Imu() imu_data.header.frame_id = "IMU" imu_data.header.stamp = self.get_clock().now().to_msg() imu_data.linear_acceleration.x = self.accel_x imu_data.linear_acceleration.y = self.accel_y imu_data.linear_acceleration.z = self.accel_z imu_data.angular_velocity.x = self.angle_x # unit transfer to rad/s imu_data.angular_velocity.y = self.angle_y imu_data.angular_velocity.z = self.angle_z imu_data.orientation.x = self.quaternion_data0 imu_data.orientation.y = self.quaternion_data1 imu_data.orientation.z = self.quaternion_data2 imu_data.orientation.w = self.quaternion_data3 self.imu_publisher.publish(imu_data) # 发布imu的数据 self.get_logger().info(f'发布了指令:{imu_data.header.frame_id}') #打印一下发布的数据 # -------------------------------------------------------- # print('加速度:',self.accel_x,self.accel_y,self.accel_z) # print('角速度:',self.angle_x,self.angle_y,self.angle_z) # print('欧拉角:',self.pitch,self.roll,self.yaw) # print('时间戳:',self.sample_timestamp,self.data_ready_timestamp) except KeyboardInterrupt: if serial != None: print("close serial port") self.IMU_Usart.close() def Send_ReadCommand(self): ''' Author: alian Time: 2024.03.15 description: 发送读取IMU内部数据指令 IMU型号:YIS506, 波特率为:460800 协议长度:93 ''' # 静态零偏校准指令发送 self.IMU_Usart.write(codecs.decode('59534d12005001b06a','hex')) #发送 def Read_data(self): # 定义解析头 PROTOCOL_FIRST_BYTE = 0x59 PROTOCOL_SECOND_BYTE = 0x53 # 定义数据因子 fac1 = 0.000001 count = self.IMU_Usart.inWaiting() if count > 0: # 发送静态零偏校准指令 self.IMU_Usart.write(codecs.decode('59534d12005001b06a','hex')) data = self.IMU_Usart.read(79) print('开始解析数据') if data[0]==0x59 and data[1]==0x53: print('解析头校验:',data[0] == PROTOCOL_FIRST_BYTE and data[1] == PROTOCOL_SECOND_BYTE) header = struct.unpack('HHB', data[:5]) # 解包协议头部分:帧头、帧序号、数据长度 payload_len = header[2] # 返回数据长度 # 1.校验和:帧号到数据域的最后一个字节(有问题,后续再修改) # ck_1,ck_2 = calc_checksum(data,payload_len+3) # 计算校验和 # if ck_1!=data[-2] or ck_2!=data[-1]: # print('crc_err') # 2.分析数据域 if payload_len>0: # 解析加速度 if data[5]==0x10 and data[6]==12: # 每四个字节组成一个int型(有符号)数据,小端模式(最低有效字节排在最前面),然后乘以系数 self.accel_x = int.from_bytes(data[7:11],byteorder='little',signed = True)*fac1 self.accel_y = int.from_bytes(data[11:15],byteorder='little',signed = True)*fac1 self.accel_z = int.from_bytes(data[15:19],byteorder='little',signed = True)*fac1 # 解析角速度 if data[19]==0x20 and data[20]==12: self.angle_x = int.from_bytes(data[21:25],byteorder='little',signed = True)*fac1 self.angle_y = int.from_bytes(data[25:29],byteorder='little',signed = True)*fac1 self.angle_z = int.from_bytes(data[29:33],byteorder='little',signed = True)*fac1 # 解析欧拉角 if data[33]==0x40 and data[34]==12: self.pitch = int.from_bytes(data[35:39],byteorder='little',signed = True)*fac1 self.roll = int.from_bytes(data[39:43],byteorder='little',signed = True)*fac1 self.yaw = int.from_bytes(data[43:47],byteorder='little',signed = True)*fac1 # 解析四元数 if data[47]==0x41 and data[48]==16: self.quaternion_data0 = int.from_bytes(data[49:53],byteorder='little',signed = True)*fac1 self.quaternion_data1 = int.from_bytes(data[53:57],byteorder='little',signed = True)*fac1 self.quaternion_data2 = int.from_bytes(data[57:61],byteorder='little',signed = True)*fac1 self.quaternion_data3 = int.from_bytes(data[61:65],byteorder='little',signed = True)*fac1 # 解析采样时间戳 if data[65]==0x51 and data[66]==4: self.sample_timestamp = int.from_bytes(data[67:71],byteorder='little',signed = False) # 解析采样时间戳 if data[71]==0x52 and data[72]==4: self.data_ready_timestamp = int.from_bytes(data[73:77],byteorder='little',signed = False) # 重新打开串口 else: self.IMU_Usart = serial.Serial('/dev/ttyUSB0', 460800) def main(args=None): """ ros2运行该节点的入口函数 编写ROS2节点的一般步骤 1. 导入库文件 2. 初始化客户端库 3. 新建节点对象 4. spin循环节点 5. 关闭客户端库 """ rclpy.init(args=args) # 初始化rclpy node = IMUPublisher("imu_publisher") # 新建一个节点 rclpy.spin(node) # 保持节点运行,检测是否收到退出指令(Ctrl+C) rclpy.shutdown() # 关闭rclpy
# -*- coding: utf-8 -*- import rclpy from rclpy.node import Node # 话题接口 from sensor_msgs.msg import Imu # imu接口 class IMUSubscribe(Node): def __init__(self,name): super().__init__(name) self.get_logger().info("大家好,我是%s!" % name) # 初始化订阅者,传入参数:消息类型、话题名称、回调函数(用于处理接收的消息),缓冲区大小 self.imu_subscribe_node = self.create_subscription(Imu, 'alian_IMU',self.imu_callback, 10) # 创建发布imu数据的发布者到话题:imu_data上 def imu_callback(self, imu_data): self.get_logger().info(f'收到[{imu_data.header.frame_id}]命令') print('加速度:',imu_data.linear_acceleration.x,imu_data.linear_acceleration.y,imu_data.linear_acceleration.z) print('角速度:',imu_data.angular_velocity.x,imu_data.angular_velocity.y,imu_data.angular_velocity.z) def main(args=None): rclpy.init(args=args) # 初始化rclpy node = IMUSubscribe("imu_subscribe") # 新建一个节点 rclpy.spin(node) # 保持节点运行,检测是否收到退出指令(Ctrl+C) rclpy.shutdown() # 关闭rclpy
from setuptools import setup import os import glob package_name = 'imu_py' setup( name=package_name, version='0.0.0', packages=[package_name], data_files=[ ('share/ament_index/resource_index/packages', ['resource/' + package_name]), ('share/' + package_name, ['package.xml']), ], install_requires=['setuptools'], zip_safe=True, maintainer='ll', maintainer_email='ll@todo.todo', description='TODO: Package description', license='TODO: License declaration', tests_require=['pytest'], entry_points={ 'console_scripts': [ "node_imu=imu_py.node_imu:main", "imu_publisher=imu_py.imu_publisher:main", # 在这修改哦!可以发布多个节点 "imu_subscribe=imu_py.imu_subscribe:main" ], }, )
终端1:
# 编译运行节点
cd ROS_WS/colcon_ws
colcon build --packages-select imu_py --symlink-install
source install/setup.bash
# 运行节点
ros2 run imu_py imu_publisher
终端2:
# 编译运行节点
cd ROS_WS/colcon_ws
colcon build --packages-select imu_py --symlink-install
source install/setup.bash
# 运行节点
ros2 run imu_py imu_subscribe
打开试小工具-RQT查看话题发布情况
终端3:
rqt
使用launch文件批量启动多个节点文件
参考链接:ROS2——教你写新版Launch文件
cd ROS_WS/colcon_ws/src # 在该目录下打开终端
mkdir imu_py/launch
touch imu_py/launch /alian_imu_launch.py
alian_imu_launch.py
# 导入库 from launch import LaunchDescription from launch_ros.actions import Node from setuptools import setup from glob import glob import os def generate_launch_description(): """launch内容描述函数,由ros2 launch 扫描调用 example-node = Node( package='package-name', #节点所在的功能包 namespace='package-namespace', #命名空间。如果存在同名节点,这一选项会有用 node_executable ='execute-name/script-name.py', #表示要运行的可执行文件名或脚本名字.py parameters=[{'parameter-name': parameter-value}], #参数 arguments=['-xxx', xxx, '-xxx', xxx ], #启动参数 output='screen', #用于将话题信息打印到屏幕 name='node-name' #表示启动后的节点名,可以没有 remappings=[ #重映射 ('/xxx/xxx-new', '/xxx/xxx-old'), ] ) """ imu_publisher = Node( package="imu_py", # 节点所在功能包 node_executable="imu_publisher", output='screen' ) imu_subscribe = Node( package="imu_py", node_executable="imu_subscribe", output='screen' ) # 创建LaunchDescription对象launch_description,用于描述launch文件 launch_description = LaunchDescription( [imu_publisher, imu_subscribe]) # 返回让ROS2根据launch描述执行节点 return launch_description
重要!!!修改setup.py文件
from setuptools import setup import os import glob package_name = 'imu_py' setup( name=package_name, version='0.0.0', packages=[package_name], data_files=[ ('share/ament_index/resource_index/packages', ['resource/' + package_name]), ('share/' + package_name, ['package.xml']), # 添加launch配置 (os.path.join('share', package_name, 'launch'), glob.glob('launch/*.launch.py')) ], install_requires=['setuptools'], zip_safe=True, maintainer='ll', maintainer_email='ll@todo.todo', description='TODO: Package description', license='TODO: License declaration', tests_require=['pytest'], entry_points={ 'console_scripts': [ "node_imu=imu_py.node_imu:main", "imu_publisher=imu_py.imu_publisher:main", # 可以发布多个节点 "imu_subscribe=imu_py.imu_subscribe:main" ], }, )
同样的编译、配置、运行三步走
终端1:
# 编译+配置
cd ROS_WS/colcon_ws
colcon build --packages-select imu_py --symlink-install
source install/setup.bash
# 运行节点
ros2 launch imu_py alian_imu.launch.py
终端2:RQT查看话题发布情况
rqt
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。