赞
踩
接着上篇文章
,我记录一下ROS2多线程Python版的问题。
提出这个话题的原因是上文中,当需要小车旋转90°时。我最开始的想法是用while循环来实现:
while abs(self.first_yaw-self.yaw)>0.05:
cmd_vel = Twist()
cmd_vel.linear.x = 0.0
cmd_vel.angular.z = angle
self.cmd_vel_pub.publish(cmd_vel)
而不是用if :
if abs(self.first_yaw-self.yaw)>0.05:
cmd_vel = Twist()
cmd_vel.linear.x = 0.0
cmd_vel.angular.z = angle
self.cmd_vel_pub.publish(cmd_vel)
显然,使用while会该回调函数 odom_callback 堵塞,导致无法正常获得 self.yaw ,于是小车一直打转 。 如上文,使用if 可以正常运行。
知晓while 出问题的原因,我尝试将整个移动函数与odom_callback分开,另写一个回调函数执行移动函数, 然而这是没用的, while依旧会堵塞主线程
最终通过多线程的方式解决问题 :
#! /usr/bin/env python3 import rclpy import math import numpy as np from rclpy.node import Node from geometry_msgs.msg import Twist from nav_msgs.msg import Odometry import time from rclpy.callback_groups import ReentrantCallbackGroup, MutuallyExclusiveCallbackGroup from rclpy.executors import MultiThreadedExecutor, SingleThreadedExecutor def euler_from_quaternion(quaternion): """ Converts quaternion (w in last place) to euler roll, pitch, yaw quaternion = [x, y, z, w] """ x = quaternion.x y = quaternion.y z = quaternion.z w = quaternion.w sinr_cosp = 2 * (w * x + y * z) cosr_cosp = 1 - 2 * (x * x + y * y) roll = np.arctan2(sinr_cosp, cosr_cosp) sinp = 2 * (w * y - z * x) pitch = np.arcsin(sinp) siny_cosp = 2 * (w * z + x * y) cosy_cosp = 1 - 2 * (y * y + z * z) yaw = np.arctan2(siny_cosp, cosy_cosp) return roll, pitch, yaw class CarController(Node): def __init__(self): super().__init__('car_controller') # 目标位置 self.target_x = 1.0 # 目标 x 坐标 self.target_y = 1.0 # 目标 y 坐标 self.current_x = 0.0 # 当前 x 坐标 self.current_y = 0.0 # 当前 y 坐标 self.roll=0.0 self.pitch=0.0 self.yaw=0.0 self.group1 = MutuallyExclusiveCallbackGroup() self.group2 = MutuallyExclusiveCallbackGroup() # 订阅当前位置 self.odom_sub = self.create_subscription( Odometry, '/odom', self.odom_callback, 10, callback_group=self.group1 ) self.odom_sub_1 = self.create_subscription( Odometry, '/odom', self.odom_callback_1, 10, callback_group=self.group2 ) # 发布控制指令 self.cmd_vel_pub = self.create_publisher(Twist, '/cmd_vel', 10) # 标记当前步骤 self.current_step = 1 # 角速度 self.angular_velocity = math.pi / 2 # 90 度/秒 self.timer=None self.first_yaw =None self.timer_count=0 def odom_callback(self, odom_msg): self.current_x = odom_msg.pose.pose.position.x self.current_y = odom_msg.pose.pose.position.y orientation=odom_msg.pose.pose.orientation self.roll, self.pitch, self.yaw = euler_from_quaternion(orientation) self.get_logger().info('Current Position: x={:.2f}, y={:.2f}, roll={:.2f}, pitch={:.2f}, yaw={:.2f}'.format(self.current_x, self.current_y, self.roll, self.pitch, self.yaw)) def odom_callback_1(self,odom_msg): if self.current_step == 1: # 走完 x 轴 if abs(self.current_x - self.target_x) < 0.05: self.get_logger().info('X direction target position reached!') self.current_step = 2 # 进入第二步:旋转 self.stop_car() return cmd_vel = Twist() if self.current_x < self.target_x: cmd_vel.linear.x = 0.5 # 正向线速度,可根据需要进行调整 else: cmd_vel.linear.x = -0.5 # 反向线速度,可根据需要进行调整 self.cmd_vel_pub.publish(cmd_vel) elif self.current_step == 2 and abs(self.current_y - self.target_y) > 0.05: # 根据 y 轴坐标差选择旋转方向 dy = self.target_y - self.current_y if dy > 0: angle = math.pi / 2 # 向左旋转 90 度 else : angle = -math.pi / 2 # 向右旋转 90 度 if not self.first_yaw: self.first_yaw=self.yaw+angle while abs(self.first_yaw-self.yaw)>0.05: cmd_vel = Twist() cmd_vel.linear.x = 0.0 cmd_vel.angular.z = angle self.cmd_vel_pub.publish(cmd_vel) self.stop_car() self.current_step = 3 elif self.current_step == 3: # 走完 y 轴 if abs(self.current_y - self.target_y) < 0.05: self.get_logger().info('Target position reached!') self.stop_car() return cmd_vel = Twist() cmd_vel.linear.x = 0.5 # 前进线速度,可根据需要进行调整 self.cmd_vel_pub.publish(cmd_vel) def stop_car(self): cmd_vel = Twist() self.cmd_vel_pub.publish(cmd_vel) def main(args=None): rclpy.init(args=args) car_controller = CarController() # executor = MultiThreadedExecutor(num_threads=2) executor = SingleThreadedExecutor() # Add the node to the executor executor.add_node(car_controller) try: executor.spin() finally: # Shutdown the executor executor.shutdown() car_controller.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()
记录一下多线程吧,主要是回调函数的多线程,ROS2中的多线程主要由Executors和Callback Groups组成
ROS2中的执行器分为两类SingleThreadedExecutor 和 MultiThreadedExecutor
SingleThreadedExecutor:同一时刻只能执行一个线程的任务,其他任务需要排队,这个是ROS2默认的执行器,其实当你使用rclpy.spin()时,使用的就是默认的Executor(你可以开两个线程同时执行rclpy.spin()试试 )
这也是导致为什么while 执行时,其他回调任务也阻塞了。
MultiThreadedExecutor: 允许同时执行多个线程的任务 。当然,直接开启多个线程执行回调函数是没有用的,为什么,就要引出Callback Groups了
ROS2中的执行器分为两类ReentrantCallbackGroup 和 MutuallyExclusiveCallbackGroup。
两者的区别就是ReentrantCallbackGroup允许,同一时刻,一个Group中多个线程同时执行 ?
而MutuallyExclusiveCallbackGroup,同一时刻,一个Group中只能有一个线程执行。而 ROS2中,默认的就是这个回调组,而且不在订阅时声明callback_group=self.group,那么所有
的订阅都在同一个回调组中进行 。因此想要真正使用多线程,需要:
self.group1 = MutuallyExclusiveCallbackGroup()
self.group2 = MutuallyExclusiveCallbackGroup()
self.odom_sub = self.create_subscription(
Odometry, 'odom', self.odom_callback, 10, callback_group=self.group1)
executor = MultiThreadedExecutor(num_threads=3)
或者将group声明为ReentrantCallbackGroup,那么只用这样一个组就够了。
想要尝试不同Executor 和 Callback Group的效果,可以执行下面的代码:
自己给参数运行 ,类似: ros2 run xxx_pkg xxx -service_wait_time 5.0 -timer_period 1.0 -callback_group_type reentrant -threads 2
from pickle import TRUE from std_srvs.srv import SetBool import rclpy from rclpy.node import Node import time from rclpy.callback_groups import ReentrantCallbackGroup, MutuallyExclusiveCallbackGroup from rclpy.executors import MultiThreadedExecutor, SingleThreadedExecutor from rclpy.qos import ReliabilityPolicy, QoSProfile import argparse class DummyServer(Node): def __init__(self, args, callback_group_type="reentrant"): self.timer_flag = True super().__init__('service_start_turn') # More info here: https://docs.python.org/3/library/argparse.html parser = argparse.ArgumentParser( description='Dummy Server to Learn about Callback Groups and Threads') # Add an argument for setting the service waiting time parser.add_argument('-service_wait_time', type=float, help='Time the service will be waiting', required=True) # Add an argument for setting por of the timer callback parser.add_argument('-timer_period', type=float, nargs=1, metavar='TIMEOUT', default=1.0, help="Time period of the Callback for the timer") # Add an argument for setting the callback group type parser.add_argument('-callback_group_type', type=str, default="reentrant", help="Type of Callback Groups REENTRANT of EXCLUSIVE") # Add an argument for setting the number of threads parser.add_argument('-threads', type=int, default=1, help="Number of threads to use in the executor") self.args = parser.parse_args(args[1:]) parser.print_help() print("## DEFAULT Node Callback Group=" + str(self.default_callback_group)) self.get_logger().warning("Setting "+self.args.callback_group_type+" Groups") if self.args.callback_group_type == "reentrant": # If you set the group reentrant, any Callback inside will be executed in parallel # If there are enough threads self.group1 = ReentrantCallbackGroup() self.get_logger().warning("ReentrantCallbackGroup Set") # Both the service and the timer use the same callback group self.srv = self.create_service( SetBool, '/dummy_server_srv', self.SetBool_callback, callback_group=self.group1) self.timer = self.create_timer( self.args.timer_period[0], self.timer_callback, callback_group=self.group1) elif self.args.callback_group_type == "exclusive": self.group1 = MutuallyExclusiveCallbackGroup() self.group2 = MutuallyExclusiveCallbackGroup() self.get_logger().warning("MutuallyExclusiveCallbackGroup Set") # Set one group for the service and another one for the timer self.srv = self.create_service( SetBool, '/dummy_server_srv', self.SetBool_callback, callback_group=self.group1) self.timer = self.create_timer( self.args.timer_period[0], self.timer_callback, callback_group=self.group2) else: # You do not set groups. Therefore, they will get the default group for the Node self.get_logger().error("NO GROUPS SET Set") self.srv = self.create_service( SetBool, '/dummy_server_srv', self.SetBool_callback) self.timer = self.create_timer( self.args.timer_period[0], self.timer_callback) def get_threads(self): return self.args.threads def SetBool_callback(self, request, response): self.get_logger().warning("Processing Server Message...") self.wait_for_sec(self.args.service_wait_time) self.get_logger().warning("Processing Server Message...DONE") response.message = 'TURNING' # return the response parameters return response def wait_for_sec(self, wait_sec, delta=1.0): i = 0 while i < wait_sec: self.get_logger().info("..."+str(i)+"[WAITING...]") time.sleep(delta) i += delta def timer_callback(self): self.print_dummy_msgs() def print_dummy_msgs(self): if self.timer_flag: self.get_logger().info("TICK") self.timer_flag = False else: self.get_logger().info("TACK") self.timer_flag = True def main(args=None): rclpy.init(args=args) print("args==="+str(args)) args_without_ros = rclpy.utilities.remove_ros_args(args) print("clean ROS args==="+str(args_without_ros)) start_stop_service_node = DummyServer(args_without_ros) num_threads = start_stop_service_node.get_threads() start_stop_service_node.get_logger().info( 'DummyServer Started with threads='+str(num_threads)) executor = MultiThreadedExecutor(num_threads=num_threads) executor.add_node(start_stop_service_node) try: executor.spin() finally: executor.shutdown() start_stop_service_node.destroy_node() # shutdown the ROS communication rclpy.shutdown() if __name__ == '__main__': main()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。