赞
踩
int main(int argc, char* argv[]) noexcept
{
using iox::roudi::IceOryxRouDiApp;
iox::config::CmdLineParserConfigFileOption cmdLineParser;
auto cmdLineArgs = cmdLineParser.parse(argc, argv);
iox::config::TomlRouDiConfigFileProvider configFileProvider(cmdLineArgs.value());
auto roudiConfig = configFileProvider.parse();
IceOryxRouDiApp roudi(cmdLineArgs.value(), roudiConfig.value());
return roudi.run();
}
iox::config::TomlRouDiConfigFileProvider configFileProvider(cmdLineArgs.value());
传入的参数为 传入的 cfg 文件
IceOryxRouDiApp roudi(cmdLineArgs.value(), roudiConfig.value());
构造函数里仅仅 又 构造了一个 RouDiApp
IceOryxRouDiApp::IceOryxRouDiApp(const config::CmdLineArgs_t& cmdLineArgs, const RouDiConfig_t& roudiConfig) noexcept
: RouDiApp(cmdLineArgs, roudiConfig)
给一些主要成员赋值,设置Log等级。。。
m_logLevel(cmdLineArgs.logLevel)
m_config(config)
uint8_t IceOryxRouDiApp::run() noexcept { if (m_run) { static cxx::optional<IceOryxRouDiComponents> m_rouDiComponents; //<1> auto componentsScopeGuard = cxx::makeScopedStatic(m_rouDiComponents, m_config); //<2> static cxx::optional<RouDi> roudi; auto roudiScopeGuard = cxx::makeScopedStatic(roudi, //<3> m_rouDiComponents.value().rouDiMemoryManager, m_rouDiComponents.value().portManager, RouDi::RoudiStartupParameters{m_monitoringMode, true, RouDi::RuntimeMessagesThreadStart::IMMEDIATE, m_compatibilityCheckLevel, m_processKillDelay}); waitForSignal(); } return EXIT_SUCCESS; }
template <typename T>
inline optional<T>::optional() noexcept
: optional(nullopt_t())
{
}
template <typename T, typename... CTorArgs>
inline GenericRAII makeScopedStatic(T& memory, CTorArgs&&... ctorArgs) noexcept
{
memory.emplace(std::forward<CTorArgs>(ctorArgs)...);
return GenericRAII([] {}, [&memory] { memory.reset(); });
}
相当于开辟了一块sizeof ( IceOryxRouDiComponents )大小的空间,用 m_config 填充,将(void *)m_data指向这块空间,并将m_data强转为 IceOryxRouDiComponents * ,
inline void optional<T>::construct_value(Targs&&... args) noexcept
{
new (static_cast<T*>(static_cast<void*>(m_data))) T(std::forward<Targs>(args)...);
m_hasValue = true;
}
此处会调用 IceOryxRouDiComponents 的构造,这个构造至关重要:
IceOryxRouDiComponents::IceOryxRouDiComponents(const RouDiConfig_t& roudiConfig) noexcept
: rouDiMemoryManager(roudiConfig)
, portManager([&]() -> IceOryxRouDiMemoryManager* {
runtime::IpcInterfaceBase::cleanupOutdatedIpcChannel(roudi::IPC_CHANNEL_ROUDI_NAME); //<1>
rouDiMemoryManager.createAndAnnounceMemory().or_else([](RouDiMemoryManagerError error) { //<2>
LogFatal() << "Could not create SharedMemory! Error: " << error;
errorHandler(PoshError::ROUDI_COMPONENTS__SHARED_MEMORY_UNAVAILABLE, iox::ErrorLevel::FATAL);
});
return &rouDiMemoryManager;
}())
{
}
<1>处清理 过期的 IPC channel 。
<2>处创建内存池
先看一下 IceOryxRouDiMemoryManager 的构造
IceOryxRouDiMemoryManager::IceOryxRouDiMemoryManager(const RouDiConfig_t& roudiConfig) noexcept
: m_defaultMemory(roudiConfig)
{
m_defaultMemory.m_managementShm.addMemoryBlock(&m_portPoolBlock).or_else([](auto) {
errorHandler(PoshError::ICEORYX_ROUDI_MEMORY_MANAGER__FAILED_TO_ADD_PORTPOOL_MEMORY_BLOCK, ErrorLevel::FATAL);
});
m_memoryManager.addMemoryProvider(&m_defaultMemory.m_managementShm).or_else([](auto) { //<3>
errorHandler(PoshError::ICEORYX_ROUDI_MEMORY_MANAGER__FAILED_TO_ADD_MANAGEMENT_MEMORY_BLOCK, ErrorLevel::FATAL);
});
}
初始化了成员 m_defaultMemory(DefaultRouDiMemory) ,很重要 :
DefaultRouDiMemory::DefaultRouDiMemory(const RouDiConfig_t& roudiConfig) noexcept
: m_introspectionMemPoolBlock(introspectionMemPoolConfig())
, m_segmentManagerBlock(roudiConfig)
, m_managementShm(SHM_NAME, posix::AccessMode::READ_WRITE, posix::OpenMode::PURGE_AND_CREATE) //<4>
{
m_managementShm.addMemoryBlock(&m_introspectionMemPoolBlock).or_else([](auto) { //<7>
errorHandler(PoshError::ROUDI__DEFAULT_ROUDI_MEMORY_FAILED_TO_ADD_INTROSPECTION_MEMORY_BLOCK,
ErrorLevel::FATAL);
});
m_managementShm.addMemoryBlock(&m_segmentManagerBlock).or_else([](auto) { //<8>
errorHandler(PoshError::ROUDI__DEFAULT_ROUDI_MEMORY_FAILED_TO_ADD_SEGMENT_MANAGER_MEMORY_BLOCK,
ErrorLevel::FATAL);
});
}
主要看<4> m_defaultMemory.m_shmName 为 SHM_NAME “iceoryx_mgmt”,会在 /dev/shm/下看到。
<7>、<8>处 m_managementShm.m_memoryBlocks.push_back(memoryBlock )了两次,一块用来缓存Topic,一块是cfg.toml文件中的内存池配置。
<3>处将 m_managementShm push 进 IceOryxRouDiMemoryManager::m_memoryManager 。
cxx::expected<RouDiMemoryManagerError> IceOryxRouDiMemoryManager::createAndAnnounceMemory() noexcept
{
auto result = m_memoryManager.createAndAnnounceMemory();
auto portPool = m_portPoolBlock.portPool();
if (!result.has_error() && portPool.has_value())
{
m_portPool.emplace(*portPool.value());
}
return result;
}
调用:
cxx::expected<RouDiMemoryManagerError> RouDiMemoryManager::createAndAnnounceMemory() noexcept
{
for (auto memoryProvider : m_memoryProvider)
{
auto result = memoryProvider->create(); //<5>
}
for (auto memoryProvider : m_memoryProvider)
{
memoryProvider->announceMemoryAvailable(); //<6>
}
return cxx::success<>();
}
uint64_t totalSize = 0u;
uint64_t maxAlignment = 1;
for (auto memoryBlock : m_memoryBlocks)
{
auto alignment = memoryBlock->alignment();
if (alignment > maxAlignment)
{
maxAlignment = alignment;
}
// just in case the memory block doesn't calculate its size as multiple of the alignment
// this shouldn't be necessary, but also doesn't harm
auto size = cxx::align(memoryBlock->size(), alignment);
totalSize = cxx::align(totalSize, alignment) + size;
}
<7><8>处push了两块 memoryBlock ,这里全部取出,对齐(留出sizeof>8),计算出 totalSize 。
auto memoryResult = createMemory(totalSize, maxAlignment);
实际调用:
cxx::expected<void*, MemoryProviderError> PosixShmMemoryProvider::createMemory(const uint64_t size, const uint64_t alignment) noexcept { if (!posix::SharedMemoryObjectBuilder() .name(m_shmName) .memorySizeInBytes(size) .accessMode(m_accessMode) .openMode(m_openMode) .permissions(SHM_MEMORY_PERMISSIONS) .create() .and_then([this](auto& sharedMemoryObject) { sharedMemoryObject.finalizeAllocation(); m_shmObject.emplace(std::move(sharedMemoryObject)); })) { return cxx::error<MemoryProviderError>(MemoryProviderError::MEMORY_CREATION_FAILED); } auto baseAddress = m_shmObject->getBaseAddress(); return cxx::success<void*>(baseAddress); }
关键方法:
SharedMemoryObjectBuilder::create()
依次调用:
auto sharedMemory = SharedMemoryBuilder()
.name(m_name)
.accessMode(m_accessMode)
.openMode(m_openMode)
.filePermissions(m_permissions)
.size(m_memorySizeInBytes)
.create();
实质是 shm_open(name, oflag, mode) 创建 /dev/shm/iceoryx_mgmt 设备,并返回 sharedMemoryFileHandle 句柄
然后:
auto memoryMap = MemoryMapBuilder()
.baseAddressHint((m_baseAddressHint) ? *m_baseAddressHint : nullptr)
.length(m_memorySizeInBytes)
.fileDescriptor(sharedMemory->getHandle())
.accessMode(m_accessMode)
.flags(MemoryMapFlags::SHARE_CHANGES)
.offset(0)
.create();
调用mmap(void* addr, size_t length, int prot, int flags, int fd, off_t offset) 将文件映射成一个虚拟地址,并返回一个MemoryMap实例,包含这块内存的baseAddress、length。
return cxx::success<MemoryMap>(MemoryMap(result.value().value, m_length));
MemoryMap::MemoryMap(void* const baseAddress, const uint64_t length) noexcept
: m_baseAddress(baseAddress)
, m_length(length)
接下来构造一个 Allocator ,用于计算地址偏移:
Allocator allocator(memoryMap->getBaseAddress(), m_memorySizeInBytes);
返回:
return cxx::success<SharedMemoryObject>(
SharedMemoryObject(std::move(*sharedMemory), std::move(*memoryMap), std::move(allocator), m_memorySizeInBytes));
之后 获取每个 memoryBlock 的 m_memory
iox::posix::Allocator allocator(m_memory, m_size);
for (auto memoryBlock : m_memoryBlocks)
{
memoryBlock->m_memory = allocator.allocate(memoryBlock->size(), memoryBlock->alignment());
}
(void *)m_memory = m_startAddress + alignedPosition;
至此 <5> 讲完。
void MemoryProvider::announceMemoryAvailable() noexcept
{
if (!m_memoryAvailableAnnounced)
{
for (auto memoryBlock : m_memoryBlocks)
{
memoryBlock->onMemoryAvailable(memoryBlock->m_memory);
}
m_memoryAvailableAnnounced = true;
}
}
关注一下 MemPoolSegmentManagerMemoryBlock::onMemoryAvailable。即 m_defaultMemory :
void MemPoolSegmentManagerMemoryBlock::onMemoryAvailable(cxx::not_null<void*> memory) noexcept
{
posix::Allocator allocator(memory, size());
auto segmentManager = allocator.allocate(sizeof(mepoo::SegmentManager<>), alignof(mepoo::SegmentManager<>));
m_segmentManager = new (segmentManager) mepoo::SegmentManager<>(m_segmentConfig, &allocator);
}
在这里实例化了一个 SegmentManager 类型的 m_segmentManager ,传入的 m_segmentConfig 即 xxx.toml里的配置,注意 这里的 allocator 的 m_currentPosition 向后偏移了 sizeof(mepoo::SegmentManager<>)
inline SegmentManager<SegmentType>::SegmentManager(const SegmentConfig& segmentConfig,
posix::Allocator* managementAllocator) noexcept
: m_managementAllocator(managementAllocator)
{
cxx::Expects(segmentConfig.m_sharedMemorySegments.capacity() <= m_segmentContainer.capacity());
for (const auto& segmentEntry : segmentConfig.m_sharedMemorySegments) //<1>
{
createSegment(segmentEntry);
}
}
要说清楚 <1> 需要 看一下 解析cfg.toml的过程,在 roudi_config_toml_file_provider.cpp文件的 TomlRouDiConfigFileProvider::parse():
[general]
version = 1
[[segment]]
[[segment.mempool]]
size = 128
count = 10000
[[segment.mempool]]
size = 1024
count = 5000
for (auto segment : *segments) { auto writer = segment->get_as<std::string>("writer").value_or(groupOfCurrentProcess); auto reader = segment->get_as<std::string>("reader").value_or(groupOfCurrentProcess); iox::mepoo::MePooConfig mempoolConfig; auto mempools = segment->get_table_array("mempool"); for (auto mempool : *mempools) { auto chunkSize = mempool->get_as<uint32_t>("size"); auto chunkCount = mempool->get_as<uint32_t>("count"); mempoolConfig.addMemPool({*chunkSize, *chunkCount}); // <2> } parsedConfig.m_sharedMemorySegments.push_back( {iox::posix::PosixGroup::string_t(iox::cxx::TruncateToCapacity, reader), iox::posix::PosixGroup::string_t(iox::cxx::TruncateToCapacity, writer), mempoolConfig}); }
配置文件中只有一个 segment , 该 segment 有多个 mempool ,获取 mempool 的 chunkSize 、chunkCount 填充 mempoolConfig.m_mempoolConfig ,最后填充 cxx::vector<SegmentEntry, MAX_SHM_SEGMENTS> m_sharedMemorySegments; 这个vector 即 <1> 中的。
因此 <1>中取出的 segmentEntry 即 struct SegmentEntry 类型,构造:
SegmentEntry(const posix::PosixGroup::string_t& readerGroup,
const posix::PosixGroup::string_t& writerGroup,
const MePooConfig& memPoolConfig,
iox::mepoo::MemoryInfo memoryInfo = iox::mepoo::MemoryInfo()) noexcept
: m_readerGroup(readerGroup)
, m_writerGroup(writerGroup)
, m_mempoolConfig(memPoolConfig)
, m_memoryInfo(memoryInfo)
{
}
下面有个 填充 m_segmentContainer ,其类型为:
cxx::vector<SegmentType, MAX_SHM_SEGMENTS> m_segmentContainer;
SegmentType实际的类型为:
template <typename SegmentType = MePooSegment<>>
因此 以下操作会调用 MePooSegment 的构造 :
m_segmentContainer.emplace_back(
segmentEntry.m_mempoolConfig, *m_managementAllocator, readerGroup, writerGroup, segmentEntry.m_memoryInfo);
inline MePooSegment<SharedMemoryObjectType, MemoryManagerType>::MePooSegment(
const MePooConfig& mempoolConfig,
posix::Allocator& managementAllocator,
const posix::PosixGroup& readerGroup,
const posix::PosixGroup& writerGroup,
const iox::mepoo::MemoryInfo& memoryInfo) noexcept
: m_sharedMemoryObject(std::move(createSharedMemoryObject(mempoolConfig, writerGroup))) //<3>
, m_readerGroup(readerGroup)
, m_writerGroup(writerGroup)
, m_memoryInfo(memoryInfo)
<3>处最重要 创建了 /dev/shm/whoami ,并地址映射:
inline SharedMemoryObjectType MePooSegment<SharedMemoryObjectType, MemoryManagerType>::createSharedMemoryObject( const MePooConfig& mempoolConfig, const posix::PosixGroup& writerGroup) noexcept { std::cout<<"MePooSegment:"<<writerGroup.getName()<<std::endl; return std::move( typename SharedMemoryObjectType::Builder() .name(writerGroup.getName()) //<4> .memorySizeInBytes(MemoryManager::requiredChunkMemorySize(mempoolConfig)) //<5> .accessMode(posix::AccessMode::READ_WRITE) .openMode(posix::OpenMode::PURGE_AND_CREATE) .permissions(SEGMENT_PERMISSIONS) .create() .and_then([this](auto& sharedMemoryObject) { this->setSegmentId(iox::rp::BaseRelativePointer::registerPtr(sharedMemoryObject.getBaseAddress(), sharedMemoryObject.getSizeInBytes())); LogDebug() << "Roudi registered payload data segment " << iox::log::HexFormat(reinterpret_cast<uint64_t>(sharedMemoryObject.getBaseAddress())) << " with size " << sharedMemoryObject.getSizeInBytes() << " to id " << m_segmentId; }) .or_else([](auto&) { errorHandler(PoshError::MEPOO__SEGMENT_UNABLE_TO_CREATE_SHARED_MEMORY_OBJECT); }) .value()); }
<4>处获取用户名 whoami ;<5> 处 计算 memorySize :
uint64_t MemoryManager::requiredChunkMemorySize(const MePooConfig& mePooConfig) noexcept
{
uint64_t memorySize{0};
for (const auto& mempoolConfig : mePooConfig.m_mempoolConfig) //<6>
{
memorySize += cxx::align(static_cast<uint64_t>(mempoolConfig.m_chunkCount)
* MemoryManager::sizeWithChunkHeaderStruct(mempoolConfig.m_size),
MemPool::CHUNK_MEMORY_ALIGNMENT); //<7>
}
return memorySize;
}
<6> 取出 [[segment.mempool]] size = 128 count = 10000 [[segment.mempool]] size = 1024 count = 5000
<7> 1000 * [128 + sizeof(ChunkHeader)] 再进行 CHUNK_MEMORY_ALIGNMENT 对齐。最终计算出 总的 size。
创建好 /dev/shm/whoami 并 mmap 后 ,返回一个 sharedMemoryObject 进行 registerPtr()注册获得 SegmentId。
m_memoryManager.configureMemoryManager(mempoolConfig, managementAllocator, m_sharedMemoryObject.getAllocator());
m_sharedMemoryObject.finalizeAllocation();
MemoryManager 类型的 m_memoryManager
void MemoryManager::configureMemoryManager(const MePooConfig& mePooConfig,
posix::Allocator& managementAllocator,
posix::Allocator& chunkMemoryAllocator) noexcept
{
for (auto entry : mePooConfig.m_mempoolConfig)
{
addMemPool(managementAllocator, chunkMemoryAllocator, entry.m_size, entry.m_chunkCount);
}
generateChunkManagementPool(managementAllocator); //<8>
}
addMemPool 最终调用了 MemPool 的构造 ,填充 cxx::vector<MemPool, 32> m_memPoolVector;
看一下 MemPool :
MemPool::MemPool(const cxx::greater_or_equal<uint32_t, CHUNK_MEMORY_ALIGNMENT> chunkSize, const cxx::greater_or_equal<uint32_t, 1> numberOfChunks, posix::Allocator& managementAllocator, posix::Allocator& chunkMemoryAllocator) noexcept : m_chunkSize(chunkSize) , m_numberOfChunks(numberOfChunks) , m_minFree(numberOfChunks) { if (isMultipleOfAlignment(chunkSize)) { m_rawMemory = static_cast<uint8_t*>(chunkMemoryAllocator.allocate( static_cast<uint64_t>(m_numberOfChunks) * m_chunkSize, CHUNK_MEMORY_ALIGNMENT)); auto memoryLoFFLi = managementAllocator.allocate(freeList_t::requiredIndexMemorySize(m_numberOfChunks), CHUNK_MEMORY_ALIGNMENT); m_freeIndices.init(static_cast<concurrent::LoFFLi::Index_t*>(memoryLoFFLi), m_numberOfChunks); } }
传入 4个参数 : [[segment.mempool]] size (已经+ChunkHeader)、count 、managementAllocator(管理模块的指针,在/dev/shm/iceoryx_mgmt )、chunk池的指针(chunkMemoryAllocator 在 /dev/shm/whoami )
MemPool.m_rawMemory 指向 该 [[segment.mempool]] 的起始address。
MemPool.m_freeIndices 指向 (m_numberOfChunks * uint32_t )的首地址 ,实际是一个freelist
m_nextFreeIndex[i] = i + 1;
相当于 每个uint32_t 代表一个chunk,起始 填入 1,2,3… m_numberOfChunks
<8>处如下:
void MemoryManager::generateChunkManagementPool(posix::Allocator& managementAllocator) noexcept
{
m_denyAddMemPool = true;
uint32_t chunkSize = sizeof(ChunkManagement);
m_chunkManagementPool.emplace_back(chunkSize, m_totalNumberOfChunks, managementAllocator, managementAllocator);
}
相当于在 /dev/shm/iceoryx_mgmt 上划分了 m_totalNumberOfChunks 个 ChunkManagement ,并做了 freelist。
补充: MemoryManager->size = sizeof(mepoo::SegmentManager<>) + m_totalNumberOfChunks * u32 + m_totalNumberOfChunks * sizeof(ChunkManagement) + m_totalNumberOfChunks * u32
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。