当前位置:   article > 正文

ROS2(四)- MultiThreadedExecutor多线程调用callback funcs_rclcpp::executors::multithreadedexecutor

rclcpp::executors::multithreadedexecutor


本想使用多线程executor执行node,没想到简单的创建Subscription之后callback funcs竟然是顺序执行的,所以仔细看了一下,原来node中还有callback_group这个东西。结论如下:

1.callback_group

为所有的callback分了组别

类型有

  • MutuallyExclusive;互斥,即这个组别中每时刻只允许1个线程,一个callback在执行时,其他只能等待
  • Reentrant;可重入,这个组别中每时刻允许多个线程,一个Callback在执行时,其他callback可开启新的线程

2.node默认的组别

  • 每个node中有一个默认的callback_group,默认是MutuallyExclusive类型,即便使用了multiThreadedExecutor,也依然默认MutuallyExclusive类型
  • 当node创建一个subscription时,可以设置选项
Node::create_subscription(
  const std::string & topic_name,  //topic名称
  const rclcpp::QoS & qos,
  CallbackT && callback,
  const SubscriptionOptionsWithAllocator<AllocatorT> & options,  //选项
  typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
    typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, AllocatorT>::SharedPtr
  msg_mem_strat)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
this->create_subscription<std_msgs::msg::Int32>("int1",qos,
			std::bind(&PathSearcherNode::int1Sub, this,
	        std::placeholders::_1),
			rclcpp::SubscriptionOptions());
  • 1
  • 2
  • 3
  • 4

这里的SubscriptionOptions()可以设置callback_group组别;如果不设置,默认为node初始化时的callback_group

3.使用MultiThreadedExecutor,想要多线程调用callback

3.1 可设置不同的MutuallyExclusive callback_group
#include <rclcpp/rclcpp.hpp> 
#include <std_msgs/msg/int32.hpp>
using namespace std::chrono_literals;
std::string getTime(){
    time_t tick = (time_t)(rclcpp::Clock().now().seconds());
    struct tm tm;
    char s[100];
    tm = *localtime(&tick);
    strftime(s, sizeof(s), "%Y-%m-%d %H:%M:%S", &tm);
    return std::string(s);
}
std::string string_thread_id()
{
  auto hashed = std::hash<std::thread::id>()(std::this_thread::get_id());
  return std::to_string(hashed);
}
class MyNode: public rclcpp::Node{
public:
    MyNode(const rclcpp::NodeOptions & options);
    ~MyNode(){}
    void int1Sub(std_msgs::msg::Int32::SharedPtr msg);
    void int2Sub(std_msgs::msg::Int32::SharedPtr msg);

private:
    rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr int_sub1_;
    rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr int_sub2_;
    rclcpp::callback_group::CallbackGroup::SharedPtr callback_group_sub1_;
    rclcpp::callback_group::CallbackGroup::SharedPtr callback_group_sub2_;
}
MyNode::MyNode(const rclcpp::NodeOptions & options):
    rclcpp::Node("my_node",options){
//两个组别,设置组别类型为MutuallyExclusive
    callback_group_sub1_ = this->create_callback_group(rclcpp::callback_group::CallbackGroupType::MutuallyExclusive);
    callback_group_sub2_ = this->create_callback_group(rclcpp::callback_group::CallbackGroupType::MutuallyExclusive);

    auto sub1_opt = rclcpp::SubscriptionOptions();
    sub1_opt.callback_group = callback_group_sub1_;
    auto sub2_opt = rclcpp::SubscriptionOptions();
    sub2_opt.callback_group = callback_group_sub2_;
    
    int_sub1_ = nh_->create_subscription<std_msgs::msg::Int32>("int1",10,
        std::bind(&MyNode::int1Sub,this,std::placeholders::_1),
        sub1_opt);
    int_sub2_ = nh_->create_subscription<std_msgs::msg::Int32>("int2",10,
        std::bind(&MyNode::int2Sub,this,std::placeholders::_1),
        sub2_opt);
}
void MyNode::int1Sub(std_msgs::msg::Int32::SharedPtr msg){
    printf("int1 sub time:%s\n",getTime().c_str());
    printf("thread1 id:%s\n",string_thread_id().c_str());
    rclcpp::sleep_for(10s);
    printf("int1 sub time:%s\n",getTime().c_str());
}
void MyNode::int2Sub(std_msgs::msg::Int32::SharedPtr msg){
    printf("int2 sub time:%s\n",getTime().c_str());
    printf("thread2 id:%s\n",string_thread_id().c_str());
    rclcpp::sleep_for(10s);
    printf("int2 sub time:%s\n",getTime().c_str());
}
int main(int argc, char **argv) {
    rclcpp::init(argc, argv);
    rclcpp::executors::MultiThreadedExecutor executor(rclcpp::executor::ExecutorArgs(),5,true);

    auto node = std::make_shared<MyNode>(rclcpp::NodeOptions());
    executor.add_node(node);
    printf("threads   %d\n",executor.get_number_of_threads());
    executor.spin();
    rclcpp::shutdown();
    return EXIT_SUCCESS;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70

这时,如果同时收到了/int1和/int2 的消息,两个callback都能执行,但如果接连收到了两个/int1,只能调1个int1Sub;

3.2 设置同一个Reentrant callback_group
//同一个组别,类型设置为Reentrant
callback_group_sub_ = this->create_callback_group(rclcpp::callback_group::CallbackGroupType::Reentrant);
auto sub_opt = rclcpp::SubscriptionOptions();
sub_opt.callback_group = callback_group_sub_;
    
    
int_sub1_ = nh_->create_subscription<std_msgs::msg::Int32>("int1",10,
        std::bind(&MyNode::int1Sub,this,std::placeholders::_1),
        sub_opt);
int_sub2_ = nh_->create_subscription<std_msgs::msg::Int32>("int2",10,
        std::bind(&MyNode::int2Sub,this,std::placeholders::_1),
        sub_opt);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

这时,如果同时收到多个/int1和/int2,开启的线程数会受电脑内核和初始化MultiThreadedExecutor时设置的线程数决

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读