ES系列(五):获取单条数据get处理过程实现
阅读原文时间:2021年05月24日阅读:2

  前面讲的都是些比较大的东西,即框架层面的东西。今天咱们来个轻松点的,只讲一个点:如题,get单条记录的es查询实现。

1. get语义说明

  get是用于搜索单条es的数据,是根据主键id查询数据方式。类比关系型数据库中的sql则相当于:

select * from test where id = #{id};

  当然了,es中每个关键词,都有相当多的附加描述词汇。比如:指定输出字段,版本号。。。

2. get的实现简要说明

  从语义上讲,get的结果至多只有一条记录。所以,虽然es是集群存储数据的,但此处都需要从某节点取得一条数据即可。所以,理论上,只要能够快速定位到数据在哪个es节点上,然后向其发起请求,即可获取到结果了。

  另外,对于使用主键id来进行查询数据,只要数据结构设计得当,应该会有非常高效的查询能力。

  所以,通过本功能的实现方式分析,我们可以简要理解es key的分布方式。

3. get的具体实现

  get只是es语法中的一个小点,根据上一节我们分析,知道了如何可以找到处理get的处理器。此处,我们简单再回顾下:

// org.elasticsearch.rest.RestController#dispatchRequest  
@Override  
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {  
    try {  
        // 尝试所有可能的处理器  
        tryAllHandlers(request, channel, threadContext);  
    } catch (Exception e) {  
        try {  
            // 发生异常则响应异常信息  
            channel.sendResponse(new BytesRestResponse(channel, e));  
        } catch (Exception inner) {  
            inner.addSuppressed(e);  
            logger.error(() ->  
                new ParameterizedMessage("failed to send failure response for uri \[{}\]", request.uri()), inner);  
        }  
    }  
}  
// org.elasticsearch.rest.RestController#tryAllHandlers  
private void tryAllHandlers(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) throws Exception {  
    // 读取 header 信息  
    for (final RestHeaderDefinition restHeader : headersToCopy) {  
        final String name = restHeader.getName();  
        final List<String> headerValues = request.getAllHeaderValues(name);  
        if (headerValues != null && headerValues.isEmpty() == false) {  
            final List<String> distinctHeaderValues = headerValues.stream().distinct().collect(Collectors.toList());  
            if (restHeader.isMultiValueAllowed() == false && distinctHeaderValues.size() > 1) {  
                channel.sendResponse(  
                    BytesRestResponse.  
                        createSimpleErrorResponse(channel, BAD\_REQUEST, "multiple values for single-valued header \[" + name + "\]."));  
                return;  
            } else {  
                threadContext.putHeader(name, String.join(",", distinctHeaderValues));  
            }  
        }  
    }  
    // error\_trace cannot be used when we disable detailed errors  
    // we consume the error\_trace parameter first to ensure that it is always consumed  
    if (request.paramAsBoolean("error\_trace", false) && channel.detailedErrorsEnabled() == false) {  
        channel.sendResponse(  
                BytesRestResponse.createSimpleErrorResponse(channel, BAD\_REQUEST, "error traces in responses are disabled."));  
        return;  
    }

    final String rawPath = request.rawPath();  
    final String uri = request.uri();  
    final RestRequest.Method requestMethod;  
    try {  
        // Resolves the HTTP method and fails if the method is invalid  
        requestMethod = request.method();  
        // Loop through all possible handlers, attempting to dispatch the request  
        // 获取可能的处理器,主要是有正则或者索引变量的存在,可能匹配多个处理器  
        Iterator<MethodHandlers> allHandlers = getAllHandlers(request.params(), rawPath);  
        while (allHandlers.hasNext()) {  
            final RestHandler handler;  
            // 一个处理器里支持多种请求方法  
            final MethodHandlers handlers = allHandlers.next();  
            if (handlers == null) {  
                handler = null;  
            } else {  
                handler = handlers.getHandler(requestMethod);  
            }  
            if (handler == null) {  
              // 未找到处理器不代表不能处理,有可能需要继续查找,如果确定不能处理,则直接响应客户端返回  
              if (handleNoHandlerFound(rawPath, requestMethod, uri, channel)) {  
                  return;  
              }  
            } else {  
                // 找到了处理器,调用其方法  
                dispatchRequest(request, channel, handler);  
                return;  
            }  
        }  
    } catch (final IllegalArgumentException e) {  
        handleUnsupportedHttpMethod(uri, null, channel, getValidHandlerMethodSet(rawPath), e);  
        return;  
    }  
    // If request has not been handled, fallback to a bad request error.  
    // 降级方法调用  
    handleBadRequest(uri, requestMethod, channel);  
}  
    // org.elasticsearch.rest.RestController#dispatchRequest  
private void dispatchRequest(RestRequest request, RestChannel channel, RestHandler handler) throws Exception {  
    final int contentLength = request.contentLength();  
    if (contentLength > 0) {  
        final XContentType xContentType = request.getXContentType();  
        if (xContentType == null) {  
            sendContentTypeErrorMessage(request.getAllHeaderValues("Content-Type"), channel);  
            return;  
        }  
        if (handler.supportsContentStream() && xContentType != XContentType.JSON && xContentType != XContentType.SMILE) {  
            channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(channel, RestStatus.NOT\_ACCEPTABLE,  
                "Content-Type \[" + xContentType + "\] does not support stream parsing. Use JSON or SMILE instead"));  
            return;  
        }  
    }  
    RestChannel responseChannel = channel;  
    try {  
        // 熔断判定  
        if (handler.canTripCircuitBreaker()) {  
            inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http\_request>");  
        } else {  
            inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);  
        }  
        // iff we could reserve bytes for the request we need to send the response also over this channel  
        responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);  
        // TODO: Count requests double in the circuit breaker if they need copying?  
        if (handler.allowsUnsafeBuffers() == false) {  
            request.ensureSafeBuffers();  
        }  
        if (handler.allowSystemIndexAccessByDefault() == false && request.header(ELASTIC\_PRODUCT\_ORIGIN\_HTTP\_HEADER) == null) {  
            // The ELASTIC\_PRODUCT\_ORIGIN\_HTTP\_HEADER indicates that the request is coming from an Elastic product with a plan  
            // to move away from direct access to system indices, and thus deprecation warnings should not be emitted.  
            // This header is intended for internal use only.  
            client.threadPool().getThreadContext().putHeader(SYSTEM\_INDEX\_ACCESS\_CONTROL\_HEADER\_KEY, Boolean.FALSE.toString());  
        }  
        // 调用handler处理方法,该handler可能会被过滤器先执行  
        handler.handleRequest(request, responseChannel, client);  
    } catch (Exception e) {  
        responseChannel.sendResponse(new BytesRestResponse(responseChannel, e));  
    }  
}  
// org.elasticsearch.xpack.security.rest.SecurityRestFilter#handleRequest  
@Override  
public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {  
    if (licenseState.isSecurityEnabled() && request.method() != Method.OPTIONS) {  
        // CORS - allow for preflight unauthenticated OPTIONS request  
        if (extractClientCertificate) {  
            HttpChannel httpChannel = request.getHttpChannel();  
            SSLEngineUtils.extractClientCertificates(logger, threadContext, httpChannel);  
        }

        final String requestUri = request.uri();  
        authenticationService.authenticate(maybeWrapRestRequest(request), ActionListener.wrap(  
            authentication -> {  
                if (authentication == null) {  
                    logger.trace("No authentication available for REST request \[{}\]", requestUri);  
                } else {  
                    logger.trace("Authenticated REST request \[{}\] as {}", requestUri, authentication);  
                }  
                secondaryAuthenticator.authenticateAndAttachToContext(request, ActionListener.wrap(  
                    secondaryAuthentication -> {  
                        if (secondaryAuthentication != null) {  
                            logger.trace("Found secondary authentication {} in REST request \[{}\]", secondaryAuthentication, requestUri);  
                        }  
                        RemoteHostHeader.process(request, threadContext);  
                        restHandler.handleRequest(request, channel, client);  
                    },  
                    e -> handleException("Secondary authentication", request, channel, e)));  
            }, e -> handleException("Authentication", request, channel, e)));  
    } else {  
        // 转发到下一处理器责任链  
        restHandler.handleRequest(request, channel, client);  
    }  
}

  一般地,很多具体的处理器都会继承 BaseRestHandler, 即 handleRequest() 整体调用时序图如下:

handleRequest() 整体调用时序图

  具体代码实现如下:

// org.elasticsearch.rest.BaseRestHandler#handleRequest  
@Override  
public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {  
    // prepare the request for execution; has the side effect of touching the request parameters  
    // 看起来叫准备请求,实际非常重要,它会组装后续的请求逻辑  
    final RestChannelConsumer action = prepareRequest(request, client);

    // validate unconsumed params, but we must exclude params used to format the response  
    // use a sorted set so the unconsumed parameters appear in a reliable sorted order  
    final SortedSet<String> unconsumedParams =  
        request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));

    // validate the non-response params  
    if (!unconsumedParams.isEmpty()) {  
        final Set<String> candidateParams = new HashSet<>();  
        candidateParams.addAll(request.consumedParams());  
        candidateParams.addAll(responseParams());  
        throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));  
    }

    if (request.hasContent() && request.isContentConsumed() == false) {  
        throw new IllegalArgumentException("request \[" + request.method() + " " + request.path() + "\] does not support having a body");  
    }

    usageCount.increment();  
    // execute the action  
    // 即此处仅为调用前面设置好的方法  
    action.accept(channel);  
}

  而处理get的处理器,我们实际上可以通过在最初注册的时候,可以看到是 RestGetAction, 它实现的 prepareRequest() 体现了其处理方法。

// org.elasticsearch.rest.action.document.RestGetAction#prepareRequest  
@Override  
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {  
    GetRequest getRequest;  
    if (request.hasParam("type")) {  
        deprecationLogger.deprecate("get\_with\_types", TYPES\_DEPRECATION\_MESSAGE);  
        getRequest = new GetRequest(request.param("index"), request.param("type"), request.param("id"));  
    } else {  
        getRequest = new GetRequest(request.param("index"), request.param("id"));  
    }

    getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh()));  
    getRequest.routing(request.param("routing"));  
    getRequest.preference(request.param("preference"));  
    getRequest.realtime(request.paramAsBoolean("realtime", getRequest.realtime()));  
    if (request.param("fields") != null) {  
        throw new IllegalArgumentException("the parameter \[fields\] is no longer supported, " +  
            "please use \[stored\_fields\] to retrieve stored fields or \[\_source\] to load the field from \_source");  
    }  
    final String fieldsParam = request.param("stored\_fields");  
    if (fieldsParam != null) {  
        final String\[\] fields = Strings.splitStringByCommaToArray(fieldsParam);  
        if (fields != null) {  
            getRequest.storedFields(fields);  
        }  
    }

    getRequest.version(RestActions.parseVersion(request));  
    getRequest.versionType(VersionType.fromString(request.param("version\_type"), getRequest.versionType()));

    getRequest.fetchSourceContext(FetchSourceContext.parseFromRestRequest(request));  
    // 封装具体业务处理方法  
    // 交由 NodeClient 处理  
    return channel -> client.get(getRequest, new RestToXContentListener<GetResponse>(channel) {  
        @Override  
        protected RestStatus getStatus(final GetResponse response) {  
            return response.isExists() ? OK : NOT\_FOUND;  
        }  
    });  
}  
// org.elasticsearch.search.fetch.subphase.FetchSourceContext#parseFromRestRequest  
public static FetchSourceContext parseFromRestRequest(RestRequest request) {  
    Boolean fetchSource = null;  
    String\[\] sourceExcludes = null;  
    String\[\] sourceIncludes = null;

    String source = request.param("\_source");  
    if (source != null) {  
        if (Booleans.isTrue(source)) {  
            fetchSource = true;  
        } else if (Booleans.isFalse(source)) {  
            fetchSource = false;  
        } else {  
            sourceIncludes = Strings.splitStringByCommaToArray(source);  
        }  
    }

    String sIncludes = request.param("\_source\_includes");  
    if (sIncludes != null) {  
        sourceIncludes = Strings.splitStringByCommaToArray(sIncludes);  
    }

    String sExcludes = request.param("\_source\_excludes");  
    if (sExcludes != null) {  
        sourceExcludes = Strings.splitStringByCommaToArray(sExcludes);  
    }

    if (fetchSource != null || sourceIncludes != null || sourceExcludes != null) {  
        return new FetchSourceContext(fetchSource == null ? true : fetchSource, sourceIncludes, sourceExcludes);  
    }  
    return null;  
}

  上面就是get的处理实现前奏,可以看出其支持的参数:type, id, refresh, routing, preference, fields, stored_fields, version_type, _source, _source_includes… 果然还是有点复杂,选择越多,越麻烦。

  其中,url的请求方式,我们可以从es的route信息中看到:

// org.elasticsearch.rest.action.document.RestGetAction#routes  
@Override  
public List<Route> routes() {  
    return unmodifiableList(asList(  
        new Route(GET, "/{index}/\_doc/{id}"),  
        new Route(HEAD, "/{index}/\_doc/{id}"),  
        // Deprecated typed endpoints.  
        new Route(GET, "/{index}/{type}/{id}"),  
        new Route(HEAD, "/{index}/{type}/{id}")));  
}

  最终,会得到一个 TransportGetAction 的内部处理器。

// org.elasticsearch.action.support.TransportAction#execute  
/\*\*  
 \* Use this method when the transport action call should result in creation of a new task associated with the call.  
 \*  
 \* This is a typical behavior.  
 \*/  
public final Task execute(Request request, ActionListener<Response> listener) {  
    /\*  
     \* While this version of execute could delegate to the TaskListener  
     \* version of execute that'd add yet another layer of wrapping on the  
     \* listener and prevent us from using the listener bare if there isn't a  
     \* task. That just seems like too many objects. Thus the two versions of  
     \* this method.  
     \*/  
    final Releasable unregisterChildNode = registerChildNode(request.getParentTask());  
    final Task task;  
    try {  
        task = taskManager.register("transport", actionName, request);  
    } catch (TaskCancelledException e) {  
        unregisterChildNode.close();  
        throw e;  
    }  
    execute(task, request, new ActionListener<Response>() {  
        @Override  
        public void onResponse(Response response) {  
            try {  
                Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));  
            } finally {  
                listener.onResponse(response);  
            }  
        }

        @Override  
        public void onFailure(Exception e) {  
            try {  
                Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));  
            } finally {  
                listener.onFailure(e);  
            }  
        }  
    });  
    return task;  
}

