3.3_springBoot2.1.x检索之RestHighLevelClient方式
阅读原文时间:2023年07月10日阅读:1

1、版本依赖

注意对 transport client不了解先阅读官方文档:

transport client(传送门)

这里需要版本匹配,如失败查看官网或百度。

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.9.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.jiatp</groupId>
    <artifactId>springboot-03-rest</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-03-rest</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>6.3.2</version>
        </dependency>
        <!-- Java Low Level REST Client -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>6.3.2</version>
        </dependency>
        <!-- Java High Level REST Client -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>6.3.2</version>
        </dependency>
        <!-- json转换 -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

2.配置客户端

ElasticsearchConfig.java

package com.jiatp.springboot.config;

import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.config.RequestConfig.Builder;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;

@Configuration
public class ElasticsearchConfig {

    @Value("${elasticsearch.host}")
    private String host;
    @Value("${elasticsearch.port}")
    private int port;
    @Value("${elasticsearch.schema}")
    private String schema;
    @Value("${elasticsearch.connectTimeOut}")
    private int connectTimeOut;
    @Value("${elasticsearch.socketTimeOut}")
    private int socketTimeOut;
    @Value("${elasticsearch.connectionRequestTimeOut}")
    private int connectionRequestTimeOut;
    @Value("${elasticsearch.maxConnectNum}")
    private int maxConnectNum;
    @Value("${elasticsearch.maxConnectPerRoute}")
    private int maxConnectPerRoute;
    private HttpHost httpHost;
    private boolean uniqueConnectTimeConfig = true;
    private boolean uniqueConnectNumConfig = true;
    private RestClientBuilder builder;
    private RestHighLevelClient client;

    /**
     * 返回一个RestHighLevelClient
     *
     * @return
     */
    @Bean(autowire = Autowire.BY_NAME, name = "restHighLevelClient")
    public RestHighLevelClient client() {
        httpHost= new HttpHost(host, port, schema);
        builder = RestClient.builder(httpHost);
        if (uniqueConnectTimeConfig) {
            setConnectTimeOutConfig();
        }
        if (uniqueConnectNumConfig) {
            setMutiConnectConfig();
        }
        client = new RestHighLevelClient(builder);
        return client;
    }

