赞
踩
将上述几篇博客的内容整理了一下,补充部分概念,但是自己理解的还是不够深入。
每个Index由多个Shard组成(默认是5个),每个Shard有一个主节点和多个副本节点,副本个数可配。
1、客户端选择一个node发送请求过去,这个node就是coordinating node(协调节点)。
2、coordinating node,对document进行路由,将请求转发给对应的node(有primary shard)。
索引请求中可以设置使用哪个Filed的值作为路由参数,如果没有设置,则使用Mapping中的配置,如果mapping中也没有配置,则使用_id作为路由参数,然后通过_routing的Hash值选择出Shard(在OperationRouting类中),最后从集群的Meta中找出出该Shard的Primary节点。
3、实际的node上的primary shard处理请求,然后将数据同步到replica node,primary shard会将结果返回到coordinating node。
4、coordinating node,如果发现primary node和所有replica node都搞定之后,就返回响应结果给客户端。
收到用户请求后首先检测请求的合法性,把检查操作放在处理流程的第一步,有问题就直接拒绝,对异常请求的处理代价是最小的。
检查操作进行以下参数检查,如下表所示:
每项检查遇到异常都会拒绝当前请求。
数据预处理(ingest)工作通过定义pipeline和processors实现。pipeline是一系列processors的定义,processors按照声明的顺序执行。如果Index或Bulk请求中指定了pipeline参数,则先使用相应的pipeline进行处理。
这里需要详细说明一下什么是pipline和processors:
Elasticsearch引入管道(Pipeline)这一个概念,是为了让那些有过工作经验的人来说更直白,更轻松的理解这一概念。这里就以Pipeline和java中的Stream进行类比,两者从功能和概念上很类似,我们经常会对Stream中的数据进行处理,比如map操作,peek操作,reduce操作,count操作等,这些操作从行为上说,就是对数据的加工,而Pipeline也是如此,Pipeline也会对通过该Pipeline的数据(一般来说是文档)进行加工,比如上面说到的,修改文档的某个字段值,修改文档某个字段的类型等等.而Elasticsearch对该加工行为进行抽象包装,并称之为Processors。Elasticsearch命名了多种类型的Processors来规范对文档的操作,比如set,append,date,join,json,kv等等。简单的来说,多个对ducument进行存储之前的处理(Processors)组成Pipeline,比如修改字段名,修改字段名的值等操作。
如果配置为允许自动创建索引(默认允许),则计算请求中涉及的索引,可能有多个,其中有哪些索引是不存在的,然后创建它。如果部分索引创建失败,则涉及创建失败索引的请求被标记为失败。其他索引正常执行写流程。
创建索引请求被发送到Master节点,待收到全部创建请求的Response(无论成功还是失败的)之后,才进入下一个流程。Master节点什么时候返回Response?在Master节点执行完创建索引流程,将新的clusterState发布完毕才会返回。那什么才算发布完毕呢?默认情况下,Master发布clusterState的Request收到半数以上的节点Response,认为发
布成功。负责写数据的节点会先执行一遍内容路由的过程以处理没有收到最新clusterState的情况。
这里不同于对数据的预处理,对请求的预先处理只是检查参数、自动生成id、处理routing等。
由于上一步可能有创建索引操作,所以在此先获取最新集群状态信息。然后遍历所有请求,从集群状态中获取对应索引的元信息,检查mapping、routing、id等信息。如果id不存在,则生成一个UUID作为文档id。
协调节点在开始处理时会先检测集群状态,若集群异常则取消写入。例如,Master节点不存在,会阻塞等待Master节点直至超时。
因此索引为Red时,如果Master节点存在,则数据可以写到正常shard,Master节点不存在,协调节点会阻塞等待或取消写入。
将用户的 bulkRequest 重新组织为基于 shard 的请求列表。例如,原始用户请求可能有10个写操作,如果这些文档的主分片都属于同一个,则写请求被合并为1个。所以这里本质上是合并请求的过程。此处尚未确定主分片节点。
主分片所在节点收到协调节点发来的请求后也是先做了校验工作,主要检测要写的是否是主分片,AllocationId是否符合预期,索引是否处于关闭状态等。
补充: AllocationId是区分不同分片的唯一标识,集群级的元数据中记录了一个被认为是最新shard的AllocationIID集合,称之为in-sync AllocationI IDs,也就是说,这里的检查请求,1是通过AllocationId检查该shard是否为指定主分片,二是检查该主分片是否在in-sync AllocationI IDs集合中。
判断请求是否需要延迟执行,如果需要延迟则放入队列,否则继续下面的流程。
如果已经发生迁移,则转发请求到迁移的节点。
补充:在高并发下,有概率在该主分片接收到请求时发生故障,已经发生角色迁移。
在开始写之前,检测本次写操作涉及的shard,活跃shard数量是否足够,不足则不执行写入。默认为1,只要主分片可用就执行写入。
循环执行每个Single Write Request,对于每个Request,根据操作类型(CREATE/INDEX/UPDATE/DELETE)选择不同的处理逻辑。
其中,Create/Index是直接新增Doc,Delete是直接根据_id删除Doc,Update会稍微复杂些,我们下面就以Update为例来介绍。
这一步是Update操作的特有步骤,在这里,会将Update请求转换为Index或者Delete请求。首先,会通过GetRequest查询到已经存在的同_id Doc(如果有)的完整字段和值(依赖_source字段),然后和请求中的Doc合并。同时,这里会获取到读到的Doc版本号,记做V1。
这里会解析Doc中各个字段。生成ParsedDocument对象,同时会生成uid Term。在Elasticsearch中,_uid = type # _id,对用户,_Id可见,而Elasticsearch中存储的是_uid。这一部分生成的ParsedDocument中也有Elasticsearch的系统字段,大部分会根据当前内容填充,部分未知的会在后面继续填充ParsedDocument。
Elasticsearch中有个自动更新Mapping的功能,就在这一步生效。会先挑选出Mapping中未包含的新Field,然后判断是否运行自动更新Mapping,如果允许,则更新Mapping。
由于当前是Primary Shard,则会从SequenceNumber Service获取一个sequenceID和Version。SequenceID在Shard级别每次递增1,SequenceID在写入Doc成功后,会用来初始化LocalCheckpoint。Version则是根据当前Doc的最大Version递增1。
遍历请求,处理动态更新字段映射,然后调用InternalEngine#index逐条对doc进行索引。Engine封装了Lucene和translog的调用,对外提供读写接口。
在写入Lucene之前,先生成Sequence Number和Version,Sequence Number每次递增1,Version根据当前doc的最大版本加1。
索引过程为先写Lucene,后写translog。因为Lucene写入时对数据有检查,写操作可能会失败。如果先写translog,写入Lucene 时失败,则还需要对translog进行回滚处理。
根据配置的translog flush策略进行刷盘控制,定时或立即刷盘。
现在已经为要写的副本shard准备了一个列表,循环处理每个shard,跳过unassigned状态的shard,向目标节点发送请求,等待响应。这个过程是异步并行的。
转发请求时会将SequenceID、PrimaryTerm、GlobalCheckPoint、version等传递给副分片。
(这些概念会在数据模型中详细说明,虽然这篇介绍帖还没写出来,哈哈)
在等待Response的过程中,本节点发出了多少个Request,就要等待多少个Response。无论这些Response是成功的还是失败的,直到超时。
收集到全部的Response后,执行finish()。给协调节点返回消息,告知其哪些成功、哪些失败了。
主分片所在节点将发送一个shardFailed请求给Master。
sendShardAction(SHARD_FAILED_ACTION_NAME, currentState,shardEntry, listener);
然后Master会更新集群状态,在新的集群状态中,这个shard将:
根据请求类型是Index还是Delete,选择不同的执行逻辑。这里没有Update,是因为在Primary Node中已经将Update转换成了Index或Delete请求了。
同主分片。
同主分片。
Primary Node中会生成Sequence ID和Version,然后放入ReplicaRequest中,这里只需要从Request中获取到就行。
由于已经在Primary Node中将部分Update请求转换成了Index或Delete请求,这里只需要处理Index和Delete两种请求,不再需要处理Update请求了。比Primary Node会更简单一些。
同主分片。
同主分片。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。