赞
踩
实验程序:
talker.cc
- #include <std_msgs/Int32.h>
- #include <ros/ros.h>
-
- int main(int argc, char **argv) {
- ros::init(argc, argv, "talker");
- ros::NodeHandle nh;
- std_msgs::Int32 data;
- data.data = 0;
-
- ros::Publisher pub = nh.advertise<std_msgs::Int32>("alan_topic", 1);
- ros::Rate loop_rate(1.0); // 1hz
- while (ros::ok()) {
- data.data++;
- pub.publish(data);
- std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] publish " << data.data << std::endl;
- loop_rate.sleep();
- }
- return 0;
- }

listener.cc
- #include <ros/ros.h>
- #include <std_msgs/Int32.h>
-
- void alan_callback(const std_msgs::Int32& data){
- std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " short time" << std::endl;
- std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " long time" << std::endl;
- return;
- }
-
- int main(int argc, char ** argv) {
- ros::init(argc, argv, "listener");
- ros::NodeHandle nh;
- ros::Subscriber sub = nh.subscribe("alan_topic", 1, alan_callback);
- ros::spin();
- return 0;
- }

实时
listener.cc
- #include <unistd.h>
- #include <ros/ros.h>
- #include <std_msgs/Int32.h>
-
- void alan_callback(const std_msgs::Int32& data){
- std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " short time" << std::endl;
- std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " long time" << std::endl;
- sleep(5); // 5秒
- return;
- }
-
- int main(int argc, char ** argv) {
- ros::init(argc, argv, "listener");
- ros::NodeHandle nh;
- ros::Subscriber sub = nh.subscribe("alan_topic", 1, alan_callback);
- ros::spin();
- return 0;
- }

回调函数处理时间长导致数据丢失(队列长度为1,处理回调函数时间过长时时,队列里旧数据不断被新数据顶掉,处理完第2帧时队列里数据为7,丢失数据的帧数跟回调处理时间有关,是动态变化的)
listener.cc
- #include <unistd.h>
- #include <ros/ros.h>
- #include <std_msgs/Int32.h>
-
- void alan_callback(const std_msgs::Int32& data){
- std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " short time" << std::endl;
- std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " long time" << std::endl;
- sleep(5); // 5秒
- return;
- }
-
- int main(int argc, char ** argv) {
- ros::init(argc, argv, "listener");
- ros::NodeHandle nh;
- ros::Subscriber sub = nh.subscribe("alan_topic", 5, alan_callback);
- ros::spin();
- return 0;
- }

只扩大回调队列长度;处理有延时,且有数据丢失
处理第3帧是因为一开始队列为空,第2帧处理完时队列刚满(3,4,5,6,7),队首为3,还没被顶掉
处理第14,15,16,17帧是因为talker在publish第17帧后终止,此时队列为(13,14,15,16,17),listener会依次处理直到队列为空
listener.cc
- #include <unistd.h>
- #include <ros/ros.h>
- #include <std_msgs/Int32.h>
-
- void alan_callback(const std_msgs::Int32& data){
- std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " short time" << std::endl;
- if (data.data % 5 == 0) {
- std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " long time" << std::endl;
- sleep(5); // 5秒
- }
- return;
- }
-
- int main(int argc, char ** argv) {
- ros::init(argc, argv, "listener");
- ros::NodeHandle nh;
- ros::Subscriber sub = nh.subscribe("alan_topic", 5, alan_callback);
- ros::spin();
- return 0;
- }

扩大回调队列长度(队列长度至少设为回调函数最长处理时间 / 订阅的topic的频率),并且做跳帧处理;无数据丢失,且在跳帧处实时
另起一个后端处理线程
listener.cc
- #include <unistd.h>
- #include <queue>
- #include <thread>
- #include <mutex>
- #include <condition_variable>
- #include <ros/ros.h>
- #include <std_msgs/Int32.h>
-
- std::queue<int> q;
- std::mutex m;
- std::condition_variable cv;
- bool data_ready = false;
-
- void Process() {
- while(ros::ok()) {
- std::unique_lock<std::mutex> lk(m);
- cv.wait(lk, []{return data_ready;});
-
- std::cout << "queue size: " << q.size() << std::endl;
- if (q.empty()) {
- continue;
- }
-
- int data = q.front();
- data_ready = false;
- q.pop();
- lk.unlock();
- std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data << " long time" << std::endl;
- sleep(5); // 5秒
- }
- }
-
- void alan_callback(const std_msgs::Int32& data){
- std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " short time" << std::endl;
- {
- std::lock_guard<std::mutex> lk(m);
- q.push(data.data);
- data_ready = true;
- }
- cv.notify_one();
- return;
- }
-
- int main(int argc, char ** argv) {
- std::thread t(Process);
- ros::init(argc, argv, "listener");
- ros::NodeHandle nh;
- ros::Subscriber sub = nh.subscribe("alan_topic", 1, alan_callback);
- ros::spin();
- return 0;
- }

