当前位置:   article > 正文

冰羚 example-icedelivery-README.md翻译_iceoryx 发布订阅

iceoryx 发布订阅

icedelivery - 在Posix应用间传输数据

介绍

这个例子展示了基于冰羚系统的进程间通信(IPC),零拷贝完成数据单向传输的例子
它提供了发布者和订阅者应用,这些应用包含了两种风格(Bare-metal版本 以及simplified版本)

RouDi, 守护进程

RouDi是 Routing and Discovery的别名,这个名称很好的描述了RouDi的任务内容。RouDi负责建立通信,但是不实际参与发布者和订阅者间的通信,可以将RouDi想象为冰羚系统中的交换机。

RouDi所负责的另一个主要任务是建立共享存储,这个共享存储会被不同的应用互相通信使用,我们目前使用了内存池,其中有不同大小的内存块,这些内存块根据其大小分别放在互相隔离的空闲内存块列表中。RouDi运行时会使编译进去的默认内存配置,用户可以通过使用自己配置好的TOML配置文件替换该默认的配置文件,作为RouDi运行时的配置文件。

使用自定义的TOML配置文件时,可以运行使用 RouDi --help.查看RouDi的命令选项。

运行 icedelivery

创建三个终端,并且在每一个终端中运行一个命令,可以运行icedelivery的simple版本或者normal版本。

# If installed and available in PATH environment variable
RouDi
# If build from scratch with script in tools
$ICEORYX_ROOT/build/posh/RouDi


./build/iceoryx_examples/icedelivery/ice-publisher-bare-metal
# The simplified publisher is an alternative
./build/iceoryx_examples/icedelivery/ice-publisher-simple


./build/iceoryx_examples/icedelivery/ice-subscriber-bare-metal
# The simplified subscriber is an alternative
./build/iceoryx_examples/icedelivery/ice-subscriber-simple
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

期望的输出

具体的数值可能和实际运行的存在不同。

RouDi application

Reserving 99683360 bytes in the shared memory [/iceoryx_mgmt]
[ Reserving shared memory successful ]
Reserving 410709312 bytes in the shared memory [/username]
[ Reserving shared memory successful ]
  • 1
  • 2
  • 3
  • 4

Publisher application

Sending: 0
Sending: 1
Sending: 2
Sending: 3
Sending: 4
Sending: 5
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Subscriber application (bare-metal)

Not subscribed
Receiving: 3
Receiving: 4
Receiving: 5
  • 1
  • 2
  • 3
  • 4

Subscriber application (simple)

Callback: 4
Callback: 5
Callback: 6
Callback: 7
  • 1
  • 2
  • 3
  • 4

Code walkthrough

这个例子使用了两种不同风格的API,使用 bare-metal 风格的API 可以获得最大的灵活性,可以让用户站在在更高级别的API层级君临冰羚系统,例如,the ara::com API of AUTOSAR Adaptive或者ROS2 API。
对于日常开发者,使用bare-metal 风格的API 会缺少意义,我们假定会有一个更高级别的抽象。使用simplified 模式的例子中就显示除了这种更高级别的抽象。

Publisher application (bare-metal风格)

首先,让我们将publisher以及runtime的头文件包含进来:

#include "iceoryx_posh/popo/publisher.hpp"
#include "iceoryx_posh/runtime/posh_runtime.hpp"
  • 1
  • 2

你可能会想知道publisher应用发布了什么?发布的数据结果如下:

struct CounterTopic
{
    uint32_t counter;
};
  • 1
  • 2
  • 3
  • 4

这个结构包含在topic_data.hpp头文件中

#include "topic_data.hpp"
  • 1

runtime对象必须创建用于和RouDi进行通信, getInstance() 方法的参数包含了一个针对publisher发布者唯一的描述字符串:

iox::runtime::PoshRuntime::getInstance("/publisher-bare-metal");
  • 1

