Spring与RestHighLevelClient
阅读原文时间:2023年07月09日阅读:2
  • Elasticsearch连接方式有两种;分别为TCP协议HTTP协议

最近使用es比较多,之前使用一直是使用spring封装的spring-data-elasticsearch;关于spring-data-elasticsearch有以下几点比较难受:

  • 基于TCP协议的使用(不确定是否支持http, 公司XX云大佬推荐使用HTTP协议,好像是官方推荐?)

  • 版本对应比较恶心人

  • 不好用

  • 基于以上几点,索性抛弃spring-data-elasticsearch,自己造轮子;

  • 根据 官方文档 描述,我们选择使用RestHighLevelClient来实现es基础查询;


官方描述:

The Java REST Client comes in 2 flavors:

Java Low Level REST Client: the official low-level client for Elasticsearch. It allows to communicate with an Elasticsearch cluster through http. Leaves requests marshalling and responses un-marshalling to users. It is compatible with all Elasticsearch versions.

Java High Level REST Client: the official high-level client for Elasticsearch. Based on the low-level client, it exposes API specific methods and takes care of requests marshalling and responses un-marshalling.

  • 提供Java Low Level REST Client 版本和 Java High Level REST Client 版本:

    • Java Low Level REST Client 与所有Elasticsearch版本兼容(版本问题舒服)
    • 通过HTTP协议与Elasticsearch集群进行通信(大佬推荐)
    • Java High Level REST Client 是基于Java Low Level REST Client 版本实现更多高级API
  • 很显然我们选择RestHighLevelClient


Spring整合RestHighLevelClient

  1. ##### 构建ElasticsearchClient
  • 查看RestHighLevelClient构造器可以发现可以使用RestClientBuilder来构建,简单demo如下

    /**
     * 连接超时时间
     */
    private final static int CONNECT_TIMEOUT = 5000;
    /**
     * 连接超时时间
     */
    private final static int SOCKET_TIMEOUT = 40000;
    /**
     * 获取连接的超时时间
     */
    private final static int CONNECTION_REQUEST_TIMEOUT = 1000;
    /**
     * 最大连接数
     */
    private final static int MAX_CONNECT_NUM = 100;
    /**
     * 最大路由连接数
     */
    private final static int MAX_CONNECT_ROUTE = 100;
    
    @Bean(name = "elasticsearchClient", destroyMethod = "close")
    public RestHighLevelClient client() {
        RestClientBuilder builder = RestClient.builder(new HttpHost("host", "port", "http"));
        // 配置一些请求配置的参数
        builder.setRequestConfigCallback(requestConfigBuilder -> {
            requestConfigBuilder.setConnectTimeout(CONNECT_TIMEOUT);
            requestConfigBuilder.setSocketTimeout(SOCKET_TIMEOUT);
            requestConfigBuilder.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT);
            return requestConfigBuilder;
        });
        // 配置一些httpClient的参数
        builder.setHttpClientConfigCallback(httpClientBuilder -> {
            httpClientBuilder.setMaxConnTotal(MAX_CONNECT_NUM);
            httpClientBuilder.setMaxConnPerRoute(MAX_CONNECT_ROUTE);
            return httpClientBuilder;
        });
        builder.setFailureListener(new RestClient.FailureListener(){
            @Override
            public void onFailure(HttpHost host) {
                // TODO do something when failed
                super.onFailure(host);
            }
        });
        return new RestHighLevelClient(builder);
    }
  • 支持一些回调与参数的配置,具体的API可自行查看RestClientBuilder的源码

  • 配置完client后我们可以使用client造一些简单的轮子, 如es默认查询只可以查询1000条数据,我们可以封装查询所有数据

    public List<SearchHit> searchAll(SearchRequest searchRequest) {
        try {
            List<SearchHit> hits = new ArrayList<>(16);
            int maxNum = searchRequest.source().size();
            searchRequest.scroll(TimeValue.timeValueMinutes(10));
            SearchResponse search = client.search(searchRequest);
            hits.addAll(Arrays.asList(search.getHits().getHits()));
            while (search.getHits().getHits().length == maxNum) {
                SearchScrollRequest searchScrollRequest = new SearchScrollRequest(search.getScrollId());
                searchScrollRequest.scroll(TimeValue.timeValueMinutes(10));
                search = client.searchScroll(searchScrollRequest);
                hits.addAll(Arrays.asList(search.getHits().getHits()));
            }
            return hits;
        } catch (IOException e) {
            log.error("Get message error.", e);
            return null;
        }
    }
  • 有了以上接口,我们可以查询一些常用数据,如以下为查询数据的简单使用:

        BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();
        boolBuilder.filter(QueryBuilders.termQuery("type", 0));
        boolBuilder.filter(QueryBuilders.termsQuery("id.keyword", id));
    RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("createTime");
    rangeQueryBuilder.gte(startTime);
    rangeQueryBuilder.lte(endTime);
    boolBuilder.filter(rangeQueryBuilder);
    
    SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
    sourceBuilder.size(9999);
    sourceBuilder.fetchSource(new String[]{"field1", "field2", "field3"}, new String[]{});
    sourceBuilder.query(boolBuilder);
    
    SearchRequest searchRequest = new SearchRequest("index");
    searchRequest.source(sourceBuilder);
    List&lt;SearchHit&gt; searchHits = repository.searchAll(searchRequest);</code></pre></li>
  • 具体API使用可查看官方文档


