赞
踩
从文件名看,这是一个publish和subscribe的demo。
publish<—>subscribe模式是一种常见的设计模式,比较有名的应用场景是mqtt。
废话不多说,直接上代码。
constexpr char APP_NAME[] = "iox-cpp-publisher-helloworld";
iox::runtime::PoshRuntime::initRuntime(APP_NAME);
这是main的第一段代码,这段代码至关重要,做了很多事情。
这行代码一直追踪下去,是这个类成员方法:
PoshRuntime& PoshRuntime::defaultRuntimeFactory(cxx::optional<const RuntimeName_t*> name) noexcept
{
static PoshRuntimeImpl instance(name);
return instance;
}
不难看出,调用 类 PoshRuntimeImpl 的构造,生成了一个静态实例。来看一下这个类的构造:
PoshRuntimeImpl(cxx::optional<const RuntimeName_t*> name, const RuntimeLocation location):PoshRuntime(name)
PoshRuntime(name)没什么可说的,初始化了父类的m_appName成员。
m_ipcChannelInterface(roudi::IPC_CHANNEL_ROUDI_NAME, *name.value(), runtime::PROCESS_WAITING_FOR_ROUDI_TIMEOUT)
这段代码做了很多事情,总结来说就是:
1、建立两个socket,UDS类型,UDP协议
2、与RouDi建立连接,注册,获得返回参数
m_runtimeName(runtimeName)不用多说,也是初始化名称。注意这里传入的是 APP_NAME,
m_AppIpcInterface(runtimeName) 创建一个SERVER类型的socket,看名字可知是APP的IPC接口,重点代码如下:
openIpcChannel(posix::IpcChannelSide::SERVER);
bool IpcInterfaceBase::openIpcChannel(const posix::IpcChannelSide channelSide) noexcept
{
m_ipcChannel.destroy().or_else(
[this](auto) { LogWarn() << "unable to destroy previous ipc channel " << m_runtimeName; });
m_channelSide = channelSide;
platform::IoxIpcChannelType::create(m_runtimeName, m_channelSide, m_maxMessageSize, m_maxMessages)
.and_then([this](auto& ipcChannel) { this->m_ipcChannel = std::move(ipcChannel); });
return m_ipcChannel.isInitialized();
}
直接看IoxIpcChannelType类的create,由using IoxIpcChannelType = iox::posix::UnixDomainSocket可知
template <typename DerivedClass, typename ErrorType>
template <typename... Targs>
inline typename Creation<DerivedClass, ErrorType>::result_t
Creation<DerivedClass, ErrorType>::create(Targs&&... args) noexcept
{
return verify(DerivedClass(std::forward<Targs>(args)...));
}
看起来比较复杂,分析一下:
DerivedClass 是 UnixDomainSocket,DerivedClass(std::forward(args)…)其实是调了UnixDomainSocket的构造。UnixDomainSocket有两个构造,第一个构造增加了个参数 NoPathPrefix,并将UdsName_t初始化为/tmp/APP_NAME ,然后调用第二个构造。该构造最重要的操作为:
initalizeSocket()
这个函数也比较简单,调了posix接口,创建了一个与 /tmp/APP_NAME 绑定的m_sockfd,SERVER类型,构造的UnixDomainSocket在create().and_then里赋给了m_ipcChannel 。
m_RoudiIpcInterface(roudiName)
这个看名字可知是RouDi的IPC接口,重点代码如下:
openIpcChannel(posix::IpcChannelSide::CLIENT);
跟上面openIpcChannel(posix::IpcChannelSide::SERVER)类似,不过换了个参数。这里创建了一个m_sockfd去connect /tmp/roudi,即与RouDi建立连接。
该操作也是在IpcRuntimeInterface的构造函数里。
m_RoudiIpcInterface.timedSend(sendBuffer, 100_ms);
sendBuffer的内容:REG << APP_NAME << pid << getUserOfCurrentProcess().getID() << transmissionTimestamp << :getCurrentVersion()
sendBuffer :1,iox-cpp-publisher-helloworld,5246,1000,1656049274105816,
1:22:901:01:020:2022-06-24T05:37:47Z12:c8f2f47ac19a
最终调用了
m_ipcChannel.timedSend(msg.getMessage(), timeout)即 UnixDomainSocket::timedSend(const std::string& msg,
const units::Duration& timeout)
{
auto sendCall = posixCall(iox_sendto)(m_sockfd, msg.c_str(), msg.size() + NULL_TERMINATOR_SIZE, 0, nullptr, 0)
}
调用udp的send_to,发给 /tmp/roudi 的server即 RouDi,server端完成register,并返回ACK。
waitForRegAck(transmissionTimestamp) == RegAckResult::SUCCESS
if (m_AppIpcInterface.timedReceive(1_s, receiveBuffer))
receiveBuffer填充了m_shmTopicSize、m_segmentManagerAddressOffset、m_segmentId,这些是RouDi创建的shared memory base address。publisher将根据这几个参数映射到shared memory上。
SharedMemoryUser(const size_t topicSize, const uint64_t segmentId, const rp::BaseRelativePointer::offset_t segmentManagerAddressOffset) { posix::SharedMemoryObjectBuilder() .name(roudi::SHM_NAME) .memorySizeInBytes(topicSize) .accessMode(posix::AccessMode::READ_WRITE) .openMode(posix::OpenMode::OPEN_EXISTING) .permissions(SHM_SEGMENT_PERMISSIONS) .create() .and_then([this, segmentId, segmentManagerAddressOffset](auto& sharedMemoryObject) { rp::BaseRelativePointer::registerPtr( <1> segmentId, sharedMemoryObject.getBaseAddress(), sharedMemoryObject.getSizeInBytes()); LogDebug() << "Application registered management segment " << iox::log::HexFormat(reinterpret_cast<uint64_t>(sharedMemoryObject.getBaseAddress())) << " with size " << sharedMemoryObject.getSizeInBytes() << " to id " << segmentId; this->openDataSegments(segmentId, segmentManagerAddressOffset); <2> m_shmObject.emplace(std::move(sharedMemoryObject)); }) .or_else([](auto&) { errorHandler(PoshError::POSH__SHM_APP_MAPP_ERR); }); }
所有的操作都在上面代码段,做了如下事情:
1、 registered management segment /dev/shm/iceoryx_mgmt ,用作内存管理
2、 registered management segment /dev/shm/whoami ,作为mempool
SharedMemoryObjectBuilder::create()里有两个create():
SharedMemoryBuilder::create()负责shm_open /dev/shm/iceoryx_mgmt ,并返回文件描述符 m_sharedMemory->m_handle
MemoryMapBuilder::create()调用mmap将m_handle和m_memorySizeInBytes映射到虚拟地址空间,返回一个起始地址m_memoryMap->m_baseAddress
Allocator allocator(memoryMap->getBaseAddress(), m_memorySizeInBytes);
这个操作是创建了一个内存分配器,赋值给m_allocator后面会用到。
<1>处创建了一个static 类型的PointerRepository,并将m_info[segmentId].basePtr 赋值为 memoryMap- >getBaseAddress() ,并根据size计算出 m_info[segmentId].endPtr。
<2>处跟上面操作类似,将**/dev/shm/whoami** 映射进了虚拟地址空间。
注意传入的两个参数,一个是**/dev/shm/iceoryx_mgmt** 的起始地址(虚拟地址),一个是segmentManager的偏移。之后从segmentManager->m_segmentContainer取出MePooSegment,获取其成员属性构造SegmentMapping类,并填入SegmentMappingContainer类型的vector中。
mappingContainer.emplace_back(segment.getWriterGroup().getName(),
segment.getSharedMemoryObject().getBaseAddress(),
segment.getSharedMemoryObject().getSizeInBytes(),
true,
segment.getSegmentId());
然后注册 payload data segment
for (const auto& segment : segmentMapping) { auto accessMode = segment.m_isWritable ? posix::AccessMode::READ_WRITE : posix::AccessMode::READ_ONLY; posix::SharedMemoryObjectBuilder() .name(segment.m_sharedMemoryName) .memorySizeInBytes(segment.m_size) .accessMode(accessMode) .openMode(posix::OpenMode::OPEN_EXISTING) .permissions(SHM_SEGMENT_PERMISSIONS) .create() .and_then([this, &segment](auto& sharedMemoryObject) { if (static_cast<uint32_t>(m_dataShmObjects.size()) >= MAX_SHM_SEGMENTS) { errorHandler(PoshError::POSH__SHM_APP_SEGMENT_COUNT_OVERFLOW); } rp::BaseRelativePointer::registerPtr( segment.m_segmentId, sharedMemoryObject.getBaseAddress(), sharedMemoryObject.getSizeInBytes()); LogDebug() << "Application registered payload data segment " << iox::log::HexFormat(reinterpret_cast<uint64_t>(sharedMemoryObject.getBaseAddress())) << " with size " << sharedMemoryObject.getSizeInBytes() << " to id " << segment.m_segmentId; m_dataShmObjects.emplace_back(std::move(sharedMemoryObject)); }) .or_else([](auto&) { errorHandler(PoshError::POSH__SHM_APP_SEGMENT_MAPP_ERR); }); }
生成的sharedMemoryObject放入m_dataShmObjects中。
1) 生成的sharedMemoryObject初始化m_shmObject。
initRuntime(APP_NAME)做了如下事情:
1 创建两个socket,一个 /tmp/APP_NAME 作为server接收RouDi的消息,一个 /tmp/roudi 为client,负责给RouDi发消息。
2 注册到RouDi。
3 根据返回信息,注册两个segment,一个 /dev/shm/iceoryx_mgmt 负责内存管理,一个 /dev/shm/whoami 作为内存池。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。