当前位置:   article > 正文

ceph bluestore 写操作源码分析_ceph源码

ceph源码

bluestore写操作分为大写和小写,写操作经过层层处理后最终被传递到BlueStore::queue_transactions函数中,如下:

ReplicatedBackend::submit_transaction
    parent->queue_transactions(tls, op.op); //调用PrimaryLogPG::queue_transactions
        osd->store->queue_transactions(ch, tls, op, NULL); //调用BlueStore::queue_transactions

queue_transaction中经过层层调用,最终会调用_do_write_data来分别处理大写和小写,如下:

_txc_add_transaction(txc, &(*p));
    _write(txc, c, o, off, len, bl, fadvise_flags);
        _do_write(txc, c, o, offset, length, bl, fadvise_flags);
            _do_write_data(txc, c, o, offset, length, bl, &wctx);

其中offset和length分别是要写的数据再对象内的逻辑偏移和长度。
_do_write_data实现如下

if (offset / min_alloc_size == (end - 1) / min_alloc_size && (length != min_alloc_size))
    _do_write_small(txc, c, o, offset, length, p, wctx);
else
    head_offset = offset;  //offset为对象内偏移
    head_length = p2nphase(offset, min_alloc_size);      //头部的字节,即头部多出多少字节
    tail_offset = p2align(end, min_alloc_size);
    tail_length = p2phase(end, min_alloc_size)
    middle_offset = head_offset + head_length;
    middle_length = length - head_length - tail_length;
    if (head_length)
        _do_write_small(txc, c, o, head_offset, head_length, p, wctx);
    if (middle_length) 
        _do_write_big(txc, c, o, middle_offset, middle_length, p, wctx);
    if (tail_length)
        _do_write_small(txc, c, o, tail_offset, tail_length, p, wctx);

在_do_write_data中,会判断要写的数据是否处于一个min_alloc_size(一般为block size大小的整数倍)大小中,如果是,则就是小写,调用_do_write_small去处理。如果要写的数据跨越一个min_alloc_size,则会把数据按照min_alloc_size划分为大写和小写,如下图所示

使用以下方法调优 BlueStore 以实现小写操作 bluestore_min_alloc_size

在 BlueStore 中,原始分区以 bluestore_min_alloc_size 的块的形式进行配置和管理。默认情况下,bluestore_min_alloc_size 用于 HDD,16 KB 代表 SSD。当每个块中的未写入区域写入原始分区时,它会被填充为零。如果工作负载没有正确定义未使用空间(例如编写小对象时),这会导致浪费空间。最佳实践是将 bluestore_min_alloc_size 设置为与最小写入操作相匹配,从而避免产生放大的罚款。

例如,如果您的客户端频繁写入 4 KB 对象,请使用 ceph-ansible 在 OSD 节点上配置以下设置:

bluestore_min_alloc_size = 4096

注意:bluestore_min_alloc_size_ssd 和 bluestore_min_alloc_size_hdd 的设置分别特定于 SSD 和 HDD,但不需要设置它们,因为设置 bluestore_min_alloc_size 会覆盖它们。

_do_write_small会处理各种场景的小写,如下:

uint64_t end_offs = offset + length;//在对象内结束的逻辑地址
auto ep = o->extent_map.seek_lextent(offset);//返回offset处于的lextent,如果均小于则返回extent_map的end迭代,均大于则返回begin迭代
auto max_bsize = std::max(wctx->target_blob_size, min_alloc_size);
uint32_t alloc_len = min_alloc_size;
auto offset0 = p2align(offset, alloc_len); //返回往前最近按照alloc_len对齐的地址
BlobRef b = ep->blob;
auto bstart = ep->blob_start();//return logical_offset - blob_offset; 即返回这个blob开始处所代表的对象内逻辑偏移
uint64_t chunk_size = b->get_blob().get_chunk_size(block_size); //block_size
head_pad = p2phase(offset, chunk_size);  //头部需要填充多少字节
tail_pad = p2nphase(end_offs, chunk_size); //尾部需要填充多少字节
if (head_pad && o->extent_map.has_any_lextents(offset - head_pad, chunk_size)) 
//检查是否有lextent和[offset-head_pad, offset-head_pad+chunk_size]区间重叠,如果有则不能用0补齐 
    head_pad = 0;