/\*\*  
 \* Use this method when the transport action should continue to run in the context of the current task  
 \*/  
public final void execute(Task task, Request request, ActionListener<Response> listener) {  
    ActionRequestValidationException validationException = request.validate();  
    if (validationException != null) {  
        listener.onFailure(validationException);  
        return;  
    }

    if (task != null && request.getShouldStoreResult()) {  
        listener = new TaskResultStoringActionListener<>(taskManager, task, listener);  
    }

    RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);  
    requestFilterChain.proceed(task, actionName, request, listener);  
}

  TransportGetAction 继承了 TransportSingleShardAction 继承了 TransportAction . , 所以 doExecute 调用了 TransportSingleShardAction 的方法。

// org.elasticsearch.action.support.single.shard.TransportSingleShardAction#doExecute  
@Override  
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {  
    new AsyncSingleAction(request, listener).start();  
}  
    // org.elasticsearch.action.support.single.shard.TransportSingleShardAction.AsyncSingleAction  
    private AsyncSingleAction(Request request, ActionListener<Response> listener) {  
        this.listener = listener;

        ClusterState clusterState = clusterService.state();  
        if (logger.isTraceEnabled()) {  
            logger.trace("executing \[{}\] based on cluster state version \[{}\]", request, clusterState.version());  
        }  
        nodes = clusterState.nodes();  
        ClusterBlockException blockException = checkGlobalBlock(clusterState);  
        if (blockException != null) {  
            throw blockException;  
        }

        String concreteSingleIndex;  
        // 解析 index 参数  
        if (resolveIndex(request)) {  
            concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request).getName();  
        } else {  
            concreteSingleIndex = request.index();  
        }  
        // 组装内部请求  
        this.internalRequest = new InternalRequest(request, concreteSingleIndex);  
        resolveRequest(clusterState, internalRequest);  
        // 再次检测集群阻塞状态  
        blockException = checkRequestBlock(clusterState, internalRequest);  
        if (blockException != null) {  
            throw blockException;  
        }  
        // 获取所有分片信息,向集群发起请求  
        this.shardIt = shards(clusterState, internalRequest);  
    }  
// org.elasticsearch.cluster.metadata.IndexNameExpressionResolver#concreteSingleIndex  
/\*\*  
 \* Utility method that allows to resolve an index expression to its corresponding single concrete index.  
 \* Callers should make sure they provide proper {@link org.elasticsearch.action.support.IndicesOptions}  
 \* that require a single index as a result. The indices resolution must in fact return a single index when  
 \* using this method, an {@link IllegalArgumentException} gets thrown otherwise.  
 \*  
 \* @param state             the cluster state containing all the data to resolve to expression to a concrete index  
 \* @param request           The request that defines how the an alias or an index need to be resolved to a concrete index  
 \*                          and the expression that can be resolved to an alias or an index name.  
 \* @throws IllegalArgumentException if the index resolution lead to more than one index  
 \* @return the concrete index obtained as a result of the index resolution  
 \*/  
public Index concreteSingleIndex(ClusterState state, IndicesRequest request) {  
    String indexExpression = CollectionUtils.isEmpty(request.indices()) ? null : request.indices()\[0\];  
    Index\[\] indices = concreteIndices(state, request.indicesOptions(), indexExpression);  
    if (indices.length != 1) {  
        throw new IllegalArgumentException("unable to return a single index as the index and options" +  
            " provided got resolved to multiple indices");  
    }  
    return indices\[0\];  
}

// org.elasticsearch.cluster.metadata.IndexNameExpressionResolver#concreteIndices  
/\*\*  
 \* Translates the provided index expression into actual concrete indices, properly deduplicated.  
 \*  
 \* @param state             the cluster state containing all the data to resolve to expressions to concrete indices  
 \* @param options           defines how the aliases or indices need to be resolved to concrete indices  
 \* @param indexExpressions  expressions that can be resolved to alias or index names.  
 \* @return the resolved concrete indices based on the cluster state, indices options and index expressions  
 \* @throws IndexNotFoundException if one of the index expressions is pointing to a missing index or alias and the  
 \* provided indices options in the context don't allow such a case, or if the final result of the indices resolution  
 \* contains no indices and the indices options in the context don't allow such a case.  
 \* @throws IllegalArgumentException if one of the aliases resolve to multiple indices and the provided  
 \* indices options in the context don't allow such a case.  
 \*/  
public Index\[\] concreteIndices(ClusterState state, IndicesOptions options, String... indexExpressions) {  
    return concreteIndices(state, options, false, indexExpressions);  
}

public Index\[\] concreteIndices(ClusterState state, IndicesOptions options, boolean includeDataStreams, String... indexExpressions) {  
    Context context = new Context(state, options, false, false, includeDataStreams,  
        isSystemIndexAccessAllowed());  
    return concreteIndices(context, indexExpressions);  
}

Index\[\] concreteIndices(Context context, String... indexExpressions) {  
    if (indexExpressions == null || indexExpressions.length == 0) {  
        indexExpressions = new String\[\]{Metadata.ALL};  
    }  
    Metadata metadata = context.getState().metadata();  
    IndicesOptions options = context.getOptions();  
    // If only one index is specified then whether we fail a request if an index is missing depends on the allow\_no\_indices  
    // option. At some point we should change this, because there shouldn't be a reason why whether a single index  
    // or multiple indices are specified yield different behaviour.  
    final boolean failNoIndices = indexExpressions.length == 1 ? !options.allowNoIndices() : !options.ignoreUnavailable();  
    List<String> expressions = Arrays.asList(indexExpressions);  
    for (ExpressionResolver expressionResolver : expressionResolvers) {  
        expressions = expressionResolver.resolve(context, expressions);  
    }

    if (expressions.isEmpty()) {  
        if (!options.allowNoIndices()) {  
            IndexNotFoundException infe;  
            if (indexExpressions.length == 1) {  
                if (indexExpressions\[0\].equals(Metadata.ALL)) {  
                    infe = new IndexNotFoundException("no indices exist", (String)null);  
                } else {  
                    infe = new IndexNotFoundException((String)null);  
                }  
            } else {  
                infe = new IndexNotFoundException((String)null);  
            }  
            infe.setResources("index\_expression", indexExpressions);  
            throw infe;  
        } else {  
            return Index.EMPTY\_ARRAY;  
        }  
    }

    boolean excludedDataStreams = false;  
    final Set<Index> concreteIndices = new LinkedHashSet<>(expressions.size());  
    for (String expression : expressions) {  
        IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(expression);  
        if (indexAbstraction == null ) {  
            if (failNoIndices) {  
                IndexNotFoundException infe;  
                if (expression.equals(Metadata.ALL)) {  
                    infe = new IndexNotFoundException("no indices exist", expression);  
                } else {  
                    infe = new IndexNotFoundException(expression);  
                }  
                infe.setResources("index\_expression", expression);  
                throw infe;  
            } else {  
                continue;  
            }  
        } else if (indexAbstraction.getType() == IndexAbstraction.Type.ALIAS && context.getOptions().ignoreAliases()) {  
            if (failNoIndices) {  
                throw aliasesNotSupportedException(expression);  
            } else {  
                continue;  
            }  
        } else if (indexAbstraction.getType() == IndexAbstraction.Type.DATA\_STREAM &&  
                    context.includeDataStreams() == false) {  
            excludedDataStreams = true;  
            continue;  
        }

        if (indexAbstraction.getType() == IndexAbstraction.Type.ALIAS && context.isResolveToWriteIndex()) {  
            IndexMetadata writeIndex = indexAbstraction.getWriteIndex();  
            if (writeIndex == null) {  
                throw new IllegalArgumentException("no write index is defined for alias \[" + indexAbstraction.getName() + "\]." +  
                    " The write index may be explicitly disabled using is\_write\_index=false or the alias points to multiple" +  
                    " indices without one being designated as a write index");  
            }  
            if (addIndex(writeIndex, context)) {  
                concreteIndices.add(writeIndex.getIndex());  
            }  
        } else if (indexAbstraction.getType() == IndexAbstraction.Type.DATA\_STREAM && context.isResolveToWriteIndex()) {  
            IndexMetadata writeIndex = indexAbstraction.getWriteIndex();  
            if (addIndex(writeIndex, context)) {  
                concreteIndices.add(writeIndex.getIndex());  
            }  
        } else {  
            if (indexAbstraction.getIndices().size() > 1 && !options.allowAliasesToMultipleIndices()) {  
                String\[\] indexNames = new String\[indexAbstraction.getIndices().size()\];  
                int i = 0;  
                for (IndexMetadata indexMetadata : indexAbstraction.getIndices()) {  
                    indexNames\[i++\] = indexMetadata.getIndex().getName();  
                }  
                throw new IllegalArgumentException(indexAbstraction.getType().getDisplayName() + " \[" + expression +  
                    "\] has more than one index associated with it " + Arrays.toString(indexNames) +  
                    ", can't execute a single index op");  
            }

            for (IndexMetadata index : indexAbstraction.getIndices()) {  
                if (shouldTrackConcreteIndex(context, options, index)) {  
                    concreteIndices.add(index.getIndex());  
                }  
            }  
        }  
    }

    if (options.allowNoIndices() == false && concreteIndices.isEmpty()) {  
        IndexNotFoundException infe = new IndexNotFoundException((String)null);  
        infe.setResources("index\_expression", indexExpressions);  
        if (excludedDataStreams) {  
            // Allows callers to handle IndexNotFoundException differently based on whether data streams were excluded.  
            infe.addMetadata(EXCLUDED\_DATA\_STREAMS\_KEY, "true");  
        }  
        throw infe;  
    }  
    checkSystemIndexAccess(context, metadata, concreteIndices, indexExpressions);  
    return concreteIndices.toArray(new Index\[concreteIndices.size()\]);  
}

// org.elasticsearch.action.get.TransportGetAction#resolveRequest  
@Override  
protected void resolveRequest(ClusterState state, InternalRequest request) {  
    // update the routing (request#index here is possibly an alias)  
    request.request().routing(state.metadata().resolveIndexRouting(request.request().routing(), request.request().index()));  
    // Fail fast on the node that received the request.  
    if (request.request().routing() == null && state.getMetadata().routingRequired(request.concreteIndex())) {  
        throw new RoutingMissingException(request.concreteIndex(), request.request().type(), request.request().id());  
    }  
}

@Override  
protected ShardIterator shards(ClusterState state, InternalRequest request) {  
    return clusterService.operationRouting()  
            .getShards(clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(),  
                request.request().preference());  
}

    // org.elasticsearch.action.support.single.shard.TransportSingleShardAction.AsyncSingleAction#start  
    public void start() {  
        if (shardIt == null) {  
            // just execute it on the local node  
            final Writeable.Reader<Response> reader = getResponseReader();  
            transportService.sendRequest(clusterService.localNode(), transportShardAction, internalRequest.request(),  
                new TransportResponseHandler<Response>() {  
                @Override  
                public Response read(StreamInput in) throws IOException {  
                    return reader.read(in);  
                }

                @Override  
                public void handleResponse(final Response response) {  
                    listener.onResponse(response);  
                }

                @Override  
                public void handleException(TransportException exp) {  
                    listener.onFailure(exp);  
                }  
            });  
        } else {  
            perform(null);  
        }  
    }

    private void perform(@Nullable final Exception currentFailure) {  
        Exception lastFailure = this.lastFailure;  
        if (lastFailure == null || TransportActions.isReadOverrideException(currentFailure)) {  
            lastFailure = currentFailure;  
            this.lastFailure = currentFailure;  
        }  
        final ShardRouting shardRouting = shardIt.nextOrNull();  
        if (shardRouting == null) {  
            Exception failure = lastFailure;  
            if (failure == null || isShardNotAvailableException(failure)) {  
                failure = new NoShardAvailableActionException(null,  
                    LoggerMessageFormat.format("No shard available for \[{}\]", internalRequest.request()), failure);  
            } else {  
                logger.debug(() -> new ParameterizedMessage("{}: failed to execute \[{}\]", null,  
                    internalRequest.request()), failure);  
            }  
            listener.onFailure(failure);  
            return;  
        }  
        DiscoveryNode node = nodes.get(shardRouting.currentNodeId());  
        if (node == null) {  
            onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId()));  
        } else {  
            internalRequest.request().internalShardId = shardRouting.shardId();  
            if (logger.isTraceEnabled()) {  
                logger.trace(  
                        "sending request \[{}\] to shard \[{}\] on node \[{}\]",  
                        internalRequest.request(),  
                        internalRequest.request().internalShardId,  
                        node  
                );  
            }  
            final Writeable.Reader<Response> reader = getResponseReader();  
            transportService.sendRequest(node, transportShardAction, internalRequest.request(),  
                new TransportResponseHandler<Response>() {

                    @Override  
                    public Response read(StreamInput in) throws IOException {  
                        return reader.read(in);  
                    }

                    @Override  
                    public void handleResponse(final Response response) {  
                        listener.onResponse(response);  
                    }

                    @Override  
                    public void handleException(TransportException exp) {  
                        onFailure(shardRouting, exp);  
                    }  
            });  
        }  
    }

  sendRequest, 本意是向外部网络发起请求,得到结果的过程。但es针对本节点的请求当然不会真正向外请求,我们以请求本节点的时序图,来看看es处理get的sendRequest过程吧。

sendRequest 调用时序图

  具体代码实现如下:

// org.elasticsearch.transport.TransportService#sendRequest  
public <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,  
                                                            final TransportRequest request,  
                                                            final TransportResponseHandler<T> handler) {  
    sendRequest(node, action, request, TransportRequestOptions.EMPTY, handler);  
}

public final <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,  
                                                            final TransportRequest request,  
                                                            final TransportRequestOptions options,  
                                                            TransportResponseHandler<T> handler) {  
    final Transport.Connection connection;  
    try {  
        connection = getConnection(node);  
    } catch (final NodeNotConnectedException ex) {  
        // the caller might not handle this so we invoke the handler  
        handler.handleException(ex);  
        return;  
    }  
    sendRequest(connection, action, request, options, handler);  
}

/\*\*  
 \* Returns either a real transport connection or a local node connection if we are using the local node optimization.  
 \* @throws NodeNotConnectedException if the given node is not connected  
 \*/  