现在RouDi知道了我们的Publisher发布者应用已经存在,那就可以创建发布者实例并且发布者发布给所有人

iox::popo::Publisher myPublisher({"Radar", "FrontLeft", "Counter"});
myPublisher.offer();
  • 1
  • 2

iox::popo::Publisher 构造函数中的第一个字符串参数是capro::ServiceDescription类型的, capro 代表的是canionical protocol, capro 被用于抽象不同版本的SOA协议。Radar 是服务名称,FrontLeft 代表 Radar服务的实例的名称,第三个参数event 定义了FrontLeft 服务实例上的一个事件 Counter
这个服务模型的来源来自AUTOSAR,他可能不是最适合发布订阅模式API,但该模式可以让我们适配不同的技术方案。这个事件和发布/订阅方式下的Topic(主题)在概念上是相通的。服务也不是简单的请求/应答模式,而是采取将事件和/或者方法进行分组并且可以被外部当做服务发现的这种模式。服务和实例和C++中类与实例的关系是类似的。因此你总是可以在应用运行的时候拥有服务的某个特定实例。在冰羚系统中,一个发布者和订阅者只有在服务 实例 事件 三个服务要素上都相同的情况下进行匹配。
现在来看下例子的运行模式。数据需要被创建出来,但是等等…我们首先需要存储!让我们预定一块共享内存块来使用:

auto sample = static_cast<CounterTopic*>(myPublisher.allocateChunk(sizeof(CounterTopic)));
  • 1

好的,这种是bare-metal风格! allocateChunk() 返回了一个 void* 指针,这个指针需要被转换为 CounterTopic类型。
然后我们就可以设置 ct 的值为我们目前的计数值并且将这个内存块发布给所有的订阅者。

sample->counter = ct;
myPublisher.sendChunk(sample);
  • 1
  • 2

计数的递增和数据的发送在每秒循环进行直到用户按下 Ctrl-C。这个操作会被信号处理所捕获并且终止循环,最后

myPublisher.stopOffer();
  • 1

被调用用于向所有的订阅者说goodbye。

Subscriber application (bare-metal风格)

订阅者该如何获取到发布者传输的数据?和发布者发布数据的代码类似,我们首先需要包含runtime以及subscriber的头文件以及topic data的头文件。

#include "iceoryx_posh/popo/subscriber.hpp"
#include "iceoryx_posh/runtime/posh_runtime.hpp"
#include "topic_data.hpp"
  • 1
  • 2
  • 3

为了让RouDi知道有订阅者存在,订阅者所在进程也必须使用一个唯一的描述字符串来创建posh runtime对象:

iox::runtime::PoshRuntime::getInstance("/subscriber-bare-metal");
  • 1

下一步时创建一个订阅者对象,创建订阅者对象时,其三个字符串参数(服务 实例 事件 )需要和发布锁订阅数据的发布者创建时使用的参数一致::

iox::popo::Subscriber mySubscriber({"Radar", "FrontLeft", "Counter"});
  • 1

当创建完成订阅者对象后,需要设定订阅者的缓存大小参数,这个参数的意义在于订阅者内部的FIFO对象可以容纳多少数据量,如果FIFO发生溢出,那么最老的数据的存储会被释放用于最新的数据。

mySubscriber.subscribe(10);
  • 1

接着,在一个while循环之前,首先检查我们的订阅者对象是否已经处于订阅状态了:

