当前位置:   article > 正文

ros callback 回调机制 & 订阅队列长度_ros callback回调函数

ros callback回调函数

实验程序:

talker.cc 

  1. #include <std_msgs/Int32.h>
  2. #include <ros/ros.h>
  3. int main(int argc, char **argv) {
  4. ros::init(argc, argv, "talker");
  5. ros::NodeHandle nh;
  6. std_msgs::Int32 data;
  7. data.data = 0;
  8. ros::Publisher pub = nh.advertise<std_msgs::Int32>("alan_topic", 1);
  9. ros::Rate loop_rate(1.0); // 1hz
  10. while (ros::ok()) {
  11. data.data++;
  12. pub.publish(data);
  13. std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] publish " << data.data << std::endl;
  14. loop_rate.sleep();
  15. }
  16. return 0;
  17. }

listener.cc

  1. #include <ros/ros.h>
  2. #include <std_msgs/Int32.h>
  3. void alan_callback(const std_msgs::Int32& data){
  4. std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " short time" << std::endl;
  5. std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " long time" << std::endl;
  6. return;
  7. }
  8. int main(int argc, char ** argv) {
  9. ros::init(argc, argv, "listener");
  10. ros::NodeHandle nh;
  11. ros::Subscriber sub = nh.subscribe("alan_topic", 1, alan_callback);
  12. ros::spin();
  13. return 0;
  14. }

实时 


listener.cc

  1. #include <unistd.h>
  2. #include <ros/ros.h>
  3. #include <std_msgs/Int32.h>
  4. void alan_callback(const std_msgs::Int32& data){
  5. std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " short time" << std::endl;
  6. std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " long time" << std::endl;
  7. sleep(5); // 5秒
  8. return;
  9. }
  10. int main(int argc, char ** argv) {
  11. ros::init(argc, argv, "listener");
  12. ros::NodeHandle nh;
  13. ros::Subscriber sub = nh.subscribe("alan_topic", 1, alan_callback);
  14. ros::spin();
  15. return 0;
  16. }

回调函数处理时间长导致数据丢失(队列长度为1,处理回调函数时间过长时时,队列里旧数据不断被新数据顶掉,处理完第2帧时队列里数据为7,丢失数据的帧数跟回调处理时间有关,是动态变化的)


listener.cc

  1. #include <unistd.h>
  2. #include <ros/ros.h>
  3. #include <std_msgs/Int32.h>
  4. void alan_callback(const std_msgs::Int32& data){
  5. std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " short time" << std::endl;
  6. std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " long time" << std::endl;
  7. sleep(5); // 5秒
  8. return;
  9. }
  10. int main(int argc, char ** argv) {
  11. ros::init(argc, argv, "listener");
  12. ros::NodeHandle nh;
  13. ros::Subscriber sub = nh.subscribe("alan_topic", 5, alan_callback);
  14. ros::spin();
  15. return 0;
  16. }

只扩大回调队列长度;处理有延时,且有数据丢失

处理第3帧是因为一开始队列为空,第2帧处理完时队列刚满(3,4,5,6,7),队首为3,还没被顶掉

处理第14,15,16,17帧是因为talker在publish第17帧后终止,此时队列为(13,14,15,16,17),listener会依次处理直到队列为空


listener.cc

  1. #include <unistd.h>
  2. #include <ros/ros.h>
  3. #include <std_msgs/Int32.h>
  4. void alan_callback(const std_msgs::Int32& data){
  5. std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " short time" << std::endl;
  6. if (data.data % 5 == 0) {
  7. std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " long time" << std::endl;
  8. sleep(5); // 5秒
  9. }
  10. return;
  11. }
  12. int main(int argc, char ** argv) {
  13. ros::init(argc, argv, "listener");
  14. ros::NodeHandle nh;
  15. ros::Subscriber sub = nh.subscribe("alan_topic", 5, alan_callback);
  16. ros::spin();
  17. return 0;
  18. }

