当前位置:   article > 正文

【 iceoryx 零拷贝】RouDi注册publisher的过程

roudi

REG过程

publisher、subscriber会向RouDi注册一个process,process主要成员如下:

class Process
{
	private:
    const uint32_t m_pid{0U};		//进程pid
    runtime::IpcInterfaceUser m_ipcChannel; // ipc的interface
    mepoo::TimePointNs_t m_timestamp;
    posix::PosixUser m_user;      //主机用户名 whoami
    bool m_isMonitored{true};
    std::atomic<uint64_t> m_sessionId{0U};
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

RouDi有个 m_prcMgr :ProcessManager 类型,调用其registerProcess方法,核心代码如下:

returnValue = this->addProcess(name, pid, user, isMonitored, transmissionTimestamp, sessionId, versionInfo);
  • 1

调用addProcess:
1、实例化一个process,放入list,

 m_processList.emplace_back(name, pid, user, isMonitored, sessionId);
  • 1

注意这里会创建自己的 m_ipcChannel ,是一个CLIENT类型的socket,用于向publisher发送数据

 openIpcChannel(posix::IpcChannelSide::CLIENT);
  • 1

2、计算m_segmentManager(图中绿色部分)的偏移,进程间无法传虚拟地址,只能传offset:
在这里插入图片描述

 auto offset = rp::BaseRelativePointer::getOffset(m_mgmtSegmentId, m_segmentManager);
  • 1

3、给注册者发送sendbuffer

sendBuffer << runtime::IpcMessageTypeToString(runtime::IpcMessageType::REG_ACK)
             << m_roudiMemoryInterface.mgmtMemoryProvider()->size() << offset << transmissionTimestamp
             << m_mgmtSegmentId;
m_processList.back().sendViaIpcChannel(sendBuffer);
  • 1
  • 2
  • 3
  • 4

publisher 会在 PoshRuntimeImpl 构造中,使用这个offset 获取到 management segmentpayload data segment

create a PublisherPort

获取 一个 SegmentUserInformation:

  struct SegmentUserInformation
    {
        cxx::optional<std::reference_wrapper<MemoryManager>> m_memoryManager;
        uint64_t m_segmentID;
    };
  • 1
  • 2
  • 3
  • 4
  • 5
 auto segmentInfo = m_segmentManager->getSegmentInformationWithWriteAccessForUser(process->getUser());
  • 1

核心部分:

    for (const auto& groupID : groupContainer)
    {
        for (auto& segment : m_segmentContainer)
        {
            if (segment.getWriterGroup() == groupID)
            {
                segmentInfo.m_memoryManager = segment.getMemoryManager();//<1>
                segmentInfo.m_segmentID = segment.getSegmentId();        //<2>
                return segmentInfo;
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

m_segmentContainer实际只有一个MePooSegment<>,一个cfg.toml生成一个MePooSegment<>。
<1>处获取MePooSegment的m_memoryManager : MemoryManager 类型。
<2>处获取MePooSegment 的 m_segmentId。

acquire一个PortManager

   auto maybePublisher = m_portManager.acquirePublisherPortData(
                service, publisherOptions, name, &segmentInfo.m_memoryManager.value().get(), portConfigInfo);
  • 1
  • 2
auto maybePublisherPortData = m_portPool->addPublisherPort(
        service, payloadDataSegmentMemoryManager, runtimeName, publisherOptions, portConfigInfo.memoryInfo);
  • 1
  • 2

调用 m_portPoolData:PortPoolData的
FixedPositionContainer< iox::popo::PublisherPortData,512 >m_publisherPortMembers;

auto publisherPortData = m_portPoolData->m_publisherPortMembers.insert(
            serviceDescription, runtimeName, memoryManager, publisherOptions, memoryInfo);
        return cxx::success<PublisherPortRouDiType::MemberType_t*>(publisherPortData);
  • 1
  • 2
  • 3

insert会调用 PublisherPortData 的构造,其第一个成员为 m_chunkSenderData,主要初始化该成员。

m_chunkSenderData的第一个成员为RelativePointer<mepoo::MemoryManager >m_memoryMgr;
即 MePooSegment的 m_memoryManager
接下来获取这个成员在 /dev/shm/iceoryx_mgmt 上的偏移:

  auto offset = rp::BaseRelativePointer::getOffset(m_mgmtSegmentId, maybePublisher.value());
  • 1

组包,发送给注册者publisher:

 runtime::IpcMessage sendBuffer;
                sendBuffer << runtime::IpcMessageTypeToString(runtime::IpcMessageType::CREATE_PUBLISHER_ACK)
                           << cxx::convert::toString(offset) << cxx::convert::toString(m_mgmtSegmentId);
  • 1
  • 2
  • 3

分析一下publisher 如何接收处理:

inline PublisherImpl<T, H, BasePublisherType>::PublisherImpl(const capro::ServiceDescription& service,
                                                             const PublisherOptions& publisherOptions)
    : BasePublisherType(service, publisherOptions)
{
}
  • 1
  • 2
  • 3
  • 4
  • 5

BasePublisherType = BasePublisher<>
因此:

inline BasePublisher<port_t>::BasePublisher(const capro::ServiceDescription& service,
                                            const PublisherOptions& publisherOptions)
    : m_port(iox::runtime::PoshRuntime::getInstance().getMiddlewarePublisher(service, publisherOptions))
{
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

m_port 是 PublisherPortUser类型。
getMiddlewarePublisher主要操作如下:
1、组包发送

IpcMessage sendBuffer;
    sendBuffer << IpcMessageTypeToString(IpcMessageType::CREATE_PUBLISHER) << m_appName
               << static_cast<cxx::Serialization>(service).toString() << publisherOptions.serialize().toString()
               << static_cast<cxx::Serialization>(portConfigInfo).toString();
  • 1
  • 2
  • 3
  • 4

2、获取RouDi的返回数据(上文已分析):

cxx::expected<PublisherPortUserType::MemberType_t*, IpcMessageErrorType>
PoshRuntimeImpl::requestPublisherFromRoudi(const IpcMessage& sendBuffer) noexcept
{
...
  auto ptr = rp::BaseRelativePointer::getPtr(segmentId, offset);     
  return cxx::success<PublisherPortUserType::MemberType_t*>(
                reinterpret_cast<PublisherPortUserType::MemberType_t*>(ptr));
...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

获取到RouDi返回的offset,转化为 PublisherPortData 型指针。

用该指针初始化m_port:

PublisherPortUser::PublisherPortUser(cxx::not_null<MemberType_t* const> publisherPortDataPtr) noexcept
    : BasePort(publisherPortDataPtr)
    , m_chunkSender(&getMembers()->m_chunkSenderData)

{
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

该ptr(PublisherPortData 类型) 初始化了 父类BasePort 的 m_basePortDataPtr。
用ptr 的 m_chunkSenderData 也是首地址,初始化PublisherPortUser :: m_chunkSender 。
注意:该ptr的成员都在 /dev/shm/iceoryx_mgmt 上。
在这里插入图片描述

然后publisher可用这个m_port 进行一系列:
tryAllocateChunk
sendChunk
操作。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/705926
推荐阅读
相关标签
  

闽ICP备14008679号