更新于2019-10-28

IndexRequest indexRequest = new IndexRequest(index, type, id);
indexRequest.source(entityMapper.mapToString(map), Requests.INDEX_CONTENT_TYPE);
return client.index(indexRequest);
  • 官方API中IndexRequest提供以下几种source方法:

    • 值得注意的是source(Map source)source(Map source, XContentType contentType) 方法,对于Map的传参,会进行类型校验;

    • 源码如下:

      public IndexRequest source(Map source, XContentType contentType) throws ElasticsearchGenerationException {
      try {
      XContentBuilder builder = XContentFactory.contentBuilder(contentType);
      builder.map(source);
      return source(builder);
      } catch (IOException e) {
      throw new ElasticsearchGenerationException("Failed to generate [" + source + "]", e);
      }
      }

  • 其中builder.map中的unknownValue方法会遍历参数进行逐一校验:

  • 源码如下:

    private XContentBuilder map(Map values, boolean ensureNoSelfReferences) throws IOException {
    if (values == null) {
    return this.nullValue();
    } else {
    if (ensureNoSelfReferences) {
    ensureNoSelfReferences(values);
    }

           this.startObject();
           Iterator var3 = values.entrySet().iterator();
       while(var3.hasNext()) {
           Entry&lt;String, ?&gt; value = (Entry)var3.next();
           this.field((String)value.getKey());
           this.unknownValue(value.getValue(), false);
       }
    
       this.endObject();
       return this;
    }

    }

  • 检验方法源码

    private void unknownValue(Object value, boolean ensureNoSelfReferences) throws IOException {
        if (value == null) {
            this.nullValue();
        } else {
            XContentBuilder.Writer writer = (XContentBuilder.Writer)WRITERS.get(value.getClass());
            if (writer != null) {
                writer.write(this, value);
            } else if (value instanceof Path) {
                this.value((Path)value);
            } else if (value instanceof Map) {
                Map<String, ?> valueMap = (Map)value;
                this.map(valueMap, ensureNoSelfReferences);
            } else if (value instanceof Iterable) {
                this.value((Iterable)value, ensureNoSelfReferences);
            } else if (value instanceof Object[]) {
                this.values((Object[])value, ensureNoSelfReferences);
            } else if (value instanceof ToXContent) {
                this.value((ToXContent)value);
            } else {
                if (!(value instanceof Enum)) {
                    throw new IllegalArgumentException("cannot write xcontent for unknown value of type " + value.getClass());
                }
            this.value(Objects.toString(value));
        }
    
    }
    }
  • 为了避免这个坑,可以使用jsonString来规避,具体使用如下:

    IndexRequest indexRequest = new IndexRequest(index, type, id);
        indexRequest.source(JSON.toJSONString(map), Requests.INDEX_CONTENT_TYPE);
        client.index(indexRequest);