public Transport.Connection getConnection(DiscoveryNode node) {  
    // 如果是当前节点,直接返回当前service即可,无须再访问远程节点  
    if (isLocalNode(node)) {  
        return localNodeConnection;  
    } else {  
        return connectionManager.getConnection(node);  
    }  
}  
private boolean isLocalNode(DiscoveryNode discoveryNode) {  
    return Objects.requireNonNull(discoveryNode, "discovery node must not be null").equals(localNode);  
}

// org.elasticsearch.transport.TransportService#sendRequest  
/\*\*  
 \* Sends a request on the specified connection. If there is a failure sending the request, the specified handler is invoked.  
 \*  
 \* @param connection the connection to send the request on  
 \* @param action     the name of the action  
 \* @param request    the request  
 \* @param options    the options for this request  
 \* @param handler    the response handler  
 \* @param <T>        the type of the transport response  
 \*/  
public final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,  
                                                            final TransportRequest request,  
                                                            final TransportRequestOptions options,  
                                                            final TransportResponseHandler<T> handler) {  
    try {  
        final TransportResponseHandler<T> delegate;  
        if (request.getParentTask().isSet()) {  
            // If the connection is a proxy connection, then we will create a cancellable proxy task on the proxy node and an actual  
            // child task on the target node of the remote cluster.  
            //  ----> a parent task on the local cluster  
            //        |  
            //         ----> a proxy task on the proxy node on the remote cluster  
            //               |  
            //                ----> an actual child task on the target node on the remote cluster  
            // To cancel the child task on the remote cluster, we must send a cancel request to the proxy node instead of the target  
            // node as the parent task of the child task is the proxy task not the parent task on the local cluster. Hence, here we  
            // unwrap the connection and keep track of the connection to the proxy node instead of the proxy connection.  
            final Transport.Connection unwrappedConn = unwrapConnection(connection);  
            final Releasable unregisterChildNode = taskManager.registerChildConnection(request.getParentTask().getId(), unwrappedConn);  
            delegate = new TransportResponseHandler<T>() {  
                @Override  
                public void handleResponse(T response) {  
                    unregisterChildNode.close();  
                    handler.handleResponse(response);  
                }

                @Override  
                public void handleException(TransportException exp) {  
                    unregisterChildNode.close();  
                    handler.handleException(exp);  
                }

                @Override  
                public String executor() {  
                    return handler.executor();  
                }

                @Override  
                public T read(StreamInput in) throws IOException {  
                    return handler.read(in);  
                }

                @Override  
                public String toString() {  
                    return getClass().getName() + "/\[" + action + "\]:" + handler.toString();  
                }  
            };  
        } else {  
            delegate = handler;  
        }  
        asyncSender.sendRequest(connection, action, request, options, delegate);  
    } catch (final Exception ex) {  
        // the caller might not handle this so we invoke the handler  
        final TransportException te;  
        if (ex instanceof TransportException) {  
            te = (TransportException) ex;  
        } else {  
            te = new TransportException("failure to send", ex);  
        }  
        handler.handleException(te);  
    }  
}

// org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor#interceptSender  
@Override  
public AsyncSender interceptSender(AsyncSender sender) {  
    return new AsyncSender() {  
        @Override  
        public <T extends TransportResponse> void sendRequest(Transport.Connection connection, String action, TransportRequest request,  
                                                              TransportRequestOptions options, TransportResponseHandler<T> handler) {  
            final boolean requireAuth = shouldRequireExistingAuthentication();  
            // the transport in core normally does this check, BUT since we are serializing to a string header we need to do it  
            // ourselves otherwise we wind up using a version newer than what we can actually send  
            final Version minVersion = Version.min(connection.getVersion(), Version.CURRENT);

            // Sometimes a system action gets executed like a internal create index request or update mappings request  
            // which means that the user is copied over to system actions so we need to change the user  
            if (AuthorizationUtils.shouldReplaceUserWithSystem(threadPool.getThreadContext(), action)) {  
                securityContext.executeAsUser(SystemUser.INSTANCE, (original) -> sendWithUser(connection, action, request, options,  
                        new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original)  
                                , handler), sender, requireAuth), minVersion);  
            } else if (AuthorizationUtils.shouldSetUserBasedOnActionOrigin(threadPool.getThreadContext())) {  
                AuthorizationUtils.switchUserBasedOnActionOriginAndExecute(threadPool.getThreadContext(), securityContext,  
                        (original) -> sendWithUser(connection, action, request, options,  
                                new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original)  
                                        , handler), sender, requireAuth));  
            } else if (securityContext.getAuthentication() != null &&  
                    securityContext.getAuthentication().getVersion().equals(minVersion) == false) {  
                // re-write the authentication since we want the authentication version to match the version of the connection  
                securityContext.executeAfterRewritingAuthentication(original -> sendWithUser(connection, action, request, options,  
                    new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler), sender,  
                    requireAuth), minVersion);  
            } else {  
                sendWithUser(connection, action, request, options, handler, sender, requireAuth);  
            }  
        }  
    };  
}  
// org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor#sendWithUser  
private <T extends TransportResponse> void sendWithUser(Transport.Connection connection, String action, TransportRequest request,  
                                                        TransportRequestOptions options, TransportResponseHandler<T> handler,  
                                                        AsyncSender sender, final boolean requireAuthentication) {  
    if (securityContext.getAuthentication() == null && requireAuthentication) {  
        // we use an assertion here to ensure we catch this in our testing infrastructure, but leave the ISE for cases we do not catch  
        // in tests and may be hit by a user  
        assertNoAuthentication(action);  
        throw new IllegalStateException("there should always be a user when sending a message for action \[" + action + "\]");  
    }

    try {  
        sender.sendRequest(connection, action, request, options, handler);  
    } catch (Exception e) {  
        handler.handleException(new TransportException("failed sending request", e));  
    }  
}  
// org.elasticsearch.transport.TransportService#sendRequestInternal  
private <T extends TransportResponse> void sendRequestInternal(final Transport.Connection connection, final String action,  
                                                               final TransportRequest request,  
                                                               final TransportRequestOptions options,  
                                                               TransportResponseHandler<T> handler) {  
    if (connection == null) {  
        throw new IllegalStateException("can't send request to a null connection");  
    }  
    DiscoveryNode node = connection.getNode();

    Supplier<ThreadContext.StoredContext> storedContextSupplier = threadPool.getThreadContext().newRestorableContext(true);  
    ContextRestoreResponseHandler<T> responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, handler);  
    // TODO we can probably fold this entire request ID dance into connection.sendReqeust but it will be a bigger refactoring  
    final long requestId = responseHandlers.add(new Transport.ResponseContext<>(responseHandler, connection, action));  
    final TimeoutHandler timeoutHandler;  
    if (options.timeout() != null) {  
        timeoutHandler = new TimeoutHandler(requestId, connection.getNode(), action);  
        responseHandler.setTimeoutHandler(timeoutHandler);  
    } else {  
        timeoutHandler = null;  
    }  
    try {  
        if (lifecycle.stoppedOrClosed()) {  
            /\*  
             \* If we are not started the exception handling will remove the request holder again and calls the handler to notify the  
             \* caller. It will only notify if toStop hasn't done the work yet.  
             \*/  
            throw new NodeClosedException(localNode);  
        }  
        if (timeoutHandler != null) {  
            assert options.timeout() != null;  
            timeoutHandler.scheduleTimeout(options.timeout());  
        }  
        // 请求发送  
        connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream  
    } catch (final Exception e) {  
        // usually happen either because we failed to connect to the node  
        // or because we failed serializing the message  
        final Transport.ResponseContext<? extends TransportResponse> contextToNotify = responseHandlers.remove(requestId);  
        // If holderToNotify == null then handler has already been taken care of.  
        if (contextToNotify != null) {  
            if (timeoutHandler != null) {  
                timeoutHandler.cancel();  
            }  
            // callback that an exception happened, but on a different thread since we don't  
            // want handlers to worry about stack overflows. In the special case of running into a closing node we run on the current  
            // thread on a best effort basis though.  
            final SendRequestTransportException sendRequestException = new SendRequestTransportException(node, action, e);  
            final String executor = lifecycle.stoppedOrClosed() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC;  
            threadPool.executor(executor).execute(new AbstractRunnable() {  
                @Override  
                public void onRejection(Exception e) {  
                    // if we get rejected during node shutdown we don't wanna bubble it up  
                    logger.debug(  
                        () -> new ParameterizedMessage(  
                            "failed to notify response handler on rejection, action: {}",  
                            contextToNotify.action()),  
                        e);  
                }  
                @Override  
                public void onFailure(Exception e) {  
                    logger.warn(  
                        () -> new ParameterizedMessage(  
                            "failed to notify response handler on exception, action: {}",  
                            contextToNotify.action()),  
                        e);  
                }  
                @Override  
                protected void doRun() throws Exception {  
                    contextToNotify.handler().handleException(sendRequestException);  
                }  
            });  
        } else {  
            logger.debug("Exception while sending request, handler likely already notified due to timeout", e);  
        }  
    }  
}  
    // org.elasticsearch.transport.Transport.Connection#sendRequest  
    @Override  
    public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)  
        throws TransportException {  
        sendLocalRequest(requestId, action, request, options);  
    }  
// org.elasticsearch.transport.TransportService#sendLocalRequest  
private void sendLocalRequest(long requestId, final String action, final TransportRequest request, TransportRequestOptions options) {  
    final DirectResponseChannel channel = new DirectResponseChannel(localNode, action, requestId, this, threadPool);  
    try {  
        onRequestSent(localNode, requestId, action, request, options);  
        onRequestReceived(requestId, action);  
        // 获取 handler  
        final RequestHandlerRegistry reg = getRequestHandler(action);  
        if (reg == null) {  
            throw new ActionNotFoundTransportException("Action \[" + action + "\] not found");  
        }  
        final String executor = reg.getExecutor();  
        if (ThreadPool.Names.SAME.equals(executor)) {  
            //noinspection unchecked  
            reg.processMessageReceived(request, channel);  
        } else {  
            threadPool.executor(executor).execute(new AbstractRunnable() {  
                @Override  
                protected void doRun() throws Exception {  
                    //noinspection unchecked  
                    reg.processMessageReceived(request, channel);  
                }

                @Override  
                public boolean isForceExecution() {  
                    return reg.isForceExecution();  
                }

                @Override  
                public void onFailure(Exception e) {  
                    try {  
                        channel.sendResponse(e);  
                    } catch (Exception inner) {  
                        inner.addSuppressed(e);  
                        logger.warn(() -> new ParameterizedMessage(  
                                "failed to notify channel of error message for action \[{}\]", action), inner);  
                    }  
                }

                @Override  
                public String toString() {  
                    return "processing of \[" + requestId + "\]\[" + action + "\]: " + request;  
                }  
            });  
        }

    } catch (Exception e) {  
        try {  
            channel.sendResponse(e);  
        } catch (Exception inner) {  
            inner.addSuppressed(e);  
            logger.warn(  
                () -> new ParameterizedMessage(  
                    "failed to notify channel of error message for action \[{}\]", action), inner);  
        }  
    }  
}  
// org.elasticsearch.transport.RequestHandlerRegistry#processMessageReceived  
public void processMessageReceived(Request request, TransportChannel channel) throws Exception {  
    final Task task = taskManager.register(channel.getChannelType(), action, request);  
    Releasable unregisterTask = () -> taskManager.unregister(task);  
    try {  
        if (channel instanceof TcpTransportChannel && task instanceof CancellableTask) {  
            final TcpChannel tcpChannel = ((TcpTransportChannel) channel).getChannel();  
            final Releasable stopTracking = taskManager.startTrackingCancellableChannelTask(tcpChannel, (CancellableTask) task);  
            unregisterTask = Releasables.wrap(unregisterTask, stopTracking);  
        }  
        final TaskTransportChannel taskTransportChannel = new TaskTransportChannel(channel, unregisterTask);  
        handler.messageReceived(request, taskTransportChannel, task);  
        unregisterTask = null;  
    } finally {  
        Releasables.close(unregisterTask);  
    }  
}  
// org.elasticsearch.action.get.TransportGetAction#asyncShardOperation  
@Override  
protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionListener<GetResponse> listener) throws IOException {  
    IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());  
    IndexShard indexShard = indexService.getShard(shardId.id());  
    if (request.realtime()) { // we are not tied to a refresh cycle here anyway  
        super.asyncShardOperation(request, shardId, listener);  
    } else {  
        indexShard.awaitShardSearchActive(b -> {  
            try {  
                super.asyncShardOperation(request, shardId, listener);  
            } catch (Exception ex) {  
                listener.onFailure(ex);  
            }  
        });  
    }  
}  
// org.elasticsearch.action.support.single.shard.TransportSingleShardAction#asyncShardOperation  
protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {  
    threadPool.executor(getExecutor(request, shardId))  
        .execute(ActionRunnable.supply(listener, () -> shardOperation(request, shardId)));  
}

// org.elasticsearch.action.get.TransportGetAction#shardOperation  
// 该方法为实现get操作的真正方法  
@Override  
protected GetResponse shardOperation(GetRequest request, ShardId shardId) {  
    IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());  
    IndexShard indexShard = indexService.getShard(shardId.id());

    if (request.refresh() && !request.realtime()) {  
        indexShard.refresh("refresh\_flag\_get");  
    }  
    // 核心 get 实现  
    GetResult result = indexShard.getService().get(request.type(), request.id(), request.storedFields(),  
            request.realtime(), request.version(), request.versionType(), request.fetchSourceContext());  
    // 使用 GetResponse 包装结果返回,方便结果响应输出  
    return new GetResponse(result);  
}

  以上,是 get 的处理框架,基本原理就是先看是否有路由配置,如果有按其规则来,如果没有直接向某个节点发起请求,即可获取到数据。整个get请求前置的转发过程,因为考虑大量的远程调用与复用,显示比较复杂。其中,es中大量使用了包装器模式和观察者模式,值得我们注意和学习。到最后最核心的get的实现,其实是 shardOperation() 方法。它又是如何与lucene交互获取数据的呢?稍后见分晓。

  路由是es的一个我特性,而shard则是一个核心概念。接下来,我们看两个细节,即 es get 如何处理路由 和es get 如何处理shard问题。

