赞
踩
// run until interrupted by Ctrl-C while (!iox::posix::hasTerminationRequested()) { //! [receive] auto takeResult = subscriber.take(); //<1> if (!takeResult.has_error()) { std::cout << APP_NAME << " got value: " << takeResult.value()->x << std::endl; } //! [receive] else { //! [error] if (takeResult.get_error() == iox::popo::ChunkReceiveResult::NO_CHUNK_AVAILABLE) { std::cout << "No chunk available." << std::endl; } ... } //! [wait] std::this_thread::sleep_for(std::chrono::milliseconds(100)); //! [wait] }
<1> 获取 publisher push的trunk。
template <typename T, typename H = mepoo::NoUserHeader>
class Subscriber : public SubscriberImpl<T, H>
{
using Impl = SubscriberImpl<T, H>;
public:
using SubscriberImpl<T, H>::SubscriberImpl;
virtual ~Subscriber() noexcept
{
Impl::m_trigger.reset();
}
};
在这里 T = RadarObject ,这个不重要。H 为空。
实际调用的是
inline cxx::expected<Sample<const T, const H>, ChunkReceiveResult> SubscriberImpl<T, H, BaseSubscriberType>::take() noexcept { auto result = BaseSubscriberType::takeChunk(); if (result.has_error()) { return cxx::error<ChunkReceiveResult>(result.get_error()); } auto userPayloadPtr = static_cast<const T*>(result.value()->userPayload()); auto samplePtr = cxx::unique_ptr<const T>(userPayloadPtr, [this](auto* userPayload) { auto chunkHeader = iox::mepoo::ChunkHeader::fromUserPayload(userPayload); this->port().releaseChunk(chunkHeader); }); return cxx::success<Sample<const T, const H>>(std::move(samplePtr)); }
追踪代码,最终调用了
inline cxx::expected<const mepoo::ChunkHeader*, ChunkReceiveResult> BaseSubscriber<port_t>::takeChunk() noexcept
{
return m_port.tryGetChunk();
}
m_port :iox::popo::SubscriberPortUser;
cxx::expected<const mepoo::ChunkHeader*, ChunkReceiveResult> SubscriberPortUser::tryGetChunk() noexcept
{
return m_chunkReceiver.tryGet();
}
ChunkReceiver < SubscriberPortData::ChunkReceiverData_t > m_chunkReceiver ;
inline cxx::expected<const mepoo::ChunkHeader*, ChunkReceiveResult> ChunkReceiver<ChunkReceiverDataType>::tryGet() noexcept { auto popRet = this->tryPop(); if (popRet.has_value()) { auto sharedChunk = *popRet; // if the application holds too many chunks, don't provide more if (getMembers()->m_chunksInUse.insert(sharedChunk)) { return cxx::success<const mepoo::ChunkHeader*>( const_cast<const mepoo::ChunkHeader*>(sharedChunk.getChunkHeader())); } else { // release the chunk sharedChunk = nullptr; return cxx::error<ChunkReceiveResult>(ChunkReceiveResult::TOO_MANY_CHUNKS_HELD_IN_PARALLEL); } } return cxx::error<ChunkReceiveResult>(ChunkReceiveResult::NO_CHUNK_AVAILABLE); }
this指向了ChunkReceiver的父类 :ChunkQueuePopper
template <typename ChunkQueueDataType> inline cxx::optional<mepoo::SharedChunk> ChunkQueuePopper<ChunkQueueDataType>::tryPop() noexcept { auto retVal = getMembers()->m_queue.pop(); //<5> // check if queue had an element that was poped and return if so if (retVal.has_value()) { auto chunk = retVal.value().releaseToSharedChunk(); auto receivedChunkHeaderVersion = chunk.getChunkHeader()->chunkHeaderVersion(); if (receivedChunkHeaderVersion != mepoo::ChunkHeader::CHUNK_HEADER_VERSION) { return cxx::nullopt_t(); } return cxx::make_optional<mepoo::SharedChunk>(chunk); } else { return cxx::nullopt_t(); } }
<5> 翻译一下:m_chunkQueueDataPtr->m_queue.pop()
m_chunkQueueDataPtr : ChunkQueueData
cxx::VariantQueue<mepoo::ShmSafeUnmanagedChunk, MAX_CAPACITY> m_queue;
接下来是 VariantQueue 的 pop:
switch (m_type)
{
case VariantQueueTypes::FiFo_SingleProducerSingleConsumer:
{
return m_fifo.template get_at_index<static_cast<uint64_t>(VariantQueueTypes::FiFo_SingleProducerSingleConsumer)>()->pop(); //<6>
}
}
<6>处 选取一个模板(SoFi_SingleProducerSingleConsumer :1) ,即 FiFo<ValueType, Capacity>
template <class ValueType, uint64_t Capacity> inline bool FiFo<ValueType, Capacity>::push(const ValueType& f_param_r) noexcept { if (is_full()) { return false; } else { auto currentWritePos = m_write_pos.load(std::memory_order_relaxed); m_data[currentWritePos % Capacity] = f_param_r; // m_write_pos must be increased after writing the new value otherwise // it is possible that the value is read by pop while it is written. // this fifo is a single producer, single consumer fifo therefore // store is allowed. m_write_pos.store(currentWritePos + 1, std::memory_order_release); return true; } }
ValueType m_data[Capacity];
m_data为ValueType类型的数组,大小为 Capacity 。
经过查询,Capacity 为 MAX_RESPONSE_QUEUE_CAPACITY = 16
f_param_r 是一个 ValueType 类型的引用。
template <class ValueType, uint64_t Capacity> inline cxx::optional<ValueType> FiFo<ValueType, Capacity>::pop() noexcept { auto currentReadPos = m_read_pos.load(std::memory_order_relaxed); bool isEmpty = (currentReadPos == // we are not allowed to use the empty method since we have to sync with // the producer pop - this is done here m_write_pos.load(std::memory_order_acquire)); //<7> if (isEmpty) { return cxx::nullopt_t(); } else { ValueType out = m_data[currentReadPos % Capacity]; //<8> // m_read_pos must be increased after reading the pop'ed value otherwise // it is possible that the pop'ed value is overwritten by push while it is read. // Implementing a single consumer fifo here allows us to use store. m_read_pos.store(currentReadPos + 1, std::memory_order_relaxed); return out; } }
<7>处先判断 m_read_pos =?m_write_pos ,即队列是否为空?
然后<8>取出数据。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。