当前位置:   article > 正文

【 iceoryx 零拷贝】 iox_subscriber_helloworld 2_iceoryx memorymanager: unable to acquire a chunk w

iceoryx memorymanager: unable to acquire a chunk with a chunk-payload size o

subscriber 的 receiver过程

    // run until interrupted by Ctrl-C
    while (!iox::posix::hasTerminationRequested())
    {
        //! [receive]
        auto takeResult = subscriber.take();								//<1>
        if (!takeResult.has_error())
        {
            std::cout << APP_NAME << " got value: " << takeResult.value()->x << std::endl;
        }
        //! [receive]
        else
        {
            //! [error]
            if (takeResult.get_error() == iox::popo::ChunkReceiveResult::NO_CHUNK_AVAILABLE)
            {
                std::cout << "No chunk available." << std::endl;
            }
		...
        }

        //! [wait]
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        //! [wait]
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

<1> 获取 publisher push的trunk。

template <typename T, typename H = mepoo::NoUserHeader>
class Subscriber : public SubscriberImpl<T, H>
{
    using Impl = SubscriberImpl<T, H>;
  public:
    using SubscriberImpl<T, H>::SubscriberImpl;

    virtual ~Subscriber() noexcept
    {
        Impl::m_trigger.reset();
    }
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

在这里 T = RadarObject ,这个不重要。H 为空。
实际调用的是

inline cxx::expected<Sample<const T, const H>, ChunkReceiveResult>
SubscriberImpl<T, H, BaseSubscriberType>::take() noexcept
{
    auto result = BaseSubscriberType::takeChunk();
    if (result.has_error())
    {
        return cxx::error<ChunkReceiveResult>(result.get_error());
    }
    auto userPayloadPtr = static_cast<const T*>(result.value()->userPayload());
    auto samplePtr = cxx::unique_ptr<const T>(userPayloadPtr, [this](auto* userPayload) {
        auto chunkHeader = iox::mepoo::ChunkHeader::fromUserPayload(userPayload);
        this->port().releaseChunk(chunkHeader);
    });
    return cxx::success<Sample<const T, const H>>(std::move(samplePtr));
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

追踪代码,最终调用了

inline cxx::expected<const mepoo::ChunkHeader*, ChunkReceiveResult> BaseSubscriber<port_t>::takeChunk() noexcept
{
    return m_port.tryGetChunk();
}
  • 1
  • 2
  • 3
  • 4

m_port :iox::popo::SubscriberPortUser;

cxx::expected<const mepoo::ChunkHeader*, ChunkReceiveResult> SubscriberPortUser::tryGetChunk() noexcept
{
    return m_chunkReceiver.tryGet();
}
  • 1
  • 2
  • 3
  • 4

ChunkReceiver < SubscriberPortData::ChunkReceiverData_t > m_chunkReceiver

inline cxx::expected<const mepoo::ChunkHeader*, ChunkReceiveResult>
ChunkReceiver<ChunkReceiverDataType>::tryGet() noexcept
{
    auto popRet = this->tryPop();

    if (popRet.has_value())
    {
        auto sharedChunk = *popRet;

        // if the application holds too many chunks, don't provide more
        if (getMembers()->m_chunksInUse.insert(sharedChunk))
        {
            return cxx::success<const mepoo::ChunkHeader*>(
                const_cast<const mepoo::ChunkHeader*>(sharedChunk.getChunkHeader()));
        }
        else
        {
            // release the chunk
            sharedChunk = nullptr;
            return cxx::error<ChunkReceiveResult>(ChunkReceiveResult::TOO_MANY_CHUNKS_HELD_IN_PARALLEL);
        }
    }
    return cxx::error<ChunkReceiveResult>(ChunkReceiveResult::NO_CHUNK_AVAILABLE);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

this指向了ChunkReceiver的父类 :ChunkQueuePopper

template <typename ChunkQueueDataType>
inline cxx::optional<mepoo::SharedChunk> ChunkQueuePopper<ChunkQueueDataType>::tryPop() noexcept
{
    auto retVal = getMembers()->m_queue.pop();     //<5>

    // check if queue had an element that was poped and return if so
    if (retVal.has_value())
    {
        auto chunk = retVal.value().releaseToSharedChunk();

        auto receivedChunkHeaderVersion = chunk.getChunkHeader()->chunkHeaderVersion();
        if (receivedChunkHeaderVersion != mepoo::ChunkHeader::CHUNK_HEADER_VERSION)
        {
            return cxx::nullopt_t();
        }
        return cxx::make_optional<mepoo::SharedChunk>(chunk);
    }
    else
    {
        return cxx::nullopt_t();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

<5> 翻译一下:m_chunkQueueDataPtr->m_queue.pop()
m_chunkQueueDataPtr : ChunkQueueData
cxx::VariantQueue<mepoo::ShmSafeUnmanagedChunk, MAX_CAPACITY> m_queue;

看一下整个数据结构之间的关系:

在这里插入图片描述

接下来是 VariantQueue 的 pop:

   switch (m_type)
    {
	    case VariantQueueTypes::FiFo_SingleProducerSingleConsumer:
	    {       
	        return m_fifo.template get_at_index<static_cast<uint64_t>(VariantQueueTypes::FiFo_SingleProducerSingleConsumer)>()->pop();     //<6>                  
	    }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

<6>处 选取一个模板(SoFi_SingleProducerSingleConsumer :1) ,即 FiFo<ValueType, Capacity>

先看一下push:

template <class ValueType, uint64_t Capacity>
inline bool FiFo<ValueType, Capacity>::push(const ValueType& f_param_r) noexcept
{
    if (is_full())
    {
        return false;
    }
    else
    {
        auto currentWritePos = m_write_pos.load(std::memory_order_relaxed);
        m_data[currentWritePos % Capacity] = f_param_r;

        // m_write_pos must be increased after writing the new value otherwise
        // it is possible that the value is read by pop while it is written.
        // this fifo is a single producer, single consumer fifo therefore
        // store is allowed.
        m_write_pos.store(currentWritePos + 1, std::memory_order_release);
        return true;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

ValueType m_data[Capacity];
m_data为ValueType类型的数组,大小为 Capacity 。
经过查询,Capacity 为 MAX_RESPONSE_QUEUE_CAPACITY = 16
f_param_r 是一个 ValueType 类型的引用。

再看pop:

template <class ValueType, uint64_t Capacity>
inline cxx::optional<ValueType> FiFo<ValueType, Capacity>::pop() noexcept
{
    auto currentReadPos = m_read_pos.load(std::memory_order_relaxed);
    bool isEmpty = (currentReadPos ==
                    // we are not allowed to use the empty method since we have to sync with
                    // the producer pop - this is done here
                    m_write_pos.load(std::memory_order_acquire)); //<7>
    if (isEmpty)
    {
        return cxx::nullopt_t();
    }
    else
    {
        ValueType out = m_data[currentReadPos % Capacity]; //<8>

        // m_read_pos must be increased after reading the pop'ed value otherwise
        // it is possible that the pop'ed value is overwritten by push while it is read.
        // Implementing a single consumer fifo here allows us to use store.
        m_read_pos.store(currentReadPos + 1, std::memory_order_relaxed);
        return out;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

<7>处先判断 m_read_pos =?m_write_pos ,即队列是否为空?
然后<8>取出数据。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/705936
推荐阅读
相关标签
  

闽ICP备14008679号