// 处理路由问题, 请求方式如: curl -XGET 'http://localhost:9200/test\_index/job/1?routing=user123'  
// org.elasticsearch.action.get.TransportGetAction#resolveRequest  
@Override  
protected void resolveRequest(ClusterState state, InternalRequest request) {  
    // update the routing (request#index here is possibly an alias)  
    // 取出默认的 routing 规则,或者校验传入的routing规则  
    request.request().routing(state.metadata().resolveIndexRouting(request.request().routing(), request.request().index()));  
    // Fail fast on the node that received the request.  
    if (request.request().routing() == null && state.getMetadata().routingRequired(request.concreteIndex())) {  
        throw new RoutingMissingException(request.concreteIndex(), request.request().type(), request.request().id());  
    }  
}  
// org.elasticsearch.cluster.metadata.Metadata#resolveIndexRouting  
/\*\*  
 \* Returns indexing routing for the given index.  
 \*/  
// TODO: This can be moved to IndexNameExpressionResolver too, but this means that we will support wildcards and other expressions  
// in the index,bulk,update and delete apis.  
public String resolveIndexRouting(@Nullable String routing, String aliasOrIndex) {  
    if (aliasOrIndex == null) {  
        return routing;  
    }  
    // 获取索引对应的元数据信息,用于判定是否是别名型索引  
    // 非别名型索引,routing 不变  
    IndexAbstraction result = getIndicesLookup().get(aliasOrIndex);  
    if (result == null || result.getType() != IndexAbstraction.Type.ALIAS) {  
        return routing;  
    }  
    IndexAbstraction.Alias alias = (IndexAbstraction.Alias) result;  
    if (result.getIndices().size() > 1) {  
        rejectSingleIndexOperation(aliasOrIndex, result);  
    }  
    AliasMetadata aliasMd = alias.getFirstAliasMetadata();  
    if (aliasMd.indexRouting() != null) {  
        // 一个 alias 不允许有多个 routing key  
        if (aliasMd.indexRouting().indexOf(',') != -1) {  
            throw new IllegalArgumentException("index/alias \[" + aliasOrIndex + "\] provided with routing value \[" +  
                aliasMd.getIndexRouting() + "\] that resolved to several routing values, rejecting operation");  
        }  
        if (routing != null) {  
            if (!routing.equals(aliasMd.indexRouting())) {  
                throw new IllegalArgumentException("Alias \[" + aliasOrIndex + "\] has index routing associated with it \[" +  
                    aliasMd.indexRouting() + "\], and was provided with routing value \[" + routing + "\], rejecting operation");  
            }  
        }  
        // Alias routing overrides the parent routing (if any).  
        return aliasMd.indexRouting();  
    }  
    return routing;  
}

// 处理shard问题  
// org.elasticsearch.action.get.TransportGetAction#shards  
@Override  
protected ShardIterator shards(ClusterState state, InternalRequest request) {  
    return clusterService.operationRouting()  
            .getShards(clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(),  
                request.request().preference());  
}  
// org.elasticsearch.cluster.routing.OperationRouting#getShards  
public ShardIterator getShards(ClusterState clusterState, String index, String id, @Nullable String routing,  
                               @Nullable String preference) {  
    // 先获取shardingTable, 再生成Shard迭代器  
    return preferenceActiveShardIterator(shards(clusterState, index, id, routing), clusterState.nodes().getLocalNodeId(),  
        clusterState.nodes(), preference, null, null);  
}  
// org.elasticsearch.cluster.routing.OperationRouting#shards  
protected IndexShardRoutingTable shards(ClusterState clusterState, String index, String id, String routing) {  
    int shardId = generateShardId(indexMetadata(clusterState, index), id, routing);  
    return clusterState.getRoutingTable().shardRoutingTable(index, shardId);  
}  
// org.elasticsearch.cluster.routing.OperationRouting#indexMetadata  
protected IndexMetadata indexMetadata(ClusterState clusterState, String index) {  
    IndexMetadata indexMetadata = clusterState.metadata().index(index);  
    if (indexMetadata == null) {  
        throw new IndexNotFoundException(index);  
    }  
    return indexMetadata;  
}  
// 生成shardId  
// org.elasticsearch.cluster.routing.OperationRouting#generateShardId  
public static int generateShardId(IndexMetadata indexMetadata, @Nullable String id, @Nullable String routing) {  
    final String effectiveRouting;  
    final int partitionOffset;

    if (routing == null) {  
        assert(indexMetadata.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";  
        effectiveRouting = id;  
    } else {  
        effectiveRouting = routing;  
    }

    if (indexMetadata.isRoutingPartitionedIndex()) {  
        partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetadata.getRoutingPartitionSize());  
    } else {  
        // we would have still got 0 above but this check just saves us an unnecessary hash calculation  
        partitionOffset = 0;  
    }

    return calculateScaledShardId(indexMetadata, effectiveRouting, partitionOffset);  
}  
// org.elasticsearch.cluster.routing.OperationRouting#calculateScaledShardId  
private static int calculateScaledShardId(IndexMetadata indexMetadata, String effectiveRouting, int partitionOffset) {  
    final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;

    // we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size  
    // of original index to hash documents  
    // routingFactor默认为1  
    return Math.floorMod(hash, indexMetadata.getRoutingNumShards()) / indexMetadata.getRoutingFactor();  
}  
// org.elasticsearch.cluster.routing.RoutingTable#shardRoutingTable(java.lang.String, int)  
/\*\*  
 \* All shards for the provided index and shard id  
 \* @return All the shard routing entries for the given index and shard id  
 \* @throws IndexNotFoundException if provided index does not exist  
 \* @throws ShardNotFoundException if provided shard id is unknown  
 \*/  
public IndexShardRoutingTable shardRoutingTable(String index, int shardId) {  
    IndexRoutingTable indexRouting = index(index);  
    if (indexRouting == null) {  
        throw new IndexNotFoundException(index);  
    }  
    return shardRoutingTable(indexRouting, shardId);  
}  
// org.elasticsearch.cluster.routing.RoutingTable#shardRoutingTable  
/\*\*  
 \* Get's the {@link IndexShardRoutingTable} for the given shard id from the given {@link IndexRoutingTable}  
 \* or throws a {@link ShardNotFoundException} if no shard by the given id is found in the IndexRoutingTable.  
 \*  
 \* @param indexRouting IndexRoutingTable  
 \* @param shardId ShardId  
 \* @return IndexShardRoutingTable  
 \*/  
public static IndexShardRoutingTable shardRoutingTable(IndexRoutingTable indexRouting, int shardId) {  
    IndexShardRoutingTable indexShard = indexRouting.shard(shardId);  
    if (indexShard == null) {  
        throw new ShardNotFoundException(new ShardId(indexRouting.getIndex(), shardId));  
    }  
    return indexShard;  
}  
// org.elasticsearch.cluster.routing.OperationRouting#preferenceActiveShardIterator  
private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable indexShard, String localNodeId,  
                                                    DiscoveryNodes nodes, @Nullable String preference,  
                                                    @Nullable ResponseCollectorService collectorService,  
                                                    @Nullable Map<String, Long> nodeCounts) {  
    if (preference == null || preference.isEmpty()) {  
        return shardRoutings(indexShard, nodes, collectorService, nodeCounts);  
    }  
    if (preference.charAt(0) == '\_') {  
        Preference preferenceType = Preference.parse(preference);  
        if (preferenceType == Preference.SHARDS) {  
            // starts with \_shards, so execute on specific ones  
            int index = preference.indexOf('|');

            String shards;  
            if (index == -1) {  
                shards = preference.substring(Preference.SHARDS.type().length() + 1);  
            } else {  
                shards = preference.substring(Preference.SHARDS.type().length() + 1, index);  
            }  
            String\[\] ids = Strings.splitStringByCommaToArray(shards);  
            boolean found = false;  
            for (String id : ids) {  
                if (Integer.parseInt(id) == indexShard.shardId().id()) {  
                    found = true;  
                    break;  
                }  
            }  
            if (!found) {  
                return null;  
            }  
            // no more preference  
            if (index == -1 || index == preference.length() - 1) {  
                return shardRoutings(indexShard, nodes, collectorService, nodeCounts);  
            } else {  
                // update the preference and continue  
                preference = preference.substring(index + 1);  
            }  
        }  
        preferenceType = Preference.parse(preference);  
        switch (preferenceType) {  
            case PREFER\_NODES:  
                final Set<String> nodesIds =  
                        Arrays.stream(  
                                preference.substring(Preference.PREFER\_NODES.type().length() + 1).split(",")  
                        ).collect(Collectors.toSet());  
                return indexShard.preferNodeActiveInitializingShardsIt(nodesIds);  
            case LOCAL:  
                return indexShard.preferNodeActiveInitializingShardsIt(Collections.singleton(localNodeId));  
            case ONLY\_LOCAL:  
                return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);  
            case ONLY\_NODES:  
                String nodeAttributes = preference.substring(Preference.ONLY\_NODES.type().length() + 1);  
                return indexShard.onlyNodeSelectorActiveInitializingShardsIt(nodeAttributes.split(","), nodes);  
            default:  
                throw new IllegalArgumentException("unknown preference \[" + preferenceType + "\]");  
        }  
    }  
    // if not, then use it as the index  
    int routingHash = Murmur3HashFunction.hash(preference);  
    if (nodes.getMinNodeVersion().onOrAfter(Version.V\_6\_0\_0\_alpha1)) {  
        // The AllocationService lists shards in a fixed order based on nodes  
        // so earlier versions of this class would have a tendency to  
        // select the same node across different shardIds.  
        // Better overall balancing can be achieved if each shardId opts  
        // for a different element in the list by also incorporating the  
        // shard ID into the hash of the user-supplied preference key.  
        routingHash = 31 \* routingHash + indexShard.shardId.hashCode();  
    }  
    if (awarenessAttributes.isEmpty()) {  
        return indexShard.activeInitializingShardsIt(routingHash);  
    } else {  
        return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes, routingHash);  
    }  
}

private ShardIterator shardRoutings(IndexShardRoutingTable indexShard, DiscoveryNodes nodes,  
        @Nullable ResponseCollectorService collectorService, @Nullable Map<String, Long> nodeCounts) {  
    if (awarenessAttributes.isEmpty()) {  
        if (useAdaptiveReplicaSelection) {  
            return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);  
        } else {  
            return indexShard.activeInitializingShardsRandomIt();  
        }  
    } else {  
        return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes);  
    }  
}  
// org.elasticsearch.cluster.routing.IndexShardRoutingTable#activeInitializingShardsRankedIt  
/\*\*  
 \* Returns an iterator over active and initializing shards, ordered by the adaptive replica  
 \* selection formula. Making sure though that its random within the active shards of the same  
 \* (or missing) rank, and initializing shards are the last to iterate through.  
 \*/  
public ShardIterator activeInitializingShardsRankedIt(@Nullable ResponseCollectorService collector,  
                                                      @Nullable Map<String, Long> nodeSearchCounts) {  
    final int seed = shuffler.nextSeed();  
    if (allInitializingShards.isEmpty()) {  
        return new PlainShardIterator(shardId,  
                rankShardsAndUpdateStats(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts));  
    }

    ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());  
    List<ShardRouting> rankedActiveShards =  
            rankShardsAndUpdateStats(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts);  
    ordered.addAll(rankedActiveShards);  
    List<ShardRouting> rankedInitializingShards =  
            rankShardsAndUpdateStats(allInitializingShards, collector, nodeSearchCounts);  
    ordered.addAll(rankedInitializingShards);  
    return new PlainShardIterator(shardId, ordered);  
}

4. get的内部逻辑

  它是在 TransportGetAction 的 shardOperation() 中实现的同步调用:

  先来看个整体的调用时序图:

get查找核心时序图

// org.elasticsearch.action.get.TransportGetAction#shardOperation  
// 该方法为实现get操作的真正方法  
@Override  
protected GetResponse shardOperation(GetRequest request, ShardId shardId) {  
    IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());  
    IndexShard indexShard = indexService.getShard(shardId.id());

    if (request.refresh() && !request.realtime()) {  
        indexShard.refresh("refresh\_flag\_get");  
    }  
    // 核心 get 实现  
    GetResult result = indexShard.getService().get(request.type(), request.id(), request.storedFields(),  
            request.realtime(), request.version(), request.versionType(), request.fetchSourceContext());  
    return new GetResponse(result);  
}  
// org.elasticsearch.indices.IndicesService#indexServiceSafe  
/\*\*  
 \* Returns an IndexService for the specified index if exists otherwise a {@link IndexNotFoundException} is thrown.  
 \*/  
public IndexService indexServiceSafe(Index index) {  
    IndexService indexService = indices.get(index.getUUID());  
    if (indexService == null) {  
        throw new IndexNotFoundException(index);  
    }  
    assert indexService.indexUUID().equals(index.getUUID()) : "uuid mismatch local: " + indexService.indexUUID() +  
        " incoming: " + index.getUUID();  
    return indexService;  
}  
// org.elasticsearch.index.IndexService#getShard  
/\*\*  
 \* Return the shard with the provided id, or throw an exception if it doesn't exist.  
 \*/  
public IndexShard getShard(int shardId) {  
    IndexShard indexShard = getShardOrNull(shardId);  
    if (indexShard == null) {  
        throw new ShardNotFoundException(new ShardId(index(), shardId));  
    }  
    return indexShard;  
}

// org.elasticsearch.index.shard.IndexShard#getService  
public ShardGetService getService() {  
    return this.getService;  
}

