当前位置:   article > 正文

Java(elasticsearch-rest-high-level-client)使用Es

elasticsearch-rest-high-level-client

1、导入依赖

  1. <dependency>
  2. <groupId>org.elasticsearch.client</groupId>
  3. <artifactId>elasticsearch-rest-high-level-client</artifactId>
  4. <version>7.10.2</version>
  5. </dependency>

2、配置application.yml

es集群配置

  1. es:
  2. uris: http://XXX.XXX.XXX.XXX:9200,http://XXX.XXX.XXX.XXX:9200,http://XXX.XXX.XXX.XXX:9200
  3. connectTimeout: 10000
  4. soTimeout: 5000

第二种

  1. es:
  2. url: xxx.xxx.xxx.xxx;xxx.xxx.xxx.xxx;xxx.xxx.xxx.xxx
  3. port: 9200
  4. connectTimeout: 10000
  5. soTimeout: 5000

单个es配置

  1. es:
  2. url: xxx.xxx.xxx.xxx
  3. port: 9200
  4. connectTimeout: 10000
  5. soTimeout: 5000

3、初始化Es配置

es集群初始化配置

  1. import lombok.Getter;
  2. import lombok.Setter;
  3. import org.apache.http.HttpHost;
  4. import org.opensearch.client.RestClient;
  5. import org.opensearch.client.RestClientBuilder;
  6. import org.opensearch.client.RestHighLevelClient;
  7. import org.springframework.beans.factory.InitializingBean;
  8. import org.springframework.beans.factory.annotation.Value;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.stereotype.Component;
  11. import java.util.Arrays;
  12. @Component
  13. public class ElasticsearchConfig implements InitializingBean {
  14. @Setter
  15. @Getter
  16. @Value("${els.event.uris:}")
  17. private String[] uris;
  18. @Setter
  19. @Getter
  20. @Value("${els.event.connectTimeout:1000}")
  21. private Integer connectTimeout;
  22. @Setter
  23. @Getter
  24. @Value("${els.event.soTimeout:1000}")
  25. private Integer soTimeout;
  26. /**
  27. * After properties set.
  28. *
  29. */
  30. @Override
  31. public void afterPropertiesSet() {
  32. }
  33. /**
  34. * To string string.
  35. *
  36. * @return the string
  37. */
  38. @Override
  39. public String toString() {
  40. return "ElasticsearchConfig{" + "uris='" + Arrays.toString(uris) + '\'' + ", port='" + port + '\'' + '}';
  41. }
  42. @Bean(destroyMethod = "close", name = "client")
  43. public RestHighLevelClient initRestClient() {
  44. HttpHost[] httpHosts = Arrays.stream(uris).map(HttpHost::create).toArray(HttpHost[]::new);
  45. RestClientBuilder builder = RestClient.builder(httpHosts)
  46. .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
  47. .setConnectTimeout(connectTimeout)
  48. .setSocketTimeout(soTimeout)
  49. );
  50. return new RestHighLevelClient(builder);
  51. }
  52. }

 第二种,自己拼接

  1. import lombok.Getter;
  2. import lombok.Setter;
  3. import org.apache.http.HttpHost;
  4. import org.elasticsearch.client.RestClient;
  5. import org.elasticsearch.client.RestClientBuilder;
  6. import org.elasticsearch.client.RestHighLevelClient;
  7. import org.springframework.beans.factory.InitializingBean;
  8. import org.springframework.beans.factory.annotation.Value;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.stereotype.Component;
  11. import java.util.Arrays;
  12. @Component
  13. public class ElasticsearchConfig implements InitializingBean {
  14. @Setter
  15. @Getter
  16. @Value("${es.url:}")
  17. private String url;
  18. @Setter
  19. @Getter
  20. @Value("${es.schema:http}")
  21. private String schema;
  22. @Setter
  23. @Getter
  24. @Value("${es.port:9200}")
  25. private Integer port;
  26. @Setter
  27. @Getter
  28. @Value("${es.connectTimeout:1000}")
  29. private Integer connectTimeout;
  30. @Setter
  31. @Getter
  32. @Value("${es.soTimeout:1000}")
  33. private Integer soTimeout;
  34. /**
  35. * After properties set.
  36. *
  37. */
  38. @Override
  39. public void afterPropertiesSet() {
  40. }
  41. /**
  42. * To string string.
  43. *
  44. * @return the string
  45. */
  46. @Override
  47. public String toString() {
  48. return "ElasticsearchConfig{" + "url='" + url + '\'' + ", port='" + port + '\''
  49. + '}';
  50. }
  51. @Bean(destroyMethod = "close", name = "client")
  52. public RestHighLevelClient initRestClient() {
  53. String[] array = url.split(SeparatorConstants.SEMI_COLON);
  54. HttpHost[] hosts = new HttpHost[array.length];
  55. for (int i = 0; i < array.length; i++) {
  56. hosts[i] = new HttpHost(array[i], port, schema);
  57. }
  58. RestClientBuilder builder = RestClient.builder(hosts)
  59. .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
  60. .setConnectTimeout(connectTimeout)
  61. .setSocketTimeout(soTimeout)
  62. );
  63. return new RestHighLevelClient(builder);
  64. }
  65. }

