当前位置:   article > 正文

elasticsearch学习(七):es客户端RestHighLevelClient

resthighlevelclient

本文主要是对 elasticsearch-rest-high-level-client 是学习总结。

1、es端口:

默认情况下,ElasticSearch使用两个端口来监听外部TCP流量。

  • 9200端口:用于所有通过HTTP协议进行的API调用。包括搜索、聚合、监控、以及其他任何使用HTTP协议的请求。所有的客户端库都会使用该端口与ElasticSearch进行交互。
  • 9300端口:是一个自定义的二进制协议,用于集群中各节点之间的通信。用于诸如集群变更、主节点选举、节点加入/离开、分片分配等事项。

以往,9300端口也被用于客户端库的连接,然而这种类型的交互在我们的官方客户端已被废弃,其他地方也不支持。

2、es的java客户端

客户端优点缺点说明
Java Low Level Rest Client与ES版本之间没有关系,适用于作为所有版本ES的客户端 
Java High Level Rest Client使用最多使用需与ES版本保持一致基于Low Level Rest Client,它提供了更多的接口。注意:7.15版本之后将被弃用
TransportClient使用Transport 接口进行通信,能够使用ES集群中的一些特性,性能最好JAR包版本需与ES集群版本一致,ES集群升级,客户端也跟着升级到相同版本过时产品,7版本之后不再支持
Elasticsearch Java API Client最新的es客户端文档少 

详细的elasticsearch java客户端发展史详见:https://blog.csdn.net/cloudbigdata/article/details/126296206

3、RestHighLevelClient介绍

JavaREST客户端有两种模式:

  • Java Low Level REST Client:ES官方的低级客户端。低级别的客户端通过http与Elasticearch集群通信。
  • Java High Level REST Client:ES官方的高级客户端。基于上面的低级客户端,也是通过HTTP与ES集群进行通信。它提供了更多的接口。

注意事项:

        客户端(Client) Jar包的版本尽量不要大于Elasticsearch本体的版本,否则可能出现客户端中使用的某些API在Elasticsearch中不支持。

4、springboot集成RestHighLevelClient

        下面介绍下 SpringBoot 如何通过 elasticsearch-rest-high-level-client 工具操作ElasticSearch。当然也可以通过spring-data-elasticsearch来操作ElasticSearch,而本文仅是 elasticsearch-rest-high-level-client 的案例介绍。

        这里需要说一下,能使用RestHighLevelClient尽量使用它,为什么不推荐使用 Spring 家族封装的 spring-data-elasticsearch。主要原因是灵活性和更新速度,Spring 将 ElasticSearch 过度封装,让开发者很难跟 ES 的 DSL 查询语句进行关联。再者就是更新速度,ES 的更新速度是非常快,但是 spring-data-elasticsearch 更新速度比较缓慢。并且spring-data-elasticsearch在Elasticsearch6.x和7.x版本上的Java API差距很大,如果升级版本需要花点时间来了解。spring-data-elasticsearch的底层其实也是基于elasticsearch-rest-high-level-client的api。

4.1、maven依赖

  1. <!--引入es-high-level-client相关依赖 start-->
  2. <dependency>
  3. <groupId>org.elasticsearch</groupId>
  4. <artifactId>elasticsearch</artifactId>
  5. <version>6.8.2</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.elasticsearch.client</groupId>
  9. <artifactId>elasticsearch-rest-client</artifactId>
  10. <version>6.8.2</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.elasticsearch.client</groupId>
  14. <artifactId>elasticsearch-rest-high-level-client</artifactId>
  15. <version>6.8.2</version>
  16. </dependency>
  17. <!--引入es-high-level-client相关依赖 end-->
  18. <!--加入json解析 start-->
  19. <dependency>
  20. <groupId>com.alibaba</groupId>
  21. <artifactId>fastjson</artifactId>
  22. <version>1.2.28</version>
  23. </dependency>
  24. <dependency>
  25. <groupId>commons-lang</groupId>
  26. <artifactId>commons-lang</artifactId>
  27. <version>2.6</version>
  28. </dependency>
  29. <dependency>
  30. <groupId>commons-io</groupId>
  31. <artifactId>commons-io</artifactId>
  32. <version>2.6</version>
  33. </dependency>
  34. <!--加入json解析 end-->

4.2、es配置

4.2.1、application.yml 配置文件

  1. # es集群名称
  2. elasticsearch.clusterName=single-node-cluster
  3. #es用户名
  4. elasticsearch.userName=elastic
  5. #es密码
  6. elasticsearch.password=elastic
  7. # es host ip 地址(集群):本次使用的是单机模式
  8. elasticsearch.hosts=43.142.243.124:9200
  9. # es 请求方式
  10. elasticsearch.scheme=http
  11. # es 连接超时时间
  12. elasticsearch.connectTimeOut=1000
  13. # es socket 连接超时时间
  14. elasticsearch.socketTimeOut=30000
  15. # es 请求超时时间
  16. elasticsearch.connectionRequestTimeOut=500
  17. # es 最大连接数
  18. elasticsearch.maxConnectNum=100
  19. # es 每个路由的最大连接数
  20. elasticsearch.maxConnectNumPerRoute=100

4.2.2、java 连接配置类

 写一个 Java 配置类读取 application 中的配置信息:

  1. package com.example.demo.config;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.http.HttpHost;
  4. import org.apache.http.auth.AuthScope;
  5. import org.apache.http.auth.UsernamePasswordCredentials;
  6. import org.apache.http.client.CredentialsProvider;
  7. import org.apache.http.impl.client.BasicCredentialsProvider;
  8. import org.elasticsearch.client.RestClient;
  9. import org.elasticsearch.client.RestClientBuilder;
  10. import org.elasticsearch.client.RestHighLevelClient;
  11. import org.elasticsearch.client.transport.TransportClient;
  12. import org.elasticsearch.common.settings.Settings;
  13. import org.elasticsearch.common.transport.TransportAddress;
  14. import org.elasticsearch.transport.client.PreBuiltTransportClient;
  15. import org.slf4j.Logger;
  16. import org.slf4j.LoggerFactory;
  17. import org.springframework.beans.factory.annotation.Value;
  18. import org.springframework.boot.context.properties.ConfigurationProperties;
  19. import org.springframework.context.annotation.Bean;
  20. import org.springframework.context.annotation.Configuration;
  21. import java.net.InetAddress;
  22. import java.util.ArrayList;
  23. import java.util.List;
  24. /**
  25. * restHighLevelClient 客户端配置类
  26. */
  27. @Slf4j
  28. @Data
  29. @Configuration
  30. @ConfigurationProperties(prefix = "elasticsearch")
  31. public class ElasticsearchConfig {
  32. // es host ip 地址(集群)
  33. private String hosts;
  34. // es用户名
  35. private String userName;
  36. // es密码
  37. private String password;
  38. // es 请求方式
  39. private String scheme;
  40. // es集群名称
  41. private String clusterName;
  42. // es 连接超时时间
  43. private int connectTimeOut;
  44. // es socket 连接超时时间
  45. private int socketTimeOut;
  46. // es 请求超时时间
  47. private int connectionRequestTimeOut;
  48. // es 最大连接数
  49. private int maxConnectNum;
  50. // es 每个路由的最大连接数
  51. private int maxConnectNumPerRoute;
  52. /**
  53. * 如果@Bean没有指定bean的名称,那么这个bean的名称就是方法名
  54. */
  55. @Bean(name = "restHighLevelClient")
  56. public RestHighLevelClient restHighLevelClient() {
  57. // 拆分地址
  58. // List<HttpHost> hostLists = new ArrayList<>();
  59. // String[] hostList = hosts.split(",");
  60. // for (String addr : hostList) {
  61. // String host = addr.split(":")[0];
  62. // String port = addr.split(":")[1];
  63. // hostLists.add(new HttpHost(host, Integer.parseInt(port), scheme));
  64. // }
  65. // // 转换成 HttpHost 数组
  66. // HttpHost[] httpHost = hostLists.toArray(new HttpHost[]{});
  67. // 此处为单节点es
  68. String host = hosts.split(":")[0];
  69. String port = hosts.split(":")[1];
  70. HttpHost httpHost = new HttpHost(host,Integer.parseInt(port));
  71. // 构建连接对象
  72. RestClientBuilder builder = RestClient.builder(httpHost);
  73. // 设置用户名、密码
  74. CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
  75. credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(userName,password));
  76. // 连接延时配置
  77. builder.setRequestConfigCallback(requestConfigBuilder -> {
  78. requestConfigBuilder.setConnectTimeout(connectTimeOut);
  79. requestConfigBuilder.setSocketTimeout(socketTimeOut);
  80. requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeOut);
  81. return requestConfigBuilder;
  82. });
  83. // 连接数配置
  84. builder.setHttpClientConfigCallback(httpClientBuilder -> {
  85. httpClientBuilder.setMaxConnTotal(maxConnectNum);
  86. httpClientBuilder.setMaxConnPerRoute(maxConnectNumPerRoute);
  87. httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
  88. return httpClientBuilder;
  89. });
  90. return new RestHighLevelClient(builder);
  91. }
  92. }

