ElasticSearch RestHighLevelClient 通用操作
阅读原文时间:2023年07月10日阅读:1

项目中使用到ElasticSearch作为搜索引擎。而ES的环境搭建自然是十分简单,且本身就适应于分布式环境,因此这块就不多赘述。而其本身特性和查询语句这篇博文不会介绍,如果有机会会深入介绍。

​ 所以这篇博文主要还是介绍Java客户端中如何使用查询搜索引擎中的数据。而使用的Java客户端是官方新推出的RestHighLevelClient,使用Http连接查询结果。但是网上相关资料较少,只有官网的api介绍。所以本文以一个小demo介绍RestHighLevelClient的使用。

项目依赖:
dependencies {

// https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-high-level-client
compile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-high-level-client', version: '5.6.2'
}

一般配置Java Client

// Java客户端生成工厂
public class ESClientFactory {

private static final String HOST = "127.0.0.1";  
private static final int PORT = 9200;  
private static final String SCHEMA = "http";  
private static final int CONNECT\_TIME\_OUT = 1000;  
private static final int SOCKET\_TIME\_OUT = 30000;  
private static final int CONNECTION\_REQUEST\_TIME\_OUT = 500;

private static final int MAX\_CONNECT\_NUM = 100;  
private static final int MAX\_CONNECT\_PER\_ROUTE = 100;

private static HttpHost HTTP\_HOST = new HttpHost(HOST,PORT,SCHEMA);  
private static boolean uniqueConnectTimeConfig = false;  
private static boolean uniqueConnectNumConfig = true;  
private static RestClientBuilder builder;  
private static RestClient restClient;  
private static RestHighLevelClient restHighLevelClient;

static {  
    init();  
}

public static void init(){  
    builder = RestClient.builder(HTTP\_HOST);  
    if(uniqueConnectTimeConfig){  
        setConnectTimeOutConfig();  
    }  
    if(uniqueConnectNumConfig){  
        setMutiConnectConfig();  
    }  
    restClient = builder.build();  
    restHighLevelClient = new RestHighLevelClient(restClient);  
}

// 主要关于异步httpclient的连接延时配置  
public static void setConnectTimeOutConfig(){  
    builder.setRequestConfigCallback(new RequestConfigCallback() {

        @Override  
        public Builder customizeRequestConfig(Builder requestConfigBuilder) {  
            requestConfigBuilder.setConnectTimeout(CONNECT\_TIME\_OUT);  
            requestConfigBuilder.setSocketTimeout(SOCKET\_TIME\_OUT);  
            requestConfigBuilder.setConnectionRequestTimeout(CONNECTION\_REQUEST\_TIME\_OUT);  
            return requestConfigBuilder;  
        }  
    });  
}  
// 主要关于异步httpclient的连接数配置  
public static void setMutiConnectConfig(){  
    builder.setHttpClientConfigCallback(new HttpClientConfigCallback() {

        @Override  
        public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {  
            httpClientBuilder.setMaxConnTotal(MAX\_CONNECT\_NUM);  
            httpClientBuilder.setMaxConnPerRoute(MAX\_CONNECT\_PER\_ROUTE);  
            return httpClientBuilder;  
        }  
    });  
}

public static RestClient getClient(){  
    return restClient;  
}

public static RestHighLevelClient getHighLevelClient(){  
    return restHighLevelClient;  
}

public static void close() {  
    if (restClient != null) {  
        try {  
            restClient.close();  
        } catch (IOException e) {  
            e.printStackTrace();  
        }  
    }  
}

}