// org.elasticsearch.index.get.ShardGetService#get  
public GetResult get(String type, String id, String\[\] gFields, boolean realtime, long version,  
                        VersionType versionType, FetchSourceContext fetchSourceContext) {  
    return  
        get(type, id, gFields, realtime, version, versionType, UNASSIGNED\_SEQ\_NO, UNASSIGNED\_PRIMARY\_TERM, fetchSourceContext);  
}  
// org.elasticsearch.index.get.ShardGetService#get  
private GetResult get(String type, String id, String\[\] gFields, boolean realtime, long version, VersionType versionType,  
                      long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) {  
    currentMetric.inc();  
    try {  
        long now = System.nanoTime();  
        GetResult getResult =  
            innerGet(type, id, gFields, realtime, version, versionType, ifSeqNo, ifPrimaryTerm, fetchSourceContext);

        if (getResult.isExists()) {  
            existsMetric.inc(System.nanoTime() - now);  
        } else {  
            missingMetric.inc(System.nanoTime() - now);  
        }  
        return getResult;  
    } finally {  
        currentMetric.dec();  
    }  
}  
// org.elasticsearch.index.get.ShardGetService#innerGet  
private GetResult innerGet(String type, String id, String\[\] gFields, boolean realtime, long version, VersionType versionType,  
                           long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) {  
    // 查询字段信息校验  
    fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields);  
    // 支持无type搜索,即丢弃老版本的 type 字段, 一个索引下全是数据  
    if (type == null || type.equals("\_all")) {  
        DocumentMapper mapper = mapperService.documentMapper();  
        type = mapper == null ? null : mapper.type();  
    }

    Engine.GetResult get = null;  
    if (type != null) {  
        Term uidTerm = new Term(IdFieldMapper.NAME, Uid.encodeId(id));  
        // 链式构造一个 Get 实例,调用 get() 方法查找doc  
        // 其中 indexShard 是前面根据shardId 推断出来的, 也是基于一个doc只能在一个shard上的前提  
        get = indexShard.get(new Engine.Get(realtime, realtime, type, id, uidTerm)  
            .version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm));  
        assert get.isFromTranslog() == false || realtime : "should only read from translog if realtime enabled";  
        if (get.exists() == false) {  
            get.close();  
        }  
    }

    if (get == null || get.exists() == false) {  
        return new GetResult(shardId.getIndexName(), type, id, UNASSIGNED\_SEQ\_NO, UNASSIGNED\_PRIMARY\_TERM, -1, false, null, null, null);  
    }

    try {  
        // break between having loaded it from translog (so we only have \_source), and having a document to load  
        // 根据docId 查询具体的doc信息  
        return innerGetLoadFromStoredFields(type, id, gFields, fetchSourceContext, get, mapperService);  
    } finally {  
        get.close();  
    }  
}

/\*\*  
 \* decides what needs to be done based on the request input and always returns a valid non-null FetchSourceContext  
 \*/  
private FetchSourceContext normalizeFetchSourceContent(@Nullable FetchSourceContext context, @Nullable String\[\] gFields) {  
    if (context != null) {  
        return context;  
    }  
    if (gFields == null) {  
        return FetchSourceContext.FETCH\_SOURCE;  
    }  
    for (String field : gFields) {  
        if (SourceFieldMapper.NAME.equals(field)) {  
            return FetchSourceContext.FETCH\_SOURCE;  
        }  
    }  
    return FetchSourceContext.DO\_NOT\_FETCH\_SOURCE;  
}  
// org.elasticsearch.index.shard.IndexShard#get  
public Engine.GetResult get(Engine.Get get) {  
    // 当前集群状态判定, 是否可读, 如果是 red 状态则是不可读的  
    readAllowed();  
    DocumentMapper mapper = mapperService.documentMapper();  
    if (mapper == null || mapper.type().equals(mapperService.resolveDocumentType(get.type())) == false) {  
        return GetResult.NOT\_EXISTS;  
    }  
    // wrapSearcher 实现查询结果封装  
    return getEngine().get(get, mapper, this::wrapSearcher);  
}  
// org.elasticsearch.index.shard.IndexShard#readAllowed  
public void readAllowed() throws IllegalIndexShardStateException {  
    IndexShardState state = this.state; // one time volatile read  
    if (readAllowedStates.contains(state) == false) {  
        throw new IllegalIndexShardStateException(shardId, state, "operations only allowed when shard state is one of " +  
            readAllowedStates.toString());  
    }  
}  
// org.elasticsearch.index.shard.IndexShard#getEngine  
Engine getEngine() {  
    Engine engine = getEngineOrNull();  
    if (engine == null) {  
        throw new AlreadyClosedException("engine is closed");  
    }  
    return engine;  
}  
/\*\*  
 \* NOTE: returns null if engine is not yet started (e.g. recovery phase 1, copying over index files, is still running), or if engine is  
 \* closed.  
 \*/  
protected Engine getEngineOrNull() {  
    return this.currentEngineReference.get();  
}

private Engine.Searcher wrapSearcher(Engine.Searcher searcher) {  
    assert ElasticsearchDirectoryReader.unwrap(searcher.getDirectoryReader())  
        != null : "DirectoryReader must be an instance or ElasticsearchDirectoryReader";  
    boolean success = false;  
    try {  
        final Engine.Searcher newSearcher = readerWrapper == null ? searcher : wrapSearcher(searcher, readerWrapper);  
        assert newSearcher != null;  
        success = true;  
        return newSearcher;  
    } catch (IOException ex) {  
        throw new ElasticsearchException("failed to wrap searcher", ex);  
    } finally {  
        if (success == false) {  
            Releasables.close(success, searcher);  
        }  
    }  
}

// org.elasticsearch.index.engine.InternalEngine#get  
@Override  
public GetResult get(Get get, DocumentMapper mapper, Function<Engine.Searcher, Engine.Searcher> searcherWrapper) {  
    assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();  
    // 使用 ReentrantReadWriteLock 实现并发锁  
    try (ReleasableLock ignored = readLock.acquire()) {  
        // 确保未close  
        ensureOpen();  
        if (get.realtime()) {  
            final VersionValue versionValue;  
            try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {  
                // we need to lock here to access the version map to do this truly in RT  
                versionValue = getVersionFromMap(get.uid().bytes());  
            }  
            // 版本号为空,以下大片逻辑就不走了  
            if (versionValue != null) {  
                if (versionValue.isDelete()) {  
                    return GetResult.NOT\_EXISTS;  
                }  
                if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) {  
                    throw new VersionConflictEngineException(shardId, get.id(),  
                        get.versionType().explainConflictForReads(versionValue.version, get.version()));  
                }  
                if (get.getIfSeqNo() != SequenceNumbers.UNASSIGNED\_SEQ\_NO && (  
                    get.getIfSeqNo() != versionValue.seqNo || get.getIfPrimaryTerm() != versionValue.term  
                )) {  
                    throw new VersionConflictEngineException(shardId, get.id(),  
                        get.getIfSeqNo(), get.getIfPrimaryTerm(), versionValue.seqNo, versionValue.term);  
                }  
                if (get.isReadFromTranslog()) {  
                    // this is only used for updates - API \_GET calls will always read form a reader for consistency  
                    // the update call doesn't need the consistency since it's source only + \_parent but parent can go away in 7.0  
                    if (versionValue.getLocation() != null) {  
                        try {  
                            final Translog.Operation operation = translog.readOperation(versionValue.getLocation());  
                            if (operation != null) {  
                                return getFromTranslog(get, (Translog.Index) operation, mapper, searcherWrapper);  
                            }  
                        } catch (IOException e) {  
                            maybeFailEngine("realtime\_get", e); // lets check if the translog has failed with a tragic event  
                            throw new EngineException(shardId, "failed to read operation from translog", e);  
                        }  
                    } else {  
                        trackTranslogLocation.set(true);  
                    }  
                }  
                assert versionValue.seqNo >= 0 : versionValue;  
                refreshIfNeeded("realtime\_get", versionValue.seqNo);  
            }  
            return getFromSearcher(get, acquireSearcher("realtime\_get", SearcherScope.INTERNAL, searcherWrapper));  
        } else {  
            // we expose what has been externally expose in a point in time snapshot via an explicit refresh  
            return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper));  
        }  
    }  
}  
  // org.elasticsearch.index.engine.Engine#ensureOpen()  
protected final void ensureOpen() {  
    ensureOpen(null);  
}  
protected final void ensureOpen(Exception suppressed) {  
    if (isClosed.get()) {  
        AlreadyClosedException ace = new AlreadyClosedException(shardId + " engine is closed", failedEngine.get());  
        if (suppressed != null) {  
            ace.addSuppressed(suppressed);  
        }  
        throw ace;  
    }  
}  
// org.elasticsearch.index.engine.LiveVersionMap#acquireLock  
/\*\*  
 \* Acquires a releaseable lock for the given uId. All \*UnderLock methods require  
 \* this lock to be hold by the caller otherwise the visibility guarantees of this version  
 \* map are broken. We assert on this lock to be hold when calling these methods.  
 \* @see KeyedLock  
 \*/  
Releasable acquireLock(BytesRef uid) {  
    return keyedLock.acquire(uid);  
}  
// org.elasticsearch.common.util.concurrent.KeyedLock#acquire  
/\*\*  
 \* Acquires a lock for the given key. The key is compared by it's equals method not by object identity. The lock can be acquired  
 \* by the same thread multiple times. The lock is released by closing the returned {@link Releasable}.  
 \*/  
public Releasable acquire(T key) {  
    while (true) {  
        KeyLock perNodeLock = map.get(key);  
        if (perNodeLock == null) {  
            ReleasableLock newLock = tryCreateNewLock(key);  
            if (newLock != null) {  
                return newLock;  
            }  
        } else {  
            assert perNodeLock != null;  
            int i = perNodeLock.count.get();  
            // 最终基于 ReentrantLock 实现锁, KeyLock 实现了计数  
            if (i > 0 && perNodeLock.count.compareAndSet(i, i + 1)) {  
                perNodeLock.lock();  
                return new ReleasableLock(key, perNodeLock);  
            }  
        }  
    }  
}

// org.elasticsearch.common.util.concurrent.KeyedLock#tryCreateNewLock  
private ReleasableLock tryCreateNewLock(T key) {  
    // KeyLock extends ReentrantLock  
    KeyLock newLock = new KeyLock(fair);  
    newLock.lock();  
    KeyLock keyLock = map.putIfAbsent(key, newLock);  
    if (keyLock == null) {  
        return new ReleasableLock(key, newLock);  
    }  
    return null;  
}  
// org.elasticsearch.index.engine.InternalEngine#getVersionFromMap  
private VersionValue getVersionFromMap(BytesRef id) {  
    if (versionMap.isUnsafe()) {  
        synchronized (versionMap) {  
            // we are switching from an unsafe map to a safe map. This might happen concurrently  
            // but we only need to do this once since the last operation per ID is to add to the version  
            // map so once we pass this point we can safely lookup from the version map.  
            if (versionMap.isUnsafe()) {  
                refresh("unsafe\_version\_map", SearcherScope.INTERNAL, true);  
            }  
            versionMap.enforceSafeAccess();  
        }  
    }  
    return versionMap.getUnderLock(id);  
}  
// org.elasticsearch.index.engine.LiveVersionMap#getUnderLock(org.apache.lucene.util.BytesRef)  
/\*\*  
 \* Returns the live version (add or delete) for this uid.  
 \*/  
VersionValue getUnderLock(final BytesRef uid) {  
    return getUnderLock(uid, maps);  
}

private VersionValue getUnderLock(final BytesRef uid, Maps currentMaps) {  
    assert assertKeyedLockHeldByCurrentThread(uid);  
    // First try to get the "live" value:  
    VersionValue value = currentMaps.current.get(uid);  
    if (value != null) {  
        return value;  
    }

    value = currentMaps.old.get(uid);  
    if (value != null) {  
        return value;  
    }  
    // 返回 null  
    return tombstones.get(uid);  
}  
// org.elasticsearch.index.engine.Engine#acquireSearcher  
public Searcher acquireSearcher(String source, SearcherScope scope, Function<Searcher, Searcher> wrapper) throws EngineException {  
    SearcherSupplier releasable = null;  
    try {  
        // 获取 searcher , soure=realtime\_get  
        SearcherSupplier reader = releasable = acquireSearcherSupplier(wrapper, scope);  
        Searcher searcher = reader.acquireSearcher(source);  
        releasable = null;  
        return new Searcher(source, searcher.getDirectoryReader(), searcher.getSimilarity(),  
            searcher.getQueryCache(), searcher.getQueryCachingPolicy(), () -> Releasables.close(searcher, reader));  
    } finally {  
        Releasables.close(releasable);  
    }  
}  
// org.elasticsearch.index.engine.Engine#acquireSearcherSupplier  
/\*\*  
 \* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.  
 \*/  
public SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wrapper, SearcherScope scope) throws EngineException {  
    /\* Acquire order here is store -> manager since we need  
     \* to make sure that the store is not closed before  
     \* the searcher is acquired. \*/  
    if (store.tryIncRef() == false) {  
        throw new AlreadyClosedException(shardId + " store is closed", failedEngine.get());  
    }  
    Releasable releasable = store::decRef;  
    try {  
        // 引用计数+1  
        ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);  
        ElasticsearchDirectoryReader acquire = referenceManager.acquire();  
        SearcherSupplier reader = new SearcherSupplier(wrapper) {  
            @Override  
            public Searcher acquireSearcherInternal(String source) {  
                assert assertSearcherIsWarmedUp(source, scope);  
                return new Searcher(source, acquire, engineConfig.getSimilarity(), engineConfig.getQueryCache(),  
                    engineConfig.getQueryCachingPolicy(), () -> {});  
            }

            @Override  
            protected void doClose() {  
                try {  
                    referenceManager.release(acquire);  
                } catch (IOException e) {  
                    throw new UncheckedIOException("failed to close", e);  
                } catch (AlreadyClosedException e) {  
                    // This means there's a bug somewhere: don't suppress it  
                    throw new AssertionError(e);  
                } finally {  
                    store.decRef();  
                }  
            }  
        };  
        releasable = null; // success - hand over the reference to the engine reader  
        return reader;  
    } catch (AlreadyClosedException ex) {  
        throw ex;  
    } catch (Exception ex) {  
        maybeFailEngine("acquire\_reader", ex);  
        ensureOpen(ex); // throw EngineCloseException here if we are already closed  
        logger.error(() -> new ParameterizedMessage("failed to acquire reader"), ex);  
        throw new EngineException(shardId, "failed to acquire reader", ex);  
    } finally {  
        Releasables.close(releasable);  
    }  
}  
    // org.elasticsearch.index.engine.Engine.SearcherSupplier#acquireSearcher  
    public final Searcher acquireSearcher(String source) {  
        if (released.get()) {  
            throw new AlreadyClosedException("SearcherSupplier was closed");  
        }  
        // 新建 searcher  
        final Searcher searcher = acquireSearcherInternal(source);  
        return CAN\_MATCH\_SEARCH\_SOURCE.equals(source) ? searcher : wrapper.apply(searcher);  
    }  
    // org.elasticsearch.index.engine.Engine.Searcher#Searcher  
    public Searcher(String source, IndexReader reader,  
                    Similarity similarity, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,  
                    Closeable onClose) {  
        super(reader);  
        setSimilarity(similarity);  
        setQueryCache(queryCache);  
        setQueryCachingPolicy(queryCachingPolicy);  
        this.source = source;  
        this.onClose = onClose;  
    }  