4.3、mybatis配置

  1. package com.example.test.dao;
  2. import com.example.test.beans.Goods;
  3. import java.util.List;
  4. public interface GoodsMapper {
  5. /**
  6. * 查询所有
  7. */
  8. List<Goods> findAll();
  9. }
  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
  3. <mapper namespace="com.example.test.dao.GoodsMapper">
  4. <select id="findAll" resultType="com.example.test.beans.Goods">
  5. select `id`,
  6. `title`,
  7. `price`,
  8. `stock`,
  9. `saleNum`,
  10. `createTime`,
  11. `categoryName`,
  12. `brandName`,
  13. `status`,
  14. `spec`
  15. from goods
  16. </select>
  17. </mapper>

4.4、实体对象

  1. package com.example.test.beans;
  2. import com.alibaba.fastjson.annotation.JSONField;
  3. import lombok.AllArgsConstructor;
  4. import lombok.Data;
  5. import lombok.NoArgsConstructor;
  6. import lombok.experimental.Accessors;
  7. import java.math.BigDecimal;
  8. import java.util.Date;
  9. public class Goods {
  10. /**
  11. * 商品编号
  12. */
  13. private Long id;
  14. /**
  15. * 商品标题
  16. */
  17. private String title;
  18. /**
  19. * 商品价格
  20. */
  21. private BigDecimal price;
  22. /**
  23. * 商品库存
  24. */
  25. private Integer stock;
  26. /**
  27. * 商品销售数量
  28. */
  29. private Integer saleNum;
  30. /**
  31. * 商品分类
  32. */
  33. private String categoryName;
  34. /**
  35. * 商品品牌
  36. */
  37. private String brandName;
  38. /**
  39. * 上下架状态
  40. */
  41. private Integer status;
  42. /**
  43. * 说明书
  44. */
  45. private String spec;
  46. /**
  47. * 商品创建时间
  48. */
  49. @JSONField(format = "yyyy-MM-dd HH:mm:ss")
  50. private Date createTime;
  51. public Goods() {
  52. }
  53. public Goods(Long id, String title, BigDecimal price, Integer stock, Integer saleNum, String categoryName, String brandName, Integer status, String spec, Date createTime) {
  54. this.id = id;
  55. this.title = title;
  56. this.price = price;
  57. this.stock = stock;
  58. this.saleNum = saleNum;
  59. this.categoryName = categoryName;
  60. this.brandName = brandName;
  61. this.status = status;
  62. this.spec = spec;
  63. this.createTime = createTime;
  64. }
  65. public Long getId() {
  66. return id;
  67. }
  68. public void setId(Long id) {
  69. this.id = id;
  70. }
  71. public String getTitle() {
  72. return title;
  73. }
  74. public void setTitle(String title) {
  75. this.title = title;
  76. }
  77. public BigDecimal getPrice() {
  78. return price;
  79. }
  80. public void setPrice(BigDecimal price) {
  81. this.price = price;
  82. }
  83. public Integer getStock() {
  84. return stock;
  85. }
  86. public void setStock(Integer stock) {
  87. this.stock = stock;
  88. }
  89. public Integer getSaleNum() {
  90. return saleNum;
  91. }
  92. public void setSaleNum(Integer saleNum) {
  93. this.saleNum = saleNum;
  94. }
  95. public String getCategoryName() {
  96. return categoryName;
  97. }
  98. public void setCategoryName(String categoryName) {
  99. this.categoryName = categoryName;
  100. }
  101. public String getBrandName() {
  102. return brandName;
  103. }
  104. public void setBrandName(String brandName) {
  105. this.brandName = brandName;
  106. }
  107. public Integer getStatus() {
  108. return status;
  109. }
  110. public void setStatus(Integer status) {
  111. this.status = status;
  112. }
  113. public Date getCreateTime() {
  114. return createTime;
  115. }
  116. public void setCreateTime(Date createTime) {
  117. this.createTime = createTime;
  118. }
  119. public String getSpec() {
  120. return spec;
  121. }
  122. public void setSpec(String spec) {
  123. this.spec = spec;
  124. }
  125. @Override
  126. public String toString() {
  127. return "Goods{" +
  128. "id=" + id +
  129. ", title='" + title + '\'' +
  130. ", price=" + price +
  131. ", stock=" + stock +
  132. ", saleNum=" + saleNum +
  133. ", categoryName='" + categoryName + '\'' +
  134. ", brandName='" + brandName + '\'' +
  135. ", status=" + status +
  136. ", spec='" + spec + '\'' +
  137. ", createTime=" + createTime +
  138. '}';
  139. }
  140. }

5、索引操作service

 IndexTestService:

  1. package com.example.test.service.es;
  2. import org.elasticsearch.cluster.metadata.MappingMetaData;
  3. import java.util.Map;
  4. public interface IndexTestService {
  5. public boolean indexCreate() throws Exception;
  6. public Map<String,Object> getMapping(String indexName) throws Exception;
  7. public boolean indexDelete(String indexName) throws Exception;
  8. public boolean indexExists(String indexName) throws Exception;
  9. }