public class ESClientSpringFactory {

public static int CONNECT\_TIMEOUT\_MILLIS = 1000;  
public static int SOCKET\_TIMEOUT\_MILLIS = 30000;  
public static int CONNECTION\_REQUEST\_TIMEOUT\_MILLIS = 500;  
public static int MAX\_CONN\_PER\_ROUTE = 10;  
public static int MAX\_CONN\_TOTAL = 30;

private static HttpHost HTTP\_HOST;  
private RestClientBuilder builder;  
private RestClient restClient;  
private RestHighLevelClient restHighLevelClient;

private static ESClientSpringFactory esClientSpringFactory = new ESClientSpringFactory();

private ESClientSpringFactory(){}

public static ESClientSpringFactory build(HttpHost httpHost,  
        Integer maxConnectNum, Integer maxConnectPerRoute){  
    HTTP\_HOST = httpHost;  
    MAX\_CONN\_TOTAL = maxConnectNum;  
    MAX\_CONN\_PER\_ROUTE = maxConnectPerRoute;  
    return  esClientSpringFactory;  
}

public static ESClientSpringFactory build(HttpHost httpHost,Integer connectTimeOut, Integer socketTimeOut,  
        Integer connectionRequestTime,Integer maxConnectNum, Integer maxConnectPerRoute){  
    HTTP\_HOST = httpHost;  
    CONNECT\_TIMEOUT\_MILLIS = connectTimeOut;  
    SOCKET\_TIMEOUT\_MILLIS = socketTimeOut;  
    CONNECTION\_REQUEST\_TIMEOUT\_MILLIS = connectionRequestTime;  
    MAX\_CONN\_TOTAL = maxConnectNum;  
    MAX\_CONN\_PER\_ROUTE = maxConnectPerRoute;  
    return  esClientSpringFactory;  
}

public void init(){  
    builder = RestClient.builder(HTTP\_HOST);  
    setConnectTimeOutConfig();  
    setMutiConnectConfig();  
    restClient = builder.build();  
    restHighLevelClient = new RestHighLevelClient(restClient);  
    System.out.println("init factory");  
}  
// 配置连接时间延时  
public void setConnectTimeOutConfig(){  
    builder.setRequestConfigCallback(new RequestConfigCallback() {

        @Override  
        public Builder customizeRequestConfig(Builder requestConfigBuilder) {  
            requestConfigBuilder.setConnectTimeout(CONNECT\_TIMEOUT\_MILLIS);  
            requestConfigBuilder.setSocketTimeout(SOCKET\_TIMEOUT\_MILLIS);  
            requestConfigBuilder.setConnectionRequestTimeout(CONNECTION\_REQUEST\_TIMEOUT\_MILLIS);  
            return requestConfigBuilder;  
        }  
    });  
}  
// 使用异步httpclient时设置并发连接数  
public void setMutiConnectConfig(){  
    builder.setHttpClientConfigCallback(new HttpClientConfigCallback() {

        @Override  
        public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {  
            httpClientBuilder.setMaxConnTotal(MAX\_CONN\_TOTAL);  
            httpClientBuilder.setMaxConnPerRoute(MAX\_CONN\_PER\_ROUTE);  
            return httpClientBuilder;  
        }  
    });  
}

public RestClient getClient(){  
    return restClient;  
}

public RestHighLevelClient getRhlClient(){  
    return restHighLevelClient;  
}

public void close() {  
    if (restClient != null) {  
        try {  
            restClient.close();  
        } catch (IOException e) {  
            e.printStackTrace();  
        }  
    }  
    System.out.println("close client");  
}

}

两种配置方法从本质上都是对client进行配置,且达到相同目的。

​ 由于Client配置为单例模式,在Spring中的生命周期随着容器开始结束而开始结束。在定义bean创建和销毁方法后会自动关闭连接。

​ 但是使用一般Java配置时,需要手动关闭。如果在web项目中,可以使用监听器,随着项目的生命周期手动调用开启关闭。

客户端演示
接下来就是最简单的几个demo,校验这种客户端的有效性,同时也为大家试验如何使用这种Java客户端:

数据准备
首先准备要操作的数据:创建一个index为demo,type为demo的新闻索引。

/PUT http://{{host}}:{{port}}/demo
{
"mappings":{
"demo":{
"properties":{
"title":{
"type":"text"
},
"tag":{
"type":"keyword"
},
"publishTime":{
"type":"date"
}
}
}
}
}

插入数据
API格式

/POST http://{{host}}:{{port}}/demo/demo/
{
"title":"中国产小型无人机的“对手”来了,俄微型拦截导弹便宜量又多",
"tag":"军事",
"publishTime":"2018-01-24T23:59:30Z"
}

Java Client

