当前位置:   article > 正文

死磕Elasticsearch(四)写入过程_elasticsearch 主分片迁移同时写入

elasticsearch 主分片迁移同时写入

参考来源

  1. 《Elasticsearch源码解析与优化实战》
  2. https://blog.csdn.net/jiaojiao521765146514/article/details/83753215
  3. https://blog.csdn.net/jiankunking/article/details/89172081

将上述几篇博客的内容整理了一下,补充部分概念,但是自己理解的还是不够深入。

1 基础数据模型

每个Index由多个Shard组成(默认是5个),每个Shard有一个主节点和多个副本节点,副本个数可配。

在这里插入图片描述

2 粗粒度流程

1、客户端选择一个node发送请求过去,这个node就是coordinating node(协调节点)。
2、coordinating node,对document进行路由,将请求转发给对应的node(有primary shard)。
索引请求中可以设置使用哪个Filed的值作为路由参数,如果没有设置,则使用Mapping中的配置,如果mapping中也没有配置,则使用_id作为路由参数,然后通过_routing的Hash值选择出Shard(在OperationRouting类中),最后从集群的Meta中找出出该Shard的Primary节点。
3、实际的node上的primary shard处理请求,然后将数据同步到replica node,primary shard会将结果返回到coordinating node。
4、coordinating node,如果发现primary node和所有replica node都搞定之后,就返回响应结果给客户端。

3 细粒度流程

该图来自《Elasticsearch源码解析与优化实战》

3.1 协调节点流程:

3.1.1 参数检查

    收到用户请求后首先检测请求的合法性,把检查操作放在处理流程的第一步,有问题就直接拒绝,对异常请求的处理代价是最小的。
检查操作进行以下参数检查,如下表所示:
该图来自《Elasticsearch源码解析与优化实战》

每项检查遇到异常都会拒绝当前请求。

3.1.2 处理pipeline请求

    数据预处理(ingest)工作通过定义pipeline和processors实现。pipeline是一系列processors的定义,processors按照声明的顺序执行。如果Index或Bulk请求中指定了pipeline参数,则先使用相应的pipeline进行处理。

这里需要详细说明一下什么是pipline和processors:
    Elasticsearch引入管道(Pipeline)这一个概念,是为了让那些有过工作经验的人来说更直白,更轻松的理解这一概念。这里就以Pipeline和java中的Stream进行类比,两者从功能和概念上很类似,我们经常会对Stream中的数据进行处理,比如map操作,peek操作,reduce操作,count操作等,这些操作从行为上说,就是对数据的加工,而Pipeline也是如此,Pipeline也会对通过该Pipeline的数据(一般来说是文档)进行加工,比如上面说到的,修改文档的某个字段值,修改文档某个字段的类型等等.而Elasticsearch对该加工行为进行抽象包装,并称之为Processors。Elasticsearch命名了多种类型的Processors来规范对文档的操作,比如set,append,date,join,json,kv等等。简单的来说,多个对ducument进行存储之前的处理(Processors)组成Pipeline,比如修改字段名,修改字段名的值等操作。

3.1.3 自动创建索引

    如果配置为允许自动创建索引(默认允许),则计算请求中涉及的索引,可能有多个,其中有哪些索引是不存在的,然后创建它。如果部分索引创建失败,则涉及创建失败索引的请求被标记为失败。其他索引正常执行写流程。

    创建索引请求被发送到Master节点,待收到全部创建请求的Response(无论成功还是失败的)之后,才进入下一个流程。Master节点什么时候返回Response?在Master节点执行完创建索引流程,将新的clusterState发布完毕才会返回。那什么才算发布完毕呢?默认情况下,Master发布clusterState的Request收到半数以上的节点Response,认为发
布成功。负责写数据的节点会先执行一遍内容路由的过程以处理没有收到最新clusterState的情况。

3.1.4 对请求的预先处理

    这里不同于对数据的预处理,对请求的预先处理只是检查参数、自动生成id、处理routing等。

    由于上一步可能有创建索引操作,所以在此先获取最新集群状态信息。然后遍历所有请求,从集群状态中获取对应索引的元信息,检查mapping、routing、id等信息。如果id不存在,则生成一个UUID作为文档id。

3.1.5 检测集群状态

    协调节点在开始处理时会先检测集群状态,若集群异常则取消写入。例如,Master节点不存在,会阻塞等待Master节点直至超时。

    因此索引为Red时,如果Master节点存在,则数据可以写到正常shard,Master节点不存在,协调节点会阻塞等待或取消写入。

