赞
踩
前面已经介绍了ElasticSearch的写入流程,了解了ElasticSearch写入时的分布式特性的相关原理。ElasticSearch作为一款具有强大搜索功能的存储引擎,它的读取是什么样的呢?读取相比写入简单得多,但是在使用过程中有哪些需要我们注意的呢?本篇文章会进行详细的分析。
在前面的文章我们已经知道ElasticSearch的读取分为两种GET和SEARCH。这两种操作是有一定的差异的,下面我们先对这两种核心的数据读取方式进行一一分析。
(图片来自官网)
以下是从主分片或者副本分片检索文档的步骤顺序:
注意:
在协调节点有个http_server_worker线程池。收到读请求后它的具体过程为:
数据节点上有一个get线程池。收到了请求后,处理过程为:
- private class ShardTransportHandler implements TransportRequestHandler<Request> {
- @Override
- public void messageReceived(final Request request, final TransportChannel channel, Task task) {
- asyncShardOperation(request, request.internalShardId, new ChannelActionListener<>(channel, transportShardAction, request));
- }
- }
- if (request.refresh() && !request.realtime()) {
- indexShard.refresh("refresh_flag_get");
- }
- GetResult result = indexShard.getService().get(
- request.type(), request.id(),
- request.storedFields(), request.realtime(),
- request.version(), request.versionType(),
- request.fetchSourceContext());
- private GetResult innerGet(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType, long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) {
- ................
- Engine.GetResult get = null;
- ............
- get = indexShard.get(new Engine.Get(realtime, realtime, type, id, uidTerm).version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm));
- ..........
- if (get == null || get.exists() == false) {
- return new GetResult(shardId.getIndexName(), type, id, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, -1, false, null, null, null);
- }
- try {
- return innerGetLoadFromStoredFields(type, id, gFields, fetchSourceContext, get, mapperService);
- } finally {
- get.close();
- }
- public GetResult get(Get get, BiFunction<String, SearcherScope, Engine.Searcher> searcherFactory) throws EngineException {
- try (ReleasableLock ignored = readLock.acquire()) {
- ensureOpen();
- SearcherScope scope;
- if (get.realtime()) {
- VersionValue versionValue = null;
- try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
- // we need to lock here to access the version map to do this truly in RT
- versionValue = getVersionFromMap(get.uid().bytes());
- }
- if (versionValue != null) {
- if (versionValue.isDelete()) {
- return GetResult.NOT_EXISTS;
- }
- 。。。。。。
- //刷盘操作
- refreshIfNeeded("realtime_get", versionValue.seqNo);
注意:
get过程会加读锁。处理realtime选项,如果为true,则先判断是否有数据可以刷盘,然后调用Searcher进行读取。Searcher是对IndexSearcher的封装在早期realtime为true则会从tranlog中读取,后面只会从index的lucene读取了。即实时的数据只在lucene之中。
对于Search类请求,ElasticSearch请求是查询lucene的Segment,前面的写入详情流程也分析了,新增的文档会定时的refresh到磁盘中,所以搜索是属于近实时的。而且因为没有文档id,你不知道你要检索的文档在哪个分配上,需要将索引的所有的分片都去搜索下,然后汇总。ElasticSearch的search一般有两个搜索类型
所有的搜索系统一般都是两阶段查询:
第一阶段查询到匹配的docID,第二阶段再查询DocID对应的完整文档。这种在ElasticSearch中称为query_then_fetch,另一种就是一阶段查询的时候就返回完整Doc,在ElasticSearch中叫query_and_fetch,一般第二种适用于只需要查询一个Shard的请求。因为这种一次请求就能将数据请求到,减少交互次数,二阶段的原因是需要多个分片聚合汇总,如果数据量太大那么会影响网络传输效率,所以第一阶段会先返回id。
除了上述的这两种查询外,还有一种三阶段查询的情况。
搜索里面有一种算分逻辑是根据TF和DF来计算score的,而在普通的查询中,第一阶段去每个Shard中独立查询时携带条件算分都是独立的,即Shard中的TF和DF也是独立的。虽然从统计学的基础上数据量多的情况下,每一个分片的TF和DF在整体上会趋向于准确。但是总会有情况导致局部的TF和DF不准的情况出现。
ElasticSearch为了解决这个问题引入了DFS查询。
比如DFS_query_then_fetch,它在每次查询时会先收集所有Shard中的TF和DF值,然后将这些值带入请求中,再次执行query_then_fetch,这样算分的时候TF和DF就是准确的,类似的有DFS_query_and_fetch。这种查询的优势是算分更加精准,但是效率会变差。
另一种选择是用BM25代替TF/DF模型。
在ElasticSearch7.x,用户没法指定以下两种方式:DFS_query_and_fetch和query_and_fetch。
注:这两种算分的算法模型在《ElasticSearch实战篇》有介绍:
这里query_then_fetch具体的搜索的流程图如下:
(图片来自官网)
查询阶段包含以下四个步骤:
以上就是ElasticSearch的search的详细流程,下面会对每一步进行进一步的说明。
协调节点处理query请求的线程池为:
http_server_work
负责该解析功能的类为:org.elasticsearch.rest.action.search.RestSearchAction
- @Override
- public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
- SearchRequest searchRequest = new SearchRequest();
- IntConsumer setSize = size -> searchRequest.source().size(size);
- request.withContentOrSourceParamParserOrNull(parser ->
- parseSearchRequest(searchRequest, request, parser, client.getNamedWriteableRegistry(), setSize));
- 。。。。。。。。。。。。
- };
- }
主要将restquest的参数封装成SearchRequest
这样SearchRequest请求发送给TransportSearchAction处理
将索引涉及到的shard列表或者有跨集群访问相关的shard列表合并
- private void executeSearch(...........) {
- ........
- //本集群的列表分片列表
- localShardIterators = StreamSupport.stream(localShardRoutings.spliterator(), false)
- .map(it -> new SearchShardIterator(
- searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), localIndices))
- .collect(Collectors.toList());
- .......
- //远程集群的分片列表
- final GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardIterators, remoteShardIterators);
- .......
- }
如果有多个分片位于同一个节点,仍然会发送多次请求
- public final void run() {
- ......
- for (final SearchShardIterator iterator : toSkipShardsIts) {
- assert iterator.skip();
- skipShard(iterator);
- }
- ......
- ......
- if (shardsIts.size() > 0) {
- //遍历分片发送请求
- for (int i = 0; i < shardsIts.size(); i++) {
- final SearchShardIterator shardRoutings = shardsIts.get(i);
- assert shardRoutings.skip() == false;
- assert shardItIndexMap.containsKey(shardRoutings);
- int shardIndex = shardItIndexMap.get(shardRoutings);
- //执行shard请求
- performPhaseOnShard(shardIndex, shardRoutings, shardRoutings.nextOrNull());
- }
- ......
shardsIts为搜索涉及的所有分片,而shardRoutings.nextOrNull()会从分片的所有副本分片选出一个分片来请求。
onShardSuccess对收集到的结果进行合并,这里需要检查所有的请求是否都已经有了回复。
然后才会判断要不要进行executeNextPhase
- private void onShardResultConsumed(Result result, SearchShardIterator shardIt) {
- successfulOps.incrementAndGet();
- AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();
- if (shardFailures != null) {
- shardFailures.set(result.getShardIndex(), null);
- }
- successfulShardExecution(shardIt);
- }
- private void successfulShardExecution(SearchShardIterator shardsIt) {
- ......
- //计数器累加
- final int xTotalOps = totalOps.addAndGet(remainingOpsOnIterator);
- //是不是所有分都已经回复,然后调用onPhaseDone();
- if (xTotalOps == expectedTotalOps) {
- onPhaseDone();
- } else if (xTotalOps > expectedTotalOps) {
- throw new AssertionError("unexpected higher total ops [" + xTotalOps + "] compared to expected [" + expectedTotalOps + "]",
- new SearchPhaseExecutionException(getName(), "Shard failures", null, buildShardFailures()));
- }
- }
当返回结果的分片数等于预期的总分片数时,协调节点会进入当前Phase的结束处理,启动下一个阶段Fetch Phase的执行。onPhaseDone()会executeNextPhase来执行下一个阶段。
当触发了executeNextPhase方法将触发fetch阶段
上一步的executeNextPhase方法触发Fetch阶段,Fetch阶段的起点为FetchSearchPhase#innerRun函数,从查询阶段的shard列表中遍历,跳过查询结果为空的 shard。其中也会封装一些分页信息的数据。
- private void executeFetch(....){
- //发送请求
- context.getSearchTransport().sendExecuteFetch(connection, fetchSearchRequest, context.getTask(),
- new SearchActionListener<FetchSearchResult>(shardTarget, shardIndex) {
- //处理成功的消息
- @Override
- public void innerOnResponse(FetchSearchResult result) {
- try {
- progressListener.notifyFetchResult(shardIndex);
- counter.onResult(result);
- } catch (Exception e) {
- context.onPhaseFailure(FetchSearchPhase.this, "", e);
- }
- }
- //处理失败的消息
- @Override
- public void onFailure(Exception e) {
- ........
- }
- });
- }
使用了countDown多线程工具,fetchResults存储某个分片的结果,每收到一个shard的数据就countDoun一下,当都完毕后,触发finishPhase。接着会进行下一步:
CountedCollector:
final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(fetchResults, docIdsToLoad.length, finishPhase, context);
finishPhase:
- final Runnable finishPhase = ()
- -> moveToNextPhase(searchPhaseController, queryResults, reducedQueryPhase, queryAndFetchOptimization ?
- queryResults : fetchResults.getAtomicArray());
执行字段折叠功能,有兴趣可以研究下。即ExpandSearchPhase模块。ES 5.3版本以后支持的Field Collapsing查询。通过该类查询可以轻松实现按Field值进行分类,每个分类获取排名前N的文档。如在菜单行为日志中按菜单名称(用户管理、角色管理等)分类,获取每个菜单排名点击数前十的员工。用户也可以按Field进行Aggregation实现类似功能,但Field Collapsing会更易用、高效。
ExpandSearchPhase执行完了,就返回给客户端结果了。
context.sendSearchResponse(searchResponse, queryResults);
处理数据节点请求的线程池为:search
根据前面的两个阶段,数据节点主要处理协调节点的两类请求:query和fetch
这里响应的请求就是第一阶段的query请求
- transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchRequest::new,
- (request, channel, task) -> {
- //执行查询
- searchService.executeQueryPhase(request, keepStatesInContext(channel.getVersion()), (SearchShardTask) task,
- //注册结果监听器
- new ChannelActionListener<>(channel, QUERY_ACTION_NAME, request));
- });
executeQueryPhase:
- public void executeQueryPhase(ShardSearchRequest request, boolean keepStatesInContext,
- SearchShardTask task, ActionListener<SearchPhaseResult> listener) {
- ...........
- final IndexShard shard = getShard(request);
- rewriteAndFetchShardRequest(shard, request, new ActionListener<ShardSearchRequest>() {
- @Override
- public void onResponse(ShardSearchRequest orig) {
- .......
- //执行真正的请求
- runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task, keepStatesInContext), listener);
- }
- @Override
- public void onFailure(Exception exc) {
- listener.onFailure(exc);
- }
- });
- }
executeQueryPhase会执行loadOrExecuteQueryPhase方法
- private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final SearchContext context) throws Exception {
- final boolean canCache = indicesService.canCache(request, context);
- context.getQueryShardContext().freezeContext();
- if (canCache) {
- indicesService.loadIntoContext(request, context, queryPhase);
- } else {
- queryPhase.execute(context);
- }
- }
这里判断是否从缓存查询,默认启用缓存,缓存的算法默认为LRU,即删除最近最少使用的数据。如果不启用缓存则会执行queryPhase.execute(context);底层调用lucene进行检索,并且进行聚合。
- public void execute(SearchContext searchContext) throws QueryPhaseExecutionException {
- .......
- //聚合预处理
- aggregationPhase.preProcess(searchContext);
- .......
- //全文检索并打分
- rescorePhase.execute(searchContext);
- .......
- //自动补全和纠错
- suggestPhase.execute(searchContext);
- //实现聚合
- aggregationPhase.execute(searchContext);
- .......
-
- }
关键点:
- transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SAME, true, true, ShardFetchSearchRequest::new,
- (request, channel, task) -> {
- searchService.executeFetchPhase(request, (SearchShardTask) task,
- new ChannelActionListener<>(channel, FETCH_ID_ACTION_NAME, request));
- });
ElasticSearch查询分为两类,一类为GET,另一类为SEARCH。它们使用场景不同。
本文主要分析了ElasticSearch分布式查询主体流程,并未对lucene部分进行分析,有兴趣的可以自行查找相关资料。
程序员的核心竞争力其实还是技术,因此对技术还是要不断的学习,关注 “IT巅峰技术” 公众号 ,该公众号内容定位:中高级开发、架构师、中层管理人员等中高端岗位服务的,除了技术交流外还有很多架构思想和实战案例。
作者是 《 消息中间件 RocketMQ 技术内幕》 一书作者,同时也是 “RocketMQ 上海社区”联合创始人,曾就职于拼多多、德邦等公司,现任上市快递公司架构负责人,主要负责开发框架的搭建、中间件相关技术的二次开发和运维管理、混合云及基础服务平台的建设。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。