// org.elasticsearch.index.shard.IndexShard#wrapSearcher  
private Engine.Searcher wrapSearcher(Engine.Searcher searcher) {  
    assert ElasticsearchDirectoryReader.unwrap(searcher.getDirectoryReader())  
        != null : "DirectoryReader must be an instance or ElasticsearchDirectoryReader";  
    boolean success = false;  
    try {  
        final Engine.Searcher newSearcher = readerWrapper == null ? searcher : wrapSearcher(searcher, readerWrapper);  
        assert newSearcher != null;  
        success = true;  
        return newSearcher;  
    } catch (IOException ex) {  
        throw new ElasticsearchException("failed to wrap searcher", ex);  
    } finally {  
        if (success == false) {  
            Releasables.close(success, searcher);  
        }  
    }  
}  
// org.elasticsearch.index.shard.IndexShard#wrapSearcher  
static Engine.Searcher wrapSearcher(Engine.Searcher engineSearcher,  
                                    CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper) throws IOException {  
    assert readerWrapper != null;  
    final ElasticsearchDirectoryReader elasticsearchDirectoryReader =  
        ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(engineSearcher.getDirectoryReader());  
    if (elasticsearchDirectoryReader == null) {  
        throw new IllegalStateException("Can't wrap non elasticsearch directory reader");  
    }  
    NonClosingReaderWrapper nonClosingReaderWrapper = new NonClosingReaderWrapper(engineSearcher.getDirectoryReader());  
    DirectoryReader reader = readerWrapper.apply(nonClosingReaderWrapper);  
    if (reader != nonClosingReaderWrapper) {  
        if (reader.getReaderCacheHelper() != elasticsearchDirectoryReader.getReaderCacheHelper()) {  
            throw new IllegalStateException("wrapped directory reader doesn't delegate IndexReader#getCoreCacheKey," +  
                " wrappers must override this method and delegate to the original readers core cache key. Wrapped readers can't be " +  
                "used as cache keys since their are used only per request which would lead to subtle bugs");  
        }  
        if (ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(reader) != elasticsearchDirectoryReader) {  
            // prevent that somebody wraps with a non-filter reader  
            throw new IllegalStateException("wrapped directory reader hides actual ElasticsearchDirectoryReader but shouldn't");  
        }  
    }

    if (reader == nonClosingReaderWrapper) {  
        return engineSearcher;  
    } else {  
        // we close the reader to make sure wrappers can release resources if needed....  
        // our NonClosingReaderWrapper makes sure that our reader is not closed  
        return new Engine.Searcher(engineSearcher.source(), reader,  
            engineSearcher.getSimilarity(), engineSearcher.getQueryCache(), engineSearcher.getQueryCachingPolicy(),  
            () -> IOUtils.close(reader, // this will close the wrappers excluding the NonClosingReaderWrapper  
                engineSearcher)); // this will run the closeable on the wrapped engine reader  
    }  
}

  获取到searcher实例后,再由其进行get操作。

// org.elasticsearch.index.engine.Engine#getFromSearcher  
protected final GetResult getFromSearcher(Get get, Engine.Searcher searcher) throws EngineException {  
    final DocIdAndVersion docIdAndVersion;  
    try {  
        // 读取docId, version, 其实就是核心的读数据实现之一  
        docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.getIndexReader(), get.uid(), true);  
    } catch (Exception e) {  
        Releasables.closeWhileHandlingException(searcher);  
        //TODO: A better exception goes here  
        throw new EngineException(shardId, "Couldn't resolve version", e);  
    }

    if (docIdAndVersion != null) {  
        // 检测版本是否冲突,如有则抛出异常  
        if (get.versionType().isVersionConflictForReads(docIdAndVersion.version, get.version())) {  
            Releasables.close(searcher);  
            throw new VersionConflictEngineException(shardId, get.id(),  
                    get.versionType().explainConflictForReads(docIdAndVersion.version, get.version()));  
        }  
        if (get.getIfSeqNo() != SequenceNumbers.UNASSIGNED\_SEQ\_NO && (  
            get.getIfSeqNo() != docIdAndVersion.seqNo || get.getIfPrimaryTerm() != docIdAndVersion.primaryTerm  
        )) {  
            Releasables.close(searcher);  
            throw new VersionConflictEngineException(shardId, get.id(),  
                get.getIfSeqNo(), get.getIfPrimaryTerm(), docIdAndVersion.seqNo, docIdAndVersion.primaryTerm);  
        }  
    }

    if (docIdAndVersion != null) {  
        // don't release the searcher on this path, it is the  
        // responsibility of the caller to call GetResult.release  
        return new GetResult(searcher, docIdAndVersion, false);  
    } else {  
        Releasables.close(searcher);  
        return GetResult.NOT\_EXISTS;  
    }  
}  
// org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver#loadDocIdAndVersion  
/\*\*  
 \* Load the internal doc ID and version for the uid from the reader, returning<ul>  
 \* <li>null if the uid wasn't found,  
 \* <li>a doc ID and a version otherwise  
 \* </ul>  
 \*/  
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term, boolean loadSeqNo) throws IOException {  
    // feild: \_id  
    PerThreadIDVersionAndSeqNoLookup\[\] lookups = getLookupState(reader, term.field());  
    List<LeafReaderContext> leaves = reader.leaves();  
    // iterate backwards to optimize for the frequently updated documents  
    // which are likely to be in the last segments  
    for (int i = leaves.size() - 1; i >= 0; i--) {  
        final LeafReaderContext leaf = leaves.get(i);  
        PerThreadIDVersionAndSeqNoLookup lookup = lookups\[leaf.ord\];  
        DocIdAndVersion result = lookup.lookupVersion(term.bytes(), loadSeqNo, leaf);  
        if (result != null) {  
            return result;  
        }  
    }  
    return null;  
}  
// org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver#getLookupState  
private static PerThreadIDVersionAndSeqNoLookup\[\] getLookupState(IndexReader reader, String uidField) throws IOException {  
    // We cache on the top level  
    // This means cache entries have a shorter lifetime, maybe as low as 1s with the  
    // default refresh interval and a steady indexing rate, but on the other hand it  
    // proved to be cheaper than having to perform a CHM and a TL get for every segment.  
    // See https://github.com/elastic/elasticsearch/pull/19856.  
    IndexReader.CacheHelper cacheHelper = reader.getReaderCacheHelper();  
    CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup\[\]> ctl = lookupStates.get(cacheHelper.getKey());  
    if (ctl == null) {  
        // First time we are seeing this reader's core; make a new CTL:  
        ctl = new CloseableThreadLocal<>();  
        CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup\[\]> other = lookupStates.putIfAbsent(cacheHelper.getKey(), ctl);  
        if (other == null) {  
            // Our CTL won, we must remove it when the reader is closed:  
            cacheHelper.addClosedListener(removeLookupState);  
        } else {  
            // Another thread beat us to it: just use their CTL:  
            ctl = other;  
        }  
    }

    PerThreadIDVersionAndSeqNoLookup\[\] lookupState = ctl.get();  
    if (lookupState == null) {  
        lookupState = new PerThreadIDVersionAndSeqNoLookup\[reader.leaves().size()\];  
        for (LeafReaderContext leaf : reader.leaves()) {  
            lookupState\[leaf.ord\] = new PerThreadIDVersionAndSeqNoLookup(leaf.reader(), uidField);  
        }  
        ctl.set(lookupState);  
    }

    if (lookupState.length != reader.leaves().size()) {  
        throw new AssertionError("Mismatched numbers of leaves: " + lookupState.length + " != " + reader.leaves().size());  
    }

    if (lookupState.length > 0 && Objects.equals(lookupState\[0\].uidField, uidField) == false) {  
        throw new AssertionError("Index does not consistently use the same uid field: \["  
                + uidField + "\] != \[" + lookupState\[0\].uidField + "\]");  
    }

    return lookupState;  
}  