3.1.6 内容路由,构建基于shard的请求

    将用户的 bulkRequest 重新组织为基于 shard 的请求列表。例如,原始用户请求可能有10个写操作,如果这些文档的主分片都属于同一个,则写请求被合并为1个。所以这里本质上是合并请求的过程。此处尚未确定主分片节点。

3.2 主分片写入流程

3.2.1 预处理:

3.2.1.1 检查请求

    主分片所在节点收到协调节点发来的请求后也是先做了校验工作,主要检测要写的是否是主分片,AllocationId是否符合预期,索引是否处于关闭状态等。
    补充: AllocationId是区分不同分片的唯一标识,集群级的元数据中记录了一个被认为是最新shard的AllocationIID集合,称之为in-sync AllocationI IDs,也就是说,这里的检查请求,1是通过AllocationId检查该shard是否为指定主分片,二是检查该主分片是否在in-sync AllocationI IDs集合中。

3.2.1.2 是否延迟执行

    判断请求是否需要延迟执行,如果需要延迟则放入队列,否则继续下面的流程。

3.2.1.3 判断主分片是否已经发生迁移

    如果已经发生迁移,则转发请求到迁移的节点。
补充:在高并发下,有概率在该主分片接收到请求时发生故障,已经发生角色迁移。

3.2.1.4 检测写一致性

    在开始写之前,检测本次写操作涉及的shard,活跃shard数量是否足够,不足则不执行写入。默认为1,只要主分片可用就执行写入。

3.2.2 元数据处理:

3.2.2.1 Index or Update or Delete

    循环执行每个Single Write Request,对于每个Request,根据操作类型(CREATE/INDEX/UPDATE/DELETE)选择不同的处理逻辑。

    其中,Create/Index是直接新增Doc,Delete是直接根据_id删除Doc,Update会稍微复杂些,我们下面就以Update为例来介绍。

3.2.2.2 Translate Update To Index or Delete

    这一步是Update操作的特有步骤,在这里,会将Update请求转换为Index或者Delete请求。首先,会通过GetRequest查询到已经存在的同_id Doc(如果有)的完整字段和值(依赖_source字段),然后和请求中的Doc合并。同时,这里会获取到读到的Doc版本号,记做V1。

3.2.2.3 Parse Doc

    这里会解析Doc中各个字段。生成ParsedDocument对象,同时会生成uid Term。在Elasticsearch中,_uid = type # _id,对用户,_Id可见,而Elasticsearch中存储的是_uid。这一部分生成的ParsedDocument中也有Elasticsearch的系统字段,大部分会根据当前内容填充,部分未知的会在后面继续填充ParsedDocument。

3.2.2.4 Update Mapping

    Elasticsearch中有个自动更新Mapping的功能,就在这一步生效。会先挑选出Mapping中未包含的新Field,然后判断是否运行自动更新Mapping,如果允许,则更新Mapping。

3.2.2.5 Get Sequence Id and Version

    由于当前是Primary Shard,则会从SequenceNumber Service获取一个sequenceID和Version。SequenceID在Shard级别每次递增1,SequenceID在写入Doc成功后,会用来初始化LocalCheckpoint。Version则是根据当前Doc的最大Version递增1。

3.2.3 Lucene和translog处理:

3.2.3.1 写Lucene和事务日志

    遍历请求,处理动态更新字段映射,然后调用InternalEngine#index逐条对doc进行索引。Engine封装了Lucene和translog的调用,对外提供读写接口。
    在写入Lucene之前,先生成Sequence Number和Version,Sequence Number每次递增1,Version根据当前doc的最大版本加1。

    索引过程为先写Lucene,后写translog。因为Lucene写入时对数据有检查,写操作可能会失败。如果先写translog,写入Lucene 时失败,则还需要对translog进行回滚处理。

3.2.3.2 flush translog

    根据配置的translog flush策略进行刷盘控制,定时或立即刷盘。

3.2.3.3 写副分片

    现在已经为要写的副本shard准备了一个列表,循环处理每个shard,跳过unassigned状态的shard,向目标节点发送请求,等待响应。这个过程是异步并行的。
转发请求时会将SequenceID、PrimaryTerm、GlobalCheckPoint、version等传递给副分片。
(这些概念会在数据模型中详细说明,虽然这篇介绍帖还没写出来,哈哈)

    在等待Response的过程中,本节点发出了多少个Request,就要等待多少个Response。无论这些Response是成功的还是失败的,直到超时。

    收集到全部的Response后,执行finish()。给协调节点返回消息,告知其哪些成功、哪些失败了。

