03-springboot整合elasticsearch-源码初识
阅读原文时间:2023年07月10日阅读:1

    前面两个小节已经知道了spring boot怎么整合es,以及es的简单使用,但是springboot中是怎么和es服务器交互的。我们可以简单了解一下。要看一下源码

在看源码的同时,先要对springboot请求ES服务器的原理了解一下,ES官网(https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-update.html)给出了很详细的说明,可以自行进行了解。

1.RestClient es交互的基础服务器

对于单机es,一般使用的是ElasticsearchOperations

1.1 数据存储的具体过程

本质上还是使用ElasticsearchRestTemplate进行操作。

数据存储发起操作

1.存储数据
String documentId = operations.index(indexQuery,indexCoordinates);

2.index操作的过程
@Override
    public String index(IndexQuery query, IndexCoordinates index) {

        maybeCallbackBeforeConvertWithQuery(query, index);//实体类再保存操作之前的回调方法

        IndexRequest request = requestFactory.indexRequest(query, index);//==获取indexRequest的过程==
        String documentId = execute(client -> client.index(request, RequestOptions.DEFAULT).getId());//==与ES服务器交互过程==

        // We should call this because we are not going through a mapper.
        Object queryObject = query.getObject();
        if (queryObject != null) {
            setPersistentEntityId(queryObject, documentId);
        }

        maybeCallbackAfterSaveWithQuery(query, index);//实体类再保存之后的回调方法

        return documentId;
    }

查看IndexRequest的创建过程如下

    public IndexRequest indexRequest(IndexQuery query, IndexCoordinates index) {
        String indexName = index.getIndexName();

        IndexRequest indexRequest;

        if (query.getObject() != null) {
            String id = StringUtils.isEmpty(query.getId()) ? getPersistentEntityId(query.getObject()) : query.getId();
            // If we have a query id and a document id, do not ask ES to generate one.
            if (id != null) {
                indexRequest = new IndexRequest(indexName).id(id);
            } else {
                indexRequest = new IndexRequest(indexName);
            }
                        /**
                         * 1.将传来的object转成Map,再转成json串
                         * 2.将Object的json串转成字节BytesReference,请求的ContentType设置为Request.JSON方式
                         */
            indexRequest.source(elasticsearchConverter.mapObject(query.getObject()).toJson(), Requests.INDEX_CONTENT_TYPE);
        }
                // 省略一部分代码。。。。。
        return indexRequest;
    }

请求数据处理过程

public final IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException {
        return performRequestAndParseEntity(indexRequest, RequestConverters::index, options, IndexResponse::fromXContent, emptySet());
    }

1.RequestConverters::index 这个过程创建Request对象

static Request index(IndexRequest indexRequest) {
        String method = Strings.hasLength(indexRequest.id()) ? HttpPut.METHOD_NAME : HttpPost.METHOD_NAME; //根据有无ID选择传输方式是PUT还是POST
        boolean isCreate = (indexRequest.opType() == DocWriteRequest.OpType.CREATE);
        //拼接请求的uri
        String endpoint = endpoint(indexRequest.index(), indexRequest.type(), indexRequest.id(), isCreate ? "_create" : null);
        Request request = new Request(method, endpoint);
        //增加request的请求参数
        Params parameters = new Params(request);
        parameters.withRouting(indexRequest.routing());
        parameters.withParent(indexRequest.parent());
        parameters.withTimeout(indexRequest.timeout());
        parameters.withVersion(indexRequest.version());
        parameters.withVersionType(indexRequest.versionType());
        parameters.withIfSeqNo(indexRequest.ifSeqNo());
        parameters.withIfPrimaryTerm(indexRequest.ifPrimaryTerm());
        parameters.withPipeline(indexRequest.getPipeline());
        parameters.withRefreshPolicy(indexRequest.getRefreshPolicy());
        parameters.withWaitForActiveShards(indexRequest.waitForActiveShards(), ActiveShardCount.DEFAULT);
        //将请求的参数变成byte[]
        BytesRef source = indexRequest.source().toBytesRef();
        ContentType contentType = createContentType(indexRequest.getContentType());
        request.setEntity(new NByteArrayEntity(source.bytes, source.offset, source.length, contentType));
        return request;
    }

2.创建response

3.创建空的集合 : new EmptySet<>();