单个es初始化配置

  1. import lombok.Getter;
  2. import lombok.Setter;
  3. import org.apache.http.HttpHost;
  4. import org.elasticsearch.client.RestClient;
  5. import org.elasticsearch.client.RestClientBuilder;
  6. import org.elasticsearch.client.RestHighLevelClient;
  7. import org.springframework.beans.factory.InitializingBean;
  8. import org.springframework.beans.factory.annotation.Value;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.stereotype.Component;
  11. import java.util.Arrays;
  12. @Component
  13. public class ElasticsearchConfig implements InitializingBean {
  14. @Setter
  15. @Getter
  16. @Value("${es.url:}")
  17. private String url;
  18. @Setter
  19. @Getter
  20. @Value("${es.schema:http}")
  21. private String schema;
  22. @Setter
  23. @Getter
  24. @Value("${es.port:9200}")
  25. private Integer port;
  26. @Setter
  27. @Getter
  28. @Value("${es.connectTimeout:1000}")
  29. private Integer connectTimeout;
  30. @Setter
  31. @Getter
  32. @Value("${es.soTimeout:1000}")
  33. private Integer soTimeout;
  34. /**
  35. * After properties set.
  36. *
  37. */
  38. @Override
  39. public void afterPropertiesSet() {
  40. }
  41. /**
  42. * To string string.
  43. *
  44. * @return the string
  45. */
  46. @Override
  47. public String toString() {
  48. return "ElasticsearchConfig{" + "url='" + url + '\'' + ", port='" + port + '\''
  49. + '}';
  50. }
  51. @Bean(destroyMethod = "close", name = "client")
  52. public RestHighLevelClient initRestClient() {
  53. RestClientBuilder builder = RestClient.builder(new HttpHost(url, port, schema))
  54. .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
  55. .setConnectTimeout(connectTimeout)
  56. .setSocketTimeout(soTimeout)
  57. );
  58. return new RestHighLevelClient(builder);
  59. }
  60. }

4、建立实体类

5、增删改方法

推荐学习示例: SpringBoot-Learn/elasticsearch at master · wupeixuan/SpringBoot-Learn · GitHub

注意坑点:查询方法默认查询10条,不管分不分页查询,因此,注意给size够用的大小

