当前位置:   article > 正文

使用RestHighLevelClient-6.4.0客户端实现ES增删改查-操作工具类简单封装_es 6.4 resthighlevelclient 配置

es 6.4 resthighlevelclient 配置

maven依赖,使用了RestHighLevelClient6.4.0版本

  1. <dependency>
  2. <groupId>org.elasticsearch.client</groupId>
  3. <artifactId>elasticsearch-rest-high-level-client</artifactId>
  4. <version>6.4.0</version>
  5. </dependency>

首先RestHighLevelClient配置,采用注入方式,启动就注册客户端bean

  1. @Configuration
  2. public class ESHighLevelRestClient {
  3. public static final int CONNECT_TIMEOUT = 5000;
  4. public static final int SOCKET_TIMEOUT = 60000;
  5. public static final int MAX_RETRY_TIMEOUT = 60000;
  6. public static final int WORK_THREADS = Runtime.getRuntime().availableProcessors();
  7. private final String authEnable = "enable";
  8. // 配置类,里面配置了es的地址,用户,密码
  9. @Autowired
  10. private ElasticsearchConfig elasticsearchConfig;
  11. @Bean
  12. public RestHighLevelClient restHighLevelClient() throws UnsupportedEncodingException {
  13. ArrayList<HttpHost> httpHosts = Lists.newArrayList();
  14. Map<String, String> hostsMap = Splitter.on(',').trimResults().omitEmptyStrings()
  15. .withKeyValueSeparator(":")
  16. .split(elasticsearchConfig.getHosts());
  17. hostsMap.entrySet().stream().forEach(x -> {
  18. httpHosts.add(new HttpHost(x.getKey(), Integer.valueOf(x.getValue()), "http"));
  19. });
  20. RestClientBuilder restClientBuilder = RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()]));
  21. // set es connection timeout
  22. restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
  23. @Override
  24. public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
  25. return requestConfigBuilder
  26. .setConnectTimeout(CONNECT_TIMEOUT)
  27. .setSocketTimeout(SOCKET_TIMEOUT);
  28. }
  29. }).setMaxRetryTimeoutMillis(MAX_RETRY_TIMEOUT);
  30. // set es client works numbers
  31. restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
  32. @Override
  33. public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
  34. return httpClientBuilder.setDefaultIOReactorConfig(
  35. IOReactorConfig.custom().setIoThreadCount(WORK_THREADS).build());
  36. }
  37. });
  38. if (authEnable.equalsIgnoreCase(elasticsearchConfig.getAuth())) {
  39. Header[] defaultHeaders = new Header[]{new BasicHeader("Authorization", getToken(elasticsearchConfig.getUsername(), elasticsearchConfig.getPassword()))};
  40. restClientBuilder.setDefaultHeaders(defaultHeaders);
  41. }
  42. return new RestHighLevelClient(restClientBuilder);
  43. }
  44. private String getToken(String username, String password) throws UnsupportedEncodingException {
  45. StringBuilder tokenBuilder = new StringBuilder();
  46. tokenBuilder.append(username);
  47. tokenBuilder.append(":");
  48. tokenBuilder.append(password);
  49. String token = new String(Base64.getEncoder().encode(tokenBuilder.toString().getBytes()), StandardCharsets.UTF_8);
  50. return "Basic " + token;
  51. }
  52. }

封装abstract类,想要操作ES的实体需要继承这个类,重写packageElasticSearchBody()方法,封装查询条件

  1. @Getter
  2. @Setter
  3. public abstract class BaseElasticSearchEntity {
  4. /**
  5. * ES的INDEX
  6. */
  7. private String esIndex;
  8. /**
  9. * ES的TYPE
  10. */
  11. private String esType;
  12. /**
  13. * ES唯一值ID
  14. */
  15. private String esId;
  16. protected BaseElasticSearchEntity() {
  17. }
  18. protected BaseElasticSearchEntity(String esIndex, String esType) {
  19. this.esIndex = esIndex;
  20. this.esType = esType;
  21. }
  22. protected BaseElasticSearchEntity(String esIndex, String esType, String esId) {
  23. this.esIndex = esIndex;
  24. this.esType = esType;
  25. this.esId = esId;
  26. }
  27. /**
  28. * 供子类封装查询条件
  29. *
  30. * @param boolQueryBuilder bool条件
  31. */
  32. public abstract void packageElasticSearchBody(BoolQueryBuilder boolQueryBuilder);
  33. }