4. 给ES服务器发送数据

  private <Req, Resp> Resp internalPerformRequest(Req request,
                                            CheckedFunction<Req, Request, IOException> requestConverter,
                                            RequestOptions options,
                                            CheckedFunction<Response, Resp, IOException> responseConverter,
                                            Set<Integer> ignores) throws IOException {
        Request req = requestConverter.apply(request);
        req.setOptions(options);
        Response response;
        try {
            response = client.performRequest(req);
        } catch (ResponseException e) {
            if (ignores.contains(e.getResponse().getStatusLine().getStatusCode())) {
                try {
                    return responseConverter.apply(e.getResponse());
                } catch (Exception innerException) {
                    throw parseResponseException(e);
                }
            }
            throw parseResponseException(e);
        }

        try {
            return responseConverter.apply(response);
        } catch(Exception e) {
            throw new IOException("Unable to parse response body for " + response, e);
        }
    }

RestClien请求发送的过程

1.发送请求过程
 public Response performRequest(Request request) throws IOException {
        SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis);
        performRequestAsyncNoCatch(request, listener);
        return listener.get();
    }
  //创建请求的url,创建request对象
 void performRequestAsyncNoCatch(Request request, ResponseListener listener) throws IOException {
        Map<String, String> requestParams = new HashMap<>(request.getParameters());
        String ignoreString = requestParams.remove("ignore");
        Set<Integer> ignoreErrorCodes;
        if (ignoreString == null) {
            if (HttpHead.METHOD_NAME.equals(request.getMethod())) {
                //404 never causes error if returned for a HEAD request
                ignoreErrorCodes = Collections.singleton(404);
            } else {
                ignoreErrorCodes = Collections.emptySet();
            }
        } else {
            String[] ignoresArray = ignoreString.split(",");
            ignoreErrorCodes = new HashSet<>();
            if (HttpHead.METHOD_NAME.equals(request.getMethod())) {
                //404 never causes error if returned for a HEAD request
                ignoreErrorCodes.add(404);
            }
            for (String ignoreCode : ignoresArray) {
                try {
                    ignoreErrorCodes.add(Integer.valueOf(ignoreCode));
                } catch (NumberFormatException e) {
                    throw new IllegalArgumentException("ignore value should be a number, found [" + ignoreString + "] instead", e);
                }
            }
        }
        URI uri = buildUri(pathPrefix, request.getEndpoint(), requestParams);//创建url和请求参数拼接
        HttpRequestBase httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity());//创建request对象
        setHeaders(httpRequest, request.getOptions().getHeaders());
        FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(listener);
        long startTime = System.nanoTime();
        performRequestAsync(startTime, nextNode(), httpRequest, ignoreErrorCodes,
                request.getOptions().getWarningsHandler() == null ? warningsHandler : request.getOptions().getWarningsHandler(),
                request.getOptions().getHttpAsyncResponseConsumerFactory(), failureTrackingResponseListener);//发送过程
    }

  //数据发送,完成后将响应信息封装进response对象中
 private void performRequestAsync(final long startTime, final NodeTuple<Iterator<Node>> nodeTuple, final HttpRequestBase request,
                                     final Set<Integer> ignoreErrorCodes,
                                     final WarningsHandler thisWarningsHandler,
                                     final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
                                     final FailureTrackingResponseListener listener) {
        final Node node = nodeTuple.nodes.next(); //获取注册的节点
        final HttpAsyncRequestProducer requestProducer = HttpAsyncMethods.create(node.getHost(), request);
        final HttpAsyncResponseConsumer<HttpResponse> asyncResponseConsumer =
            httpAsyncResponseConsumerFactory.createHttpAsyncResponseConsumer();
        final HttpClientContext context = HttpClientContext.create();
        context.setAuthCache(nodeTuple.authCache);
        client.execute(requestProducer, asyncResponseConsumer, context, new FutureCallback<HttpResponse>() {
            @Override
            public void completed(HttpResponse httpResponse) { //执行完成后的回调方法
                try {
                    RequestLogger.logResponse(logger, request, node.getHost(), httpResponse);
                    int statusCode = httpResponse.getStatusLine().getStatusCode();
                    Response response = new Response(request.getRequestLine(), node.getHost(), httpResponse);
                    if (isSuccessfulResponse(statusCode) || ignoreErrorCodes.contains(response.getStatusLine().getStatusCode())) {
                        onResponse(node);
                        if (thisWarningsHandler.warningsShouldFailRequest(response.getWarnings())) {
                            listener.onDefinitiveFailure(new WarningFailureException(response));
                        } else {
                            listener.onSuccess(response);
                        }
                    } else {
                        ResponseException responseException = new ResponseException(response);
                        if (isRetryStatus(statusCode)) {
                            //mark host dead and retry against next one
                            onFailure(node);
                            retryIfPossible(responseException);
                        } else {
                            //mark host alive and don't retry, as the error should be a request problem
                            onResponse(node);
                            listener.onDefinitiveFailure(responseException);
                        }
                    }
                } catch(Exception e) {
                    listener.onDefinitiveFailure(e);
                }
            }
        });
    }

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章