赞
踩
记录iceoryx学习过程,欢迎交流
RouDi 管理共享内存并负责服务发现,使订阅者能够找到发布者提供的主题。
介绍中有一句话:“当应用程序崩溃时,RouDi 会清理所有资源。由于我们大多是无锁的进程间机制(只有最后一个锁;我们正在努力移除它),与使用锁定的传统机制相比,基于 iceoryx 的通信更加可靠。” 这个可以再看一下,我们平常遇到的锁的问题,iceoryx是怎么处理的?
- iceoryx hoofs:Handy Objects Optimized For Safety (hoofs) 是一个库,包含各种构建块,例如固定大小的容器、并发类和来自即将发布的 C++ 标准版本的现代下一代 C++ 构造。
-
- iceoryx posh:该包iceoryx_posh(POSIX Shared memory)包含与共享内存进程间通信相关的所有内容。
-
- Core library:本节描述核心库的命名空间。
-
- popo: 命名空间posh端口包含用于传输数据的用户 API 类。
-
- capro:命名空间规范协议实现了规范协议模式 协议,用于iceoryx_posh连接和popo::Publisher发现popo::Server。
-
- mepoo:命名空间内存池包含所有与内存相关的类。例如MemoryManager或SharedPointer。
-
- version:命名空间版本包含 ABI 兼容性检查。
-
- build:命名空间构建包含某些可以在编译前更改的最大值。
-
- Gatway library:网关库及其命名空间gw包含用于创建网关的通用抽象。它们被iceoryx_dds.
-
- RouDi library:库 RouDi 及其命名空间roudi包含 RouDi 中间件守护程序使用的类。
-
- iceoryx C binding:该模块iceoryx_binding_c使 C 中的进程间通信功能iceoryx_posh可用。
-
- iceoryx DDS:该软件包提供了一个使用Eclipse Cyclone DDSiceoryx_dds的双向 DDS 网关。网关可用于通过网络发送数据,例如通过以太网。
-
- iceoryx introspection:自省客户端可用于实时调试并提供当前系统的信息,如内存使用情况和已建立的连接。
典型IPC中间件解决方案:
Iceoryx 的一个重要方面是发布者可以在订阅者仍在阅读时再次写入,因为没有订阅者的干扰。如果前一个内存块仍在使用中,则只需为发布者分配一个新的内存块。(需要看代码理解清楚这一句话)
由于消息有效负载未序列化,因此消息对于发布者和订阅者必须具有相同的内存布局。对于特定处理器上的 IPC,这可以通过使用具有相同设置的相同编译器来确保。(iceoryx 的共享内存回收是怎么做的? )
创建发布者:
- //创建一个具有唯一名称的进程,使其与RouDi通信
- iox::runtime::PoshRuntime::initRuntime("some_unique_name");
-
- struct CounterTopic
- {
- uint32_t counter;
- };
-
- //创建发布者,括号中
- iox::popo::Publisher<CounterTopic> publisher({"Group", "Instance", "CounterTopic"});
-
- //使用发布者发送数据
- auto result = publisher.loan();
- if(!result.has_error())
- {
- auto& sample = result.value();
- sample->counter = 30;
- sample.publish();
- }
- else
- {
- // handle the error
- }
创建订阅者:
问题:这是每一个订阅者一个线程吗?
- iox::popo::Subscriber<CounterTopic> subscriber({"Group", "Instance", "CounterTopic"});
-
- while (keepRunning)
- {
- // wait for new data (either sleep and wake up periodically or by notification from the waitset)
-
- auto result = subscriber.take();
-
- if(!result.has_error())
- {
- auto& sample = result.value();
- uint32_t counter = sample->counter;
- //process the data
- }
- else
- {
- //handle the error
- }
-
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- }
数据被唤醒的方式:定期检查, WaitSet 和 Listener 显示等待数据;
WaitSet 可用于放弃控制(使线程进入睡眠状态)并等待用户定义的事件发生。通常,这些事件对应于特定订阅者的数据可用性。这样我们可以在数据可用时立即唤醒,并在没有数据可用时避免不必要的唤醒。
一个典型的用例是创建一个 WaitSet,附加多个订阅者和用户触发器,然后等待一个或多个附加对象发出事件信号。如果发生这种情况,则会收到所有已发生事件的列表。这样就可以在订阅者向 WaitSet 发出新数据可用的信号时直接从订阅者那里收集数据。
WaitSet 使用反应器模式,并通过推送策略通知用户发生了一个附加事件。
- #include "iceoryx_hoofs/cxx/optional.hpp"
- #include "iceoryx_hoofs/posix_wrapper/signal_handler.hpp"
- #include "iceoryx_posh/popo/subscriber.hpp"
- #include "iceoryx_posh/popo/user_trigger.hpp"
- #include "iceoryx_posh/popo/wait_set.hpp"
- #include "iceoryx_posh/runtime/posh_runtime.hpp"
- #include "topic_data.hpp"
-
- #include <atomic>
- #include <iostream>
-
- //! [sig handler]
- std::atomic_bool keepRunning{true};
- iox::cxx::optional<iox::popo::WaitSet<>> waitset;
-
- static void sigHandler(int sig IOX_MAYBE_UNUSED)
- {
- keepRunning = false;
- if (waitset)
- {
- waitset->markForDestruction();
- }
- }
- //! [sig handler]
-
- int main()
- {
- // initialize runtime
- iox::runtime::PoshRuntime::initRuntime("iox-cpp-waitset-basic");
-
- // create waitset inside of the optional
- //! [create waitset]
- waitset.emplace();
-
- // register signal handler to handle termination of the loop
- auto signalGuard = iox::posix::registerSignalHandler(iox::posix::Signal::INT, sigHandler);
- auto signalTermGuard = iox::posix::registerSignalHandler(iox::posix::Signal::TERM, sigHandler);
-
- // create subscriber
- iox::popo::Subscriber<CounterTopic> subscriber({"Radar", "FrontLeft", "Counter"});
-
- // attach subscriber to waitset
- waitset->attachState(subscriber, iox::popo::SubscriberState::HAS_DATA).or_else([](auto) {
- std::cerr << "failed to attach subscriber" << std::endl;
- std::exit(EXIT_FAILURE);
- });
- //! [create waitset]
-
- //! [mainloop]
- while (keepRunning)
- {
- // We block and wait for samples to arrive.
- auto notificationVector = waitset->wait();
-
- for (auto& notification : notificationVector)
- {
- // We woke up and hence there must be at least one sample. When the sigHandler has called
- // markForDestruction the notificationVector is empty otherwise we know which subscriber received samples
- // since we only attached one.
- // Best practice is to always acquire the notificationVector and iterate over all elements and then react
- // accordingly. When this is not done and more elements are attached to the WaitSet it can cause
- // problems since we either miss events or handle events for objects which never occurred.
- if (notification->doesOriginateFrom(&subscriber))
- {
- // Consume a sample
- subscriber.take()
- .and_then([](auto& sample) { std::cout << " got value: " << sample->counter << std::endl; })
- .or_else([](auto& reason) {
- std::cout << "got no data, return code: " << static_cast<uint64_t>(reason) << std::endl;
- });
- // We could consume all samples but do not need to.
- // If there is more than one sample we will wake up again since the state of the subscriber is still
- // iox::popo::SubscriberState::HAS_DATA in this case.
- }
- }
- }
- //! [mainloop]
-
- std::cout << "shutting down" << std::endl;
-
- waitset.reset();
- return (EXIT_SUCCESS);
- }
Listener 是实现推送方法以检测和响应某些事件的构建块之一。与 WaitSet 的两个主要区别是 Listener 是事件驱动的,而不是 WaitSet 的事件和状态驱动的,并且 Listener 创建了一个单独的后台线程,在该线程中执行事件回调,这与 WaitSet 的不同之处在于用户必须显式调用事件回调。
iceoryx 系统使用一个“管理”段来进行管理,并使用任意数量的“用户”段来进行服务之间的事件通信。
这些段在逻辑上被划分为“内存池”。内存池包含许多大小相等的“内存块”。
内存块是 iceoryx 系统中用于共享内存访问的基本单元。
- a.由于 RouDi 守护进程导致的单点故障
- b.Roudi 守护进程需要比其他进程先启动
- c.固定虚拟地址依赖
- d.编译后不可配置的内存池
- e.不支持请求/响应过程调用
- f.只支持单发布
- g.不支持锁存数据传输
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。