操作ES的mapper

  1. public interface ElasticSearchMapper<T extends BaseElasticSearchEntity> {
  2. /**
  3. * 创建ES索引库
  4. *
  5. * @param index index
  6. * @param type type
  7. * @param mapping 映射,为json字符串,
  8. * 例如:{\"properties\":{\"name\":{\"type\":\"keyword\"},\"date1\":{\"type\":\"date\"},\"date2\":{\"type\":\"date\"},\"updateTime\":{\"type\":\"date\"}}}
  9. * @return 是否创建成功
  10. * @throws IOException 异常
  11. */
  12. boolean createIndex(String index, String type, String mapping) throws IOException;
  13. /**
  14. * 删除索引
  15. *
  16. * @param index index
  17. * @return 是否删除成功
  18. * @throws IOException 异常
  19. */
  20. boolean deleteIndex(String index) throws IOException;
  21. /**
  22. * 根据id查询数据是否存在于ES中
  23. *
  24. * @param entry index和type和id
  25. * @return 是否存在
  26. */
  27. boolean isExists(T entry);
  28. /**
  29. * 插入ES,指定id
  30. *
  31. * @param entry 实体
  32. * @return 插入结果
  33. */
  34. String insert(T entry);
  35. /**
  36. * 更新ES,指定id
  37. *
  38. * @param entry 实体
  39. * @return 更新结果
  40. */
  41. String update(T entry);
  42. /**
  43. * 删除ES,指定id
  44. *
  45. * @param entry index和type和id
  46. * @return 删除结果
  47. */
  48. String delete(T entry);
  49. /**
  50. * 根据id查询ES
  51. *
  52. * @param entry index和type和id
  53. * @return 查询数据结果json字符串
  54. */
  55. String selectById(T entry);
  56. /**
  57. * 多条件查询,正序排序
  58. *
  59. * @param entry 封装的查询条件
  60. * @param sortField 排序字段的字段名,如:updateTime
  61. * @param page 页码
  62. * @param length 每页条数
  63. * @return 查询结果,es封装
  64. */
  65. ElasticSearchResponseEntity selectByMultiConditionAsc(T entry, String sortField, Integer page, Integer length);
  66. /**
  67. * 多条件查询,倒序排序
  68. *
  69. * @param entry 封装的查询条件
  70. * @param sortField 排序字段的字段名,如:updateTime
  71. * @param page 页码
  72. * @param length 每页条数
  73. * @return 查询结果,es封装
  74. */
  75. ElasticSearchResponseEntity selectByMultiConditionDesc(T entry, String sortField, Integer page, Integer length);
  76. }