if (tail_pad && o->extent_map.has_any_lextents(end_offs, tail_pad)) 
检查是否有lextent和[end_offs, end_offs+tail_pad]区间重叠,如果有则不能用0补齐
    tail_pad = 0;
    
uint64_t b_off = offset - head_pad - bstart; //offset-head_pad在blob中的偏移
uint64_t b_len = length + head_pad + tail_pad; 

if ((b_off % chunk_size == 0 && b_len % chunk_size == 0)&&
b->get_blob().get_ondisk_length() >= b_off + b_len &&  //已分配的pextent包含[b_off, b_off+b_len]
b->get_blob().is_unused(b_off, b_len) && //blob中[b_off, b_off+b_len]区间未使用
b->get_blob().is_allocated(b_off, b_len)) { //blob中[b_off, b_off+b_len]对应的pextent已分配
    _apply_padding(head_pad, tail_pad, bl);  //头部和尾部扩充0
    //map函数就是找到blob对应的pextent,然后调用传递给map的lambda表达式
    if (b_len <= prefer_deferred_size)
        bluestore_deferred_op_t *op = _get_deferred_op(txc, o);
        op->op = bluestore_deferred_op_t::OP_WRITE;
        b->get_blob().map(b_off, b_len, [&](uint64_t offset, uint64_t length) {op->extents.emplace_back(bluestore_pextent_t(offset, length));return 0;});
        op->data = bl;
    else
        //找到blob对应的pextent,并把b_off起始的bl数据写到pextent中
        b->get_blob().map_bl(b_off, bl, [&](uint64_t offset, bufferlist& t) {bdev->aio_write(offset, t, &txc->ioc, wctx->buffered); });
    
    Extent *le = o->extent_map.set_lextent(c, offset, b_off + head_pad, length, b, &wctx->old_extents); //b_off+head_pad代表了原始数据开始位置在blob中的偏移     
        //如果有lextent包含[logical_offset, logical_offset+length],则需要将[logical_offset, logical_offset+length]从原来的那个lextent中剔除
        //原来的lextent分成一个或者两个lextent
        punch_hole(c, logical_offset, length, old_extents); 

首先检测能否前后补齐,其是利用has_any_lextents来实现的,has_any_lextents去检测是否有lextent和已知区间重叠,如果有则说明要补齐的区间(或部分)已经存在于lextent了,这时候需要从磁盘中读出这部分数据,而不是用0补齐。但是检测头部补齐区间时是用的o->extent_map.has_any_lextents(offset - head_pad, chunk_size)而不是
o->extent_map.has_any_lextents(offset - head_pad, head_pad)这一点让我很疑惑。
然后如果满足
(1)数据在blob中的偏移b_off和长度b_len都已经按照chunk_size对齐
(2)blob中已经分配的pextent长度大于b_off+b_len
(3)blob中[b_off, b_off+b_len]区间未使用
(4)blob中[b_off, b_off+b_len]对应的pextent已分配
就可以利用blob未使用的区间来存放这部分数据。注意这里的b_off和b_len是已经pad后的偏移和长度
如果在blob中的数据长度b_len小于prefer_deferred_size,就会执行延迟写,否则就会执行异步写。
最后调用set_lextent的作用有两个
(1)如果有lextent包含[logical_offset, logical_offset+length],则需要将[logical_offset, logical_offset+length]从原来的那个lextent中剔除(称作old_extent),原来的lextent分成一个或者两个lextent
(2)新创建一个lextent并插入到extent_map中
对于上面的这种小写场景,并不会发生(1)的情况。而且如果extent_map中有很多小的lextent,会执行相邻lextent合并 ,这里处理的情况如下图所示:


继续往下看_do_write_small

uint64_t head_read = p2phase(b_off, chunk_size);
uint64_t tail_read = p2nphase(b_off + b_len, chunk_size);
if (b->get_blob().get_ondisk_length() >= b_off + b_len && b_off % chunk_size == 0 && b_len % chunk_size == 0 && b->get_blob().is_allocated(b_off, b_len)) //对应的pextent已经分配
    _apply_padding(head_pad, tail_pad, bl); //数据两端填充0
    if (head_read)
        _do_read(c.get(), o, offset - head_pad - head_read, head_read, head_bl, 0);
        head_bl.claim_append(bl);
        bl.swap(head_bl);
    if (tail_read) 
        _do_read(c.get(), o, offset + length + tail_pad, tail_read, tail_bl, 0);
        bl.claim_append(tail_bl);
    bluestore_deferred_op_t *op = _get_deferred_op(txc, o);     
    op->op = bluestore_deferred_op_t::OP_WRITE;
    _buffer_cache_write(txc, b, b_off, bl, wctx->buffered ? 0 : Buffer::FLAG_NOCACHE);
    b->get_blob().map(b_off, b_len, [&](uint64_t offset, uint64_t length) {op->extents.emplace_back(bluestore_pextent_t(offset, length));return 0;});
    op->data.claim(bl);
    Extent *le = o->extent_map.set_lextent(c, offset, offset - bstart, length, b, &wctx->old_extents);
    b->dirty_blob().mark_used(le->blob_offset, le->length);

这里就是处理覆盖写的情况了,因为bluestore是按照block_size整块写的,因此要把数据填充成block_size的整数倍来写入磁盘,前面是利用pad 0来填充,这里是要读取出覆盖的部分,然后利用读取的部分填充,这里要满足以几个条件
(1)blob中已经分配的pextent长度大于b_off+b_len
(2)b_off和b_len都已经按照chunk_size对齐
(3)blob中[b_off, b_off+b_len]对应的pextent已分配
注意这里的b_off和b_len是已经pad和读填充后的偏移和长度。
然后去读取head_read和tail_read部分的数据,并利用延迟写去写合并后的数据,同样最后也会调用set_lextent去punch_hole已存在的lextent和插入新的lextent。注意这里和上面不一样的是,这里新的lextent可能和原来的lextent有重叠,因此存在lextent分片的可能。
这里处理的情况如下图所示


_do_write_small然后再次检查是否可以复用blob,如下

if (b->can_reuse_blob(min_alloc_size, max_bsize, offset0 - bstart, &alloc_len))
    if (!wctx->has_conflict(b, offset0, offset0 + alloc_len, min_alloc_size)) 
        o->extent_map.punch_hole(c, offset, length, &wctx->old_extents);
        wctx->write(offset, b, alloc_len, b_off0, bl, b_off, length, false, false);
        
if (prev_ep != end && prev_ep->logical_offset >= min_off)
    if (b->can_reuse_blob(min_alloc_size, max_bsize, offset0 - bstart, &alloc_len))
        if (!wctx->has_conflict(b, offset0, offset0 + alloc_len, min_alloc_size))
            uint64_t chunk_size = b->get_blob().get_chunk_size(block_size);
            _pad_zeros(&bl, &b_off0, chunk_size);
            o->extent_map.punch_hole(c, offset, length, &wctx->old_extents);
            wctx->write(offset, b, alloc_len, b_off0, bl, b_off, length, false, false);

最后,如果上述场景都不满足,则要写的数据处于一个新的blob中,_do_write_small最后一部分就是处理新的blob的

BlobRef b = c->new_blob();
uint64_t b_off = p2phase(offset, alloc_len);  //往前补多少字节的0
uint64_t b_off0 = b_off;
 //b_off代表对齐alloc_len大小需要扩充的字节数,pad_zeros只按照block_size扩展了,
 //因此b_off0要减去不按照block_size对齐的大小
_pad_zeros(&bl, &b_off0, block_size);  
o->extent_map.punch_hole(c, offset, length, &wctx->old_extents); 
wctx->write(offset, b, alloc_len, b_off0, bl, b_off, length, true, true);  //入队,在另一个地方被写

大写的处理流程比小写简单很多,因此就不多叙述
_do_write_data函数退出后,就开始执行_do_alloc_write,_do_alloc_write函数负责处理wctx->write函数入队的操作

for (auto& wi : wctx->writes)
    alloc->reserve(need);
    PExtentVector prealloc;
    prealloc.reserve(2 * wctx->writes.size());
    prealloc_left = alloc->allocate(need, min_alloc_size, need, 0, &prealloc);
    auto prealloc_pos = prealloc.begin();
    PExtentVector extents;
    int64_t left = final_length; //该wi压缩后的final_length
    while (left > 0) 
        if (prealloc_pos->length <= left)
            prealloc_left -= prealloc_pos->length;  //prealloc_left刚开始为已申请的空间总大小,这里代表申请的空间中还剩多少
            left -= prealloc_pos->length; //代表该wi还剩多少要没有分配
            extents.push_back(*prealloc_pos); //将本段的bluestore_pextent_t插入extents中
            ++prealloc_pos;
        else
            extents.emplace_back(prealloc_pos->offset, left);
            prealloc_pos->offset += left;
            prealloc_pos->length -= left;
            prealloc_left -= left;
            left = 0;
            break
    dblob.allocated(p2align(b_off, min_alloc_size), final_length, extents);
    Extent *le = o->extent_map.set_lextent(coll, wi.logical_offset, b_off + (wi.b_off0 - wi.b_off), wi.length0,  wi.b,  nullptr);   
    wi.b->dirty_blob().mark_used(le->blob_offset, le->length);
     //map函数就是找到blob对应的pextent,然后调用传递给map的lambda表达式
    if (b_len <= prefer_deferred_size)
        bluestore_deferred_op_t *op = _get_deferred_op(txc, o);
        op->op = bluestore_deferred_op_t::OP_WRITE;
        b->get_blob().map(b_off, b_len, [&](uint64_t offset, uint64_t length) {op->extents.emplace_back(bluestore_pextent_t(offset, length));return 0;});
        op->data = bl;
    else
        //找到blob对应的pextent,并把b_off起始的bl数据写到pextent中
        b->get_blob().map_bl(b_off, bl, [&](uint64_t offset, bufferlist& t) {bdev->aio_write(offset, t, &txc->ioc, wctx->buffered); });

因为插入到wctx->writes中的操作都是新的blob,因此需要先申请磁盘空间给prealloc,dblob.allocated(p2align(b_off, min_alloc_size), final_length, extents);负责将分配的pextents和对应的blob关联起来,最后根据数据的长度来决定是延迟写还是异步写。

  1. Tag: ceph 12.2.4
  2. [BlueStore.cc]
  3. int BlueStore::queue_transactions(Sequencer *posr, vector<Transaction>& tls, TrackedOpRef op, ThreadPool::TPHandle *handle)
  4. >> TransContext *txc = _txc_create(osr);
  5. 创建事务上下文,关联回调函数
  6. >> for (vector<Transaction>): _txc_add_transaction(txc, &(*p));
  7. 遍历事务列表,根据操作码处理,事务上下文添加修改操作
  8. >> _txc_write_nodes(txc, txc->t);
  9. 更新元数据
  10. >> _txc_finalize_kv(txc, txc->t);
  11. >> _txc_state_proc(txc);
  12. 状态机处理
  13. [注]
  14. Sequencer: 序列器,请求保序
  15. vector<Transaction>: 事务列表,一次可提交多个事务
  16. [BlueStore.cc]
  17. void BlueStore::_txc_add_transaction(TransContext *txc, Transaction *t)
  18. >> case Transaction::OP_WRITE: _write(txc, c, o, off, len, bl, fadvise_flags);
  19. [注]
  20. Transaction: OSD层面事务
  21. TransContext: BlueStore层面事务上下文
  22. [BlueStore.cc]
  23. int BlueStore::_write(TransContext *txc, CollectionRef& c, OnodeRef& o, uint64_t offset,
  24. size_t length, bufferlist& bl, uint32_t fadvise_flags)
  25. >> _do_write(txc, c, o, offset, length, bl, fadvise_flags);
  26. 写数据
  27. >> txc->write_onode(o);
  28. 更新元数据
  29. [BlueStore.cc]
  30. int BlueStore::_do_write(TransContext *txc, CollectionRef& c, OnodeRef o, uint64_t offset,
  31. uint64_t length, bufferlist& bl, uint32_t fadvise_flags)
  32. >> _do_write_data(txc, c, o, offset, length, bl, &wctx);
  33. >> _do_alloc_write(txc, c, o, &wctx);
  34. >> _wctx_finish(txc, c, o, &wctx);
  35. [BlueStore.cc]
  36. void BlueStore::_do_write_data(TransContext *txc, CollectionRef& c, OnodeRef o,
  37. uint64_t offset, uint64_t length, bufferlist& bl, WriteContext *wctx)
  38. >> _do_write_big(txc, c, o, middle_offset, middle_length, p, wctx);
  39. >> _do_write_small(txc, c, o, head_offset, head_length, p, wctx);
  40. [注] 根据offset、length、min_alloc_size区分执行大写、小写处理流程
  41. [BlueStore.cc]
  42. void BlueStore::_do_write_big(TransContext *txc, CollectionRef &c, OnodeRef o,
  43. uint64_t offset, uint64_t length, bufferlist::iterator& blp, WriteContext *wctx)
  44. >> o->extent_map.punch_hole(c, offset, length, &wctx->old_extents);
  45. 目标写区域与extent_map lextent所有重叠区域分离到old_extents;原lextent首尾也相应调整
  46. 是否存在可复用blob,否则新建blob;
  47. >> wctx->write(offset, b, l, b_off, t, b_off, l, false, new_blob);
  48. 写操作存入writes
  49. [BlueStore.h]
  50. void write(uint64_t loffs, BlobRef b, uint64_t blob_len, uint64_t o, bufferlist& bl,
  51. uint64_t o0, uint64_t len0, bool _mark_unused, bool _new_blob)
  52. >> writes.emplace_back(loffs, b, blob_len, o, bl, o0, len0, _mark_unused, _new_blob);
  53. [BlueStore.cc]
  54. void BlueStore::_do_write_small(TransContext *txc, CollectionRef &c, OnodeRef o,
  55. uint64_t offset, uint64_t length, bufferlist::iterator& blp, WriteContext *wctx)
  56. >> 是否需补零处理:_apply_padding
  57. >> IO是否落在blob未使用空间
  58. >> if b_len <= prefer_deferred_size:
  59. bluestore_deferred_op_t *op = _get_deferred_op(txc, o);
  60. op->op = bluestore_deferred_op_t::OP_WRITE;
  61. >> else:
  62. bdev->aio_write(offset, t, &txc->ioc, wctx->buffered);
  63. >> 覆盖写场景
  64. >> if head_read:
  65. _do_read(c.get(), o, offset - head_pad - head_read, head_read, head_bl, 0);
  66. >> if tail_read:
  67. _do_read(c.get(), o, offset + length + tail_pad, tail_read, tail_bl, 0);
  68. >> bluestore_deferred_op_t *op = _get_deferred_op(txc, o);
  69. >> op->op = bluestore_deferred_op_t::OP_WRITE;
  70. >> can_reuse_blob、punch_hole、wctx->write
  71. [BlueStore.cc]
  72. int BlueStore::_do_alloc_write(TransContext *txc, CollectionRef coll, OnodeRef o,
  73. WriteContext *wctx)
  74. >> for (wctx->writes): 计算各个write_item所需磁盘空间
  75. >> alloc->reserve(need); 预留空间
  76. >> alloc->allocate(need, min_alloc_size, need, 0, &prealloc); 预分配空间
  77. >> for (wctx->writes):
  78. >> wi.new_blob: 初始化blob;
  79. >> wi.bl长度小于等于prefer_deferred_size:
  80. bluestore_deferred_op_t *op = _get_deferred_op(txc, o); 放入txc->deferred_txn->ops
  81. op->op = bluestore_deferred_op_t::OP_WRITE;
  82. >> wi.bl长度非小于等于prefer_deferred_size:
  83. bdev->aio_write(offset, t, &txc->ioc, false); 放入pending_aios
  84. [BlueStore.cc][状态机:STATE_PREPARE]
  85. void BlueStore::_txc_state_proc(TransContext *txc)
  86. >> txc->state = TransContext::STATE_AIO_WAIT;
  87. >> _txc_aio_submit(txc);
  88. [BlueStore.cc]
  89. void BlueStore::_txc_aio_submit(TransContext *txc)
  90. >> bdev->aio_submit(&txc->ioc);
  91. [KernelDevice.cc]
  92. void KernelDevice::aio_submit(IOContext *ioc)
  93. >> aio_queue.submit_batch(ioc->running_aios.begin(), e, ioc->num_running.load(), priv,
  94. &retries);
  95. [KernelDevice.h]
  96. struct AioCompletionThread : public Thread {
  97. KernelDevice *bdev;
  98. explicit AioCompletionThread(KernelDevice *b) : bdev(b) {}
  99. void *entry() override {
  100. bdev->_aio_thread();
  101. return NULL;
  102. }
  103. } aio_thread;
  104. [KernelDevice.cc][线程:aio_thread]
  105. void KernelDevice::_aio_thread()
  106. >> aio_queue.get_next_completed(cct->_conf->bdev_aio_poll_ms, aio, max);
  107. >> aio_callback(aio_callback_priv, ioc->priv);
  108. [注] aio_callback: KernelDevice aio_cb
  109. [BlueStore.cc][线程:aio_thread]
  110. static void aio_cb(void *priv, void *priv2)
  111. >> BlueStore::AioContext *c = static_cast<BlueStore::AioContext*>(priv2);
  112. >> c->aio_finish(store);
  113. [BlueStore.h][线程:aio_thread]
  114. void aio_finish(BlueStore *store) override
  115. >> store->txc_aio_finish(this);
  116. [BlueStore.h][线程:aio_thread]
  117. void txc_aio_finish(void *p)
  118. >> _txc_state_proc(static_cast<TransContext*>(p));
  119. [BlueStore.cc][线程:aio_thread] [状态机:STATE_AIO_WAIT]
  120. void BlueStore::_txc_state_proc(TransContext *txc)
  121. >> _txc_finish_io(txc);
  122. [BlueStore.cc][线程:aio_thread]
  123. void BlueStore::_txc_finish_io(TransContext *txc)
  124. >> txc->state = TransContext::STATE_IO_DONE;
  125. >> osr->q.iterator_to(*txc);
  126. >> _txc_state_proc(&*p++);
  127. [BlueStore.cc][线程:aio_thread] [状态机:STATE_IO_DONE]
  128. void BlueStore::_txc_state_proc(TransContext *txc)
  129. >> txc->state = TransContext::STATE_KV_QUEUED;
  130. >> if cct->_conf->bluestore_sync_submit_transaction:
  131. >> txc->state = TransContext::STATE_KV_SUBMITTED;
  132. >> db->submit_transaction(txc->t);
  133. >> _txc_applied_kv(txc);
  134. >> kv_queue.push_back(txc);
  135. >> kv_cond.notify_one(); # notify kv_sync_thread线程
  136. >> kv_queue_unsubmitted.push_back(txc);
  137. [kv_sync_thread] [BlueStore.h]
  138. struct KVSyncThread : public Thread {
  139. BlueStore *store;
  140. explicit KVSyncThread(BlueStore *s) : store(s) {}
  141. void *entry() override {
  142. store->_kv_sync_thread();
  143. return NULL;
  144. }
  145. };
  146. [BlueStore.cc][线程: kv_sync_thread]
  147. void BlueStore::_kv_sync_thread()
  148. [注] 非deferred写数据完成,开始写kv元数据
  149. >> kv_cond.wait(l);
  150. >> kv_committing.swap(kv_queue);
  151. >> for (kv_committing):
  152. >> if txc->state == TransContext::STATE_KV_QUEUED:
  153. >> db->submit_transaction(txc->t);
  154. >> _txc_applied_kv(txc);
  155. >> txc->state = TransContext::STATE_KV_SUBMITTED;
  156. >> db->submit_transaction_sync(synct); 事务同步落盘
  157. >> kv_committing_to_finalize.swap(kv_committing);
  158. >> kv_finalize_cond.notify_one();
  159. [BlueStore.h]
  160. struct KVFinalizeThread : public Thread {
  161. BlueStore *store;
  162. explicit KVFinalizeThread(BlueStore *s) : store(s) {}
  163. void *entry() {
  164. store->_kv_finalize_thread();
  165. return NULL;
  166. }
  167. };
  168. [BlueStore.cc][线程: kv_finalize_thread]
  169. void BlueStore::_kv_finalize_thread()
  170. >> kv_finalize_cond.wait(l);
  171. >> kv_committed.swap(kv_committing_to_finalize);
  172. >> while(!kv_committed.empty()): _txc_state_proc(txc);
  173. [BlueStore.cc][线程: kv_finalize_thread] [状态机: STATE_KV_SUBMITTED]
  174. void BlueStore::_txc_state_proc(TransContext *txc)
  175. >> txc->state = TransContext::STATE_KV_DONE;
  176. >> _txc_committed_kv(txc);
  177. >> 状态机进入下一状态
  178. >> case TransContext::STATE_KV_DONE:
  179. >> if txc->deferred_txn: 存在WAL事务要处理
  180. txc->state = TransContext::STATE_DEFERRED_QUEUED;
  181. _deferred_queue(txc);
  182. >> else:
  183. txc->state = TransContext::STATE_FINISHING;
  184. [BlueStore.cc][线程: kv_finalize_thread]
  185. void BlueStore::_txc_committed_kv(TransContext *txc)
  186. >> if (txc->oncommit): finishers[n]->queue(txc->oncommit);
  187. >> if (txc->onreadable): finishers[n]->queue(txc->onreadable);
  188. [Finisher.h][Finisher]
  189. void queue(Context *c, int r = 0) {
  190. finisher_lock.Lock();
  191. if (finisher_queue.empty()) {
  192. finisher_cond.Signal(); # notify 线程finisher_thread
  193. }
  194. if (r) {
  195. finisher_queue_rval.push_back(pair<Context*, int>(c, r));
  196. finisher_queue.push_back(NULL);
  197. } else
  198. finisher_queue.push_back(c);
  199. finisher_lock.Unlock();
  200. }
  201. [Finisher.h]
  202. struct FinisherThread : public Thread {
  203. Finisher *fin;
  204. explicit FinisherThread(Finisher *f) : fin(f) {}
  205. void* entry() override { return (void*)fin->finisher_thread_entry(); }
  206. } finisher_thread;
  207. [Finisher.cc][线程:finisher_thread]
  208. void *Finisher::finisher_thread_entry()
  209. >> finisher_cond.Wait(finisher_lock);
  210. >> vector<Context*> ls;
  211. >> ls.swap(finisher_queue);
  212. >> for (vector<Context*>::iterator p = ls.begin(); p != ls.end(); ++p):
  213. >> (*p)->complete(0);
  214. 执行回调返回OSD层
  215. [BlueStore.cc][线程:kv_finalize_thread]
  216. void BlueStore::_deferred_queue(TransContext *txc)
  217. >> _deferred_submit_unlock(txc->osr.get());
  218. [BlueStore.cc][线程:kv_finalize_thread]
  219. void BlueStore::_deferred_submit_unlock(OpSequencer *osr)
  220. >> bdev->aio_write(start, bl, &b->ioc, false);
  221. [BlueStore.cc][线程:aio_thread]
  222. static void aio_cb(void *priv, void *priv2)
  223. >> BlueStore::AioContext *c = static_cast<BlueStore::AioContext*>(priv2);
  224. >> c->aio_finish(store);
  225. [BlueStore.cc][线程:aio_thread]
  226. void aio_finish(BlueStore *store) override
  227. >> store->_deferred_aio_finish(osr);
  228. [BlueStore.cc][线程:aio_thread]
  229. void BlueStore::_deferred_aio_finish(OpSequencer *osr)
  230. >> DeferredBatch *b = osr->deferred_running;
  231. >> for (auto& i : b->txcs): txc->state = TransContext::STATE_DEFERRED_CLEANUP;
  232. >> deferred_done_queue.emplace_back(b);
  233. >> kv_cond.notify_one(); # notify线程kv_sync_thread
  234. [BlueStore.cc][线程:kv_sync_thread]
  235. void BlueStore::_kv_sync_thread()
  236. >> 同上文,通过notify线程kv_finalize_thread,进而调用_txc_state_proc
  237. [BlueStore.cc][线程:kv_finalize_thread] [状态机: STATE_DEFERRED_CLEANUP]
  238. void BlueStore::_txc_state_proc(TransContext *txc)
  239. >> txc->state = TransContext::STATE_FINISHING;
  240. >> 状态机进入下一状态: _txc_finish(txc);
  241. [BlueStore.cc][线程:kv_finalize_thread]
  242. void BlueStore::_txc_finish(TransContext *txc)
  243. >> txc->state = TransContext::STATE_DONE;
  244. >> while (!osr->q.empty()): releasing_txc.push_back(*txc);
  245. >> while (!releasing_txc.empty()):
  246. >> _txc_release_alloc(txc);
  247. >> delete txc;

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/601698
推荐阅读
相关标签
  

闽ICP备14008679号