searchSourceBuilder.size(userIds.size());

  1. import com.alibaba.fastjson.JSON;
  2. import org.apache.commons.collections.CollectionUtils;
  3. import org.elasticsearch.action.bulk.BulkRequest;
  4. import org.elasticsearch.action.bulk.BulkResponse;
  5. import org.elasticsearch.action.delete.DeleteRequest;
  6. import org.elasticsearch.action.index.IndexRequest;
  7. import org.elasticsearch.action.index.IndexResponse;
  8. import org.elasticsearch.action.search.SearchRequest;
  9. import org.elasticsearch.action.search.SearchResponse;
  10. import org.elasticsearch.action.update.UpdateRequest;
  11. import org.elasticsearch.action.update.UpdateResponse;
  12. import org.elasticsearch.client.RequestOptions;
  13. import org.elasticsearch.client.RestHighLevelClient;
  14. import org.elasticsearch.common.xcontent.XContentType;
  15. import org.elasticsearch.index.query.BoolQueryBuilder;
  16. import org.elasticsearch.index.query.QueryBuilders;
  17. import org.elasticsearch.rest.RestStatus;
  18. import org.elasticsearch.search.SearchHit;
  19. import org.elasticsearch.search.builder.SearchSourceBuilder;
  20. import org.slf4j.Logger;
  21. import org.slf4j.LoggerFactory;
  22. import org.springframework.beans.factory.annotation.Autowired;
  23. import org.springframework.stereotype.Service;
  24. import java.io.IOException;
  25. import java.util.ArrayList;
  26. import java.util.List;
  27. @Service
  28. public class EsServiceImpl implements EsService {
  29. private static final Logger LOGGER = LoggerFactory.getLogger(EsService.class);
  30. @Autowired
  31. private RestHighLevelClient client;
  32. private final String INDEX = "index"; // 索引名称
  33. /**
  34. * 单条添加
  35. * @param document
  36. * @return {@link Boolean}
  37. */
  38. @Override
  39. public Boolean addDocument(UserDocument document) {
  40. LOGGER.info("Add Document begin");
  41. // .id 给每个Document生成id 可以根据id搜素和删除,不用es自动生成的id
  42. IndexRequest indexRequest = new IndexRequest(INDEX).id(document.getId()).source(JSON.toJSONString(document), XContentType.JSON);
  43. try {
  44. IndexResponse response = client.index(indexRequest, RequestOptions.DEFAULT);
  45. LOGGER.info("Add Document end");
  46. return response.status().equals(RestStatus.OK);
  47. } catch (IOException exception) {
  48. LOGGER.error("Add Document Exception: ", exception);
  49. }
  50. return false;
  51. }
  52. /**
  53. * 批量添加
  54. * @param list
  55. * @return {@link Boolean}
  56. */
  57. @Override
  58. public Boolean bulkDocument(List<UserDocument> list) {
  59. LOGGER.info("Bulk Document begin");
  60. BulkRequest bulkRequest = new BulkRequest();
  61. try {
  62. if (CollectionUtils.isNotEmpty(list)) {
  63. for (UserDocument document: list) {
  64. IndexRequest indexRequest = new IndexRequest(INDEX).id(document.getId()).source(JSON.toJSONString(document), XContentType.JSON);
  65. bulkRequest.add(indexRequest);
  66. }
  67. BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
  68. LOGGER.info("Bulk Document end");
  69. return response.status().equals(RestStatus.OK);
  70. }
  71. } catch (IOException exception) {
  72. LOGGER.error("Bulk Document Exception: ", exception);
  73. }
  74. return false;
  75. }
  76. /**
  77. * 单条更新
  78. * @param document
  79. * @return {@link Boolean}
  80. */
  81. @Override
  82. public Boolean update(UserDocument document) {
  83. LOGGER.info("Bulk Update begin");
  84. try {
  85. UpdateRequest updateRequest = new UpdateRequest(INDEX, document.getId()).doc(JSON.toJSONString(document), XContentType.JSON);
  86. UpdateResponse update = client.update(updateRequest, RequestOptions.DEFAULT);
  87. LOGGER.info("Bulk Update end");
  88. return update.status().equals(RestStatus.OK);
  89. } catch (IOException exception) {
  90. LOGGER.error("Bulk Update Exception: ", exception);
  91. }
  92. return false;
  93. }
  94. /**
  95. * 批量更新
  96. * @param list
  97. * @return {@link Boolean}
  98. */
  99. @Override
  100. public Boolean bulkUpdate(List<UserDocument> list) {
  101. LOGGER.info("Bulk Update begin");
  102. BulkRequest bulkRequest = new BulkRequest();
  103. try {
  104. if (CollectionUtils.isNotEmpty(list)) {
  105. for (UserDocument document : list) {
  106. UpdateRequest updateRequest = new UpdateRequest(INDEX, document.getId()).doc(JSON.toJSONString(document), XContentType.JSON);
  107. bulkRequest.add(updateRequest);
  108. }
  109. BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
  110. LOGGER.info("Bulk Update end");
  111. return response.status().equals(RestStatus.OK);
  112. }
  113. } catch (IOException exception) {
  114. LOGGER.error("Bulk Update Exception: ", exception);
  115. }
  116. return false;
  117. }
  118. /**
  119. * 根据某条件批量删除
  120. * @param userIds
  121. */
  122. @Override
  123. public void deleteDocumentByUserIds(List<Long> userIds) {
  124. LOGGER.info("Bulk Delete begin");
  125. SearchRequest searchRequest = new SearchRequest();
  126. searchRequest.indices(INDEX);
  127. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  128. BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
  129. boolQueryBuilder.must(QueryBuilders.termsQuery("userId", userIds));
  130. searchSourceBuilder.size(userIds.size());
  131. searchSourceBuilder.query(boolQueryBuilder);
  132. searchRequest.source(searchSourceBuilder);
  133. SearchResponse search = client.search(searchRequest, RequestOptions.DEFAULT);
  134. SearchHit[] searchHit = search.getHits().getHits();
  135. List<UserDocument> list = new ArrayList<>();
  136. if (searchHit.length > 0) {
  137. for (SearchHit hit : searchHit) {
  138. list.add(JSON.parseObject(hit.getSourceAsString(), UserDocument.class));
  139. }
  140. }
  141. BulkRequest bulkRequest = new BulkRequest();
  142. try {
  143. if (CollectionUtils.isNotEmpty(list)) {
  144. for (UserDocument document : list) {
  145. DeleteRequest deleteRequest = new DeleteRequest(INDEX, document.getId());
  146. bulkRequest.add(deleteRequest);
  147. }
  148. client.bulk(bulkRequest, RequestOptions.DEFAULT);
  149. }
  150. LOGGER.info("Bulk Delete end");
  151. } catch (IOException exception) {
  152. LOGGER.error("Bulk Delete Exception: ", exception);
  153. }
  154. }
  155. }

