jasper的技术小窝

关注DevOps、运维监控、Python、Golang、开源、大数据、web开发、互联网

elasticsearch源码分析之search模块(server端)

作者:jasper | 分类:ElasticSearch | 标签:   | 阅读 1971 次 | 发布:2016-04-04 10:34 p.m.

继续接着上一篇的来说啊,当client端将search的请求发送到某一个node之后,剩下的事情就是server端来处理了,具体包括哪些步骤呢?

过程

一、首先我们来看看接收地方其实就是在org.elasticsearch.action.search.TransportSearchAction中,收到请求之后会判断请求的index的shard是否只有一个,如果是一个的话,那么会强制将请求的type设置为QUERY_AND_FETCH,因为所以的事情在此shard上就能够做完了。所以如果设置了routing,而让请求落在了一个shard上时,搜索的效率会高很多的原因。

二、根据不同的type来确定不同的处理方式,这里补充一下,上一篇可能忘记说了,search的type一般来说分为“DFS_QUERY_THEN_FETCH、QUERY_THEN_FETCH、DFS_QUERY_AND_FETCH、QUERY_AND_FETCH”这四种,还有“SCAN、COUNT”在ES2.X里面其实已经被舍弃掉了。我们一般都是用的默认的QUERY_THEN_FETCH,上面说的一个shard的除外。所以本篇就只讨论这种情况了。

三、得到搜索的index所涉及的shard,并依次执行: 1、获取该shard所在的node并执行sendExecuteFirstPhase,实际上是向node发送了一个“QUERY”的请求:

transportService.sendRequest(node, QUERY_ACTION_NAME, request, new ActionListenerResponseHandler<QuerySearchResultProvider>(listener) {
    @Override
    public QuerySearchResult newInstance() {
        return new QuerySearchResult();
    }
});

2、node接收到"QUERY"的请求之后,执行executeQueryPhase 首先是创建一个search的context,

SearchContext context = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, engineSearcher, indexService, indexShard, scriptService, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter(), parseFieldMatcher, defaultSearchTimeout);

创建的具体过程就不详细说了,之后做的事情还是有parseSource、对size做判断(2.X里面最大不超过10000,可以通过配置文件配置)、……

最重要的其实是loadOrExecuteQueryPhase(request, context, queryPhase);,具体的内容是首先从cache里面执行query,如果cache里面没有找到,才会执行queryPhase:queryPhase.execute(context);;里面的处理逻辑就比较复杂了,但是最重要的是searcher.search(query, collector);,其实是调用了Lucene里面IndexSeartcher的search方法。

3、如此一来,第一阶段的query已经做完了,,接下来便是fetch的执行,入口在onFirstPhaseResult这里,在底层同样是向node发送一个“FETCH”请求咯:

 searchService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener<FetchSearchResult>() {
                @Override
                public void onResponse(FetchSearchResult result) {
                    result.shardTarget(shardTarget);
                    fetchResults.set(shardIndex, result);
                    if (counter.decrementAndGet() == 0) {
                        finishHim();
                    }
                }
        …………

4、node接收到“fetch”请求之后,执行executeFetchPhase:

fetch的核心代码如下:

for (int index = 0; index < context.docIdsToLoadSize(); index++) {
    int docId = context.docIdsToLoad()[context.docIdsToLoadFrom() + index];
    int readerIndex = ReaderUtil.subIndex(docId, context.searcher().getIndexReader().leaves());
    LeafReaderContext subReaderContext = context.searcher().getIndexReader().leaves().get(readerIndex);
    int subDocId = docId - subReaderContext.docBase;

    final InternalSearchHit searchHit;
    try {
        int rootDocId = findRootDocumentIfNested(context, subReaderContext, subDocId);
        if (rootDocId != -1) {
            searchHit = createNestedSearchHit(context, docId, subDocId, rootDocId, extractFieldNames, loadAllStored, fieldNames, subReaderContext);
        } else {
            searchHit = createSearchHit(context, fieldsVisitor, docId, subDocId, extractFieldNames, subReaderContext);
        }
    } catch (IOException e) {
        throw ExceptionsHelper.convertToElastic(e);
    }

    hits[index] = searchHit;
    hitContext.reset(searchHit, subReaderContext, subDocId, context.searcher());
    for (FetchSubPhase fetchSubPhase : fetchSubPhases) {
        if (fetchSubPhase.hitExecutionNeeded(context)) {
            fetchSubPhase.hitExecute(context, hitContext);
        }
    }
}

大意就是轮流通过之前query结果中的docid,然后创建出InternalSearchHit的集合,并将之放在fetchResult中context.fetchResult().hits(new InternalSearchHits(hits, context.queryResult().topDocs().totalHits, context.queryResult().topDocs().getMaxScore()));,并将之返回到发送fetch的node。

四、到目前为止,该获取的数据都已经拿到了,现在要做的则是要把个node的返回结果做merge,merge的操作由SearchPhaseController来控制:

final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,
        fetchResults, request);

具体的过程就不细说了,大体就是该排序的就做排序,有aggs的就做aggs……

然后我们来注意一下,返回的结果张啥样吧:

private InternalSearchResponse internalResponse;

private String scrollId;

private int totalShards;

private int successfulShards;

private ShardSearchFailure[] shardFailures;

private long tookInMillis;

其中InternalSearchResponse则为:

private InternalSearchHits hits;

private InternalAggregations aggregations;

private Suggest suggest;

private boolean timedOut;

五、通过listener将上面的结果返回:listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));给发出接收search请求的node,也就是上一篇说道的client。

总结

这样知道了,为什么返回的结果长那么个鬼样子了。整个过程的话算是走马观花地走了一遍了,其实里面还有很多detail的东西没用讲到,看一张图就知道了:

包括他们分别的具体实现什么的,所以一个查询牵扯到的东西实在太多,等有时间再去深究,可都是财富。


转载请注明出处:http://www.opscoder.info/es_search_server.html

其他分类: