赞
踩
这个例子展示了基于冰羚系统的进程间通信(IPC),零拷贝完成数据单向传输的例子
它提供了发布者和订阅者应用,这些应用包含了两种风格(Bare-metal版本 以及simplified版本)
RouDi是 Routing and Discovery的别名,这个名称很好的描述了RouDi的任务内容。RouDi负责建立通信,但是不实际参与发布者和订阅者间的通信,可以将RouDi想象为冰羚系统中的交换机。
RouDi所负责的另一个主要任务是建立共享存储,这个共享存储会被不同的应用互相通信使用,我们目前使用了内存池,其中有不同大小的内存块,这些内存块根据其大小分别放在互相隔离的空闲内存块列表中。RouDi运行时会使编译进去的默认内存配置,用户可以通过使用自己配置好的TOML配置文件替换该默认的配置文件,作为RouDi运行时的配置文件。
使用自定义的TOML配置文件时,可以运行使用 RouDi --help
.查看RouDi的命令选项。
创建三个终端,并且在每一个终端中运行一个命令,可以运行icedelivery的simple版本或者normal版本。
# If installed and available in PATH environment variable
RouDi
# If build from scratch with script in tools
$ICEORYX_ROOT/build/posh/RouDi
./build/iceoryx_examples/icedelivery/ice-publisher-bare-metal
# The simplified publisher is an alternative
./build/iceoryx_examples/icedelivery/ice-publisher-simple
./build/iceoryx_examples/icedelivery/ice-subscriber-bare-metal
# The simplified subscriber is an alternative
./build/iceoryx_examples/icedelivery/ice-subscriber-simple
具体的数值可能和实际运行的存在不同。
Reserving 99683360 bytes in the shared memory [/iceoryx_mgmt]
[ Reserving shared memory successful ]
Reserving 410709312 bytes in the shared memory [/username]
[ Reserving shared memory successful ]
Sending: 0
Sending: 1
Sending: 2
Sending: 3
Sending: 4
Sending: 5
Not subscribed
Receiving: 3
Receiving: 4
Receiving: 5
Callback: 4
Callback: 5
Callback: 6
Callback: 7
这个例子使用了两种不同风格的API,使用 bare-metal 风格的API 可以获得最大的灵活性,可以让用户站在在更高级别的API层级君临冰羚系统,例如,the ara::com API of AUTOSAR Adaptive或者ROS2 API。
对于日常开发者,使用bare-metal 风格的API 会缺少意义,我们假定会有一个更高级别的抽象。使用simplified 模式的例子中就显示除了这种更高级别的抽象。
首先,让我们将publisher以及runtime的头文件包含进来:
#include "iceoryx_posh/popo/publisher.hpp"
#include "iceoryx_posh/runtime/posh_runtime.hpp"
你可能会想知道publisher应用发布了什么?发布的数据结果如下:
struct CounterTopic
{
uint32_t counter;
};
这个结构包含在topic_data.hpp头文件中
#include "topic_data.hpp"
runtime对象必须创建用于和RouDi进行通信, getInstance()
方法的参数包含了一个针对publisher发布者唯一的描述字符串:
iox::runtime::PoshRuntime::getInstance("/publisher-bare-metal");
现在RouDi知道了我们的Publisher发布者应用已经存在,那就可以创建发布者实例并且发布者发布给所有人
iox::popo::Publisher myPublisher({"Radar", "FrontLeft", "Counter"});
myPublisher.offer();
iox::popo::Publisher
构造函数中的第一个字符串参数是capro::ServiceDescription
类型的, capro
代表的是canionical protocol, capro
被用于抽象不同版本的SOA协议。Radar
是服务名称,FrontLeft
代表 Radar
服务的实例的名称,第三个参数event 定义了FrontLeft
服务实例上的一个事件 Counter
。
这个服务模型的来源来自AUTOSAR,他可能不是最适合发布订阅模式API,但该模式可以让我们适配不同的技术方案。这个事件和发布/订阅方式下的Topic(主题)在概念上是相通的。服务也不是简单的请求/应答模式,而是采取将事件和/或者方法进行分组并且可以被外部当做服务发现的这种模式。服务和实例和C++中类与实例的关系是类似的。因此你总是可以在应用运行的时候拥有服务的某个特定实例。在冰羚系统中,一个发布者和订阅者只有在服务
实例
事件
三个服务要素上都相同的情况下进行匹配。
现在来看下例子的运行模式。数据需要被创建出来,但是等等…我们首先需要存储!让我们预定一块共享内存块来使用:
auto sample = static_cast<CounterTopic*>(myPublisher.allocateChunk(sizeof(CounterTopic)));
好的,这种是bare-metal风格! allocateChunk()
返回了一个 void*
指针,这个指针需要被转换为 CounterTopic
类型。
然后我们就可以设置 ct
的值为我们目前的计数值并且将这个内存块发布给所有的订阅者。
sample->counter = ct;
myPublisher.sendChunk(sample);
计数的递增和数据的发送在每秒循环进行直到用户按下 Ctrl-C
。这个操作会被信号处理所捕获并且终止循环,最后
myPublisher.stopOffer();
被调用用于向所有的订阅者说goodbye。
订阅者该如何获取到发布者传输的数据?和发布者发布数据的代码类似,我们首先需要包含runtime以及subscriber的头文件以及topic data的头文件。
#include "iceoryx_posh/popo/subscriber.hpp"
#include "iceoryx_posh/runtime/posh_runtime.hpp"
#include "topic_data.hpp"
为了让RouDi知道有订阅者存在,订阅者所在进程也必须使用一个唯一的描述字符串来创建posh runtime对象:
iox::runtime::PoshRuntime::getInstance("/subscriber-bare-metal");
下一步时创建一个订阅者对象,创建订阅者对象时,其三个字符串参数(服务
实例
事件
)需要和发布锁订阅数据的发布者创建时使用的参数一致::
iox::popo::Subscriber mySubscriber({"Radar", "FrontLeft", "Counter"});
当创建完成订阅者对象后,需要设定订阅者的缓存大小参数,这个参数的意义在于订阅者内部的FIFO对象可以容纳多少数据量,如果FIFO发生溢出,那么最老的数据的存储会被释放用于最新的数据。
mySubscriber.subscribe(10);
接着,在一个while循环之前,首先检查我们的订阅者对象是否已经处于订阅状态了:
if (iox::popo::SubscriptionState::SUBSCRIBED == mySubscriber.getSubscriptionState())
{
如果订阅者没有处于订阅状态,那么就会进入到else的代码段中,那么会有下面的信息打印在终端上:
else
{
std::cout << "Not subscribed" << std::endl;
}
如果订阅者对象已经处于订阅中的状态,那么先定义一个void*
指针:
const void * chunk = nullptr;
然后,在while循环中将订阅者对象内的FIFO队列中的订阅数据所在的共享内存块一块一块的pop up出来:
while (mySubscriber.getChunk(&chunk))
{
// we know what we expect for the CaPro ID we provided with the subscriber c'tor. So we do a cast here
auto sample = static_cast<const CounterTopic*>(chunk);
std::cout << "Receiving: " << sample->counter << std::endl;
// signal the middleware that this chunk was processed and in no more accesssed by the user side
mySubscriber.releaseChunk(chunk);
}
当订阅者内部的FIFO中的内存块都pop up完成后,订阅者应用会sleep一秒。
一旦受到 Ctrl-C
的信号,循环就会退出并且订阅者对象和RouDi之间需要通过下面的代码断开连接:
mySubscriber.unsubscribe();
简单模式下,发布者应用使用了高等级的API,但是完成的功能和之前模式下的发布者时一样的。在这里总结一下和之前发布者应用不同的地方:
首先包含一个之前模式没有的头文件:
#include "a_typed_api.hpp"
类型 TypedPublisher
和 TypedSubscriber
被定义在上面这个头文件中,在这个章节我们需要关心 TypedPublisher
这个类型。
offer()
和 stopOffer()
被称为 RAII风格的构造和析构:
TypedPublisher(const iox::capro::ServiceDescription& id)
: m_publisher(id)
{
m_publisher.offer();
}
~TypedPublisher()
{
m_publisher.stopOffer();
}
通过在函数栈创建一个TypedPublisher
模板类型的对象来代替初始化一个iox::popo::Publisher
类型的对象。
TypedPublisher<CounterTopic> myTypedPublisher({"Radar", "FrontRight", "Counter"});
CounterTopic
结构体作为TypePublisher模板的参数。
和之前的发布者应用不同的另一处地方是使用了更为简单的allocate()
函数,TypedPublisher
类在该函数内部已经完成了指针类型转换(相对于之前发布者应用中的 static_cast<CounterTopic*>),预定内存的操作变得更为简单了:
// allocate a sample
auto sample = myTypedPublisher.allocate();
// write the data
sample->counter = ct;
std::cout<< "Sending: " << ct << std::endl;
// pass the ownership to the middleware for sending the sample
myTypedPublisher.publish(std::move(sample));
现在 allocate()
返回了一个std::unique_ptr<TopicType, SampleDeleter<TopicType>>
对象而不是 void*
指针,这种类型的对象拥有自动内存管理功能,可以在离开作用域的时候释放对内存的占用。发布者发布的数据的内存使用权必须被转移给中间件。
和simple风格的发布者应用相同的是,首先需要包含下面这个头文件:
#include "a_typed_api.hpp"
An instance of TypedSubscriber
is created:
TypedSubscriber<CounterTopic> myTypedSubscriber({"Radar", "FrontRight", "Counter"}, myCallback);
在创建订阅者对象的时候,需要额外提供一个回调方法作为参数,用于在收到订阅数据时进行回调通知。
在这个例子中,当收到数据时会打印计数到终端上:
// the callback for processing the samples
void myCallback(const CounterTopic& sample)
{
std::cout << "Callback: " << sample.counter << std::endl;
}
构造和析构函数会分别自动调用subscribe()
和 unsubscribe()
函数:
TypedSubscriber(const iox::capro::ServiceDescription& id, OnReceiveCallback<TopicType> callback)
: m_subscriber(id)
, m_callback(callback)
{
m_subscriber.setReceiveHandler(std::bind(&TypedSubscriber::receiveHandler, this));
m_subscriber.subscribe();
}
~TypedSubscriber()
{
m_subscriber.unsubscribe();
m_subscriber.unsetReceiveHandler();
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。