扩大回调队列长度(队列长度至少设为回调函数最长处理时间 / 订阅的topic的频率),并且做跳帧处理;无数据丢失,且在跳帧处实时


另起一个后端处理线程

listener.cc

  1. #include <unistd.h>
  2. #include <queue>
  3. #include <thread>
  4. #include <mutex>
  5. #include <condition_variable>
  6. #include <ros/ros.h>
  7. #include <std_msgs/Int32.h>
  8. std::queue<int> q;
  9. std::mutex m;
  10. std::condition_variable cv;
  11. bool data_ready = false;
  12. void Process() {
  13. while(ros::ok()) {
  14. std::unique_lock<std::mutex> lk(m);
  15. cv.wait(lk, []{return data_ready;});
  16. std::cout << "queue size: " << q.size() << std::endl;
  17. if (q.empty()) {
  18. continue;
  19. }
  20. int data = q.front();
  21. data_ready = false;
  22. q.pop();
  23. lk.unlock();
  24. std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data << " long time" << std::endl;
  25. sleep(5); // 5秒
  26. }
  27. }
  28. void alan_callback(const std_msgs::Int32& data){
  29. std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " short time" << std::endl;
  30. {
  31. std::lock_guard<std::mutex> lk(m);
  32. q.push(data.data);
  33. data_ready = true;
  34. }
  35. cv.notify_one();
  36. return;
  37. }
  38. int main(int argc, char ** argv) {
  39. std::thread t(Process);
  40. ros::init(argc, argv, "listener");
  41. ros::NodeHandle nh;
  42. ros::Subscriber sub = nh.subscribe("alan_topic", 1, alan_callback);
  43. ros::spin();
  44. return 0;
  45. }

前端callback实时预处理,后端处理线程延时处理,问题是队列q会不断加长,后端延迟越来越大


listener.cc

  1. #include <unistd.h>
  2. #include <queue>
  3. #include <thread>
  4. #include <mutex>
  5. #include <condition_variable>
  6. #include <ros/ros.h>
  7. #include <std_msgs/Int32.h>
  8. std::queue<int> q;
  9. std::mutex m;
  10. std::condition_variable cv;
  11. bool data_ready = false;
  12. void Process() {
  13. int cnt = 0;
  14. std::queue<int> q2;
  15. while(ros::ok()) {
  16. std::unique_lock<std::mutex> lk(m);
  17. cv.wait(lk, []{return data_ready;});
  18. std::cout << "queue size: " << q.size() << std::endl;
  19. q2 = q;
  20. std::queue<int> empty_q;
  21. std::swap(empty_q, q); // 清空队列
  22. data_ready = false;
  23. lk.unlock();
  24. while (!q2.empty()) {
  25. int data = q2.front();
  26. cnt++;
  27. q2.pop();
  28. if (cnt % 5 == 0) {
  29. std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data << " long time" << std::endl;
  30. sleep(5); // 5秒
  31. }
  32. }
  33. }
  34. }
  35. void alan_callback(const std_msgs::Int32& data){
  36. std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " short time" << std::endl;
  37. {
  38. std::lock_guard<std::mutex> lk(m);
  39. q.push(data.data);
  40. data_ready = true;
  41. }
  42. cv.notify_one();
  43. return;
  44. }
  45. int main(int argc, char ** argv) {
  46. std::thread t(Process);
  47. ros::init(argc, argv, "listener");
  48. ros::NodeHandle nh;
  49. ros::Subscriber sub = nh.subscribe("alan_topic", 1, alan_callback);
  50. ros::spin();
  51. return 0;
  52. }

前端callback实时预处理,后端处理线程跳帧处理,队列q保持一定长度,后端在跳帧处实时处理