public class News {

private String title;  
private String tag;  
private String publishTime;

public News() {  
    super();  
}  
public News(String title, String tag, String publishTime) {  
    super();  
    this.title = title;  
    this.tag = tag;  
    this.publishTime = publishTime;  
}  
public String getTitle() {  
    return title;  
}  
public void setTitle(String title) {  
    this.title = title;  
}  
public String getTag() {  
    return tag;  
}  
public void setTag(String tag) {  
    this.tag = tag;  
}  
public String getPublishTime() {  
    return publishTime;  
}  
public void setPublishTime(String publishTime) {  
    this.publishTime = publishTime;  
}  

}

@RunWith(SpringJUnit4ClassRunner.class)

@ContextConfiguration("classpath:spring/spring-context.xml")
public class FreeClientTest {

private String index;

private String type;

@Autowired  
private RestHighLevelClient rhlClient;

@Before  
public void prepare() {  
    index = "demo";  
    type = "demo";  
}

@Test  
public void addTest() {  
    IndexRequest indexRequest = new IndexRequest(index, type);  
    News news = new News();  
    news.setTitle("中国产小型无人机的“对手”来了,俄微型拦截导弹便宜量又多");  
    news.setTag("军事");  
    news.setPublishTime("2018-01-24T23:59:30Z");  
    String source = JsonUtil.toString(news);  
    indexRequest.source(source, XContentType.JSON);  
    try {  
        rhlClient.index(indexRequest);  
    } catch (IOException e) {  
        // TODO Auto-generated catch block  
        e.printStackTrace();  
    }  
}

}

两种方式均可插入数据。如果有大量数据这样插入未免效率太低,接下来看一看批量插入数据。

批量插入数据
API格式

/POST http://{{host}}:{{port}}/_bulk
{"index":{"_index":"demo","_type":"demo"}}
{"title":"中印边防军于拉达克举行会晤 强调维护边境和平","tag":"军事","publishTime":"2018-01-27T08:34:00Z"}
{"index":{"_index":"demo","_type":"demo"}}
{"title":"费德勒收郑泫退赛礼 进决赛战西里奇","tag":"体育","publishTime":"2018-01-26T14:34:00Z"}
{"index":{"_index":"demo","_type":"demo"}}
{"title":"欧文否认拿动手术威胁骑士 兴奋全明星联手詹皇","tag":"体育","publishTime":"2018-01-26T08:34:00Z"}
{"index":{"_index":"demo","_type":"demo"}}
{"title":"皇马官方通告拉莫斯伊斯科伤情 将缺阵西甲关键战","tag":"体育","publishTime":"2018-01-26T20:34:00Z"}

  Java Client

@RunWith(SpringJUnit4ClassRunner.class)

@ContextConfiguration("classpath:spring/spring-context.xml")
public class FreeClientTest {

private String index;

private String type;

@Autowired  
private RestHighLevelClient rhlClient;

@Before  
public void prepare() {  
    index = "demo";  
    type = "demo";  
}

@Test  
public void batchAddTest() {  
    BulkRequest bulkRequest = new BulkRequest();  
    List<IndexRequest> requests = generateRequests();  
    for (IndexRequest indexRequest : requests) {  
        bulkRequest.add(indexRequest);  
    }  
    try {  
        rhlClient.bulk(bulkRequest);  
    } catch (IOException e) {  
        // TODO Auto-generated catch block  
        e.printStackTrace();  
    }  
}

public List<IndexRequest> generateRequests(){  
    List<IndexRequest> requests = new ArrayList<>();  
    requests.add(generateNewsRequest("中印边防军于拉达克举行会晤 强调维护边境和平", "军事", "2018-01-27T08:34:00Z"));  
    requests.add(generateNewsRequest("费德勒收郑泫退赛礼 进决赛战西里奇", "体育", "2018-01-26T14:34:00Z"));  
    requests.add(generateNewsRequest("欧文否认拿动手术威胁骑士 兴奋全明星联手詹皇", "体育", "2018-01-26T08:34:00Z"));  
    requests.add(generateNewsRequest("皇马官方通告拉莫斯伊斯科伤情 将缺阵西甲关键战", "体育", "2018-01-26T20:34:00Z"));  
    return requests;  
}

public IndexRequest generateNewsRequest(String title, String tag, String publishTime){  
    IndexRequest indexRequest = new IndexRequest(index, type);  
    News news = new News();  
    news.setTitle(title);  
    news.setTag(tag);  
    news.setPublishTime(publishTime);  
    String source = JsonUtil.toString(news);  
    indexRequest.source(source, XContentType.JSON);  
    return indexRequest;  
}

}