IndexTestServiceImpl : 

  1. package com.example.test.service.impl.es;
  2. import com.example.test.service.es.IndexTestService;
  3. import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
  4. import org.elasticsearch.action.support.master.AcknowledgedResponse;
  5. import org.elasticsearch.client.IndicesClient;
  6. import org.elasticsearch.client.RequestOptions;
  7. import org.elasticsearch.client.RestHighLevelClient;
  8. import org.elasticsearch.client.indices.CreateIndexRequest;
  9. import org.elasticsearch.client.indices.CreateIndexResponse;
  10. import org.elasticsearch.client.indices.GetIndexRequest;
  11. import org.elasticsearch.client.indices.GetIndexResponse;
  12. import org.elasticsearch.cluster.metadata.MappingMetaData;
  13. import org.elasticsearch.common.xcontent.XContentType;
  14. import org.springframework.beans.factory.annotation.Autowired;
  15. import org.springframework.stereotype.Service;
  16. import java.util.Map;
  17. /**
  18. * 索引服务类
  19. */
  20. @Service
  21. public class IndexTestServiceImpl implements IndexTestService {
  22. @Autowired
  23. RestHighLevelClient restHighLevelClient;
  24. @Override
  25. public boolean indexCreate() throws Exception {
  26. // 1、创建 创建索引request 参数:索引名mess
  27. CreateIndexRequest indexRequest = new CreateIndexRequest("goods");
  28. // 2、设置索引的settings
  29. // 3、设置索引的mappings
  30. String mapping = "{\n" +
  31. "\n" +
  32. "\t\t\"properties\": {\n" +
  33. "\t\t \"brandName\": {\n" +
  34. "\t\t\t\"type\": \"keyword\"\n" +
  35. "\t\t },\n" +
  36. "\t\t \"categoryName\": {\n" +
  37. "\t\t\t\"type\": \"keyword\"\n" +
  38. "\t\t },\n" +
  39. "\t\t \"createTime\": {\n" +
  40. "\t\t\t\"type\": \"date\",\n" +
  41. "\t\t\t\"format\": \"yyyy-MM-dd HH:mm:ss\"\n" +
  42. "\t\t },\n" +
  43. "\t\t \"id\": {\n" +
  44. "\t\t\t\"type\": \"long\"\n" +
  45. "\t\t },\n" +
  46. "\t\t \"price\": {\n" +
  47. "\t\t\t\"type\": \"double\"\n" +
  48. "\t\t },\n" +
  49. "\t\t \"saleNum\": {\n" +
  50. "\t\t\t\"type\": \"integer\"\n" +
  51. "\t\t },\n" +
  52. "\t\t \"status\": {\n" +
  53. "\t\t\t\"type\": \"integer\"\n" +
  54. "\t\t },\n" +
  55. "\t\t \"stock\": {\n" +
  56. "\t\t\t\"type\": \"integer\"\n" +
  57. "\t\t },\n" +
  58. "\t\t\"spec\": {\n" +
  59. "\t\t\t\"type\": \"text\",\n" +
  60. "\t\t\t\"analyzer\": \"ik_max_word\",\n" +
  61. "\t\t\t\"search_analyzer\": \"ik_smart\"\n" +
  62. "\t\t },\n" +
  63. "\t\t \"title\": {\n" +
  64. "\t\t\t\"type\": \"text\",\n" +
  65. "\t\t\t\"analyzer\": \"ik_max_word\",\n" +
  66. "\t\t\t\"search_analyzer\": \"ik_smart\"\n" +
  67. "\t\t }\n" +
  68. "\t\t}\n" +
  69. " }";
  70. // 4、 设置索引的别名
  71. // 5、 发送请求
  72. // 5.1 同步方式发送请求
  73. IndicesClient indicesClient = restHighLevelClient.indices();
  74. indexRequest.mapping(mapping, XContentType.JSON);
  75. // 请求服务器
  76. CreateIndexResponse response = indicesClient.create(indexRequest, RequestOptions.DEFAULT);
  77. return response.isAcknowledged();
  78. }
  79. /**
  80. * 获取表结构
  81. * GET goods/_mapping
  82. */
  83. @Override
  84. public Map<String, Object> getMapping(String indexName) throws Exception {
  85. IndicesClient indicesClient = restHighLevelClient.indices();
  86. // 创建get请求
  87. GetIndexRequest request = new GetIndexRequest(indexName);
  88. // 发送get请求
  89. GetIndexResponse response = indicesClient.get(request, RequestOptions.DEFAULT);
  90. // 获取表结构
  91. Map<String, MappingMetaData> mappings = response.getMappings();
  92. Map<String, Object> sourceAsMap = mappings.get(indexName).getSourceAsMap();
  93. return sourceAsMap;
  94. }
  95. /**
  96. * 删除索引库
  97. */
  98. @Override
  99. public boolean indexDelete(String indexName) throws Exception {
  100. IndicesClient indicesClient = restHighLevelClient.indices();
  101. // 创建delete请求方式
  102. DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
  103. // 发送delete请求
  104. AcknowledgedResponse response = indicesClient.delete(deleteIndexRequest, RequestOptions.DEFAULT);
  105. return response.isAcknowledged();
  106. }
  107. /**
  108. * 判断索引库是否存在
  109. */
  110. @Override
  111. public boolean indexExists(String indexName) throws Exception {
  112. IndicesClient indicesClient = restHighLevelClient.indices();
  113. // 创建get请求
  114. GetIndexRequest request = new GetIndexRequest(indexName);
  115. // 判断索引库是否存在
  116. boolean result = indicesClient.exists(request, RequestOptions.DEFAULT);
  117. return result;
  118. }
  119. }