mapper的实现类,具体增删改查方法

  1. @Slf4j
  2. @Repository("elasticSearchMapperImpl")
  3. public class ElasticSearchMapperImpl<T extends BaseElasticSearchEntity> implements ElasticSearchMapper<T> {
  4. @Autowired
  5. private RestHighLevelClient restClient;
  6. @Override
  7. public boolean createIndex(String index, String type, String mapping) throws IOException {
  8. CreateIndexRequest indexRequest = new CreateIndexRequest(index);
  9. indexRequest.mapping(type, mapping, XContentType.JSON);
  10. IndicesClient indicesClient = restClient.indices();
  11. CreateIndexResponse createIndexResponse = indicesClient.create(indexRequest, RequestOptions.DEFAULT);
  12. return createIndexResponse.isAcknowledged();
  13. }
  14. @Override
  15. public boolean deleteIndex(String index) throws IOException {
  16. DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index);
  17. IndicesClient indicesClient = restClient.indices();
  18. AcknowledgedResponse delete = indicesClient.delete(deleteIndexRequest, RequestOptions.DEFAULT);
  19. return delete.isAcknowledged();
  20. }
  21. @Override
  22. public boolean isExists(T entry) {
  23. GetRequest getRequest = new GetRequest(entry.getEsIndex(), entry.getEsType(), entry.getEsId());
  24. getRequest.fetchSourceContext(new FetchSourceContext(false));
  25. getRequest.storedFields("_none_");
  26. try {
  27. boolean exists = restClient.exists(getRequest, RequestOptions.DEFAULT);
  28. log.info("查询ES是否存在数据,isExists:{},id:{}", exists, getRequest.id());
  29. return exists;
  30. } catch (IOException e) {
  31. throw new RuntimeException(e);
  32. }
  33. }
  34. @Override
  35. public String insert(T entry) {
  36. IndexRequest request = new IndexRequest(entry.getEsIndex(), entry.getEsType(), entry.getEsId());
  37. request.source(JSONObject.toJSONString(entry), XContentType.JSON);
  38. request.create(true);
  39. IndexResponse response;
  40. try {
  41. response = restClient.index(request, RequestOptions.DEFAULT);
  42. String name = response == null ? null : response.getResult().name();
  43. log.info("ES执行插入:index:{},type:{},id:{}", request.index(), request.type(), request.id());
  44. return name;
  45. } catch (IOException e) {
  46. throw new RuntimeException(e);
  47. }
  48. }
  49. @Override
  50. public String update(T entry) {
  51. UpdateRequest request = new UpdateRequest(entry.getEsIndex(), entry.getEsType(), entry.getEsId());
  52. request.doc(JSONObject.toJSONString(entry), XContentType.JSON);
  53. UpdateResponse response;
  54. try {
  55. response = restClient.update(request, RequestOptions.DEFAULT);
  56. String name = response == null ? null : response.getResult().name();
  57. log.info("ES执行更新:index:{},type:{},id:{}", request.index(), request.type(), request.id());
  58. return name;
  59. } catch (IOException e) {
  60. throw new RuntimeException(e);
  61. }
  62. }
  63. @Override
  64. public String delete(T entry) {
  65. DeleteRequest request = new DeleteRequest(entry.getEsIndex(), entry.getEsType(), entry.getEsId());
  66. try {
  67. DeleteResponse response = restClient.delete(request, RequestOptions.DEFAULT);
  68. String name = response == null ? null : response.getResult().name();
  69. log.info("ES执行删除:index:{},type:{},id:{}", request.index(), request.type(), request.id());
  70. return name;
  71. } catch (IOException e) {
  72. e.printStackTrace();
  73. }
  74. return null;
  75. }
  76. @Override
  77. public String selectById(T entry) {
  78. GetRequest request = new GetRequest(entry.getEsIndex(), entry.getEsType(), entry.getEsId());
  79. try {
  80. GetResponse response = restClient.get(request, RequestOptions.DEFAULT);
  81. if (response.isExists()) {
  82. return response.getSourceAsString();
  83. } else {
  84. return null;
  85. }
  86. } catch (IOException e) {
  87. throw new RuntimeException(e);
  88. }
  89. }
  90. @Override
  91. public ElasticSearchResponseEntity selectByMultiConditionDesc(T entry, String sortField, Integer page, Integer length) {
  92. return packageSelectData(selectByMultiCondition(entry, sortField, SortOrder.DESC, page, length));
  93. }
  94. @Override
  95. public ElasticSearchResponseEntity selectByMultiConditionAsc(T entry, String sortField, Integer page, Integer length) {
  96. return packageSelectData(selectByMultiCondition(entry, sortField, SortOrder.ASC, page, length));
  97. }
  98. /**
  99. * 封装成ElasticSearchResponseEntity返回
  100. *
  101. * @param searchHits es查询的数据
  102. * @return ElasticSearchResponseEntity
  103. */
  104. private ElasticSearchResponseEntity packageSelectData(SearchHits searchHits) {
  105. if (searchHits == null || searchHits.totalHits < 1) {
  106. return new ElasticSearchResponseEntity(0L, new LinkedList<>());
  107. }
  108. List<String> list = new LinkedList<>();
  109. Arrays.stream(searchHits.getHits()).forEach(hit -> {
  110. String source = hit.getSourceAsString();
  111. log.debug("ES查询数据:{}", source);
  112. list.add(source);
  113. });
  114. return new ElasticSearchResponseEntity(searchHits.getTotalHits(), list);
  115. }
  116. /**
  117. * 多条件查询,包含排序规则和分页功能
  118. *
  119. * @param entry 封装的查询条件
  120. * @param sortField 排序字段的字段名
  121. * 如果在mapping中未指定排序字段的类型为精确类型(如date或keyword),则此排序字段必须加上 .keyword 后缀,表示精准匹配,否则报错:[type=search_phase_execution_exception, reason=all shards failed]
  122. * 例如:username.keyword,如果mapping指定了字段类型为date或者keyword,则直接传字段名即可,例如:keywordName1、updateTime
  123. * @param sortOrder 排序规则,正序还是倒序
  124. * @return 查询结果,es封装
  125. */
  126. private SearchHits selectByMultiCondition(T entry, String sortField, SortOrder sortOrder, Integer page, Integer length) {
  127. SearchRequest searchRequest = new SearchRequest(entry.getEsIndex());
  128. searchRequest.types(entry.getEsType());
  129. try {
  130. BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();
  131. entry.packageElasticSearchBody(boolBuilder);
  132. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  133. sourceBuilder.sort(new FieldSortBuilder(sortField).order(sortOrder));
  134. sourceBuilder.query(boolBuilder);
  135. setPaging(sourceBuilder, page, length);
  136. searchRequest.source(sourceBuilder);
  137. SearchResponse searchResponse = restClient.search(searchRequest, RequestOptions.DEFAULT);
  138. return searchResponse.getHits();
  139. } catch (Exception e) {
  140. log.error("===》 es【多条件】查询,报错!entry={}", searchRequest.source().toString(), e);
  141. return null;
  142. }
  143. }
  144. /**
  145. * 设置es查询的分页配置
  146. *
  147. * ElasticSearchConstant.MAX_SIZE = ES默认10000条,这个自定义的常量无法超过默认值
  148. * @param sourceBuilder builder
  149. * @param page 页码
  150. * @param length 每页条数
  151. */
  152. private void setPaging(SearchSourceBuilder sourceBuilder, Integer page, Integer length) {
  153. if (page != null && length != null) {
  154. length = Math.min(length, ElasticSearchConstant.MAX_SIZE);
  155. int from = (page - 1) * length;
  156. sourceBuilder.from(from);
  157. sourceBuilder.size(length);
  158. }
  159. }
  160. }