无论通过哪种方式,现在ES中已插入五条文档数据。那么现在就可以通过多种多样的查询方式获得需要的数据了。

查询数据

查询目标:2018年1月26日早八点到晚八点关于费德勒的前十条体育新闻的标题

API 格式

/POST http://{{host}}:{{port}}/demo/demo/_search
{
"from":"0",
"size":"10",
"_source":["title"],
"query":{
"bool":{
"must":{
"match":{
"title":"费德勒"
}
},
"must":{
"term":{"tag":"体育"}
},
"must":{
"range":{
"publishTime":{
"gte":"2018-01-26T08:00:00Z",
"lte":"2018-01-26T20:00:00Z"
}
}
}
}
}

}

  Java Client

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:spring/spring-context.xml")
public class FreeClientTest {

private String index;

private String type;

@Autowired  
private RestHighLevelClient rhlClient;

@Before  
public void prepare() {  
    index = "demo";  
    type = "demo";  
}

@Test  
public void queryTest(){  
    SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();  
    sourceBuilder.from(0);  
    sourceBuilder.size(10);  
    sourceBuilder.fetchSource(new String\[\]{"title"}, new String\[\]{});  
    MatchQueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("title", "费德勒");  
    TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("tag", "体育");  
    RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("publishTime");  
    rangeQueryBuilder.gte("2018-01-26T08:00:00Z");  
    rangeQueryBuilder.lte("2018-01-26T20:00:00Z");  
    BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();  
    boolBuilder.must(matchQueryBuilder);  
    boolBuilder.must(termQueryBuilder);  
    boolBuilder.must(rangeQueryBuilder);  
    sourceBuilder.query(boolBuilder);  
    SearchRequest searchRequest = new SearchRequest(index);  
    searchRequest.types(type);  
    searchRequest.source(sourceBuilder);  
    try {  
        SearchResponse response = rhlClient.search(searchRequest);  
        System.out.println(response);  
    } catch (IOException e) {  
        // TODO Auto-generated catch block  
        e.printStackTrace();  
    }  
}

}

更新文档

如果插入了错误的数据,想要更改或者在文档中新增新的数据,那么就需要更新文档了。

演示 将费德勒的新闻的tag更改为网球类型:

API格式

/POST http://{{host}}:{{port}}/demo/demo/AWE1fnSx00f4t28WJ4D6/_update
{
"doc":{
"tag":"网球"
}
}

  Java Client

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:spring/spring-context.xml")
public class FreeClientTest {

private String index;

private String type;

private String id;

@Autowired  
private RestHighLevelClient rhlClient;

@Before  
public void prepare() {  
    index = "demo";  
    type = "demo";  
    id = "AWE1fnSx00f4t28WJ4D6";  
}

@Test  
public void updateTest(){  
    UpdateRequest updateRequest = new UpdateRequest(index, type, id);  
    Map<String, String> map = new HashMap<>();  
    map.put("tag", "网球");  
    updateRequest.doc(map);  
    try {  
        rhlClient.update(updateRequest);  
    } catch (IOException e) {  
        // TODO Auto-generated catch block  
        e.printStackTrace();  
    }  
}

}

以上介绍了最简单的doc文档更改

ID方式删除

API格式
/DELETE http://{{host}}:{{port}}/delete_demo/demo/AWExGSdW00f4t28WAPen
  • 1
Java 客户端

public class ElkDaoTest extends BaseTest{

@Autowired  
private RestHighLevelClient rhlClient;

private String index;

private String type;

private String id;

@Before  
public void prepare(){  
    index = "delete\_demo";  
    type = "demo";  
    id = "AWExGSdW00f4t28WAPeo";  
}

@Test  
public void delete(){  
    DeleteRequest deleteRequest = new DeleteRequest(index,type,id);  
    DeleteResponse response = null;  
    try {  
        response = rhlClient.delete(deleteRequest);  
    } catch (IOException e) {  
        // TODO Auto-generated catch block  
        e.printStackTrace();  
    }  
    System.out.println(response);  
}  

}

同样删除成功。