测试代码:

  1. package com.example.test;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.serializer.SerializerFeature;
  4. import com.example.common.utils.java.StackTraceUtil;
  5. import com.example.common.utils.java.UtilMisc;
  6. import com.example.test.beans.Goods;
  7. import com.example.test.service.es.DocumentTestService;
  8. import com.example.test.service.es.EsQueryDataService;
  9. import com.example.test.service.es.IndexTestService;
  10. import lombok.extern.slf4j.Slf4j;
  11. import org.elasticsearch.rest.RestStatus;
  12. import org.junit.Test;
  13. import org.junit.runner.RunWith;
  14. import org.springframework.beans.factory.annotation.Autowired;
  15. import org.springframework.boot.test.context.SpringBootTest;
  16. import org.springframework.test.context.junit4.SpringRunner;
  17. import java.math.BigDecimal;
  18. import java.util.Date;
  19. import java.util.List;
  20. import java.util.Map;
  21. @Slf4j
  22. @RunWith(SpringRunner.class)
  23. @SpringBootTest
  24. public class ElasticsearchTest1 {
  25. @Autowired
  26. IndexTestService indexTestService;
  27. /**
  28. * 创建索引库和映射表结构
  29. * 注意:索引一般不会这么创建
  30. */
  31. @Test
  32. public void indexCreate() {
  33. boolean flag = false;
  34. try {
  35. flag = indexTestService.indexCreate();
  36. } catch (Exception e) {
  37. log.error("创建索引失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  38. }
  39. System.out.println("创建索引是否成功:" + flag);
  40. }
  41. /**
  42. * 获取索引表结构
  43. */
  44. @Test
  45. public void getMapping() {
  46. try {
  47. Map<String, Object> indexMap = indexTestService.getMapping("goods");
  48. // 将bean 转化为格式化后的json字符串
  49. String pretty1 = JSON.toJSONString(indexMap, SerializerFeature.PrettyFormat, SerializerFeature.WriteMapNullValue,
  50. SerializerFeature.WriteDateUseDateFormat);
  51. log.info("索引信息:{}", pretty1);
  52. } catch (Exception e) {
  53. log.error("获取索引失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  54. }
  55. }
  56. /**
  57. * 删除索引库
  58. *
  59. */
  60. @Test
  61. public void deleteIndex() {
  62. boolean flag = false;
  63. try {
  64. flag = indexTestService.indexDelete("goods");
  65. } catch (Exception e) {
  66. log.error("删除索引库失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  67. }
  68. System.out.println("删除索引库是否成功:" + flag);
  69. }
  70. /**
  71. * 校验索引库是否存在
  72. *
  73. */
  74. @Test
  75. public void indexExists() {
  76. boolean flag = false;
  77. try {
  78. flag = indexTestService.indexExists("goods");
  79. } catch (Exception e) {
  80. log.error("校验索引库是否存在,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  81. }
  82. System.out.println("索引库是否存在:" + flag);
  83. }
  84. }

6、文档操作service

        测试数据:https://pan.baidu.com/s/1A_ckKV7wsLJQJoeeALgkig?pwd=r68c

DocumentTestService:

  1. package com.example.test.service.es;
  2. import com.example.test.beans.Goods;
  3. import org.elasticsearch.rest.RestStatus;
  4. import java.io.IOException;
  5. public interface DocumentTestService {
  6. public RestStatus addDocument(String indexName, String type, Goods goods) throws IOException;
  7. public Goods getDocument(String indexName, String type, String id) throws Exception;
  8. public RestStatus updateDocument(String indexName, String type, Goods goods) throws IOException;
  9. public RestStatus deleteDocument(String indexName, String type, String id) throws IOException;
  10. public RestStatus batchImportGoodsData() throws IOException;
  11. }

DocumentTestServiceImpl : 

  1. package com.example.test.service.impl.es;
  2. import com.alibaba.fastjson.JSON;
  3. import com.example.common.utils.ObjectUtil;
  4. import com.example.common.utils.java.BeanMapUtils;
  5. import com.example.test.beans.Goods;
  6. import com.example.test.dao.GoodsMapper;
  7. import com.example.test.service.es.DocumentTestService;
  8. import lombok.extern.slf4j.Slf4j;
  9. import org.elasticsearch.action.bulk.BulkRequest;
  10. import org.elasticsearch.action.bulk.BulkResponse;
  11. import org.elasticsearch.action.delete.DeleteRequest;
  12. import org.elasticsearch.action.delete.DeleteResponse;
  13. import org.elasticsearch.action.get.GetRequest;
  14. import org.elasticsearch.action.get.GetResponse;
  15. import org.elasticsearch.action.index.IndexRequest;
  16. import org.elasticsearch.action.index.IndexResponse;
  17. import org.elasticsearch.action.update.UpdateRequest;
  18. import org.elasticsearch.action.update.UpdateResponse;
  19. import org.elasticsearch.client.RequestOptions;
  20. import org.elasticsearch.client.RestHighLevelClient;
  21. import org.elasticsearch.common.xcontent.XContentType;
  22. import org.elasticsearch.rest.RestStatus;
  23. import org.springframework.beans.factory.annotation.Autowired;
  24. import org.springframework.stereotype.Service;
  25. import javax.annotation.Resource;
  26. import java.io.IOException;
  27. import java.util.List;
  28. import java.util.Map;
  29. /**
  30. * 文档服务类
  31. */
  32. @Slf4j
  33. @Service
  34. public class DocumentTestServiceImpl implements DocumentTestService {
  35. @Autowired
  36. RestHighLevelClient restHighLevelClient;
  37. @Resource
  38. GoodsMapper goodsMapper;
  39. /**
  40. * 增加文档信息
  41. */
  42. @Override
  43. public RestStatus addDocument(String indexName, String type, Goods goods) throws IOException {
  44. // 默认类型为_doc
  45. type = ObjectUtil.isEmptyObject(type) ? "_doc" : type;
  46. // 将对象转为json
  47. String data = JSON.toJSONString(goods);
  48. // 创建索引请求对象
  49. IndexRequest indexRequest = new IndexRequest(indexName,type).id(goods.getId() + "").source(data, XContentType.JSON);
  50. // 执行增加文档
  51. IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
  52. RestStatus status = response.status();
  53. log.info("创建状态:{}", status);
  54. return status;
  55. }
  56. /**
  57. * 获取文档信息
  58. */
  59. @Override
  60. public Goods getDocument(String indexName, String type, String id) throws Exception {
  61. // 默认类型为_doc
  62. type = ObjectUtil.isEmptyObject(type) ? "_doc" : type;
  63. // 创建获取请求对象
  64. GetRequest getRequest = new GetRequest(indexName, type, id);
  65. GetResponse response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
  66. Map<String, Object> sourceAsMap = response.getSourceAsMap();
  67. Goods goods = BeanMapUtils.mapToBean(sourceAsMap,Goods.class);
  68. return goods;
  69. }
  70. /**
  71. * 更新文档信息
  72. */
  73. @Override
  74. public RestStatus updateDocument(String indexName, String type, Goods goods) throws IOException {
  75. // 默认类型为_doc
  76. type = ObjectUtil.isEmptyObject(type) ? "_doc" : type;
  77. // 将对象转为json
  78. String data = JSON.toJSONString(goods);
  79. // 创建索引请求对象
  80. UpdateRequest updateRequest = new UpdateRequest(indexName, type, String.valueOf(goods.getId()));
  81. // 设置更新文档内容
  82. updateRequest.doc(data, XContentType.JSON);
  83. // 执行更新文档
  84. UpdateResponse response = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
  85. log.info("创建状态:{}", response.status());
  86. RestStatus status = response.status();
  87. log.info("更新文档信息响应状态:{}", status);
  88. return status;
  89. }
  90. /**
  91. * 删除文档信息
  92. */
  93. @Override
  94. public RestStatus deleteDocument(String indexName, String type, String id) throws IOException {
  95. // 默认类型为_doc
  96. type = ObjectUtil.isEmptyObject(type) ? "_doc" : type;
  97. // 创建删除请求对象
  98. DeleteRequest deleteRequest = new DeleteRequest(indexName, type, id);
  99. // 执行删除文档
  100. DeleteResponse response = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
  101. RestStatus status = response.status();
  102. log.info("删除文档响应状态:{}", status);
  103. return status;
  104. }
  105. @Override
  106. public RestStatus batchImportGoodsData() throws IOException {
  107. //1.查询所有数据,mysql
  108. List<Goods> goodsList = goodsMapper.findAll();
  109. //2.bulk导入
  110. BulkRequest bulkRequest = new BulkRequest();
  111. //2.1 循环goodsList,创建IndexRequest添加数据
  112. for (Goods goods : goodsList) {
  113. //将goods对象转换为json字符串
  114. String data = JSON.toJSONString(goods);//map --> {}
  115. IndexRequest indexRequest = new IndexRequest("goods","_doc");
  116. indexRequest.id(goods.getId() + "").source(data, XContentType.JSON);
  117. bulkRequest.add(indexRequest);
  118. }
  119. BulkResponse response = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  120. return response.status();
  121. }
  122. }

测试代码:

  1. package com.example.test;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.serializer.SerializerFeature;
  4. import com.example.common.utils.java.StackTraceUtil;
  5. import com.example.common.utils.java.UtilMisc;
  6. import com.example.test.beans.Goods;
  7. import com.example.test.service.es.DocumentTestService;
  8. import com.example.test.service.es.EsQueryDataService;
  9. import com.example.test.service.es.IndexTestService;
  10. import lombok.extern.slf4j.Slf4j;
  11. import org.elasticsearch.rest.RestStatus;
  12. import org.junit.Test;
  13. import org.junit.runner.RunWith;
  14. import org.springframework.beans.factory.annotation.Autowired;
  15. import org.springframework.boot.test.context.SpringBootTest;
  16. import org.springframework.test.context.junit4.SpringRunner;
  17. import java.math.BigDecimal;
  18. import java.util.Date;
  19. import java.util.List;
  20. import java.util.Map;
  21. @Slf4j
  22. @RunWith(SpringRunner.class)
  23. @SpringBootTest
  24. public class ElasticsearchTest1 {
  25. @Autowired
  26. DocumentTestService documentTestService;
  27. /**
  28. * 添加文档
  29. *
  30. */
  31. @Test
  32. public void addDocument() {
  33. // 创建商品信息
  34. Goods goods = new Goods();
  35. goods.setId(1L);
  36. goods.setTitle("Apple iPhone 13 Pro (A2639) 256GB 远峰蓝色 支持移动联通电信5G 双卡双待手机");
  37. goods.setPrice(new BigDecimal("8799.00"));
  38. goods.setStock(1000);
  39. goods.setSaleNum(599);
  40. goods.setCategoryName("手机");
  41. goods.setBrandName("Apple");
  42. goods.setStatus(0);
  43. goods.setCreateTime(new Date());
  44. // 返回状态
  45. RestStatus restStatus = null;
  46. try {
  47. restStatus = documentTestService.addDocument("goods","_doc", goods);
  48. } catch (Exception e) {
  49. log.error("添加文档失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  50. }
  51. System.out.println("添加文档响应状态:" + restStatus);
  52. }
  53. @Test
  54. public void getDocument() {
  55. // 返回信息
  56. Goods goods = null;
  57. try {
  58. goods = documentTestService.getDocument("goods", "_doc", "1");
  59. } catch (Exception e) {
  60. log.error("查询文档失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  61. }
  62. System.out.println("查询的文档信息:" + goods);
  63. }
  64. @Test
  65. public void updateDocument() {
  66. // 创建商品信息
  67. Goods goods = new Goods();
  68. goods.setTitle("Apple iPhone 13 Pro Max (A2644) 256GB 远峰蓝色 支持移动联通电信5G 双卡双待手机");
  69. goods.setPrice(new BigDecimal("9999"));
  70. goods.setId(1L);
  71. // 返回状态
  72. RestStatus restStatus = null;
  73. try {
  74. restStatus = documentTestService.updateDocument("goods", "_doc", goods);
  75. } catch (Exception e) {
  76. log.error("更新文档失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  77. }
  78. System.out.println("更新文档响应状态:" + restStatus);
  79. }
  80. @Test
  81. public void deleteDocument() {
  82. // 返回状态
  83. RestStatus restStatus = null;
  84. try {
  85. restStatus = documentTestService.deleteDocument("goods", "_doc", "1");
  86. } catch (Exception e) {
  87. log.error("删除文档失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  88. }
  89. System.out.println("删除文档响应状态:" + restStatus);
  90. }
  91. /**
  92. * 批量导入测试数据
  93. */
  94. @Test
  95. public void importDocument() {
  96. // 返回状态
  97. RestStatus restStatus = null;
  98. try {
  99. restStatus = documentTestService.batchImportGoodsData();
  100. } catch (Exception e) {
  101. log.error("批量导入数据失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  102. }
  103. System.out.println("批量导入数据响应状态:" + restStatus);
  104. }
  105. }

7、DSL高级查询操作

EsQueryDataService:

  1. package com.example.test.service.es;
  2. import java.io.IOException;
  3. import java.util.List;
  4. import java.util.Map;
  5. public interface EsQueryDataService {
  6. public <T> List<T> termQuery(String indexName, String columnName, Object value, Class<T> classz);
  7. public <T> List<T> termsQuery(String indexName, String columnName, Object[] dataArgs, Class<T> classz);
  8. public <T> List<T> matchAllQuery(String indexName, Class<T> classz, int startIndex, int pageSize, List<String> orderList, String columnName, Object value);
  9. public <T> List<T> matchPhraseQuery(String indexName, Class<T> classz, String columnName, Object value);
  10. public <T> List<T> matchMultiQuery(String indexName, Class<T> classz, String[] fields, Object text);
  11. public <T> List<T> wildcardQuery(String indexName, Class<T> classz,String field, String text);
  12. public <T> List<T> fuzzyQuery(String indexName, Class<T> classz, String field, String text);
  13. public <T> List<T> boolQuery(String indexName,Class<T> beanClass);
  14. public void metricQuery(String indexName);
  15. public void bucketQuery(String indexName,String bucketField, String bucketFieldAlias);
  16. public void subBucketQuery(String indexName,String bucketField, String bucketFieldAlias,String avgFiled,String avgFiledAlias);
  17. public void subSubAgg(String indexName);
  18. }

EsQueryDataServiceImpl : 

  1. package com.example.test.service.impl.es;
  2. import com.alibaba.fastjson.JSON;
  3. import com.example.common.exception.myexception.MyBusinessException;
  4. import com.example.common.utils.ObjectUtil;
  5. import com.example.common.utils.java.StackTraceUtil;
  6. import com.example.test.beans.Goods;
  7. import com.example.test.service.es.EsQueryDataService;
  8. import lombok.extern.slf4j.Slf4j;
  9. import org.apache.poi.ss.formula.functions.T;
  10. import org.elasticsearch.action.search.SearchRequest;
  11. import org.elasticsearch.action.search.SearchResponse;
  12. import org.elasticsearch.client.RequestOptions;
  13. import org.elasticsearch.client.RestHighLevelClient;
  14. import org.elasticsearch.common.text.Text;
  15. import org.elasticsearch.common.unit.Fuzziness;
  16. import org.elasticsearch.index.query.*;
  17. import org.elasticsearch.rest.RestStatus;
  18. import org.elasticsearch.search.SearchHit;
  19. import org.elasticsearch.search.SearchHits;
  20. import org.elasticsearch.search.aggregations.AggregationBuilder;
  21. import org.elasticsearch.search.aggregations.AggregationBuilders;
  22. import org.elasticsearch.search.aggregations.Aggregations;
  23. import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
  24. import org.elasticsearch.search.aggregations.bucket.terms.Terms;
  25. import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
  26. import org.elasticsearch.search.aggregations.metrics.avg.ParsedAvg;
  27. import org.elasticsearch.search.aggregations.metrics.max.ParsedMax;
  28. import org.elasticsearch.search.aggregations.metrics.min.ParsedMin;
  29. import org.elasticsearch.search.builder.SearchSourceBuilder;
  30. import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
  31. import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
  32. import org.elasticsearch.search.sort.SortBuilder;
  33. import org.elasticsearch.search.sort.SortOrder;
  34. import org.springframework.beans.factory.annotation.Autowired;
  35. import org.springframework.stereotype.Service;
  36. import javax.rmi.CORBA.Util;
  37. import java.io.IOException;
  38. import java.lang.reflect.InvocationTargetException;
  39. import java.lang.reflect.Method;
  40. import java.util.ArrayList;
  41. import java.util.List;
  42. import java.util.Map;
  43. import java.util.Set;
  44. @Slf4j
  45. @Service
  46. public class EsQueryDataServiceImpl implements EsQueryDataService {
  47. @Autowired
  48. RestHighLevelClient restHighLevelClient;
  49. /**
  50. * 精确查询(termQuery)
  51. */
  52. @Override
  53. public <T> List<T> termQuery(String indexName, String field, Object value, Class<T> beanClass) {
  54. // 查询的数据列表
  55. List<T> list = new ArrayList<>();
  56. try {
  57. // 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询)
  58. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  59. searchSourceBuilder.query(QueryBuilders.termQuery(field, value));
  60. // 执行查询es数据
  61. queryEsData(indexName, beanClass, list, searchSourceBuilder);
  62. } catch (IOException e) {
  63. log.error("精确查询数据失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  64. throw new MyBusinessException("99999","精确查询数据失败");
  65. }
  66. return list;
  67. }
  68. /**
  69. * terms:多个查询内容在一个字段中进行查询
  70. */
  71. @Override
  72. public <T> List<T> termsQuery(String indexName, String field, Object[] dataArgs, Class<T> beanClass) {
  73. // 查询的数据列表
  74. List<T> list = new ArrayList<>();
  75. try {
  76. // 构建查询条件(注意:termsQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询)
  77. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  78. searchSourceBuilder.query(QueryBuilders.termsQuery(field, dataArgs));
  79. // 展示100条,默认只展示10条记录
  80. searchSourceBuilder.size(100);
  81. // 执行查询es数据
  82. queryEsData(indexName, beanClass, list, searchSourceBuilder);
  83. } catch (IOException e) {
  84. log.error("单字段多内容查询数据失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  85. throw new MyBusinessException("99999","单字段多内容查询数据失败");
  86. }
  87. return list;
  88. }
  89. /**
  90. * 匹配查询符合条件的所有数据,并设置分页
  91. */
  92. @Override
  93. public <T> List<T> matchAllQuery(String indexName, Class<T> beanClass, int startIndex, int pageSize, List<String> orderList, String field, Object value) {
  94. // 查询的数据列表
  95. List<T> list = new ArrayList<>();
  96. try {
  97. // 创建查询源构造器
  98. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  99. // 构建查询条件
  100. if (!ObjectUtil.isEmptyObject(field) && !ObjectUtil.isEmptyObject(value)) {
  101. MatchQueryBuilder matchQueryBuilder = QueryBuilders.matchQuery(field, value);
  102. searchSourceBuilder.query(matchQueryBuilder);
  103. } else {
  104. MatchAllQueryBuilder matchAllQueryBuilder = QueryBuilders.matchAllQuery();
  105. searchSourceBuilder.query(matchAllQueryBuilder);
  106. }
  107. // 设置分页
  108. searchSourceBuilder.from(startIndex);
  109. searchSourceBuilder.size(pageSize);
  110. // 设置排序
  111. if (orderList != null) {
  112. for(String order : orderList) {
  113. // -开头代表:倒序
  114. boolean flag = order.startsWith("-");
  115. SortOrder sort = flag ? SortOrder.DESC: SortOrder.ASC;
  116. order = flag ? order.substring(1) : order;
  117. searchSourceBuilder.sort(order, sort);
  118. }
  119. }
  120. // 执行查询es数据
  121. queryEsData(indexName, beanClass, list, searchSourceBuilder);
  122. } catch (IOException e) {
  123. log.error("查询所有数据失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  124. throw new MyBusinessException("99999","查询所有数据失败");
  125. }
  126. return list;
  127. }
  128. /**
  129. * 词语匹配查询
  130. */
  131. @Override
  132. public <T> List<T> matchPhraseQuery(String indexName, Class<T> beanClass, String field, Object value) {
  133. // 查询的数据列表
  134. List<T> list = new ArrayList<>();
  135. try {
  136. // 构建查询条件
  137. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  138. searchSourceBuilder.query(QueryBuilders.matchPhraseQuery(field, value));
  139. // 执行查询es数据
  140. queryEsData(indexName, beanClass, list, searchSourceBuilder);
  141. } catch (IOException e) {
  142. log.error("词语匹配查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  143. throw new MyBusinessException("99999","词语匹配查询失败");
  144. }
  145. return list;
  146. }
  147. /**
  148. * 内容在多字段中进行查询
  149. */
  150. @Override
  151. public <T> List<T> matchMultiQuery(String indexName, Class<T> beanClass, String[] fields, Object text) {
  152. // 查询的数据列表
  153. List<T> list = new ArrayList<>();
  154. try {
  155. // 构建查询条件
  156. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  157. // 设置查询条件
  158. searchSourceBuilder.query(QueryBuilders.multiMatchQuery(text, fields));
  159. // 执行查询es数据
  160. queryEsData(indexName, beanClass, list, searchSourceBuilder);
  161. } catch (IOException e) {
  162. log.error("词语匹配查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  163. throw new MyBusinessException("99999","词语匹配查询失败");
  164. }
  165. return list;
  166. }
  167. /**
  168. * 通配符查询(wildcard):会对查询条件进行分词。还可以使用通配符 ?(任意单个字符) 和 * (0个或多个字符)
  169. *
  170. * *:表示多个字符(0个或多个字符)
  171. * ?:表示单个字符
  172. */
  173. @Override
  174. public <T> List<T> wildcardQuery(String indexName, Class<T> beanClass,String field, String text) {
  175. // 查询的数据列表
  176. List<T> list = new ArrayList<>();
  177. try {
  178. // 构建查询条件
  179. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  180. searchSourceBuilder.query(QueryBuilders.wildcardQuery(field, text));
  181. // 执行查询es数据
  182. queryEsData(indexName, beanClass, list, searchSourceBuilder);
  183. } catch (IOException e) {
  184. log.error("通配符查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  185. throw new MyBusinessException("99999","通配符查询失败");
  186. }
  187. return list;
  188. }
  189. /**
  190. * 模糊查询所有以 “三” 结尾的商品信息
  191. */
  192. @Override
  193. public <T> List<T> fuzzyQuery(String indexName, Class<T> beanClass, String field, String text) {
  194. // 查询的数据列表
  195. List<T> list = new ArrayList<>();
  196. try {
  197. // 构建查询条件
  198. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  199. searchSourceBuilder.query(QueryBuilders.fuzzyQuery(field, text).fuzziness(Fuzziness.AUTO));
  200. // 执行查询es数据
  201. queryEsData(indexName, beanClass, list, searchSourceBuilder);
  202. } catch (IOException e) {
  203. log.error("通配符查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  204. throw new MyBusinessException("99999","通配符查询失败");
  205. }
  206. return list;
  207. }
  208. /**
  209. * boolQuery 查询
  210. * 高亮展示标题搜索字段
  211. * 设置出参返回字段
  212. *
  213. * 案例:查询从2018-2022年间标题含 三星 的商品信息
  214. */
  215. @Override
  216. public <T> List<T> boolQuery(String indexName,Class<T> beanClass) {
  217. // 查询的数据列表
  218. List<T> list = new ArrayList<>();
  219. try {
  220. // 创建 Bool 查询构建器
  221. BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
  222. // 构建查询条件
  223. boolQueryBuilder.must(QueryBuilders.matchQuery("title", "三星")); // 标题
  224. boolQueryBuilder.must(QueryBuilders.matchQuery("spec", "联通3G"));// 说明书
  225. boolQueryBuilder.filter().add(QueryBuilders.rangeQuery("createTime").format("yyyy").gte("2018").lte("2022")); // 创建时间
  226. // 构建查询源构建器
  227. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  228. searchSourceBuilder.query(boolQueryBuilder);
  229. searchSourceBuilder.size(100);
  230. // 甚至返回字段
  231. // 如果查询的属性很少,那就使用includes,而excludes设置为空数组
  232. // 如果排序的属性很少,那就使用excludes,而includes设置为空数组
  233. String[] includes = {"title", "categoryName", "price"};
  234. String[] excludes = {};
  235. searchSourceBuilder.fetchSource(includes, excludes);
  236. // 高亮设置
  237. // 设置高亮三要素: field: 你的高亮字段 , preTags :前缀 , postTags:后缀
  238. HighlightBuilder highlightBuilder = new HighlightBuilder().field("title").preTags("<font color='red'>").postTags("</font>");
  239. highlightBuilder.field("spec").preTags("<font color='red'>").postTags("</font>");
  240. searchSourceBuilder.highlighter(highlightBuilder);
  241. // 创建查询请求对象,将查询对象配置到其中
  242. SearchRequest searchRequest = new SearchRequest(indexName);
  243. searchRequest.source(searchSourceBuilder);
  244. // 执行查询,然后处理响应结果
  245. SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  246. // 根据状态和数据条数验证是否返回了数据
  247. if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits() > 0) {
  248. SearchHits hits = searchResponse.getHits();
  249. for (SearchHit hit : hits) {
  250. // 将 JSON 转换成对象
  251. T bean = JSON.parseObject(hit.getSourceAsString(), beanClass);
  252. // 获取高亮的数据
  253. HighlightField highlightField = hit.getHighlightFields().get("title");
  254. System.out.println("高亮名称:" + highlightField.getFragments()[0].string());
  255. // 替换掉原来的数据
  256. Text[] fragments = highlightField.getFragments();
  257. if (fragments != null && fragments.length > 0) {
  258. StringBuilder title = new StringBuilder();
  259. for (Text fragment : fragments) {
  260. title.append(fragment);
  261. }
  262. // 获取method对象,其中包含方法名称和参数列表
  263. Method setTitle = beanClass.getMethod("setTitle", String.class);
  264. if (setTitle != null) {
  265. // 执行method,bean为实例对象,后面是方法参数列表;setTitle没有返回值
  266. setTitle.invoke(bean, title.toString());
  267. }
  268. }
  269. list.add(bean);
  270. }
  271. }
  272. } catch (Exception e) {
  273. log.error("布尔查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  274. throw new MyBusinessException("99999", "布尔查询失败");
  275. }
  276. return list;
  277. }
  278. /**
  279. * 聚合查询 : 聚合查询一定是【先查出结果】,然后对【结果使用聚合函数】做处理.
  280. *
  281. * Metric 指标聚合分析。常用的操作有:avg:求平均、max:最大值、min:最小值、sum:求和等
  282. *
  283. * 案例:分别获取最贵的商品和获取最便宜的商品
  284. */
  285. @Override
  286. public void metricQuery(String indexName) {
  287. try {
  288. // 构建查询条件
  289. MatchAllQueryBuilder matchAllQueryBuilder = QueryBuilders.matchAllQuery();
  290. // 创建查询源构造器
  291. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  292. searchSourceBuilder.query(matchAllQueryBuilder);
  293. // 获取最贵的商品
  294. AggregationBuilder maxPrice = AggregationBuilders.max("maxPrice").field("price");
  295. searchSourceBuilder.aggregation(maxPrice);
  296. // 获取最便宜的商品
  297. AggregationBuilder minPrice = AggregationBuilders.min("minPrice").field("price");
  298. searchSourceBuilder.aggregation(minPrice);
  299. // 创建查询请求对象,将查询对象配置到其中
  300. SearchRequest searchRequest = new SearchRequest(indexName);
  301. searchRequest.source(searchSourceBuilder);
  302. // 执行查询,然后处理响应结果
  303. SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  304. Aggregations aggregations = searchResponse.getAggregations();
  305. ParsedMax max = aggregations.get("maxPrice");
  306. log.info("最贵的价格:" + max.getValue());
  307. ParsedMin min = aggregations.get("minPrice");
  308. log.info("最便宜的价格:" + min.getValue());
  309. } catch (Exception e) {
  310. log.error("指标聚合分析查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  311. throw new MyBusinessException("99999", "指标聚合分析查询失败");
  312. }
  313. }
  314. /**
  315. * 聚合查询: 聚合查询一定是【先查出结果】,然后对【结果使用聚合函数】做处理.
  316. *
  317. * Bucket 分桶聚合分析 : 对查询出的数据进行分组group by,再在组上进行游标聚合
  318. *
  319. * 案例:根据品牌进行聚合查询
  320. */
  321. @Override
  322. public void bucketQuery(String indexName,String bucketField, String bucketFieldAlias) {
  323. try {
  324. // 构建查询条件
  325. MatchAllQueryBuilder matchAllQueryBuilder = QueryBuilders.matchAllQuery();
  326. // 创建查询源构造器
  327. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  328. searchSourceBuilder.query(matchAllQueryBuilder);
  329. // 根据bucketField进行分组查询
  330. TermsAggregationBuilder aggBrandName = AggregationBuilders.terms(bucketFieldAlias).field(bucketField);
  331. searchSourceBuilder.aggregation(aggBrandName);
  332. // 创建查询请求对象,将查询对象配置到其中
  333. SearchRequest searchRequest = new SearchRequest(indexName);
  334. searchRequest.source(searchSourceBuilder);
  335. // 执行查询,然后处理响应结果
  336. SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  337. Aggregations aggregations = searchResponse.getAggregations();
  338. ParsedStringTerms aggBrandName1 = aggregations.get(bucketField); // 分组结果数据
  339. for (Terms.Bucket bucket : aggBrandName1.getBuckets()) {
  340. log.info(bucket.getKeyAsString() + "====" + bucket.getDocCount());
  341. }
  342. } catch (IOException e) {
  343. log.error("分桶聚合分析查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  344. throw new MyBusinessException("99999", "分桶聚合分析查询失败");
  345. }
  346. }
  347. /**
  348. * 子聚合聚合查询
  349. * Bucket 分桶聚合分析
  350. *
  351. * 案例:根据商品分类进行分组查询,并且获取分类商品中的平均价格
  352. */
  353. @Override
  354. public void subBucketQuery(String indexName,String bucketField, String bucketFieldAlias,String avgFiled,String avgFiledAlias) {
  355. try {
  356. // 构建查询条件
  357. MatchAllQueryBuilder matchAllQueryBuilder = QueryBuilders.matchAllQuery();
  358. // 创建查询源构造器
  359. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  360. searchSourceBuilder.query(matchAllQueryBuilder);
  361. // 根据 bucketField进行分组查询,并且获取分类信息中 指定字段的平均值
  362. TermsAggregationBuilder subAggregation = AggregationBuilders.terms(bucketFieldAlias).field(bucketField)
  363. .subAggregation(AggregationBuilders.avg(avgFiledAlias).field(avgFiled));
  364. searchSourceBuilder.aggregation(subAggregation);
  365. // 创建查询请求对象,将查询对象配置到其中
  366. SearchRequest searchRequest = new SearchRequest(indexName);
  367. searchRequest.source(searchSourceBuilder);
  368. // 执行查询,然后处理响应结果
  369. SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  370. Aggregations aggregations = searchResponse.getAggregations();
  371. ParsedStringTerms aggBrandName1 = aggregations.get(bucketFieldAlias);
  372. for (Terms.Bucket bucket : aggBrandName1.getBuckets()) {
  373. // 获取聚合后的 组内字段平均值,注意返回值不是Aggregation对象,而是指定的ParsedAvg对象
  374. ParsedAvg avgPrice = bucket.getAggregations().get(avgFiledAlias);
  375. log.info(bucket.getKeyAsString() + "====" + avgPrice.getValueAsString());
  376. }
  377. } catch (IOException e) {
  378. log.error("分桶聚合分析查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  379. throw new MyBusinessException("99999", "分桶聚合分析查询失败");
  380. }
  381. }
  382. /**
  383. * 综合聚合查询
  384. *
  385. * 根据商品分类聚合,获取每个商品类的平均价格,并且在商品分类聚合之上子聚合每个品牌的平均价格
  386. */
  387. @Override
  388. public void subSubAgg(String indexName) {
  389. try {
  390. // 构建查询条件
  391. MatchAllQueryBuilder matchAllQueryBuilder = QueryBuilders.matchAllQuery();
  392. // 创建查询源构造器
  393. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  394. searchSourceBuilder.query(matchAllQueryBuilder);
  395. // 注意这里聚合写的位置不要写错,很容易搞混,错一个括号就不对了
  396. TermsAggregationBuilder subAggregation = AggregationBuilders.terms("categoryNameAgg").field("categoryName")
  397. .subAggregation(AggregationBuilders.avg("categoryNameAvgPrice").field("price"))
  398. .subAggregation(AggregationBuilders.terms("brandNameAgg").field("brandName")
  399. .subAggregation(AggregationBuilders.avg("brandNameAvgPrice").field("price")));
  400. searchSourceBuilder.aggregation(subAggregation);
  401. // 创建查询请求对象,将查询对象配置到其中
  402. SearchRequest searchRequest = new SearchRequest(indexName);
  403. searchRequest.source(searchSourceBuilder);
  404. // 执行查询,然后处理响应结果
  405. SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  406. //获取总记录数
  407. log.info("totalHits = " + searchResponse.getHits().getTotalHits());
  408. // 获取聚合信息
  409. Aggregations aggregations = searchResponse.getAggregations();
  410. ParsedStringTerms categoryNameAgg = aggregations.get("categoryNameAgg");
  411. //获取值返回
  412. for (Terms.Bucket bucket : categoryNameAgg.getBuckets()) {
  413. // 获取聚合后的分类名称
  414. String categoryName = bucket.getKeyAsString();
  415. // 获取聚合命中的文档数量
  416. long docCount = bucket.getDocCount();
  417. // 获取聚合后的分类的平均价格,注意返回值不是Aggregation对象,而是指定的ParsedAvg对象
  418. ParsedAvg avgPrice = bucket.getAggregations().get("categoryNameAvgPrice");
  419. System.out.println(categoryName + "======平均价:" + avgPrice.getValue() + "======数量:" + docCount);
  420. ParsedStringTerms brandNameAgg = bucket.getAggregations().get("brandNameAgg");
  421. for (Terms.Bucket brandeNameAggBucket : brandNameAgg.getBuckets()) {
  422. // 获取聚合后的品牌名称
  423. String brandName = brandeNameAggBucket.getKeyAsString();
  424. // 获取聚合后的品牌的平均价格,注意返回值不是Aggregation对象,而是指定的ParsedAvg对象
  425. ParsedAvg brandNameAvgPrice = brandeNameAggBucket.getAggregations().get("brandNameAvgPrice");
  426. log.info(" " + brandName + "======" + brandNameAvgPrice.getValue());
  427. }
  428. }
  429. } catch (IOException e) {
  430. log.error("综合聚合查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  431. throw new MyBusinessException("99999", "综合聚合查询失败");
  432. }
  433. }
  434. /**
  435. * 执行es查询
  436. * @param indexName
  437. * @param beanClass
  438. * @param list
  439. * @param searchSourceBuilder
  440. * @param <T>
  441. * @throws IOException
  442. */
  443. private <T> void queryEsData(String indexName, Class<T> beanClass, List<T> list, SearchSourceBuilder searchSourceBuilder) throws IOException {
  444. // 创建查询请求对象,将查询对象配置到其中
  445. SearchRequest searchRequest = new SearchRequest(indexName);
  446. searchRequest.source(searchSourceBuilder);
  447. // 执行查询,然后处理响应结果
  448. SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  449. // 根据状态和数据条数验证是否返回了数据
  450. if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits() > 0) {
  451. SearchHits hits = searchResponse.getHits();
  452. for (SearchHit hit : hits) {
  453. // 将 JSON 转换成对象
  454. Goods userInfo = JSON.parseObject(hit.getSourceAsString(), Goods.class);
  455. // 将 JSON 转换成对象
  456. T bean = JSON.parseObject(hit.getSourceAsString(), beanClass);
  457. list.add(bean);
  458. }
  459. }
  460. }
  461. }

测试代码:

  1. package com.example.test;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.serializer.SerializerFeature;
  4. import com.example.common.utils.java.StackTraceUtil;
  5. import com.example.common.utils.java.UtilMisc;
  6. import com.example.test.beans.Goods;
  7. import com.example.test.service.es.EsQueryDataService;
  8. import lombok.extern.slf4j.Slf4j;
  9. import org.elasticsearch.rest.RestStatus;
  10. import org.junit.Test;
  11. import org.junit.runner.RunWith;
  12. import org.springframework.beans.factory.annotation.Autowired;
  13. import org.springframework.boot.test.context.SpringBootTest;
  14. import org.springframework.test.context.junit4.SpringRunner;
  15. import java.math.BigDecimal;
  16. import java.util.Date;
  17. import java.util.List;
  18. import java.util.Map;
  19. @Slf4j
  20. @RunWith(SpringRunner.class)
  21. @SpringBootTest
  22. public class ElasticsearchTest1 {
  23. @Autowired
  24. EsQueryDataService esQueryDataService;
  25. /**
  26. * 单字段精确查询
  27. */
  28. @Test
  29. public void termQuery() {
  30. // 返回数据
  31. List<Goods> goodsList = null;
  32. try {
  33. goodsList = esQueryDataService.termQuery("goods", "title", "华为", Goods.class);
  34. } catch (Exception e) {
  35. log.error("单字段精确查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  36. }
  37. System.out.println("单字段精确查询结果:" + goodsList);
  38. }
  39. /**
  40. * 单字段多内容精确查询
  41. */
  42. @Test
  43. public void termsQuery() {
  44. // 返回数据
  45. List<Goods> goodsList = null;
  46. try {
  47. String[] args = {"华为", "OPPO", "TCL"};
  48. goodsList = esQueryDataService.termsQuery("goods", "title", args, Goods.class);
  49. } catch (Exception e) {
  50. log.error("单字段多内容精确查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  51. }
  52. System.out.println("单字段多内容精确查询结果:" + goodsList);
  53. }
  54. /**
  55. * 单字段匹配分页查询
  56. */
  57. @Test
  58. public void matchQuery() {
  59. // 返回数据
  60. List<Goods> goodsList = null;
  61. try {
  62. List<String> orderList = UtilMisc.toList("-price","-saleNum");
  63. goodsList = esQueryDataService.matchAllQuery("goods", Goods.class,0,3,orderList,"title", "华为");
  64. } catch (Exception e) {
  65. log.error("匹配查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  66. }
  67. System.out.println("匹配查询结果:" + goodsList);
  68. }
  69. /**
  70. * 单字段多内容精确查询
  71. */
  72. @Test
  73. public void matchPhraseQuery() {
  74. // 返回数据
  75. List<Goods> goodsList = null;
  76. try {
  77. goodsList = esQueryDataService.matchPhraseQuery("goods", Goods.class,"title", "华为");
  78. } catch (Exception e) {
  79. log.error("词语匹配查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  80. }
  81. System.out.println("词语匹配查询结果:" + goodsList);
  82. }
  83. /**
  84. * 内容在多字段中进行查询
  85. */
  86. @Test
  87. public void matchMultiQuery() {
  88. // 返回数据
  89. List<Goods> goodsList = null;
  90. try {
  91. String[] fields = {"title", "categoryName"};
  92. goodsList = esQueryDataService.matchMultiQuery("goods", Goods.class,fields,"手机");
  93. } catch (Exception e) {
  94. log.error("内容在多字段中进行查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  95. }
  96. System.out.println("内容在多字段中进行查询结果:" + goodsList);
  97. }
  98. /**
  99. * 通配符查询
  100. *
  101. * 查询所有以 “三” 结尾的商品信息
  102. */
  103. @Test
  104. public void wildcardQuery() {
  105. // 返回数据
  106. List<Goods> goodsList = null;
  107. try {
  108. goodsList = esQueryDataService.wildcardQuery("goods", Goods.class,"title","*三");
  109. } catch (Exception e) {
  110. log.error("通配符查询查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  111. }
  112. System.out.println("通配符查询结果:" + goodsList);
  113. }
  114. /**
  115. * 模糊查询
  116. *
  117. * 模糊查询所有以 “三” 结尾的商品信息
  118. */
  119. @Test
  120. public void fuzzyQuery() {
  121. // 返回数据
  122. List<Goods> goodsList = null;
  123. try {
  124. goodsList = esQueryDataService.fuzzyQuery("goods", Goods.class,"title","三");
  125. } catch (Exception e) {
  126. log.error("模糊查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  127. }
  128. System.out.println("模糊查询结果:" + goodsList);
  129. }
  130. @Test
  131. public void boolQuery() {
  132. // 返回数据
  133. List<Goods> goodsList = null;
  134. try {
  135. goodsList = esQueryDataService.boolQuery("goods", Goods.class);
  136. } catch (Exception e) {
  137. log.error("布尔查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e));
  138. }
  139. System.out.println("布尔查询结果:" + goodsList);
  140. }
  141. /**
  142. * Metric 指标聚合分析
  143. */
  144. @Test
  145. public void metricQuery() {
  146. esQueryDataService.metricQuery("goods");
  147. }
  148. /**
  149. * Bucket 分桶聚合分析
  150. */
  151. @Test
  152. public void bucketQuery() {
  153. esQueryDataService.bucketQuery("goods","brandName","brandNameName");
  154. }
  155. /**
  156. * 子聚合聚合查询
  157. */
  158. @Test
  159. public void subBucketQuery() {
  160. esQueryDataService.subBucketQuery("goods","brandName","brandNameName","price","avgPrice");
  161. }
  162. /**
  163. * 综合聚合查询
  164. */
  165. @Test
  166. public void subSubAgg() {
  167. esQueryDataService.subSubAgg("goods");
  168. }
  169. }

 参考文章:SpringBoot整合RestHighLevelClient案例

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/68344
推荐阅读
相关标签
  

闽ICP备14008679号