    /**
     * 异步httpclient的连接延时配置
     */
    public void setConnectTimeOutConfig() {
        builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
            @Override
            public Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
                requestConfigBuilder.setConnectTimeout(connectTimeOut);
                requestConfigBuilder.setSocketTimeout(socketTimeOut);
                requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeOut);
                return requestConfigBuilder;
            }
        });
    }

    /**
     * 异步httpclient的连接数配置
     */
    public void setMutiConnectConfig() {
        builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                httpClientBuilder.setMaxConnTotal(maxConnectNum);
                httpClientBuilder.setMaxConnPerRoute(maxConnectPerRoute);
                return httpClientBuilder;
            }
        });
    }

    /**
     * 关闭连接
     */
    public void close() {
        if (client != null) {
            try {
                client.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

application.yml

elasticsearch:
  host: 192.168.x.x
  port: 9200
  schema: http
  connectTimeOut: 1000
  socketTimeOut: 30000
  connectionRequestTimeOut: 500
  maxConnectNum: 100
  maxConnectPerRoute: 100

3、测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class Springboot03RestApplicationTests {

    @Qualifier(value = "restHighLevelClient")
    @Autowired
    RestHighLevelClient restHighLevelClient;

    String indexName="student";
    String esType="msg";

    @Test
    public void contextLoads() throws IOException{
        RestClient restClient = RestClient.builder(
                new HttpHost("192.168.56.101", 9200, "http")).build();
        //(1) 执行一个基本的方法,验证es集群是否搭建成功
        Response response = restClient.performRequest("GET", "/", Collections.singletonMap("pretty", "true"));
        System.out.println(EntityUtils.toString(response.getEntity()));

    }

当现实create时则表明没问题。

其它测试:

@RunWith(SpringRunner.class)
@SpringBootTest
public class Springboot03RestApplicationTests {

    @Qualifier(value = "restHighLevelClient")
    @Autowired
    RestHighLevelClient restHighLevelClient;

    String indexName="student";
    String esType="msg";

    @Test
    public void contextLoads() throws IOException{
        RestClient restClient = RestClient.builder(
                new HttpHost("192.168.56.101", 9200, "http")).build();
        //(1) 执行一个基本的方法,验证es集群是否搭建成功
        Response response = restClient.performRequest("GET", "/", Collections.singletonMap("pretty", "true"));
        System.out.println(EntityUtils.toString(response.getEntity()));

    }

    //创建索引
    @Test
    public void createIndex(){

        //index名必须全小写,否则报错
        String index ="book";
        CreateIndexRequest request = new CreateIndexRequest(index);
        try {
            CreateIndexResponse indexResponse = restHighLevelClient.indices().create(request);
            if (indexResponse.isAcknowledged()) {
                System.out.println("创建索引成功");

            } else {
                System.out.println("创建索引失败");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    //检查索引
    @Test
    public void findIndex()throws Exception{

        try {
            Response response = restHighLevelClient.getLowLevelClient().performRequest("HEAD", "book");
            boolean exist = response.getStatusLine().getReasonPhrase().equals("OK");
            System.out.println(exist);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
    //插入数据
    @Test
    public void addData(){

        JSONObject jsonObject = new JSONObject();
        jsonObject.put("id", 3);
        jsonObject.put("age", 26);
        jsonObject.put("name", "wangwu");
        jsonObject.put("date", new Date());
        IndexRequest indexRequest = new IndexRequest(indexName, esType, "2").source(jsonObject);

        try {
            IndexResponse indexResponse = restHighLevelClient.index(indexRequest);
            System.out.println(indexResponse.getId());
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
    /*
    * 使用XContentBuilder添加数据
    * */
    @Test
    public void addData1() throws Exception{

        XContentBuilder builder = jsonBuilder();
        builder.startObject();
        {
            builder.field("user", "jiatp");
            builder.timeField("postDate", new Date());
            builder.field("message", "trying out Elasticsearch");
        }
        builder.endObject();
        IndexRequest indexRequest = new IndexRequest("twitter", "t_doc", "3")
                .source(builder).routing("my_route");//可以添加指定路由
        IndexResponse response = restHighLevelClient.index(indexRequest);
        System.out.println(response.status().name());

    }
    /*
     * 使用Object key-pairs对象键
     * */
    @Test
    public void addData2() throws Exception{

        IndexRequest indexRequest = new IndexRequest("twitter", "t_doc", "3")
                .source("user", "kimchy",
                        "postDate", new Date(),
                        "message", "trying out Elasticsearch");
        IndexResponse response = restHighLevelClient.index(indexRequest);
        System.out.println(response.status().name());

    }
    //异步方式
    @Test
    public void testAddAsync() throws InterruptedException {
        ActionListener listener = new ActionListener<IndexResponse>() {
            @Override
            public void onResponse(IndexResponse indexResponse) {
                System.out.println("Async:" + indexResponse.status().name());
                if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
                    // Todo
                } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
                    // Todo
                }
                // 处理成功分片小于总分片的情况
                ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
                if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
                    // Todo
                }
            }

            @Override
            public void onFailure(Exception e) {
                System.out.println("AsyncFailure:" + e.getMessage());
                e.printStackTrace();
            }
        };

        IndexRequest indexRequest = new IndexRequest("twitter", "t_doc", "4")
                .source("user", "luxi",
                        "postDate", new Date(),
                        "message", "trying out Elasticsearch");

        restHighLevelClient.indexAsync(indexRequest, listener);  // 异步方式
        Thread.sleep(2000);

    }

    /*
    * 查询
    *
    * */
    // 指定routing的数据,查询也要指定
    @Test
    public void searchRoute()throws Exception{

        GetRequest request = new GetRequest("twitter", "t_doc", "3").routing("my_route");   // 指定routing的数据,查询也要指定
        GetResponse response = restHighLevelClient.get(request);
        System.out.println(response.getSourceAsString());
    }
    //查询-额外参数  异步获取
    @Test
    public void  getOneOp() throws IOException, InterruptedException {
        ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
            @Override
            public void onResponse(GetResponse documentFields) {
                System.out.println(documentFields.getSourceAsString());
            }

            @Override
            public void onFailure(Exception e) {
                System.out.println("Error:" + e.getMessage());
                e.printStackTrace();
            }
        };

        GetRequest request = new GetRequest("twitter", "t_doc", "2");
        String[] includes = new String[]{"message", "*Date"};   // 包含的字段
        String[] excludes = Strings.EMPTY_ARRAY;                 // 排除的字段
        FetchSourceContext fetchSourceContext =
                new FetchSourceContext(true, includes, excludes);
        request.fetchSourceContext(fetchSourceContext);
        restHighLevelClient.getAsync(request,listener);
        Thread.sleep(2000);

    }

    //查询所有
    @Test
    public void searchAll(){
        HttpEntity entity = new NStringEntity(
                "{ \"query\": { \"match_all\": {}}}",
                ContentType.APPLICATION_JSON);
        String endPoint = "/" + indexName + "/" + esType + "/_search";
        try {
            Response response = restHighLevelClient.getLowLevelClient()
                    .performRequest("POST", endPoint, Collections.<String, String>emptyMap(), entity);
            System.out.println(EntityUtils.toString(response.getEntity()));
        } catch(IOException e) {
            e.printStackTrace();
        }

    }

    //条件查询  姓名:李四
    @Test
    public void test(){
        try {
            String endPoint = "/" + indexName + "/" + esType + "/_search";

            IndexRequest indexRequest = new IndexRequest();
            XContentBuilder builder;
            try {
                builder = JsonXContent.contentBuilder()
                        .startObject()
                        .startObject("query")
                        .startObject("match")
                        .field("name.keyword", "lisi")
                        .endObject()
                        .endObject()
                        .endObject();
                indexRequest.source(builder);
            } catch (IOException e) {
                e.printStackTrace();
            }

            String source = indexRequest.source().utf8ToString();
            System.out.println(source);

            HttpEntity entity = new NStringEntity(source, ContentType.APPLICATION_JSON);

            Response response = restHighLevelClient.getLowLevelClient().performRequest("POST", endPoint, Collections.<String, String>emptyMap(), entity);
            System.out.println(EntityUtils.toString(response.getEntity()));

        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    //条件查询 叫kimchy的
    @Test
    public void testSearch(){
        try {
        SearchRequest searchRequest = new SearchRequest("twitter");
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query(QueryBuilders.termQuery("user", "kimchy"));
        sourceBuilder.from(0);
        sourceBuilder.size(5);
        searchRequest.source(sourceBuilder);
        SearchResponse response = restHighLevelClient.search(searchRequest);
            System.out.println("Hits:" + response.getHits().totalHits);
            response.getHits().forEach(e -> {
                System.out.println(e.getSourceAsString()); });

        } catch(IOException e) {
            e.printStackTrace();
        }

    }

       /**
        * * 查询名字等于 lisi
        * 并且年龄在20和40之间
        */
    @Test
    public void serarchFuhe(){
        try {
            String endPoint = "/" + indexName + "/" + esType + "/_search";

            IndexRequest indexRequest = new IndexRequest();
            XContentBuilder builder;
            try {

                builder = JsonXContent.contentBuilder()
                        .startObject()
                        .startObject("query")
                        .startObject("bool")
                        .startObject("must")
                        .startObject("match")
                        .field("name.keyword", "lisi")
                        .endObject()
                        .endObject()
                        .startObject("filter")
                        .startObject("range")
                        .startObject("age")
                        .field("gte", "20")
                        .field("lte", "40")
                        .endObject()
                        .endObject()
                        .endObject()
                        .endObject()
                        .endObject()
                        .endObject();
                indexRequest.source(builder);
            } catch (IOException e) {
                e.printStackTrace();
            }

           String source = indexRequest.source().utf8ToString();
            System.out.println(source);

            HttpEntity entity = new NStringEntity(source, ContentType.APPLICATION_JSON);

            Response response = restHighLevelClient.getLowLevelClient().performRequest("POST", endPoint, Collections.<String, String>emptyMap(), entity);
            System.out.println(EntityUtils.toString(response.getEntity()));
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
    /**
     * 存在即更新【输出:OK】
     * OK
     * {"C":"Carambola","A":"Apple","B":"Banana"}
     * 不存在则创建【输出:CREATED】
     * CREATED
     * {"C":"Carambola"}
     * 开启scriptedUpsert【在文档不存在情况下输出:CREATED】
     * {"A" : "Apple","B" : "Banana","C" : "Carambola"}
     */
    @Test
    public void testUpdate() throws IOException {
        UpdateRequest request = new UpdateRequest("twitter", "t_doc", "7")
                .script(new Script(ScriptType.INLINE,"painless",
                        "ctx._source.A='Apple';ctx._source.B='Banana'",Collections.EMPTY_MAP))
                // 如果文档不存在,使用upsert方法定义一些内容,这些内容将作为新文档插入
                .upsert(jsonBuilder()
                        .startObject()
                        .field("C","Carambola")
                        .endObject());
        request.timeout(TimeValue.timeValueSeconds(2)); // 2秒超时
        //request.scriptedUpsert(true);   // 无论文档是否存在,脚本都必须运行
        UpdateResponse update = restHighLevelClient.update(request);
        System.out.println(update.status().name());

    }

    //删除
    @Test
    public void delete(){

        String endPoint = "/" + indexName + "/" + esType + "/_delete_by_query";

        /**
         * 删除条件
         */
        IndexRequest indexRequest = new IndexRequest();
        XContentBuilder builder;
        try {
            builder = JsonXContent.contentBuilder()
                    .startObject()
                    .startObject("query")
                    .startObject("term")
                    //name中包含deleteText
                    .field("name.keyword", "wangwu")
                    .endObject()
                    .endObject()
                    .endObject();
            indexRequest.source(builder);
        } catch (IOException e) {
            e.printStackTrace();
        }

        String source = indexRequest.source().utf8ToString();

        HttpEntity entity = new NStringEntity(source, ContentType.APPLICATION_JSON);
        try {
            Response response = restHighLevelClient.getLowLevelClient().performRequest("POST", endPoint, Collections.<String, String>emptyMap(), entity);
            System.out.println(EntityUtils.toString(response.getEntity()));
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

}

可看api进行测试,https://blog.csdn.net/jatpen/article/details/102631110

或者查看官方文档:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.4/java-rest-high-supported-apis.html