// org.apache.lucene.index.IndexReader#leaves
/**
* Returns the reader's leaves, or itself if this reader is atomic.
* This is a convenience method calling {@code this.getContext().leaves()}.
* @see IndexReaderContext#leaves()
*/
public final List leaves() {
return getContext().leaves();
}
// org.apache.lucene.index.CompositeReader#getContext
@Override
public final CompositeReaderContext getContext() {
ensureOpen();
// lazy init without thread safety for perf reasons: Building the readerContext twice does not hurt!
if (readerContext == null) {
assert getSequentialSubReaders() != null;
readerContext = CompositeReaderContext.create(this);
}
return readerContext;
}
// org.apache.lucene.index.CompositeReaderContext#leaves
@Override
public List leaves() throws UnsupportedOperationException {
if (!isTopLevel)
throw new UnsupportedOperationException("This is not a top-level context.");
assert leaves != null;
return leaves;
}
// org.elasticsearch.common.lucene.uid.PerThreadIDVersionAndSeqNoLookup#lookupVersion
/** Return null if id is not found.
* We pass the {@link LeafReaderContext} as an argument so that things
* still work with reader wrappers that hide some documents while still
* using the same cache key. Otherwise we'd have to disable caching
* entirely for these readers.
*/
public DocIdAndVersion lookupVersion(BytesRef id, boolean loadSeqNo, LeafReaderContext context)
throws IOException {
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
"context's reader is not the same as the reader class was initialized on.";
// docId 是个整型?
int docID = getDocID(id, context);

    if (docID != DocIdSetIterator.NO\_MORE\_DOCS) {  
        final long seqNo;  
        final long term;  
        if (loadSeqNo) {  
            seqNo = readNumericDocValues(context.reader(), SeqNoFieldMapper.NAME, docID);  
            term = readNumericDocValues(context.reader(), SeqNoFieldMapper.PRIMARY\_TERM\_NAME, docID);  
        } else {  
            seqNo = UNASSIGNED\_SEQ\_NO;  
            term = UNASSIGNED\_PRIMARY\_TERM;  
        }  
        final long version = readNumericDocValues(context.reader(), VersionFieldMapper.NAME, docID);  
        return new DocIdAndVersion(docID, version, seqNo, term, context.reader(), context.docBase);  
    } else {  
        return null;  
    }  
}  
// org.elasticsearch.common.lucene.uid.PerThreadIDVersionAndSeqNoLookup#getDocID  
/\*\*  
 \* returns the internal lucene doc id for the given id bytes.  
 \* {@link DocIdSetIterator#NO\_MORE\_DOCS} is returned if not found  
 \* \*/  
private int getDocID(BytesRef id, LeafReaderContext context) throws IOException {  
    // termsEnum can possibly be null here if this leaf contains only no-ops.  
    // lucene 接口: seekExact  
    if (termsEnum != null && termsEnum.seekExact(id)) {  
        final Bits liveDocs = context.reader().getLiveDocs();  
        int docID = DocIdSetIterator.NO\_MORE\_DOCS;  
        // there may be more than one matching docID, in the case of nested docs, so we want the last one:  
        docsEnum = termsEnum.postings(docsEnum, 0);  
        for (int d = docsEnum.nextDoc(); d != DocIdSetIterator.NO\_MORE\_DOCS; d = docsEnum.nextDoc()) {  
            if (liveDocs != null && liveDocs.get(d) == false) {  
                continue;  
            }  
            docID = d;  
        }  
        return docID;  
    } else {  
        return DocIdSetIterator.NO\_MORE\_DOCS;  
    }  
}

// org.elasticsearch.common.lucene.uid.PerThreadIDVersionAndSeqNoLookup#readNumericDocValues  
private static long readNumericDocValues(LeafReader reader, String field, int docId) throws IOException {  
    final NumericDocValues dv = reader.getNumericDocValues(field);  
    if (dv == null || dv.advanceExact(docId) == false) {  
        assert false : "document \[" + docId + "\] does not have docValues for \[" + field + "\]";  
        throw new IllegalStateException("document \[" + docId + "\] does not have docValues for \[" + field + "\]");  
    }  
    return dv.longValue();  
}  
// docId 包装  
// org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion  
/\*\* Wraps an {@link LeafReaderContext}, a doc ID <b>relative to the context doc base</b> and a version. \*/  
public static class DocIdAndVersion {  
    public final int docId;  
    public final long version;  
    public final long seqNo;  
    public final long primaryTerm;  
    public final LeafReader reader;  
    public final int docBase;

    public DocIdAndVersion(int docId, long version, long seqNo, long primaryTerm, LeafReader reader, int docBase) {  
        this.docId = docId;  
        this.version = version;  
        this.seqNo = seqNo;  
        this.primaryTerm = primaryTerm;  
        this.reader = reader;  
        this.docBase = docBase;  
    }  
}  
    // org.elasticsearch.index.VersionType#isVersionConflictForReads  
    @Override  
    public boolean isVersionConflictForReads(long currentVersion, long expectedVersion) {  
        return isVersionConflict(currentVersion, expectedVersion, false);  
    }  
    private boolean isVersionConflict(long currentVersion, long expectedVersion, boolean deleted) {  
        if (expectedVersion == Versions.MATCH\_ANY) {  
            return false;  
        }  
        if (expectedVersion == Versions.MATCH\_DELETED) {  
            return deleted == false;  
        }  
        if (currentVersion != expectedVersion) {  
            return true;  
        }  
        return false;  
    }  
    // 最后,构造 GetResult 返回  
    // org.elasticsearch.index.engine.Engine.GetResult#GetResult  
    public GetResult(Engine.Searcher searcher, DocIdAndVersion docIdAndVersion, boolean fromTranslog) {  
        this(true, docIdAndVersion.version, docIdAndVersion, searcher, fromTranslog);  
    }

    private GetResult(boolean exists, long version, DocIdAndVersion docIdAndVersion, Engine.Searcher searcher, boolean fromTranslog) {  
        this.exists = exists;  
        this.version = version;  
        this.docIdAndVersion = docIdAndVersion;  
        this.searcher = searcher;  
        this.fromTranslog = fromTranslog;  
        assert fromTranslog == false || searcher.getIndexReader() instanceof TranslogLeafReader;  
    }

  前面根据id调用 lucene 接口获取到了docId, 接下来根据docId查询doc详情:

// org.elasticsearch.index.get.ShardGetService#innerGetLoadFromStoredFields  
private GetResult innerGetLoadFromStoredFields(String type, String id, String\[\] storedFields, FetchSourceContext fetchSourceContext,  
                                               Engine.GetResult get, MapperService mapperService) {  
    assert get.exists() : "method should only be called if document could be retrieved";

    // check first if stored fields to be loaded don't contain an object field  
    DocumentMapper docMapper = mapperService.documentMapper();  
    if (storedFields != null) {  
        for (String field : storedFields) {  
            Mapper fieldMapper = docMapper.mappers().getMapper(field);  
            if (fieldMapper == null) {  
                if (docMapper.mappers().objectMappers().get(field) != null) {  
                    // Only fail if we know it is a object field, missing paths / fields shouldn't fail.  
                    throw new IllegalArgumentException("field \[" + field + "\] isn't a leaf field");  
                }  
            }  
        }  
    }

    Map<String, DocumentField> documentFields = null;  
    Map<String, DocumentField> metadataFields = null;  
    BytesReference source = null;  
    DocIdAndVersion docIdAndVersion = get.docIdAndVersion();  
    // force fetching source if we read from translog and need to recreate stored fields  
    boolean forceSourceForComputingTranslogStoredFields = get.isFromTranslog() && storedFields != null &&  
            Stream.of(storedFields).anyMatch(f -> TranslogLeafReader.ALL\_FIELD\_NAMES.contains(f) == false);  
    // 构造字段访问器  
    FieldsVisitor fieldVisitor = buildFieldsVisitors(storedFields,  
        forceSourceForComputingTranslogStoredFields ? FetchSourceContext.FETCH\_SOURCE : fetchSourceContext);  
    if (fieldVisitor != null) {  
        try {  
            // 一个字段读取  
            docIdAndVersion.reader.document(docIdAndVersion.docId, fieldVisitor);  
        } catch (IOException e) {  
            throw new ElasticsearchException("Failed to get type \[" + type + "\] and id \[" + id + "\]", e);  
        }  
        source = fieldVisitor.source();

        // in case we read from translog, some extra steps are needed to make \_source consistent and to load stored fields  
        if (get.isFromTranslog()) {  
            // Fast path: if only asked for the source or stored fields that have been already provided by TranslogLeafReader,  
            // just make source consistent by reapplying source filters from mapping (possibly also nulling the source)  
            if (forceSourceForComputingTranslogStoredFields == false) {  
                try {  
                    source = indexShard.mapperService().documentMapper().sourceMapper().applyFilters(source, null);  
                } catch (IOException e) {  
                    throw new ElasticsearchException("Failed to reapply filters for \[" + id + "\] after reading from translog", e);  
                }  
            } else {  
                // Slow path: recreate stored fields from original source  
                assert source != null : "original source in translog must exist";  
                SourceToParse sourceToParse = new SourceToParse(shardId.getIndexName(), type, id, source,  
                    XContentHelper.xContentType(source), fieldVisitor.routing());  
                ParsedDocument doc = indexShard.mapperService().documentMapper().parse(sourceToParse);  
                assert doc.dynamicMappingsUpdate() == null : "mapping updates should not be required on already-indexed doc";  
                // update special fields  
                doc.updateSeqID(docIdAndVersion.seqNo, docIdAndVersion.primaryTerm);  
                doc.version().setLongValue(docIdAndVersion.version);

                // retrieve stored fields from parsed doc  
                fieldVisitor = buildFieldsVisitors(storedFields, fetchSourceContext);  
                for (IndexableField indexableField : doc.rootDoc().getFields()) {  
                    IndexableFieldType fieldType = indexableField.fieldType();  
                    if (fieldType.stored()) {  
                        FieldInfo fieldInfo = new FieldInfo(indexableField.name(), 0, false, false, false, IndexOptions.NONE,  
                            DocValuesType.NONE, -1, Collections.emptyMap(), 0, 0, 0, false);  
                        StoredFieldVisitor.Status status = fieldVisitor.needsField(fieldInfo);  
                        if (status == StoredFieldVisitor.Status.YES) {  
                            if (indexableField.numericValue() != null) {  
                                fieldVisitor.objectField(fieldInfo, indexableField.numericValue());  
                            } else if (indexableField.binaryValue() != null) {  
                                fieldVisitor.binaryField(fieldInfo, indexableField.binaryValue());  
                            } else if (indexableField.stringValue() != null) {  
                                fieldVisitor.objectField(fieldInfo, indexableField.stringValue());  
                            }  
                        } else if (status == StoredFieldVisitor.Status.STOP) {  
                            break;  
                        }  
                    }  
                }  
                // retrieve source (with possible transformations, e.g. source filters  
                source = fieldVisitor.source();  
            }  
        }

        // put stored fields into result objects  
        if (!fieldVisitor.fields().isEmpty()) {  
            fieldVisitor.postProcess(mapperService::fieldType,  
                mapperService.documentMapper() == null ? null : mapperService.documentMapper().type());  
            documentFields = new HashMap<>();  
            metadataFields = new HashMap<>();  
            for (Map.Entry<String, List<Object>> entry : fieldVisitor.fields().entrySet()) {  
                if (mapperService.isMetadataField(entry.getKey())) {  
                    metadataFields.put(entry.getKey(), new DocumentField(entry.getKey(), entry.getValue()));  
                } else {  
                    documentFields.put(entry.getKey(), new DocumentField(entry.getKey(), entry.getValue()));  
                }  
            }  
        }  
    }

    if (source != null) {  
        // apply request-level source filtering  
        if (fetchSourceContext.fetchSource() == false) {  
            source = null;  
        } else if (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0) {  
            Map<String, Object> sourceAsMap;  
            // TODO: The source might be parsed and available in the sourceLookup but that one uses unordered maps so different.  
            //  Do we care?  
            Tuple<XContentType, Map<String, Object>> typeMapTuple = XContentHelper.convertToMap(source, true);  
            XContentType sourceContentType = typeMapTuple.v1();  
            sourceAsMap = typeMapTuple.v2();  
            sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());  
            try {  
                source = BytesReference.bytes(XContentFactory.contentBuilder(sourceContentType).map(sourceAsMap));  
            } catch (IOException e) {  
                throw new ElasticsearchException("Failed to get id \[" + id + "\] with includes/excludes set", e);  
            }  
        }  
    }

    if (!fetchSourceContext.fetchSource()) {  
        source = null;  
    }

    if (source != null && get.isFromTranslog()) {  
        // reapply source filters from mapping (possibly also nulling the source)  
        try {  
            source = docMapper.sourceMapper().applyFilters(source, null);  
        } catch (IOException e) {  
            throw new ElasticsearchException("Failed to reapply filters for \[" + id + "\] after reading from translog", e);  
        }  
    }

    if (source != null && (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0)) {  
        Map<String, Object> sourceAsMap;  
        // TODO: The source might parsed and available in the sourceLookup but that one uses unordered maps so different. Do we care?  
        Tuple<XContentType, Map<String, Object>> typeMapTuple = XContentHelper.convertToMap(source, true);  
        XContentType sourceContentType = typeMapTuple.v1();  
        sourceAsMap = typeMapTuple.v2();  
        sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());  
        try {  
            source = BytesReference.bytes(XContentFactory.contentBuilder(sourceContentType).map(sourceAsMap));  
        } catch (IOException e) {  
            throw new ElasticsearchException("Failed to get type \[" + type + "\] and id \[" + id + "\] with includes/excludes set", e);  
        }  
    }  
    // 构造新的 GetResult 返回, source 就是最终查询的结果  
    return new GetResult(shardId.getIndexName(), type, id, get.docIdAndVersion().seqNo, get.docIdAndVersion().primaryTerm,  
        get.version(), get.exists(), source, documentFields, metadataFields);  
}

private static FieldsVisitor buildFieldsVisitors(String\[\] fields, FetchSourceContext fetchSourceContext) {  
    if (fields == null || fields.length == 0) {  
        return fetchSourceContext.fetchSource() ? new FieldsVisitor(true) : null;  
    }

    return new CustomFieldsVisitor(Sets.newHashSet(fields), fetchSourceContext.fetchSource());  
}

  最后,一点lucene的查找实现片段,供参考。

