赞
踩
最近学了一点点ROS2,为了方便自己以后查看,胡乱总结了一点自认为的干货,欢迎佬们批评指正!
ros2 run <package_name> <executable_name>
例如:ros2 run turtlesim turtlesim_node
CLI:命令行界面
GUI:图形交互界面
运行节点
ros2 run <packagea> <executable_name>
查看节点列表
ros2 node list
查看节点信息
ros2 node info <node_name>
重映射节点名称
ros2 run turtlesim turtlesim_node --ros-args __node:=my_turtle
一个工作空间下有多个功能包,一个功能包下有多个节点
mkdir -p <工作空间名>/src
根据编译方式的不同分为三类
ament_python,适用于python程序
cmake,适用于C++
ament_cmake,适用于C++
(1)安装获取
sudo apt install ros-<version>-package_name
自动放置系统目录不用再次手动source
(2)手动编译获取
需要手动source工作空间的install目录
ros2 pkg create <package-name> --build_type {cmake.ament_cmake,ament_python} --dependencies<依赖名字>
列出所有
ros2 pkg executables
列出某个功能包
ros2 pkg executables turtlesim
ros2 pkg list
ros2 pkg prefix <package-name>
每一个包都有一个标配的manifest.xml文件。用于记录这个包的名字,构建工具,编译信息,拥有者,干什么用的信息
ros2 pkg xml turtlesim
sudo apt-get install python3-colcon-common-extensions
colcon test --package-select YOUR_PKG_NAME
colcon test --package-select YOUR_PKG_NAME --cmake-args -DBUTLD_TESTING=0
colcon test
colcon build --symlink-install
mkdir -p 工作空间名字/src
cd 工作空间名字/src
ros2 pkg create <功能包名字> --build-type ament_python --dependencies rclpy
PS:若build-type什么都不写,ROS2默认ament_cmake
功能包文件夹下创建 xxx.py
import rclpy
from rclpy.node import Node
def main(args=None)
rclpy.init(args=args) #初始化rclpy
node = Node("节点名") #新建节点
rclpy.spin(node) #保持节点运行
rclpy.shutdown #关闭rclpy
并且需要配置setup.py
在console_scripts中 “节点名_node=功能包名.xxx.py”
import rclpy
from rclpy.node import Node
class WriterNode(Node)
def __init__(self.name)
self.get_logger().info("xxxxx")
def main(args=None)
rclpy.init(args=args) #初始化rclpy
node=WriteNode("节点名")#新建节点
rclpy.spin(node) #保持节点运行
rclpy.shutdown
ros2 pkg create village_wang --build-type ament_cmake --dependencies rclcpp
创建节点 :
src目录下创建文件xxx.cpp
#include "rclcpp/rclcpp.hpp"
int main(int argc,char,**argv)
{
rclcpp::init(argc,argv);
auto node = std:;make_shared<rclcpp::Node>("节点名"); #日志打印
RLCPP_INFO(node->get_logger(),"大家好");
rclcpp::spin(node);
rclcpp::shutdown();
return 0;
}
#include "rclcpp/rclcpp.hpp"
class 类名 :public rclcpp::Node(name)
{
public:
构造函数名字(std::make_shared<类型名>):Node(name)
{
RCLCPP_INFO(this->get_logger(),"大家好");
}
private:
}
int main(int argc,char,**argv)
{
rclcpp::init(argc,argv);
auto node = std::make_shared<类型名>("对象名");
rclcpp::spin(node);
rclcpp::shutdown();
return 0;
}
DDS发布订阅模型,
订阅者和发布者都不唯一,
异步通信机制:适用于周期发布的数据(例如:传感器)
.msg文件定义通信的消息结构
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class PublisherNode(Node)
def _init_(self,name): #ROS2节点父类初始化
super()._init_(name) #创建发布者对象(消息类型,话题名,队列长度)
self.pub=self.create_publisher(String,"chatter",10)
#创建定时器(单位为秒的周期,定时执行的回调函数)
self.timer=self.create_timer(0.5,self,timer_callback)
def timer_callback(self): #创建定时器正周期执行的回调函数
msg=String() #创建一个String类型的消息对象
msg.data='HELLO' #填充消息对象中的消息数据
self.pub.pulish(msg) #发布话题消息
self.get_logger().info('Publishing:"%s"'% msg.data) #输出日志信息
def main (args=None)
rcloy.init(args=args) #接口初始化
node=PublisherNode("topic_helloworld_pub") #创建节点对象并初始化
rclpy.spin(node) #循环节点
rclpy.shutdown
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class PublisherNode(Node)
def _init_(self,name): #ROS2节点父类初始化
super()._init_(name) #创建发布者对象(消息类型,话题名,队列长度)
def listener_callback(self,msg): #创建回调函数,收到话题消息后对数据处理
self.get_logger().info('I heard :"%s"'% msg.data) #输出日志信息
def main (args=None)
rcloy.init(args=args) #接口初始化
node=PublisherNode("topic_helloworld_sub") #创建节点对象并初始化
rclpy.spin(node) #循环节点
rclpy.shutdown
在console_scripts中写 ' 节点名 = 功能包名 . 节点名 :main '
放到工作空间的Install文件夹下
import rclpy
from rclpy.node import Node
from snsor_msgs.msg import String
from cv_bridge import CvBridge #ROS与OpenCV图像类型的转换
import cv2
class ImagePublisher(Node)
def _init_(self,name): #ROS2节点父类初始化
super()._init_(name) #创建发布者对象(消息类型,话题名,队列长度)
self.pub=self.create_publisher(Image,'image_raw',10)
#创建定时器(单位为秒的周期,定时执行的回调函数)
self.timer=self.create_timer(0.1,self,timer_callback)
#创建一个视频采集对象,驱动相机采集图像(相机设备号)
self.cap = cv2.VideoCapture(0)
#创建一个图像转换对象,用于稍后将OpenCV的图像转换成ROS的图像消息
self.cv_bridge = CvBridge()
def timer_callback(self): #一帧一帧读取图像
ret, frame = self.cap.read()
if ret = =True: #若读取图像成功,则发布图像消息
self.publishe(
self.cv_bridge.cv2_to_imgmsg(frame, 'bgr8'))
self.get_logger().info(' Publishing video frame ') #输出日志信息
def main (args=None)
rcloy.init(args=args) #接口初始化
node=PublisherNode("topic_helloworld_pub") #创建节点对象并初始化
rclpy.spin(node) #循环节点
node.desteroy_node() #销毁节点对象
rclpy.shutdown
(遗留问题:什么时候需要销毁节点)
import rclpy
from rclpy.node import Node
from snsor_msgs.msg import String
from cv_bridge import CvBridge #ROS与OpenCV图像类型的转换
import cv2
import numpy as np #python数值计算库
lower_red = np.array([0,90,128]) #红色HSV阈值下限
upper_red = np.array([180,255,255]) #红色HSV阈值上限
class ImageSubscriber(Node):
def _init_(self,name):
super()._init_(name) #ROS父类节点初始化
#创建订阅者对象(消息类型,话题名,订阅者回调函数,队列长度)
self.sub = self.create_subscription(
Image, 'image_raw', self.listener_callback,10)
#创建一个图像转换对象
self.cv_brige = CvBridge()
def object_detect(self, image):
hsv_img = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) #RGB转HSV
mask_red = cv2.inRange(hsv_img,lower_red,upper_red)
contours, hierarchy = cv2.findContours(
mask_red, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE) #轮廓检测
for cnt in contours :
if cnt . shape[0] < 150:
continue
(x, y, w, h) = cv2.boundingRect(cnt)
cv2.drawCounter(image, [cnt], -1, (0,255,0),2)
cv2.circle(image, (int(x+w/2), int(y+w/2), 5, (0,255,0), -1)
cv2.imshow("object", image)
cv.waitkey(10)
def listener_callback(self, data):
self.get_ogger().info('Receiving video frame')
image = self.cv_bridge.imgmsg_to_cv2(data, 'bgr8')
self .object_detect(image)
def main (args=None)
rcloy.init(args=args) #接口初始化
node= ImageSubscriber ("topic_helloworld_pub") #创建节点对象并初始化
rclpy.spin(node) #循环节点
node.desteroy_node() #销毁节点对象
rclpy.shutdown
PS:记得配置setup.py
sudo apt install ros-humble-usb-cam
ros2 run usb_cam usb_cam_node_exe
ros2 run learing_topic topic_webcam_sub
(话题通讯延时较大)
模型:客户端/服务器(C/S)模型 同步通信机制
服务器端唯一,客户端可以不唯一
.srv文件定义请求和应答数据结构
编程接口初始化
创建节点并初始化
创建客户端对象
创建并发送请求数据
等待服务器端应答数据
销毁节点并关闭接口
编程接口初始化
创建节点并初始化
创建服务器端对象
通过回调函数处进行服务
向客户端反馈应答结果
销毁节点并关闭窗口
import rclpy
from rclpy.node import Node
from learning_interface.srv import AddTwoInts #自定义服务接口
class adderServer(Node):
def _init_(self,name):
super()._init_(name) #ROS父类节点初始化
#创建服务对象(接口类型,服务名 ,服务器回调函数)
self.srv = self.create_service(AddTwoInts, 'add_two_ints' ,self.adder_callback)
def adder_callback(self, request, response): #创建回调函数
response.sum = request.a + request.b
self.get_logger.info(' Incoming requesi \na : %d b:%d ' % (request.a ,request.b) )
return response #反馈应答信息
def main( args=None ) :
rcloy.init(args=args) #接口初始化
node= ImageSubscriber (" sevice_adder_server ") #创建节点对象并初始化
rclpy.spin(node) #循环节点
node.desteroy_node() #销毁节点对象
rclpy.shutdown
import rclpy
from rclpy.node import Node
from learning_interface.srv import AddTwoInts #自定义服务接口
class adderClient(Node) :
def _init_(self , name):
#ROS2节点父类初始化
super()._init_(name)
#创建客户端服务对象(服务接口类型, 服务名)
self.client = self.create_client(AddTwoInts, ' add_two_ints ' )
#循环等待服务器端成功启动
while not self . client .wait_for_service(timeout_sec=1.0) :
self.set_logger().info(' service not available, waiting again...')
#创建服务请求的数据对象
self . request = AddTwoInts . Request ()
def send_request(self) :
self . request.a = int(sys.argv[1])
self . request.a = int(sys.argv[2])
#异步方式发送服务请求
self.future = self.client.call_async(self.request)
def main(args=None) :
rclpy.init(args=args)
node = adderClient(" service_adder_client")
node . send_request() #发送服务请求
while rclpy . ok() :
rclpy .spin_once(node) #循环执行一次节点
if node . future . done() #数据处理是否完成
try :
response = node .future . result () #接受服务器端的反馈数据
except Exception as e :
node . get_logger() . info(
' Service call failed %r ' % (e, ))
else :
node . get_logger(). info(
' Result of add_two_ints : for %d + %d = %d ' %
(node . request . a, node . request . b , responce . sum) )
break
node . destroy_node()
rclpy. shutdown()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。