if (iox::popo::SubscriptionState::SUBSCRIBED == mySubscriber.getSubscriptionState())
{
  • 1
  • 2

如果订阅者没有处于订阅状态,那么就会进入到else的代码段中,那么会有下面的信息打印在终端上:

else
{
    std::cout << "Not subscribed" << std::endl;
}
  • 1
  • 2
  • 3
  • 4

如果订阅者对象已经处于订阅中的状态,那么先定义一个void*指针:

const void * chunk = nullptr;
  • 1

然后,在while循环中将订阅者对象内的FIFO队列中的订阅数据所在的共享内存块一块一块的pop up出来:

while (mySubscriber.getChunk(&chunk))
{
    // we know what we expect for the CaPro ID we provided with the subscriber c'tor. So we do a cast here
    auto sample = static_cast<const CounterTopic*>(chunk);

    std::cout << "Receiving: " << sample->counter << std::endl;

    // signal the middleware that this chunk was processed and in no more accesssed by the user side
    mySubscriber.releaseChunk(chunk);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

当订阅者内部的FIFO中的内存块都pop up完成后,订阅者应用会sleep一秒。
一旦受到 Ctrl-C 的信号,循环就会退出并且订阅者对象和RouDi之间需要通过下面的代码断开连接:

mySubscriber.unsubscribe();
  • 1

Publisher application (simple模式)

简单模式下,发布者应用使用了高等级的API,但是完成的功能和之前模式下的发布者时一样的。在这里总结一下和之前发布者应用不同的地方:
首先包含一个之前模式没有的头文件:

#include "a_typed_api.hpp"
  • 1

类型 TypedPublisherTypedSubscriber 被定义在上面这个头文件中,在这个章节我们需要关心 TypedPublisher这个类型。

offer()stopOffer() 被称为 RAII风格的构造和析构:

TypedPublisher(const iox::capro::ServiceDescription& id)
    : m_publisher(id)
{
    m_publisher.offer();
}

~TypedPublisher()
{
    m_publisher.stopOffer();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

通过在函数栈创建一个TypedPublisher模板类型的对象来代替初始化一个iox::popo::Publisher类型的对象。

TypedPublisher<CounterTopic> myTypedPublisher({"Radar", "FrontRight", "Counter"});
  • 1

CounterTopic 结构体作为TypePublisher模板的参数。

和之前的发布者应用不同的另一处地方是使用了更为简单的allocate()函数,TypedPublisher类在该函数内部已经完成了指针类型转换(相对于之前发布者应用中的 static_cast<CounterTopic*>),预定内存的操作变得更为简单了:

// allocate a sample
auto sample = myTypedPublisher.allocate();
// write the data
sample->counter = ct;

std::cout<< "Sending: " << ct << std::endl;

// pass the ownership to the middleware for sending the sample
myTypedPublisher.publish(std::move(sample));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

现在 allocate() 返回了一个std::unique_ptr<TopicType, SampleDeleter<TopicType>>对象而不是 void*指针,这种类型的对象拥有自动内存管理功能,可以在离开作用域的时候释放对内存的占用。发布者发布的数据的内存使用权必须被转移给中间件。

Subscriber application (simple风格)

和simple风格的发布者应用相同的是,首先需要包含下面这个头文件:

#include "a_typed_api.hpp"
  • 1

An instance of TypedSubscriber is created:

TypedSubscriber<CounterTopic> myTypedSubscriber({"Radar", "FrontRight", "Counter"}, myCallback);
  • 1

在创建订阅者对象的时候,需要额外提供一个回调方法作为参数,用于在收到订阅数据时进行回调通知。
在这个例子中,当收到数据时会打印计数到终端上:

// the callback for processing the samples
void myCallback(const CounterTopic& sample)
{
    std::cout << "Callback: " << sample.counter << std::endl;
}
  • 1
  • 2
  • 3
  • 4
  • 5

构造和析构函数会分别自动调用subscribe()unsubscribe()函数:

TypedSubscriber(const iox::capro::ServiceDescription& id, OnReceiveCallback<TopicType> callback)
    : m_subscriber(id)
    , m_callback(callback)
{
    m_subscriber.setReceiveHandler(std::bind(&TypedSubscriber::receiveHandler, this));
    m_subscriber.subscribe();
}

~TypedSubscriber()
{
    m_subscriber.unsubscribe();
    m_subscriber.unsetReceiveHandler();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/705939
推荐阅读
相关标签
  

闽ICP备14008679号