赞
踩
protected ShardValidateQueryResponse shardOperation(ShardValidateQueryRequest request, Task task) throws IOException { boolean valid; String explanation = null; String error = null; ShardSearchRequest shardSearchLocalRequest = new ShardSearchRequest( request.shardId(), request.nowInMillis(), request.filteringAliases() ); SearchContext searchContext = searchService.createSearchContext(shardSearchLocalRequest, SearchService.NO_TIMEOUT); try { ParsedQuery parsedQuery = searchContext.getSearchExecutionContext().toQuery(request.query()); searchContext.parsedQuery(parsedQuery); searchContext.preProcess(); valid = true; explanation = explain(searchContext, request.rewrite()); } catch (QueryShardException | ParsingException e) { valid = false; error = e.getDetailedMessage(); } catch (AssertionError e) { valid = false; error = e.getMessage(); } finally { Releasables.close(searchContext); } return new ShardValidateQueryResponse(request.shardId(), valid, explanation, error); }
public ValidateQueryRequestBuilder setQuery(QueryBuilder queryBuilder) {
request.query(queryBuilder);
return this;
}
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client)
ValidateQueryRequest validateQueryRequest = new ValidateQueryRequest(Strings.splitStringByCommaToArray(request.param("index")))
validateQueryRequest.query(RestActions.urlParamsToQueryBuilder(request));
String queryString = request.param("q");
QueryStringQueryBuilder queryBuilder = QueryBuilders.queryStringQuery(queryString);
queryBuilder.analyzer(request.param("analyzer"));
channel -> {handleException(validateQueryRequest, finalBodyParsingException.getMessage(), channel)};
调用子类的 doToQuery 函数
public final Query toQuery(SearchExecutionContext context) throws IOException {
Query query = doToQuery(context);
if (query != null) {
if (boost != DEFAULT_BOOST) {
if (query instanceof MatchNoDocsQuery == false) {
query = new BoostQuery(query, boost);
}
}
if (queryName != null) {
context.addNamedQuery(queryName, query);
}
}
return query;
}
获取QueryStringQueryBuilder
public static QueryBuilder urlParamsToQueryBuilder(RestRequest request) { String queryString = request.param("q"); if (queryString == null) { List<String> unconsumedParams = Arrays.stream(queryStringParams).filter(key -> request.param(key) != null).toList(); if (unconsumedParams.isEmpty() == false) { // this would lead to a non-descriptive error from RestBaseHandler#unrecognized later, so throw a better IAE here throw new IllegalArgumentException( String.format( Locale.ROOT, "request [%s] contains parameters %s but missing query string parameter 'q'.", request.path(), unconsumedParams.toString() ) ); } return null; } QueryStringQueryBuilder queryBuilder = QueryBuilders.queryStringQuery(queryString); queryBuilder.defaultField(request.param("df")); queryBuilder.analyzer(request.param("analyzer")); queryBuilder.analyzeWildcard(request.paramAsBoolean("analyze_wildcard", false)); queryBuilder.lenient(request.paramAsBoolean("lenient", null)); String defaultOperator = request.param("default_operator"); if (defaultOperator != null) { queryBuilder.defaultOperator(Operator.fromString(defaultOperator)); } return queryBuilder; }
获取到 QueryStringQueryParser
protected Query doToQuery(SearchExecutionContext context)
QueryStringQueryParser queryParser;
queryParser = new QueryStringQueryParser(context, resolvedFields, isLenient);
SearchExecutionContext context = createSearchExecutionContext();
QB firstQuery = createTestQueryBuilder();
QB controlQuery = copyQuery(firstQuery);
QueryBuilder rewritten = rewriteQuery(firstQuery, new SearchExecutionContext(context));
Query firstLuceneQuery = rewritten.toQuery(context);
public interface Rewriteable<T>{ static <T extends Rewriteable<T>> void rewriteAndFetch( T original, QueryRewriteContext context, ActionListener<T> rewriteResponse, int iteration ) { T builder = original; try { for (T rewrittenBuilder = builder.rewrite(context); rewrittenBuilder != builder; rewrittenBuilder = builder.rewrite(context)) { builder = rewrittenBuilder; if (iteration++ >= MAX_REWRITE_ROUNDS) { // this is some protection against user provided queries if they don't obey the contract of rewrite we allow 16 rounds // and then we fail to prevent infinite loops throw new IllegalStateException( "too many rewrite rounds, rewriteable might return new objects even if they are not " + "rewritten" ); } if (context.hasAsyncActions()) { T finalBuilder = builder; final int currentIterationNumber = iteration; context.executeAsyncActions( ActionListener.wrap( n -> rewriteAndFetch(finalBuilder, context, rewriteResponse, currentIterationNumber), rewriteResponse::onFailure ) ); return; } } rewriteResponse.onResponse(builder); } catch (IOException | IllegalArgumentException | ParsingException ex) { rewriteResponse.onFailure(ex); } } }
public abstract class AbstractQueryBuilder<QB extends AbstractQueryBuilder<QB>> implements QueryBuilder static Collection<Query> toQueries(Collection<QueryBuilder> queryBuilders, SearchExecutionContext context) throws QueryShardException, IOException { List<Query> queries = new ArrayList<>(queryBuilders.size()); for (QueryBuilder queryBuilder : queryBuilders) { Query query = queryBuilder.rewrite(context).toQuery(context); if (query != null) { queries.add(query); } } return queries; } public final Query toQuery(SearchExecutionContext context) throws IOException { Query query = doToQuery(context); if (query != null) { if (boost != DEFAULT_BOOST) { if (query instanceof MatchNoDocsQuery == false) { query = new BoostQuery(query, boost); } } if (queryName != null) { context.addNamedQuery(queryName, query); } } return query; }
MultiMatchQueryParser parser = new MultiMatchQueryParser(searchExecutionContext); Map<String, Float> fieldNames = new HashMap<>(); fieldNames.put("field", 1.0f); fieldNames.put("field_split", 1.0f); fieldNames.put("field_normalizer", 1.0f); fieldNames.put("field_split_normalizer", 1.0f); Query query = parser.parse(MultiMatchQueryBuilder.Type.BEST_FIELDS, fieldNames, "Foo Bar", null); DisjunctionMaxQuery expected = new DisjunctionMaxQuery( Arrays.asList( new TermQuery(new Term("field_normalizer", "foo bar")), new TermQuery(new Term("field", "Foo Bar")), new BooleanQuery.Builder().add(new TermQuery(new Term("field_split", "Foo")), BooleanClause.Occur.SHOULD) .add(new TermQuery(new Term("field_split", "Bar")), BooleanClause.Occur.SHOULD) .build(), new BooleanQuery.Builder().add(new TermQuery(new Term("field_split_normalizer", "foo")), BooleanClause.Occur.SHOULD) .add(new TermQuery(new Term("field_split_normalizer", "bar")), BooleanClause.Occur.SHOULD) .build() ), 0.0f );
public class MultiMatchQueryBuilder extends AbstractQueryBuilder<MultiMatchQueryBuilder>
protected Query doToQuery(SearchExecutionContext context){
MultiMatchQueryParser multiMatchQuery = new MultiMatchQueryParser(context);
multiMatchQuery.setAnalyzer(analyzer);
multiMatchQuery.setOccur(operator.toBooleanClauseOccur());
multiMatchQuery.setTieBreaker(tieBreaker);
return multiMatchQuery.parse(type, newFieldsBoosts, value, minimumShouldMatch);
}
public class MultiMatchQueryParser extends MatchQueryParser {
public Query parse(MultiMatchQueryBuilder.Type type, Map<String, Float> fieldNames, Object value, String minimumShouldMatch)
final List<Query> queries = switch (type) {
case PHRASE, PHRASE_PREFIX, BEST_FIELDS, MOST_FIELDS, BOOL_PREFIX -> buildFieldQueries(
type,
fieldNames,
value,
minimumShouldMatch
);
case CROSS_FIELDS -> buildCrossFieldQuery(fieldNames, value, minimumShouldMatch, tieBreaker);
};
return combineGrouped(queries, tieBreaker);
return new DisjunctionMaxQuery(groupQuery, tieBreaker);
}
跨字段召回的 核心代码,生成一堆 List should链接
叶子是 CrossFieldsQueryBuilder
Analyzer analyzer = new StandardAnalyzer(); Path indexPath = Files.createTempDirectory("tempIndex"); Directory directory = FSDirectory.open(indexPath) IndexWriterConfig config = new IndexWriterConfig(analyzer); IndexWriter iwriter = new IndexWriter(directory, config); Document doc = new Document(); String text = "This is the text to be indexed."; doc.add(new Field("fieldname", text, TextField.TYPE_STORED)); iwriter.addDocument(doc); iwriter.close(); // Now search the index: DirectoryReader ireader = DirectoryReader.open(directory); IndexSearcher isearcher = new IndexSearcher(ireader); // Parse a simple query that searches for "text": QueryParser parser = new QueryParser("fieldname", analyzer); Query query = parser.parse("text"); ScoreDoc[] hits = isearcher.search(query, 10).scoreDocs; assertEquals(1, hits.length); // Iterate through the results: for (int i = 0; i < hits.length; i++) { Document hitDoc = isearcher.doc(hits[i].doc); assertEquals("This is the text to be indexed.", hitDoc.get("fieldname")); } ireader.close(); directory.close(); IOUtils.rm(indexPath);
public TransportSearchAction( ThreadPool threadPool, CircuitBreakerService circuitBreakerService, TransportService transportService, SearchService searchService, SearchTransportService searchTransportService, SearchPhaseController searchPhaseController, ClusterService clusterService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, NamedWriteableRegistry namedWriteableRegistry, ExecutorSelector executorSelector ) SearchTransportService.registerRequestHandler(transportService, searchService); transportService.registerRequestHandler( QUERY_ID_ACTION_NAME, ThreadPool.Names.SAME, QuerySearchRequest::new, (request, channel, task) -> { searchService.executeQueryPhase( request, (SearchShardTask) task, new ChannelActionListener<>(channel, QUERY_ID_ACTION_NAME, request) ); } ); public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, ActionListener<QuerySearchResult> listener) runAsync(getExecutor(readerContext.indexShard()), () -> { readerContext.setAggregatedDfs(request.dfs()); try ( SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, true); SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext) ) { searchContext.searcher().setAggregatedDfs(request.dfs()); QueryPhase.execute(searchContext); if (searchContext.queryResult().hasSearchContext() == false && readerContext.singleSession()) { // no hits, we can release the context since there will be no fetch phase freeReaderContext(readerContext.id()); } executor.success(); // Pass the rescoreDocIds to the queryResult to send them the coordinating node and receive them back in the fetch phase. // We also pass the rescoreDocIds to the LegacyReaderContext in case the search state needs to stay in the data node. final RescoreDocIds rescoreDocIds = searchContext.rescoreDocIds(); searchContext.queryResult().setRescoreDocIds(rescoreDocIds); readerContext.setRescoreDocIds(rescoreDocIds); return searchContext.queryResult(); } catch (Exception e) { assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e); logger.trace("Query phase failed", e); // we handle the failure in the failure listener below throw e; } }, wrapFailureListener(listener, readerContext, markAsUsed)); /* Query phase of a search request, used to run the query and get back from each shard information about the matching documents * (document ids and score or sort criteria) so that matches can be reduced on the coordinating node */ QueryPhase类 负责执行搜索,获取结果 public static void execute(SearchContext searchContext){ AggregationPhase.preProcess(searchContext); // 预处理 boolean rescore = executeInternal(searchContext); // 执行 QuerySearchResult queryResult = searchContext.queryResult(); // Query query = searchContext.rewrittenQuery(); // 构造query boolean shouldRescore = searchWithCollector(searchContext, searcher, query, collectors, hasFilterCollector, timeoutSet); searcher.search(query, queryCollector); // 调用lucene的api ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH); if (rescore) { // only if we do a regular search RescorePhase.execute(searchContext); } SuggestPhase.execute(searchContext); AggregationPhase.execute(searchContext); searchContext.queryResult().profileResults(searchContext.getProfilers().buildQueryPhaseResults()); } public class ContextIndexSearcher extends IndexSearcher implements Releasable // lucene api void search(Query query, Collector results) query = rewrite(query); // 重写query Query query = original; for (Query rewrittenQuery = query.rewrite(reader); rewrittenQuery != query; rewrittenQuery = query.rewrite(reader)) { query = rewrittenQuery; } search(leafContexts, createWeight(query, results.scoreMode(), 1), results); weight = wrapWeight(weight); collector.setWeight(weight); for (LeafReaderContext ctx : leaves) { // search each subreader searchLeaf(ctx, weight, collector); leafCollector = collector.getLeafCollector(ctx); Bits liveDocs = ctx.reader().getLiveDocs(); BitSet liveDocsBitSet = getSparseBitSetOrNull(liveDocs); BulkScorer bulkScorer = weight.bulkScorer(ctx); bulkScorer.score(leafCollector, liveDocs); } public Query rewrite(IndexReader reader) throws IOException { if (termArrays.length == 0) { return new MatchNoDocsQuery("empty MultiPhraseQuery"); } else if (termArrays.length == 1) { // optimize one-term case Term[] terms = termArrays[0]; BooleanQuery.Builder builder = new BooleanQuery.Builder(); for (Term term : terms) { builder.add(new TermQuery(term), BooleanClause.Occur.SHOULD); } return builder.build(); } else { return super.rewrite(reader); } }
如果用户需要对搜索结果自定义一套评分机制,并且获得更强的控制力,可以自定义一套query,weight和scorer对象树。
query对象树是对用户查询信息(比如,布尔逻辑表达式)的抽象。
weight对象树是对query内部统计信息(比如,TF-IDF值)的抽象。
scorer对象树提供了评分与解说评分的接口能力。
将子查询结果按照must, should, filter, must_not分类.
根据should, must, filter数量进行:
pure conjunction.
pure disjunction.
conjunction-disjunction mix.
查询语句的组合大致归为如下几类:
多个must的组合,比如 “+leon +Andy”,经过query->weight->scorer对象树转换,会生成ConjunctionScorer对象树,即对所有叶子节点的倒排表求交集。
must与must_not的组合,比如“+leon -Andy”,经过query->weight->scorer对象树转换,会生成ReqExclScorer(required,exclusive)对象树,即返回must的倒排表,同时删除must_not倒排表中的文档。
must与should的组合,比如“+Leon Andy”,经过query->weight->scorer对象树转换,会生成ReqOptSumScorer(required,optional)对象树,即返回must的倒排表,如果文档也出现在should的倒排表中,增加对应文档的得分。
多个should的组合,比如“Leon Andy”,经过query->weight->scorer对象树转换,会生成DisjunctionSumScorer对象树,即对所有叶子节点的倒排表求并集。
should与must_not的组合,比如“Leon -Andy”,经过query->weight->scorer对象树转换,会生成ReqExclScorer对象树,同上。
ConjunctionScorer是一棵组合查询树,对应布尔逻辑与运算,要求将所有的term找出的倒排索引数据集postings进行取交集。
我们以二个倒排索引数据集取交集进行说明。一个显而易见的方法是二层for循环,外层循环遍历其中一个posting list1,然后内层循环对另外一个posting list2元素进行遍历。这种取交集的需要比较的次数是O( num(posting list1) * num(posting list2))。
假设,一个索引包含100W篇文档,每篇文档有1000个词,根据经验100W文档集中大概有50W数量不同的term分词,那么每个term分词对应的posting倒排列表长度为2000 (100W * 1000 / 50W )。
使用上述二层循环时间复杂度是O(2000 * 2000) = 400W。大致比较400W次才能返回检索的结果。事实上,在Web互联网中,海量数据比当前举例规模要大得多,查询比较的次数就更加多了。那么,有没有更加高效的算法呢?
上面算法忽略了一个事实,Lucene中任意term对应的docID列表在frq文件存储是以doc-id有序排列的,它的排序方法是根据全局统一的docID来对posting list排序存储。
对多个postings求交集, advance时, 使用skipper.
approximations: 每个子查询的结果集.
BlockMaxConjunctionScorer$DocIdSetIterator.
如果每个approximation都有doc, 则命中该doc.
score: 该doc在每个子查询的score求和。
moveToNextBlock:target > upTo(当前block最大doc)时,move to next block。
advanceShallow(target):每个子结果都move到包含target的block。
BlockImpactsDocsEnum 跳表实现
advanceShallow(target):advance到包含target的block上.
SlowImpactsEnum 对于结果数少的postings,不需要skipping
ConjunctionScorer类中有成员变量scorers,它是一个数组。代表查询语句树中每个叶子节点,即代表每一个原子term对应的termScorer对象。
比如,仍然是”Leon and Andy and Shawn“这个查询语句,那么ConjunctionScorer.scorers数组有三个元素。每个元素是一个termScorer对象,分别代表分词Term(“Leon”),Term(“Andy”)和Term(“Shawn”)的scorer对象。前面分析过,TermScorer具备获取posting倒排表的接口。TermScorer对象中分别定义的docs与freqs数组,用来存储某个词term对应的<docID,docFreq>倒排索引信息
使用heap, 对多个postings求并集.
DisiPriorityQueue
取堆top, updateTop, 如果newTop等于top, 继续updateTop.
”Leon and Andy and Shawn“这个查询语句,DisjunctionSumScorer.subScorers成员变量是一个链表,每个元素同样代表一个term对应的posting倒排列表。DisjunctionSumScorer对这些倒排表进行并集运算,然后将并集中的文档号在nextDoc函数中返回
DisjunctionSumScorer, DisjunctionMaxScorer
postings合并后, 评分阶段, 一个doc有多个term时, 用来评分, sum或max。
当terms数量不超过16时, rewrite成should boolean query.
否则使用DocIdSetBuilder构建bit set.
TermInSetQuery$ConstantScoreWeight.rewrite
DocIdSetBuilder
add(iter):
当前已有bitset, 如果iter也是FixedBitSet(bitmap, 而非roaring bitmap), 两个bitset求OR; 否则遍历iter, set到bitset.
否则遍历iter add到buffer或者达到threshold转成bitset。
遍历docid的类
ReqExclScorer有成员变量Scorer reqScorer表示必须满足的部分(required),成员变量DocIdSetIterator exclDisi表示必须不能满足的部分,ReqExclScorer就是返回reqScorer和exclDisi的倒排表的差集,也即在reqScorer的倒排表中排除exclDisi中的文档号。
当nextDoc()调用的时候,首先取得reqScorer的第一个文档号,然后toNonExcluded()函数则判断此文档号是否被exclDisi排除掉,如果没有,则返回此文档号,如果排除掉,则取下一个文档号,看是否被排除掉,依次类推,直到找到一个文档号,或者返回NO_MORE_DOCS。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。