赞
踩
publisher、subscriber会向RouDi注册一个process,process主要成员如下:
class Process
{
private:
const uint32_t m_pid{0U}; //进程pid
runtime::IpcInterfaceUser m_ipcChannel; // ipc的interface
mepoo::TimePointNs_t m_timestamp;
posix::PosixUser m_user; //主机用户名 whoami
bool m_isMonitored{true};
std::atomic<uint64_t> m_sessionId{0U};
}
RouDi有个 m_prcMgr :ProcessManager 类型,调用其registerProcess方法,核心代码如下:
returnValue = this->addProcess(name, pid, user, isMonitored, transmissionTimestamp, sessionId, versionInfo);
调用addProcess:
1、实例化一个process,放入list,
m_processList.emplace_back(name, pid, user, isMonitored, sessionId);
注意这里会创建自己的 m_ipcChannel ,是一个CLIENT类型的socket,用于向publisher发送数据
openIpcChannel(posix::IpcChannelSide::CLIENT);
2、计算m_segmentManager(图中绿色部分)的偏移,进程间无法传虚拟地址,只能传offset:
auto offset = rp::BaseRelativePointer::getOffset(m_mgmtSegmentId, m_segmentManager);
3、给注册者发送sendbuffer
sendBuffer << runtime::IpcMessageTypeToString(runtime::IpcMessageType::REG_ACK)
<< m_roudiMemoryInterface.mgmtMemoryProvider()->size() << offset << transmissionTimestamp
<< m_mgmtSegmentId;
m_processList.back().sendViaIpcChannel(sendBuffer);
publisher 会在 PoshRuntimeImpl 构造中,使用这个offset 获取到 management segment 和 payload data segment 。
获取 一个 SegmentUserInformation:
struct SegmentUserInformation
{
cxx::optional<std::reference_wrapper<MemoryManager>> m_memoryManager;
uint64_t m_segmentID;
};
auto segmentInfo = m_segmentManager->getSegmentInformationWithWriteAccessForUser(process->getUser());
核心部分:
for (const auto& groupID : groupContainer)
{
for (auto& segment : m_segmentContainer)
{
if (segment.getWriterGroup() == groupID)
{
segmentInfo.m_memoryManager = segment.getMemoryManager();//<1>
segmentInfo.m_segmentID = segment.getSegmentId(); //<2>
return segmentInfo;
}
}
}
m_segmentContainer实际只有一个MePooSegment<>,一个cfg.toml生成一个MePooSegment<>。
<1>处获取MePooSegment的m_memoryManager : MemoryManager 类型。
<2>处获取MePooSegment 的 m_segmentId。
auto maybePublisher = m_portManager.acquirePublisherPortData(
service, publisherOptions, name, &segmentInfo.m_memoryManager.value().get(), portConfigInfo);
auto maybePublisherPortData = m_portPool->addPublisherPort(
service, payloadDataSegmentMemoryManager, runtimeName, publisherOptions, portConfigInfo.memoryInfo);
调用 m_portPoolData:PortPoolData的
FixedPositionContainer< iox::popo::PublisherPortData,512 >m_publisherPortMembers;
auto publisherPortData = m_portPoolData->m_publisherPortMembers.insert(
serviceDescription, runtimeName, memoryManager, publisherOptions, memoryInfo);
return cxx::success<PublisherPortRouDiType::MemberType_t*>(publisherPortData);
insert会调用 PublisherPortData 的构造,其第一个成员为 m_chunkSenderData,主要初始化该成员。
m_chunkSenderData的第一个成员为RelativePointer<mepoo::MemoryManager >m_memoryMgr;
即 MePooSegment的 m_memoryManager 。
接下来获取这个成员在 /dev/shm/iceoryx_mgmt 上的偏移:
auto offset = rp::BaseRelativePointer::getOffset(m_mgmtSegmentId, maybePublisher.value());
组包,发送给注册者publisher:
runtime::IpcMessage sendBuffer;
sendBuffer << runtime::IpcMessageTypeToString(runtime::IpcMessageType::CREATE_PUBLISHER_ACK)
<< cxx::convert::toString(offset) << cxx::convert::toString(m_mgmtSegmentId);
inline PublisherImpl<T, H, BasePublisherType>::PublisherImpl(const capro::ServiceDescription& service,
const PublisherOptions& publisherOptions)
: BasePublisherType(service, publisherOptions)
{
}
BasePublisherType = BasePublisher<>
因此:
inline BasePublisher<port_t>::BasePublisher(const capro::ServiceDescription& service,
const PublisherOptions& publisherOptions)
: m_port(iox::runtime::PoshRuntime::getInstance().getMiddlewarePublisher(service, publisherOptions))
{
}
m_port 是 PublisherPortUser类型。
getMiddlewarePublisher主要操作如下:
1、组包发送
IpcMessage sendBuffer;
sendBuffer << IpcMessageTypeToString(IpcMessageType::CREATE_PUBLISHER) << m_appName
<< static_cast<cxx::Serialization>(service).toString() << publisherOptions.serialize().toString()
<< static_cast<cxx::Serialization>(portConfigInfo).toString();
2、获取RouDi的返回数据(上文已分析):
cxx::expected<PublisherPortUserType::MemberType_t*, IpcMessageErrorType>
PoshRuntimeImpl::requestPublisherFromRoudi(const IpcMessage& sendBuffer) noexcept
{
...
auto ptr = rp::BaseRelativePointer::getPtr(segmentId, offset);
return cxx::success<PublisherPortUserType::MemberType_t*>(
reinterpret_cast<PublisherPortUserType::MemberType_t*>(ptr));
...
}
获取到RouDi返回的offset,转化为 PublisherPortData 型指针。
用该指针初始化m_port:
PublisherPortUser::PublisherPortUser(cxx::not_null<MemberType_t* const> publisherPortDataPtr) noexcept
: BasePort(publisherPortDataPtr)
, m_chunkSender(&getMembers()->m_chunkSenderData)
{
}
该ptr(PublisherPortData 类型) 初始化了 父类BasePort 的 m_basePortDataPtr。
用ptr 的 m_chunkSenderData 也是首地址,初始化PublisherPortUser :: m_chunkSender 。
注意:该ptr的成员都在 /dev/shm/iceoryx_mgmt 上。
然后publisher可用这个m_port 进行一系列:
tryAllocateChunk
sendChunk
操作。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。