// 快速定位到指定id的文档位置,相当于索引定位
// org.apache.lucene.codecs.blocktree.SegmentTermsEnum#seekExact(org.apache.lucene.util.BytesRef)
@Override
public boolean seekExact(BytesRef target) throws IOException {

if (fr.index == null) {  
  throw new IllegalStateException("terms index was not loaded");  
}

if (fr.size() > 0 && (target.compareTo(fr.getMin()) < 0 || target.compareTo(fr.getMax()) > 0)) {  
    return false;  
}

term.grow(1 + target.length);

assert clearEOF();

// if (DEBUG) {  
//   System.out.println("\\nBTTR.seekExact seg=" + fr.parent.segment + " target=" + fr.fieldInfo.name + ":" + brToString(target) + " current=" + brToString(term) + " (exists?=" + termExists + ") validIndexPrefix=" + validIndexPrefix);  
//   printSeekState(System.out);  
// }

FST.Arc<BytesRef> arc;  
int targetUpto;  
BytesRef output;

targetBeforeCurrentLength = currentFrame.ord;

if (currentFrame != staticFrame) {

  // We are already seek'd; find the common  
  // prefix of new seek term vs current term and  
  // re-use the corresponding seek state.  For  
  // example, if app first seeks to foobar, then  
  // seeks to foobaz, we can re-use the seek state  
  // for the first 5 bytes.

  // if (DEBUG) {  
  //   System.out.println("  re-use current seek state validIndexPrefix=" + validIndexPrefix);  
  // }

  arc = arcs\[0\];  
  assert arc.isFinal();  
  output = arc.output();  
  targetUpto = 0;

  SegmentTermsEnumFrame lastFrame = stack\[0\];  
  assert validIndexPrefix <= term.length();

  final int targetLimit = Math.min(target.length, validIndexPrefix);

  int cmp = 0;

  // TODO: reverse vLong byte order for better FST  
  // prefix output sharing

  // First compare up to valid seek frames:  
  while (targetUpto < targetLimit) {  
    cmp = (term.byteAt(targetUpto)&0xFF) - (target.bytes\[target.offset + targetUpto\]&0xFF);  
    // if (DEBUG) {  
    //    System.out.println("    cycle targetUpto=" + targetUpto + " (vs limit=" + targetLimit + ") cmp=" + cmp + " (targetLabel=" + (char) (target.bytes\[target.offset + targetUpto\]) + " vs termLabel=" + (char) (term.bytes\[targetUpto\]) + ")"   + " arc.output=" + arc.output + " output=" + output);  
    // }  
    if (cmp != 0) {  
      break;  
    }  
    arc = arcs\[1+targetUpto\];  
    assert arc.label() == (target.bytes\[target.offset + targetUpto\] & 0xFF): "arc.label=" + (char) arc.label() + " targetLabel=" + (char) (target.bytes\[target.offset + targetUpto\] & 0xFF);  
    if (arc.output() != BlockTreeTermsReader.NO\_OUTPUT) {  
      output = BlockTreeTermsReader.FST\_OUTPUTS.add(output, arc.output());  
    }  
    if (arc.isFinal()) {  
      lastFrame = stack\[1+lastFrame.ord\];  
    }  
    targetUpto++;  
  }

  if (cmp == 0) {  
    final int targetUptoMid = targetUpto;

    // Second compare the rest of the term, but  
    // don't save arc/output/frame; we only do this  
    // to find out if the target term is before,  
    // equal or after the current term  
    final int targetLimit2 = Math.min(target.length, term.length());  
    while (targetUpto < targetLimit2) {  
      cmp = (term.byteAt(targetUpto)&0xFF) - (target.bytes\[target.offset + targetUpto\]&0xFF);  
      // if (DEBUG) {  
      //    System.out.println("    cycle2 targetUpto=" + targetUpto + " (vs limit=" + targetLimit + ") cmp=" + cmp + " (targetLabel=" + (char) (target.bytes\[target.offset + targetUpto\]) + " vs termLabel=" + (char) (term.bytes\[targetUpto\]) + ")");  
      // }  
      if (cmp != 0) {  
        break;  
      }  
      targetUpto++;  
    }

    if (cmp == 0) {  
      cmp = term.length() - target.length;  
    }  
    targetUpto = targetUptoMid;  
  }

  if (cmp < 0) {  
    // Common case: target term is after current  
    // term, ie, app is seeking multiple terms  
    // in sorted order  
    // if (DEBUG) {  
    //   System.out.println("  target is after current (shares prefixLen=" + targetUpto + "); frame.ord=" + lastFrame.ord);  
    // }  
    currentFrame = lastFrame;

  } else if (cmp > 0) {  
    // Uncommon case: target term  
    // is before current term; this means we can  
    // keep the currentFrame but we must rewind it  
    // (so we scan from the start)  
    targetBeforeCurrentLength = lastFrame.ord;  
    // if (DEBUG) {  
    //   System.out.println("  target is before current (shares prefixLen=" + targetUpto + "); rewind frame ord=" + lastFrame.ord);  
    // }  
    currentFrame = lastFrame;  
    currentFrame.rewind();  
  } else {  
    // Target is exactly the same as current term  
    assert term.length() == target.length;  
    if (termExists) {  
      // if (DEBUG) {  
      //   System.out.println("  target is same as current; return true");  
      // }  
      return true;  
    } else {  
      // if (DEBUG) {  
      //   System.out.println("  target is same as current but term doesn't exist");  
      // }  
    }  
    //validIndexPrefix = currentFrame.depth;  
    //term.length = target.length;  
    //return termExists;  
  }

} else {

  targetBeforeCurrentLength = -1;  
  arc = fr.index.getFirstArc(arcs\[0\]);

  // Empty string prefix must have an output (block) in the index!  
  assert arc.isFinal();  
  assert arc.output() != null;

  // if (DEBUG) {  
  //   System.out.println("    no seek state; push root frame");  
  // }

  output = arc.output();

  currentFrame = staticFrame;

  //term.length = 0;  
  targetUpto = 0;  
  currentFrame = pushFrame(arc, BlockTreeTermsReader.FST\_OUTPUTS.add(output, arc.nextFinalOutput()), 0);  
}

// if (DEBUG) {  
//   System.out.println("  start index loop targetUpto=" + targetUpto + " output=" + output + " currentFrame.ord=" + currentFrame.ord + " targetBeforeCurrentLength=" + targetBeforeCurrentLength);  
// }

// We are done sharing the common prefix with the incoming target and where we are currently seek'd; now continue walking the index:  
while (targetUpto < target.length) {

  final int targetLabel = target.bytes\[target.offset + targetUpto\] & 0xFF;

  final FST.Arc<BytesRef> nextArc = fr.index.findTargetArc(targetLabel, arc, getArc(1+targetUpto), fstReader);

  if (nextArc == null) {

    // Index is exhausted  
    // if (DEBUG) {  
    //   System.out.println("    index: index exhausted label=" + ((char) targetLabel) + " " + toHex(targetLabel));  
    // }

    validIndexPrefix = currentFrame.prefix;  
    //validIndexPrefix = targetUpto;

    currentFrame.scanToFloorFrame(target);

    if (!currentFrame.hasTerms) {  
      termExists = false;  
      term.setByteAt(targetUpto, (byte) targetLabel);  
      term.setLength(1+targetUpto);  
      // if (DEBUG) {  
      //   System.out.println("  FAST NOT\_FOUND term=" + brToString(term));  
      // }  
      return false;  
    }

    currentFrame.loadBlock();

    final SeekStatus result = currentFrame.scanToTerm(target, true);  
    if (result == SeekStatus.FOUND) {  
      // if (DEBUG) {  
      //   System.out.println("  return FOUND term=" + term.utf8ToString() + " " + term);  
      // }  
      return true;  
    } else {  
      // if (DEBUG) {  
      //   System.out.println("  got " + result + "; return NOT\_FOUND term=" + brToString(term));  
      // }  
      return false;  
    }  
  } else {  
    // Follow this arc  
    arc = nextArc;  
    term.setByteAt(targetUpto, (byte) targetLabel);  
    // Aggregate output as we go:  
    assert arc.output() != null;  
    if (arc.output() != BlockTreeTermsReader.NO\_OUTPUT) {  
      output = BlockTreeTermsReader.FST\_OUTPUTS.add(output, arc.output());  
    }

    // if (DEBUG) {  
    //   System.out.println("    index: follow label=" + toHex(target.bytes\[target.offset + targetUpto\]&0xff) + " arc.output=" + arc.output + " arc.nfo=" + arc.nextFinalOutput);  
    // }  
    targetUpto++;

    if (arc.isFinal()) {  
      //if (DEBUG) System.out.println("    arc is final!");  
      currentFrame = pushFrame(arc, BlockTreeTermsReader.FST\_OUTPUTS.add(output, arc.nextFinalOutput()), targetUpto);  
      //if (DEBUG) System.out.println("    curFrame.ord=" + currentFrame.ord + " hasTerms=" + currentFrame.hasTerms);  
    }  
  }  
}

//validIndexPrefix = targetUpto;  
validIndexPrefix = currentFrame.prefix;

currentFrame.scanToFloorFrame(target);

// Target term is entirely contained in the index:  
if (!currentFrame.hasTerms) {  
  termExists = false;  
  term.setLength(targetUpto);  
  // if (DEBUG) {  
  //   System.out.println("  FAST NOT\_FOUND term=" + brToString(term));  
  // }  
  return false;  
}

currentFrame.loadBlock();

final SeekStatus result = currentFrame.scanToTerm(target, true);  
if (result == SeekStatus.FOUND) {  
  // if (DEBUG) {  
  //   System.out.println("  return FOUND term=" + term.utf8ToString() + " " + term);  
  // }  
  return true;  
} else {  
  // if (DEBUG) {  
  //   System.out.println("  got result " + result + "; return NOT\_FOUND term=" + term.utf8ToString());  
  // }

  return false;  
}  

}
// org.apache.lucene.codecs.blocktree.SegmentTermsEnum#postings
@Override
public PostingsEnum postings(PostingsEnum reuse, int flags) throws IOException {
assert !eof;
//if (DEBUG) {
//System.out.println("BTTR.docs seg=" + segment);
//}
currentFrame.decodeMetaData();
//if (DEBUG) {
//System.out.println(" state=" + currentFrame.state);
//}
return fr.parent.postingsReader.postings(fr.fieldInfo, currentFrame.state, reuse, flags);
}
// 根据docId 获取文档的详情信息
// org.apache.lucene.index.FilterLeafReader#document
@Override
public void document(int docID, StoredFieldVisitor visitor) throws IOException {
ensureOpen();
in.document(docID, visitor);
}
// org.apache.lucene.index.CodecReader#document
@Override
public final void document(int docID, StoredFieldVisitor visitor) throws IOException {
checkBounds(docID);
getFieldsReader().visitDocument(docID, visitor);
}
// org.apache.lucene.codecs.compressing.CompressingStoredFieldsReader#visitDocument
@Override
public void visitDocument(int docID, StoredFieldVisitor visitor)
throws IOException {

final SerializedDocument doc = document(docID);

for (int fieldIDX = 0; fieldIDX < doc.numStoredFields; fieldIDX++) {  
  final long infoAndBits = doc.in.readVLong();  
  final int fieldNumber = (int) (infoAndBits >>> TYPE\_BITS);  
  final FieldInfo fieldInfo = fieldInfos.fieldInfo(fieldNumber);

  final int bits = (int) (infoAndBits & TYPE\_MASK);  
  assert bits <= NUMERIC\_DOUBLE: "bits=" + Integer.toHexString(bits);

  switch(visitor.needsField(fieldInfo)) {  
    case YES:  
      readField(doc.in, visitor, fieldInfo, bits);  
      break;  
    case NO:  
      if (fieldIDX == doc.numStoredFields - 1) {// don't skipField on last field value; treat like STOP  
        return;  
      }  
      skipField(doc.in, bits);  
      break;  
    case STOP:  
      return;  
  }  
}  

}
// org.apache.lucene.codecs.compressing.CompressingStoredFieldsReader#document
SerializedDocument document(int docID) throws IOException {
// 定位文档位置,取出文档。。。
if (state.contains(docID) == false) {
fieldsStream.seek(indexReader.getStartPointer(docID));
state.reset(docID);
}
assert state.contains(docID);
return state.document(docID);
}
// org.apache.lucene.codecs.compressing.FieldsIndexReader#getStartPointer
@Override
long getStartPointer(int docID) {
FutureObjects.checkIndex(docID, maxDoc);
long blockIndex = docs.binarySearch(0, numChunks, docID);
if (blockIndex < 0) {
blockIndex = -2 - blockIndex;
}
return startPointers.get(blockIndex);
}
// org.apache.lucene.codecs.compressing.CompressingStoredFieldsReader.BlockState#document
/**
* Get the serialized representation of the given docID. This docID has
* to be contained in the current block.
*/
SerializedDocument document(int docID) throws IOException {
if (contains(docID) == false) {
throw new IllegalArgumentException();
}

  final int index = docID - docBase;  
  final int offset = Math.toIntExact(offsets\[index\]);  
  final int length = Math.toIntExact(offsets\[index+1\]) - offset;  
  final int totalLength = Math.toIntExact(offsets\[chunkDocs\]);  
  final int numStoredFields = Math.toIntExact(this.numStoredFields\[index\]);

  final BytesRef bytes;  
  if (merging) {  
    bytes = this.bytes;  
  } else {  
    bytes = new BytesRef();  
  }

  final DataInput documentInput;  
  if (length == 0) {  
    // empty  
    documentInput = new ByteArrayDataInput();  
  } else if (merging) {  
    // already decompressed  
    documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset + offset, length);  
  } else if (sliced) {  
    fieldsStream.seek(startPointer);  
    decompressor.decompress(fieldsStream, chunkSize, offset, Math.min(length, chunkSize - offset), bytes);  
    documentInput = new DataInput() {

      int decompressed = bytes.length;

      void fillBuffer() throws IOException {  
        assert decompressed <= length;  
        if (decompressed == length) {  
          throw new EOFException();  
        }  
        final int toDecompress = Math.min(length - decompressed, chunkSize);  
        decompressor.decompress(fieldsStream, toDecompress, 0, toDecompress, bytes);  
        decompressed += toDecompress;  
      }

      @Override  
      public byte readByte() throws IOException {  
        if (bytes.length == 0) {  
          fillBuffer();  
        }  
        --bytes.length;  
        return bytes.bytes\[bytes.offset++\];  
      }

      @Override  
      public void readBytes(byte\[\] b, int offset, int len) throws IOException {  
        while (len > bytes.length) {  
          System.arraycopy(bytes.bytes, bytes.offset, b, offset, bytes.length);  
          len -= bytes.length;  
          offset += bytes.length;  
          fillBuffer();  
        }  
        System.arraycopy(bytes.bytes, bytes.offset, b, offset, len);  
        bytes.offset += len;  
        bytes.length -= len;  
      }

    };  
  } else {  
    fieldsStream.seek(startPointer);  
    decompressor.decompress(fieldsStream, totalLength, offset, length, bytes);  
    assert bytes.length == length;  
    documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset, bytes.length);  
  }

  return new SerializedDocument(documentInput, length, numStoredFields);  
}

5. 几个数据结构

  核心的几个get的外观数据结构主要有,请求时GetRequest, 查找时DocIdAndVersion, 真正的结果GetResult, 以及响应时的出参GetResponse. 其实前面已经可以查看,不过在此处做个归集,有意者阅之。

0. get的请求数据结构

// org.elasticsearch.action.get.GetRequest
public class GetRequest extends SingleShardRequest implements RealtimeRequest {

private String type;  
private String id;  
private String routing;  
private String preference;

private String\[\] storedFields;

private FetchSourceContext fetchSourceContext;

private boolean refresh = false;

boolean realtime = true;

private VersionType versionType = VersionType.INTERNAL;  
private long version = Versions.MATCH\_ANY;

public GetRequest() {  
    type = MapperService.SINGLE\_MAPPING\_NAME;  
}

GetRequest(StreamInput in) throws IOException {  
    super(in);  
    type = in.readString();  
    id = in.readString();  
    routing = in.readOptionalString();  
    if (in.getVersion().before(Version.V\_7\_0\_0)) {  
        in.readOptionalString();  
    }  
    preference = in.readOptionalString();  
    refresh = in.readBoolean();  
    storedFields = in.readOptionalStringArray();  
    realtime = in.readBoolean();

    this.versionType = VersionType.fromValue(in.readByte());  
    this.version = in.readLong();  
    fetchSourceContext = in.readOptionalWriteable(FetchSourceContext::new);  
}  
...  

}

1. docId 和 version 包含数据结构

// org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion  
/\*\* Wraps an {@link LeafReaderContext}, a doc ID <b>relative to the context doc base</b> and a version. \*/  
public static class DocIdAndVersion {  
    public final int docId;  
    public final long version;  
    public final long seqNo;  
    public final long primaryTerm;  
    public final LeafReader reader;  
    public final int docBase;

    public DocIdAndVersion(int docId, long version, long seqNo, long primaryTerm, LeafReader reader, int docBase) {  
        this.docId = docId;  
        this.version = version;  
        this.seqNo = seqNo;  
        this.primaryTerm = primaryTerm;  
        this.reader = reader;  
        this.docBase = docBase;  
    }  
}

2. GetResult 数据结构

// org.elasticsearch.index.get.GetResult
public class GetResult implements Writeable, Iterable, ToXContentObject {

public static final String \_INDEX = "\_index";  
public static final String \_TYPE = "\_type";  
public static final String \_ID = "\_id";  
private static final String \_VERSION = "\_version";  
private static final String \_SEQ\_NO = "\_seq\_no";  
private static final String \_PRIMARY\_TERM = "\_primary\_term";  
private static final String FOUND = "found";  
private static final String FIELDS = "fields";

private String index;  
private String type;  
private String id;  
private long version;  
private long seqNo;  
private long primaryTerm;  
private boolean exists;  
private final Map<String, DocumentField> documentFields;  
private final Map<String, DocumentField> metaFields;  
private Map<String, Object> sourceAsMap;  
private BytesReference source;  
private byte\[\] sourceAsBytes;

public GetResult(StreamInput in) throws IOException {  
    index = in.readString();  
    type = in.readOptionalString();  
    id = in.readString();  
    if (in.getVersion().onOrAfter(Version.V\_6\_6\_0)) {  
        seqNo = in.readZLong();  
        primaryTerm = in.readVLong();  
    } else {  
        seqNo = UNASSIGNED\_SEQ\_NO;  
        primaryTerm = UNASSIGNED\_PRIMARY\_TERM;  
    }  
    version = in.readLong();  
    exists = in.readBoolean();  
    if (exists) {  
        source = in.readBytesReference();  
        if (source.length() == 0) {  
            source = null;  
        }  
        if (in.getVersion().onOrAfter(Version.V\_7\_3\_0)) {  
            documentFields = readFields(in);  
            metaFields = readFields(in);  
        } else {  
            Map<String, DocumentField> fields = readFields(in);  
            documentFields = new HashMap<>();  
            metaFields = new HashMap<>();  
            fields.forEach((fieldName, docField) ->  
                (MapperService.META\_FIELDS\_BEFORE\_7DOT8.contains(fieldName) ? metaFields : documentFields).put(fieldName, docField));  
        }  
    } else {  
        metaFields = Collections.emptyMap();  
        documentFields = Collections.emptyMap();  
    }  
}

public GetResult(String index, String type, String id, long seqNo, long primaryTerm, long version, boolean exists,  
                 BytesReference source, Map<String, DocumentField> documentFields, Map<String, DocumentField> metaFields) {  
    this.index = index;  
    this.type = type;  
    this.id = id;  
    this.seqNo = seqNo;  
    this.primaryTerm = primaryTerm;  
    assert (seqNo == UNASSIGNED\_SEQ\_NO && primaryTerm == UNASSIGNED\_PRIMARY\_TERM) || (seqNo >= 0 && primaryTerm >= 1) :  
        "seqNo: " + seqNo + " primaryTerm: " + primaryTerm;  
    assert exists || (seqNo == UNASSIGNED\_SEQ\_NO && primaryTerm == UNASSIGNED\_PRIMARY\_TERM) :  
        "doc not found but seqNo/primaryTerm are set";  
    this.version = version;  
    this.exists = exists;  
    this.source = source;  
    this.documentFields = documentFields == null ? emptyMap() : documentFields;  
    this.metaFields = metaFields == null ? emptyMap() : metaFields;  
}  
...  

}

3. get响应结果 GetResponse

// org.elasticsearch.action.get.GetResponse
public class GetResponse extends ActionResponse implements Iterable, ToXContentObject {

GetResult getResult;

GetResponse(StreamInput in) throws IOException {  
    super(in);  
    getResult = new GetResult(in);  
}

public GetResponse(GetResult getResult) {  
    this.getResult = getResult;  
}  
...  

}