CMakeLists.txt

  1. cmake_minimum_required(VERSION 2.8.3)
  2. project(ros_test)
  3. set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)
  4. set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -pthread")
  5. find_package(catkin REQUIRED COMPONENTS
  6. roscpp
  7. std_msgs
  8. )
  9. include_directories(
  10. ${catkin_INCLUDE_DIRS}
  11. )
  12. add_executable(talker src/talker.cc)
  13. target_link_libraries(talker ${catkin_LIBRARIES})
  14. add_executable(listener src/listener.cc)
  15. target_link_libraries(listener ${catkin_LIBRARIES})

note:

回调函数的处理时间不应太长,可只做一些简单的数据处理,然后把数据存到待处理队列,另外起一个后端线程独立处理队列中的数据;callback函数只负责push数据到队列,后端线程pop出数据并进行完整处理(在多线程中访问同一队列记得加锁mutex, condition_variable)


spinOnce原理

调用spinOnce时,首先检查当前队列(如queue1长度为4,里面有2个元素x1,x2,queue2长度为2,里面有1个元素y1),那么将会从queue1队首弹出2个元素,并执行对应的callback函数;从queue2队首弹出1个元素,并执行对应的callback函数;至于顺序,可能queue1,queue2,queue1,

或queue2,queue1,queue1等不固定。如果执行某个回调函数时间过长,队列里的元素可能被顶掉,如第一次弹出queue2的队首y1并处理y1的回调,如果y1回调处理时间较长,在处理这个回调期间,queue1来了3个数据x3,x4,x5,那么此时queue1的元素为x2,x3,x4,x5,x1被顶掉,y1的回调处理完后,弹出queue1的队首元素x2,并处理x2的回调,如果x2回调函数执行时间短,下一次弹出queue1的队首x3;如果x2回调函数执行时间长,下一次queue1弹出的队首就不一定是x3,可能是x4,x5,x6等,依次类推。

总之,调用spinOnce时,每个队列有几个元素就执行几次回调,每次都取队首的元素处理。先处理哪个队列顺序不一定。在这期间,队列中的元素还有可能会被刷新。

测试代码:

talker.cc

  1. #include <ros/ros.h>
  2. #include <std_msgs/Int32.h>
  3. int main(int argc, char **argv) {
  4. ros::init(argc, argv, "talker");
  5. ros::NodeHandle nh;
  6. std_msgs::Int32 data;
  7. data.data = 0;
  8. ros::Publisher odd_pub = nh.advertise<std_msgs::Int32>("odd_topic", 1);
  9. ros::Publisher even_pub = nh.advertise<std_msgs::Int32>("even_topic", 1);
  10. ros::Rate loop_rate(1.0); // 1hz
  11. while (ros::ok()) {
  12. data.data++;
  13. if (data.data % 2 == 0) {
  14. even_pub.publish(data);
  15. } else {
  16. odd_pub.publish(data);
  17. }
  18. std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec()
  19. << "] publish " << data.data << std::endl;
  20. loop_rate.sleep();
  21. }
  22. return 0;
  23. }

listener.cc

  1. #include <ros/ros.h>
  2. #include <std_msgs/Int32.h>
  3. void odd_callback(const std_msgs::Int32& data) {
  4. std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec()
  5. << "] process " << data.data << std::endl;
  6. return;
  7. }
  8. void even_callback(const std_msgs::Int32& data) {
  9. std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec()
  10. << "] process " << data.data << std::endl;
  11. ros::Duration(4).sleep();
  12. return;
  13. }
  14. int main(int argc, char** argv) {
  15. ros::init(argc, argv, "listener");
  16. ros::NodeHandle nh;
  17. ros::Subscriber odd_sub =
  18. nh.subscribe("odd_topic", 4, odd_callback); // 队列长度为4
  19. ros::Subscriber even_sub =
  20. nh.subscribe("even_topic", 2, even_callback); // 队列长度为2
  21. ros::Duration(10).sleep(); // 保证队列全满,6个元素
  22. ros::spinOnce();
  23. std::cout << "--------------" << std::endl;
  24. ros::spinOnce();
  25. std::cout << "--------------" << std::endl;
  26. ros::spinOnce();
  27. return 0;
  28. }

输出:

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号