返回值封装实体

  1. public class ElasticSearchResponseEntity {
  2. private Long total;// 查询总数
  3. private List<String> list;// 数据集合
  4. public ElasticSearchResponseEntity(Long total, List<String> list) {
  5. this.total = total;
  6. this.list = list;
  7. }
  8. public Long getTotal() {
  9. return total;
  10. }
  11. public void setTotal(Long total) {
  12. this.total = total;
  13. }
  14. public List<String> getList() {
  15. return list;
  16. }
  17. public void setList(List<String> list) {
  18. this.list = list;
  19. }
  20. }

使用:

0.ES查询所需的QueryBuilder的封装,简单封装一下,如有需要可以自行增减

  1. public class QueryBuilder {
  2. /**
  3. * 大于等于
  4. */
  5. public static final String GTE = "gte";
  6. /**
  7. * 小于等于
  8. */
  9. public static final String LTE = "lte";
  10. /**
  11. * 大于
  12. */
  13. public static final String GT = "gt";
  14. /**
  15. * 小于
  16. */
  17. public static final String LT = "lt";
  18. /**
  19. * 不会对搜索词进行分词处理,而是作为一个整体与目标字段进行匹配,若完全匹配,则可查询到。
  20. *
  21. * @param key
  22. * @param value
  23. * @return
  24. */
  25. public static TermQueryBuilder term(String key, Object value) {
  26. return QueryBuilders.termQuery(key, value);
  27. }
  28. /**
  29. * 一次匹配多个值,即 in()查询
  30. *
  31. * @param key key
  32. * @param values 值集合
  33. * @return TermsQueryBuilder
  34. */
  35. public static TermsQueryBuilder terms(String key, Collection<?> values) {
  36. return QueryBuilders.termsQuery(key, values);
  37. }
  38. /**
  39. * 一次匹配多个值,即 in()查询,keyword全值匹配,精确查询
  40. *
  41. * @param key
  42. * @param values
  43. * @return
  44. */
  45. public static TermsQueryBuilder termsKeyword(String key, Collection<?> values) {
  46. return QueryBuilders.termsQuery(key + ".keyword", values);
  47. }
  48. /**
  49. * 会将搜索词分词,再与目标查询字段进行匹配,若分词中的任意一个词与目标字段匹配上,则可查询到。
  50. *
  51. * @param key
  52. * @param value
  53. * @return
  54. */
  55. public static MatchQueryBuilder match(String key, Object value) {
  56. return QueryBuilders.matchQuery(key, value);
  57. }
  58. /**
  59. * 分词模糊查询
  60. *
  61. * @param key
  62. * @param value
  63. * @return
  64. */
  65. public static FuzzyQueryBuilder fuzzy(String key, Object value) {
  66. return QueryBuilders.fuzzyQuery(key, value);
  67. }
  68. /**
  69. * 范围查询
  70. *
  71. * @param key
  72. * @param value
  73. * @return
  74. */
  75. public static RangeQueryBuilder range(String key, Object value, String rangeType) {
  76. RangeQueryBuilder builder = QueryBuilders.rangeQuery(key);
  77. switch (rangeType) {
  78. case GTE:
  79. return builder.gte(value);
  80. case LTE:
  81. return builder.lte(value);
  82. case GT:
  83. return builder.gt(value);
  84. case LT:
  85. return builder.lt(value);
  86. default:
  87. return builder.gt(value);
  88. }
  89. }

1.定义数据实体类,继承BaseElasticSearchEntity,并重写父类方法,封装查询条件(省略getter/setter/toString/空参构造器等)

  1. public class Student extends BaseElasticSearchEntity {
  2. private Long id;
  3. private String name;
  4. private Date birth;
  5. private String nikename;
  6. // 父类构造器,用来操作ES指定的索引和Type
  7. public Student(String esIndex, String esType, String esId) {
  8. super(esIndex, esType, esId);
  9. }
  10. public Student(String esIndex, String esType) {
  11. super(esIndex, esType);
  12. }
  13. // 重写父类方法,封装查询的条件
  14. @Override
  15. public void packageElasticSearchBody(BoolQueryBuilder boolQueryBuilder) {
  16. // 相当于IN查询
  17. if (StringUtils.isNotEmpty(this.name)) {
  18. List<String> nameList = new ArrayList<>(Arrays.asList(this.name.split(",")));
  19. boolQueryBuilder.must(QueryBuilder.termsKeyword("name", nameList));
  20. }
  21. // 日期范围查询,日期存入yyyy-MM-dd HH:mm:ss格式时配置不当可能会报错,建议存入时间戳
  22. if (StringUtils.isNotEmpty(this.startTime)) {
  23. boolQueryBuilder.must(QueryBuilder.range("birth", DateUtil.convertTimeToLong(this.startTime, DateUtil.YYYY_MM_DD_HH_MM_SS), QueryBuilder.GTE));
  24. }
  25. if (StringUtils.isNotEmpty(this.endTime)) {
  26. boolQueryBuilder.must(QueryBuilder.range("birth", DateUtil.convertTimeToLong(this.endTime, DateUtil.YYYY_MM_DD_HH_MM_SS), QueryBuilder.LTE));
  27. }
  28. // match精准查询
  29. if (StringUtils.isNotEmpty(this.nikename)) {
  30. boolQueryBuilder.must(QueryBuilder.match("nikename", this.carId));
  31. }
  32. // 其他查询要求可以自行百度
  33. }
  34. }

2.service层的操作,忽略了业务逻辑,省略接口,直接上实现类了

  1. @Slf4j
  2. @Service("studentElasticSearchService")
  3. public class StudentElasticSearchServiceImpl implements StudentElasticSearchService {
  4. @Autowired
  5. @Qualifier("elasticSearchMapperImpl")
  6. private ElasticSearchMapper<Student> elasticSearchMapper;
  7. private final String TYPE = "student_type";
  8. private final String INDEX_NAME = "student_index";
  9. @Override
  10. public boolean isExists(String id) {
  11. return elasticSearchMapper.isExists(new Student(INDEX_NAME, TYPE, id));
  12. }
  13. @Override
  14. public String insert(Student entry) {
  15. entry.setEsIndex(INDEX_NAME);
  16. entry.setEsType(TYPE);
  17. entry.setEsId(entry.getId());
  18. return elasticSearchMapper.insert(entry);
  19. }
  20. @Override
  21. public Student selectById(String id) {
  22. String jsonString = elasticSearchMapper.selectById(new Student(INDEX_NAME, TYPE, id));
  23. return StringUtils.isEmpty(jsonString) ? null : JSONObject.parseObject(jsonString, Student.class);
  24. }
  25. @Override
  26. public String update(Student entry) {
  27. entry.setEsIndex(INDEX_NAME);
  28. entry.setEsType(TYPE);
  29. entry.setEsId(entry.getId());
  30. return elasticSearchMapper.update(entry);
  31. }
  32. @Override
  33. public String delete(String id) {
  34. return elasticSearchMapper.delete(new Student(INDEX_NAME, TYPE, id));
  35. }
  36. @Override
  37. public ElasticSearchResponseEntity selectByMultiCondition(Student entry, Integer page, Integer length) {
  38. try {
  39. entry.setEsIndex(INDEX_NAME);
  40. entry.setEsType(TYPE);
  41. return elasticSearchMapper.selectByMultiConditionDesc(entry, "birth", page, length);
  42. } catch (Exception e) {
  43. log.error("出错!", e);
  44. }
  45. return null;
  46. }
  47. }

3.直接调用service层的接口,传入封装的实体或者唯一id,即可操作ES的增删改查,代码就不用贴了,到这一步,傻子都会用了

 

如有更好的建议和写法,请告知我,先感谢能让我提升代码质量的任何朋友。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/68698
推荐阅读
相关标签
  

闽ICP备14008679号