6、查询方法

普通查询--根据某条件查询

  1. @Override
  2. public List<UserDocument> getUserDocuments(List<Long> userIds) throws IOException {
  3. SearchRequest searchRequest = new SearchRequest();
  4. searchRequest.indices(INDEX);
  5. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  6. BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
  7. boolQueryBuilder.must(QueryBuilders.termsQuery("userId", userIds));
  8. searchSourceBuilder.size(userIds.size());
  9. searchSourceBuilder.query(boolQueryBuilder);
  10. searchRequest.source(searchSourceBuilder);
  11. SearchResponse search = client.search(searchRequest, RequestOptions.DEFAULT);
  12. SearchHit[] searchHit = search.getHits().getHits();
  13. List<UserDocument> list = new ArrayList<>();
  14. if (searchHit.length > 0) {
  15. for (SearchHit hit : searchHit) {
  16. list.add(JSON.parseObject(hit.getSourceAsString(), UserDocument.class));
  17. }
  18. }
  19. return list;
  20. }

分页查询

如若不知道怎么写,可用sql转es

mysql语句在线转换elasticsearch查询语句

  1. public BaseModelPageDTO<RestEventEs> baseSearch(EventPageSearchQuery query) {
  2. BaseModelPageDTO<RestEventEs> basePageDTO = new BaseModelPageDTO<>();
  3. try {
  4. LOGGER.info("search begin");
  5. LOGGER.info("index = {}", INDEX);
  6. // 1.创建 SearchRequest搜索请求
  7. SearchRequest searchRequest = new SearchRequest();
  8. searchRequest.indices(INDEX);
  9. // 2.创建 SearchSourceBuilder条件构造。
  10. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  11. BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
  12. // 查询条件
  13. BoolQueryBuilder rootFilter = new BoolQueryBuilder();
  14. BoolQueryBuilder filterQuery = QueryBuilders.boolQuery();
  15. filterQuery.must(QueryBuilders.matchPhraseQuery("status", true));
  16. BoolQueryBuilder boolQueryBuilder1 = QueryBuilders.boolQuery();
  17. // wildcardQuery 使用模糊查询要加.keyword才能查询出来
  18. boolQueryBuilder1.should(QueryBuilders.wildcardQuery("username.keyword", "niki"));
  19. boolQueryBuilder1.should(QueryBuilders.matchQuery("phone", "1231234"));
  20. filterQuery.must(boolQueryBuilder1);
  21. BoolQueryBuilder mineOrAuthedQuery = new BoolQueryBuilder();
  22. filterQuery.must(QueryBuilders.termsQuery("appId", "wechat"));
  23. mineOrAuthedQuery.should(filterQuery);
  24. // 3.将 SearchSourceBuilder 添加到 SearchRequest中
  25. boolQueryBuilder.filter(rootFilter);
  26. searchSourceBuilder.query(rootFilter);
  27. // 折叠重复的数据
  28. searchSourceBuilder.collapse(new CollapseBuilder("id"));
  29. AggregationBuilder aggregation = AggregationBuilders.cardinality("total_size").field("id");
  30. searchSourceBuilder.aggregation(aggregation);
  31. // 设置分页
  32. searchSourceBuilder.from((query.getPageNum() - 1) * query.getPageSize());
  33. searchSourceBuilder.size(query.getPageSize());
  34. // 排序
  35. searchSourceBuilder.sort("createDate", SortOrder.ASC);
  36. searchSourceBuilder.sort("updateDate", SortOrder.DESC);
  37. searchRequest.source(searchSourceBuilder);
  38. // 4.执行查询
  39. SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
  40. Aggregations aggregations = searchResponse.getAggregations();
  41. Cardinality cardinality = aggregations.get("total_size");
  42. List<RestEventEs> restEventEs = this.getRestEventEs(searchResponse);
  43. setBaseSearchPageDto(basePageDTO, restEventEs, cardinality, query);
  44. LOGGER.info("search end");
  45. return basePageDTO;
  46. } catch (IOException exception) {
  47. LOGGER.error("search error", exception);
  48. }
  49. return basePageDTO;
  50. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/565905
推荐阅读
相关标签
  

闽ICP备14008679号