3.2.3.4 处理副分片写失败情况

    主分片所在节点将发送一个shardFailed请求给Master。

sendShardAction(SHARD_FAILED_ACTION_NAME, currentState,shardEntry, listener);
  • 1

然后Master会更新集群状态,在新的集群状态中,这个shard将:

  1. 从in_sync_allocations列表中删除;
  2. 在routing_table的shard列表中将state由STARTED更改为UNASSIGNED;
  3. 添加到routingNodes的unassignedShards列表。

3.3 副分片写入流程

3.3.1 Index or Delete

    根据请求类型是Index还是Delete,选择不同的执行逻辑。这里没有Update,是因为在Primary Node中已经将Update转换成了Index或Delete请求了。

3.3.2 Parse Doc

同主分片。

3.3.3 Update Mapping

同主分片。

3.3.4 Get Sequence Id and Version

    Primary Node中会生成Sequence ID和Version,然后放入ReplicaRequest中,这里只需要从Request中获取到就行。

3.3.5 Add Doc To Lucene

    由于已经在Primary Node中将部分Update请求转换成了Index或Delete请求,这里只需要处理Index和Delete两种请求,不再需要处理Update请求了。比Primary Node会更简单一些。

3.3.6 Write Translog

    同主分片。

3.3.7 Flush Translog

    同主分片。

3.4 document写入原理

在这里插入图片描述

3.4.1 关键词概念

  1. segment file: 存储逆向索引的文件,每个segment本质上就是一个逆向索引,每秒都会生成一个segment文件,当文件过多时es会自动进行segment merge(合并文件),合并时会同时将已经标注删除的文档物理删除。
  2. commit point(重点理解): 记录当前所有可用的segment,每个commit point都会维护一个.del文件(es删除数据本质是不属于物理删除),当es做删改操作时首先会在.del文件中声明某个document已经被删除,文件内记录了在某个segment内某个文档已经被删除,当查询请求过来时在segment中被删除的文件是能够查出来的,但是当返回结果时会根据commit point维护的那个.del文件把已经删除的文档过滤掉。
  3. translog日志文件: 为了防止elasticsearch宕机造成数据丢失保证可靠存储,es会将每次写入数据同时写到translog日志中(图中会有详解)。

3.4.2 关键流程

  1. 数据写入buffer缓冲和translog日志文件
  2. 每隔一秒钟,buffer中的数据被写入新的segment file,并进入os cache,此时segment被打开并供search使用。
  3. buffer被清空。
  4. 重复1~3,新的segment不断添加,buffer不断被清空,而translog中的数据不断累加
  5. 当translog长度达到一定程度的时候,commit操作发生。
    1. buffer中的所有数据写入一个新的segment,并写入os cache,打开供使用
    2. buffer被清空
    3. 一个commit ponit被写入磁盘,标明了所有的index segment
    4. filesystem cache中的所有index segment file缓存数据,被fsync强行刷到磁盘上
    5. 现有的translog被清空,创建一个新的translog

3.4 写入流程主要特性

  1. 可靠性,通过Replica和TransLog两套机制保证数据的可靠性。
  2. 一致性,Lucene中的Flush锁只保证Update接口里面Delete和Add中间不会Flush,但无法保证主分片与副本分片一致。因为如果add之后立即flush,这个时候segment是主分片可见的,但副本分片要落后于主分片。不过最终都会一致。
  3. 原子性,Add和Delete具有原子性。当部分更新时,使用Version和锁保证更新是原子的。
  4. 实时性,Flush之后的segment对用户可见,最快可配置100ms,可实现near-real-time。特定的查询,直接查TransLog,可实现real-time。
  5. 隔离性:仍然采用Version和局部锁来保证更新的是特定版本的数据。
  6. 性能,在很多地方的设计都考虑到了性能
    1. 不需要所有Replica都返回后才能返回给用户,只需要返回特定数目的就行。
    2. 生成的Segment现在内存中提供服务,等一段时间后才刷新到磁盘,Segment在内存这段时间的可靠性由TransLog保证.
    3. TransLog可以配置为周期性的Flush,但这个会给可靠性带来伤害。
    4. 每个线程持有一个Segment,多线程时相互不影响,相互独立,性能更好。
    5. 系统的写入流程对版本依赖较重,读取频率较高,因此采用了versionMap,减少热点数据的多次磁盘IO开销。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/939135
推荐阅读
相关标签
  

闽ICP备14008679号