前端callback实时预处理,后端处理线程延时处理,问题是队列q会不断加长,后端延迟越来越大
listener.cc
- #include <unistd.h>
- #include <queue>
- #include <thread>
- #include <mutex>
- #include <condition_variable>
- #include <ros/ros.h>
- #include <std_msgs/Int32.h>
-
- std::queue<int> q;
- std::mutex m;
- std::condition_variable cv;
- bool data_ready = false;
-
- void Process() {
- int cnt = 0;
- std::queue<int> q2;
- while(ros::ok()) {
- std::unique_lock<std::mutex> lk(m);
- cv.wait(lk, []{return data_ready;});
-
- std::cout << "queue size: " << q.size() << std::endl;
- q2 = q;
- std::queue<int> empty_q;
- std::swap(empty_q, q); // 清空队列
- data_ready = false;
- lk.unlock();
-
- while (!q2.empty()) {
- int data = q2.front();
- cnt++;
- q2.pop();
- if (cnt % 5 == 0) {
- std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data << " long time" << std::endl;
- sleep(5); // 5秒
- }
- }
- }
- }
-
- void alan_callback(const std_msgs::Int32& data){
- std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " short time" << std::endl;
- {
- std::lock_guard<std::mutex> lk(m);
- q.push(data.data);
- data_ready = true;
- }
- cv.notify_one();
- return;
- }
-
- int main(int argc, char ** argv) {
- std::thread t(Process);
- ros::init(argc, argv, "listener");
- ros::NodeHandle nh;
- ros::Subscriber sub = nh.subscribe("alan_topic", 1, alan_callback);
- ros::spin();
- return 0;
- }

前端callback实时预处理,后端处理线程跳帧处理,队列q保持一定长度,后端在跳帧处实时处理
CMakeLists.txt
- cmake_minimum_required(VERSION 2.8.3)
- project(ros_test)
-
- set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)
- set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -pthread")
-
- find_package(catkin REQUIRED COMPONENTS
- roscpp
- std_msgs
- )
-
- include_directories(
- ${catkin_INCLUDE_DIRS}
- )
-
- add_executable(talker src/talker.cc)
- target_link_libraries(talker ${catkin_LIBRARIES})
- add_executable(listener src/listener.cc)
- target_link_libraries(listener ${catkin_LIBRARIES})

note:
回调函数的处理时间不应太长,可只做一些简单的数据处理,然后把数据存到待处理队列,另外起一个后端线程独立处理队列中的数据;callback函数只负责push数据到队列,后端线程pop出数据并进行完整处理(在多线程中访问同一队列记得加锁mutex, condition_variable)
spinOnce原理
调用spinOnce时,首先检查当前队列(如queue1长度为4,里面有2个元素x1,x2,queue2长度为2,里面有1个元素y1),那么将会从queue1队首弹出2个元素,并执行对应的callback函数;从queue2队首弹出1个元素,并执行对应的callback函数;至于顺序,可能queue1,queue2,queue1,
或queue2,queue1,queue1等不固定。如果执行某个回调函数时间过长,队列里的元素可能被顶掉,如第一次弹出queue2的队首y1并处理y1的回调,如果y1回调处理时间较长,在处理这个回调期间,queue1来了3个数据x3,x4,x5,那么此时queue1的元素为x2,x3,x4,x5,x1被顶掉,y1的回调处理完后,弹出queue1的队首元素x2,并处理x2的回调,如果x2回调函数执行时间短,下一次弹出queue1的队首x3;如果x2回调函数执行时间长,下一次queue1弹出的队首就不一定是x3,可能是x4,x5,x6等,依次类推。
总之,调用spinOnce时,每个队列有几个元素就执行几次回调,每次都取队首的元素处理。先处理哪个队列顺序不一定。在这期间,队列中的元素还有可能会被刷新。
测试代码:
talker.cc
- #include <ros/ros.h>
- #include <std_msgs/Int32.h>
-
- int main(int argc, char **argv) {
- ros::init(argc, argv, "talker");
- ros::NodeHandle nh;
- std_msgs::Int32 data;
- data.data = 0;
-
- ros::Publisher odd_pub = nh.advertise<std_msgs::Int32>("odd_topic", 1);
- ros::Publisher even_pub = nh.advertise<std_msgs::Int32>("even_topic", 1);
- ros::Rate loop_rate(1.0); // 1hz
- while (ros::ok()) {
- data.data++;
- if (data.data % 2 == 0) {
- even_pub.publish(data);
- } else {
- odd_pub.publish(data);
- }
- std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec()
- << "] publish " << data.data << std::endl;
- loop_rate.sleep();
- }
- return 0;
- }

listener.cc
- #include <ros/ros.h>
- #include <std_msgs/Int32.h>
-
- void odd_callback(const std_msgs::Int32& data) {
- std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec()
- << "] process " << data.data << std::endl;
- return;
- }
-
- void even_callback(const std_msgs::Int32& data) {
- std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec()
- << "] process " << data.data << std::endl;
- ros::Duration(4).sleep();
- return;
- }
-
- int main(int argc, char** argv) {
- ros::init(argc, argv, "listener");
- ros::NodeHandle nh;
- ros::Subscriber odd_sub =
- nh.subscribe("odd_topic", 4, odd_callback); // 队列长度为4
- ros::Subscriber even_sub =
- nh.subscribe("even_topic", 2, even_callback); // 队列长度为2
- ros::Duration(10).sleep(); // 保证队列全满,6个元素
- ros::spinOnce();
- std::cout << "--------------" << std::endl;
- ros::spinOnce();
- std::cout << "--------------" << std::endl;
- ros::spinOnce();
-
- return 0;
- }

输出:
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。