ros2时间同步有两种策略,ExactTime Policy和ApproximateTime Policy,前者需要严格的时间匹配,而ApproximateTime Policy则提供了时间窗口的匹配逻辑。WIKI上描述如下:
令 S 为最后发布的集合,T 为下一个正在创建的集合。其工作原理如下
说实话,WIKI的描述有点云里雾里,不结合代码的话很难理解,Talk is cheap, show me the code
SimpleFilter <- TimeSynchronizer <- ApproximateTimeSynchronizer
通过TimeSynchronizer.connectInput初始化队列,每个ros topic初始化一个队列,注册add回调函数, 添加到input_connections列表。队列实际上是字典的形式。消息的毫秒时间戳作为key,消息体为value,消息先进先出
def __init__(self, fs, queue_size):
self.queue_size = queue_size
self.lock = threading.Lock()
def connectInput(self, fs):
self.queues = [{} for f in fs]
self.input_connections = [
f.registerCallback(self.add, q, i_q)
for i_q, (f, q) in enumerate(zip(fs, self.queues))]
## 一般需要在消息体里加上带有时间戳的header if not hasattr(msg, 'header') or not hasattr(msg.header, 'stamp'): if not self.allow_headerless: msg_filters_logger = rclpy.logging.get_logger('message_filters_approx') msg_filters_logger.set_level(LoggingSeverity.INFO) msg_filters_logger.warn("can not use message filters for " "messages without timestamp infomation when " "'allow_headerless' is disabled. auto assign " "ROSTIME to headerless messages once enabling " "constructor option of 'allow_headerless'.") return stamp = ROSClock().now() else: stamp = msg.header.stamp if not hasattr(stamp, 'nanoseconds'): stamp = Time.from_msg(stamp) # print(stamp) self.lock.acquire() my_queue[stamp.nanoseconds] = msg while len(my_queue) > self.queue_size: del my_queue[min(my_queue)] # self.queues = [topic_0 {stamp: msg}, topic_1 {stamp: msg}, ...] if my_queue_index is None: search_queues = self.queues else: search_queues = self.queues[:my_queue_index] + \ self.queues[my_queue_index+1:] # sort and leave only reasonable stamps for synchronization stamps = [] for queue in search_queues: topic_stamps = [] for s in queue: stamp_delta = Duration(nanoseconds=abs(s - stamp.nanoseconds)) if stamp_delta > self.slop: continue # far over the slop topic_stamps.append(((Time(nanoseconds=s, clock_type=stamp.clock_type)), stamp_delta)) if not topic_stamps: self.lock.release() return topic_stamps = sorted(topic_stamps, key=lambda x: x[1]) stamps.append(topic_stamps)
('t11', 't21', 't31')
('t11', 't21', 't32')
('t11', 't22', 't31')
('t11', 't22', 't32')
('t11', 't23', 't31')
('t11', 't23', 't32')
('t12', 't21', 't31')
('t12', 't21', 't32')
('t12', 't22', 't31')
('t12', 't22', 't32')
('t12', 't23', 't31')
('t12', 't23', 't32')
for vv in itertools.product(*[list(zip(*s))[0] for s in stamps]):
# vv表示每一个时间戳对
vv = list(vv)
# insert the new message
if my_queue_index is not None:
vv.insert(my_queue_index, stamp)
qt = list(zip(self.queues, vv))
if ( ((max(vv) - min(vv)) < self.slop) and
(len([1 for q,t in qt if t.nanoseconds not in q]) == 0) ):
msgs = [q[t.nanoseconds] for q,t in qt]
for q,t in qt:
del q[t.nanoseconds]
break # fast finish after the synchronizatio
通过ros2 bag record命令录取的ros bag,其时间戳对应着ros bag的记录时间,也就是消息记录时间,部分场景下,而每个消息一般带有消息的产生时间,记录在std_msgs/msg/Header接口中
# Standard metadata for higher-level stamped data types.
# This is generally used to communicate timestamped data
# in a particular coordinate frame.
# Two-integer timestamp that is expressed as seconds and nanoseconds.
builtin_interfaces/Time stamp
int32 sec
uint32 nanosec
# Transform frame with which this data is associated.
string frame_id
为了能够在在基于消息产生时间的时间轴来可视化分析不同ros topic消息,需要替换离线录制的ros2 bag中的时间戳。 ros1提供了此类样例和api:http://wiki.ros.org/rosbag/Cookbook
但是目前还没发现类似的ros2官方文档。ros2 humble版本中,ros2 bag只提供了如下功能
usage: ros2 bag [-h] Call `ros2 bag <command> -h` for more detailed usage. ... Various rosbag related sub-commands optional arguments: -h, --help show this help message and exit Commands: convert Given an input bag, write out a new bag with different settings info Print information about a bag to the screen list Print information about available plugins to the screen play Play back ROS data from a bag record Record ROS data to a bag reindex Reconstruct metadata file for a bag Call `ros2 bag <command> -h` for more detailed usage.
但是可以通过第三方库ros2bag_tools来扩展ros2 bag的功能, 经过扩展后的功能如下表,其中restamp就是进行时间戳替换的模块,既可以将消息记录时间戳替换为header stamp,也能反向替换。
command | description |
add | add new topic, with messages aligned to existing topic |
cut | cut time slice by wall time or duration offset |
drop | drop X out of every Y messages of a topic |
export | export data to other formats, see export |
extract | extract topics by name |
plot | plot message data to a new window, see plot |
process | chain multiple filters, see chaining |
prune | remove topics without messages |
reframe | change frame_id on messages with headers |
rename | change name of a topic |
replace | replace messages of a specific topic with message data specified in a yaml file |
restamp | for all messages with headers, change the bag timestamp to their header stamp |
summary | print summary on data to stdout |
sync | output synchronized bundles of messages using the ApproximateTimeSynchronizer |
video | show or write video of image data |
Failed to load entry point 'plot': cannot import name 'get_reader_options' from 'ros2bag.verb' (/home/daili/my_ros2_humble_ws/install/ros2bag/lib/python3.8/site-packages/ros2bag/verb/__init__.py) usage: ros2 bag [-h] Call `ros2 bag <command> -h` for more detailed usage. ... Various rosbag related sub-commands optional arguments: -h, --help show this help message and exit Commands: add Add new topic, with messages aligned to existing topic convert Given an input bag, write out a new bag with different settings cut Cut timespan from a bag and write to new bag drop Drop X out of every Y messages echo Print message data as yaml export Export bag data to other formats extract Extract specific topics from a bag and write to new bag info Print information about a bag to the screen list Print information about available plugins to the screen play Play back ROS data from a bag process Run a set of filters on input bags and write to new bag prune Remove empty topics and write rest to new bag record Record ROS data to a bag reframe Change header.frame_id of some messages, and write to a new bag reindex Reconstruct metadata file for a bag rename Rename specific topics in a bag, and write to new bag replace Replace content of messages in a bag with a constant, and write to new bag restamp Set bag timestamps to message header timestamps and write to new bag summary Print a summary of the contents of a bag sync Synchronize topics dropping unsynchronized messages video Display or write a video of image data in a bag Call `ros2 bag <command> -h` for more detailed usage.
执行以下命令,转换成功的ros2 bag文件夹名默认以时间戳结尾
ros2 bag restamp <your ros2 bag path> -s sqlite3 --out-storage sqlite3
除了在代码中调试时间同步之外,也可以利用上述ros2bag_tools工具基于录制好的ros bag来分析和测试时间同步效果
ros2 bag sync <rosbag-path> -s sqlite3 --out-storage sqlite3 --slop 0.05 --progress -o <rosbag-path>-sync -t /multi_stream_detections_0 /multi_stream_detections_1 /multi_stream_detections_2 /multi_stream_detections_3
同时对四路ros topic数据进行时间同步,上下图分别表示同步前后的数据。对50ms(slop=0.05)内的topic数据,近似认为是时间同步的数据,并输出,可以看到,超过时间窗口的数据,并没有被推送出来,也就是说,对于n路ros topic,要么输出近似同步的n路数据,要么都不输出,不存在只输出小于n路数据的情况。
部分场景下,这显然不是一个合理的做法,比如部分传感器失效的情况下,经过ros TimeSynchronizer之后,所有结果都不会被发布出来,即使有正常工作的传感器。一种可能的解决方案是,部分传感器失效的情况下,按既定频率往所属的ros topic发送空的消息,防止所有的topic数据都被过滤掉
