当前位置:   article > 正文

springboot项目elasticsearch7.x降版本到elasticsearch6.x实践_spring data elasticsearch 切换es版本

spring data elasticsearch 切换es版本

背景

因为客户使用的es版本为6.3.2,项目开发阶段一直用的es7.9.3,需要对项目中es降版本开发。实践开发中遇到的一系列问题,记录一下。本文的前提是项目中已经实现类es7.9.3版本的对接及增删改查接口的实现。

Es7.9.3整合springboot

主要是两块内容,配置ElasticSearchConfig,开发ElasticSearchService的实现es的增删改查接口,代码如下:

  1. @Slf4j
  2. @Configuration
  3. public class ElasticSearchConfig {
  4. @Value("${elasticsearch.host}")
  5. private String esHost;
  6. @Value("${elasticsearch.port}")
  7. private Integer esPort;
  8. @Value("${elasticsearch.username}")
  9. private String userName;
  10. @Value("${elasticsearch.password}")
  11. private String password;
  12. @Bean(name = "restHighLevelClient")
  13. public RestHighLevelClient tqHighLevelClient() {
  14. HttpHost host = new HttpHost(esHost, esPort, "http");
  15. RestClientBuilder builder= RestClient.builder(host);
  16. CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
  17. //设置用户名和密码:
  18. credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
  19. builder.setHttpClientConfigCallback(httpClientBuilder -> {
  20. RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
  21. .setConnectTimeout(28_800_000)
  22. .setSocketTimeout(28_800_000)
  23. .setConnectionRequestTimeout(28_800_000);
  24. httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());
  25. httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
  26. // httpclient保活策略
  27. httpClientBuilder.setKeepAliveStrategy(
  28. CustomConnectionKeepAliveStrategy.getInstance(10));
  29. return httpClientBuilder;
  30. });
  31. RestHighLevelClient restClient = new RestHighLevelClient(builder);
  32. return restClient;
  33. }
  34. }
  1. @Component
  2. @Slf4j
  3. public class ElasticsearchService {
  4. @Autowired
  5. private RestHighLevelClient tqHighLevelClient;
  6. private static final String COMMA_SEPARATE = ",";
  7. /**
  8. * 创建索引
  9. * @param index
  10. * @return
  11. */
  12. public boolean createIndex(String index) throws IOException {
  13. if (isIndexExist(index)) {
  14. log.error("Index is exits!");
  15. return false;
  16. }
  17. //1.创建索引请求
  18. CreateIndexRequest request = new CreateIndexRequest(index);
  19. //2.执行客户端请求
  20. org.elasticsearch.client.indices.CreateIndexResponse response = tqHighLevelClient.indices()
  21. .create(request, RequestOptions.DEFAULT);
  22. return response.isAcknowledged();
  23. }
  24. /**
  25. * 判断索引是否存在
  26. * @param index
  27. * @return
  28. */
  29. public boolean isIndexExist(String index) throws IOException {
  30. GetIndexRequest request = new GetIndexRequest(index);
  31. boolean exists = tqHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
  32. // tqHighLevelClient.close();
  33. return exists;
  34. }
  35. /**
  36. * 删除索引
  37. * @param index
  38. * @return
  39. */
  40. public boolean deleteIndex(String index) throws IOException {
  41. if (!isIndexExist(index)) {
  42. log.error("Index is not exits!");
  43. return false;
  44. }
  45. DeleteIndexRequest request = new DeleteIndexRequest(index);
  46. AcknowledgedResponse delete = tqHighLevelClient.indices()
  47. .delete(request, RequestOptions.DEFAULT);
  48. return delete.isAcknowledged();
  49. }
  50. /**
  51. * 数据添加,自定义id
  52. * @param object 要增加的数据
  53. * @param index 索引,类似数据库
  54. * @param id 数据ID,为null时es随机生成
  55. * @return
  56. */
  57. public String addData(Object object, String index, String id) throws IOException {
  58. //创建请求
  59. IndexRequest request = new IndexRequest(index);
  60. //规则 put /test_index/_doc/1
  61. request.id(id);
  62. request.timeout(TimeValue.timeValueSeconds(1));
  63. //将数据放入请求 json
  64. IndexRequest source = request.source(JSON.toJSONString(object), XContentType.JSON);
  65. //客户端发送请求
  66. IndexResponse response = tqHighLevelClient.index(request, RequestOptions.DEFAULT);
  67. return response.getId();
  68. }
  69. /**
  70. * 数据添加 随机id
  71. * @param object 要增加的数据
  72. * @param index 索引,类似数据库
  73. * @return
  74. */
  75. public String addData(Object object, String index) throws IOException {
  76. Class<?> clazz = object.getClass();
  77. Field[] fields = clazz.getDeclaredFields();
  78. String idStr = UUID.randomUUID().toString().replaceAll("-", "").toUpperCase();
  79. for (Field field : fields) {
  80. field.setAccessible(true);
  81. //判断字段是否包含注解
  82. try {
  83. Object id = field.get(object);
  84. if (field.isAnnotationPresent(ESId.class)) {
  85. if (id != null) {
  86. idStr = id.toString();
  87. }
  88. break;
  89. }
  90. } catch (IllegalAccessException e) {
  91. e.printStackTrace();
  92. }
  93. }
  94. return addData(object, index, idStr);
  95. }
  96. /**
  97. * 通过ID删除数据
  98. *
  99. * @param index 索引,类似数据库
  100. * @param id 数据ID
  101. * @return
  102. */
  103. public void deleteDataById(String index, String id) throws IOException {
  104. DeleteRequest request = new DeleteRequest(index, id);
  105. tqHighLevelClient.delete(request, RequestOptions.DEFAULT);
  106. }
  107. public void deleteDataByQuery(String index, String id) throws IOException {
  108. DeleteByQueryRequest request = new DeleteByQueryRequest(index);
  109. request.setQuery(new TermQueryBuilder("fileId",id));
  110. tqHighLevelClient.deleteByQuery(request, RequestOptions.DEFAULT);
  111. }
  112. /**
  113. * 通过ID 更新数据
  114. *
  115. * @param object 要更新数据
  116. * @param index 索引,类似数据库
  117. * @param id 数据ID
  118. * @return
  119. */
  120. public void updateDataById(Object object, String index, String id) throws IOException {
  121. UpdateRequest update = new UpdateRequest(index, id);
  122. update.timeout("1s");
  123. update.doc(JSON.toJSONString(object), XContentType.JSON);
  124. tqHighLevelClient.update(update, RequestOptions.DEFAULT);
  125. }
  126. /**
  127. * 通过ID获取数据
  128. *
  129. * @param index 索引,类似数据库
  130. * @param id 数据ID
  131. * @param fields 需要显示的字段,逗号分隔(缺省为全部字段)
  132. * @return
  133. */
  134. public Map<String, Object> searchDataById(String index, String id, String fields) throws IOException {
  135. GetRequest request = new GetRequest(index, id);
  136. if (StringUtils.isNotEmpty(fields)) {
  137. //只查询特定字段。如果需要查询所有字段则不设置该项。
  138. request.fetchSourceContext(new FetchSourceContext(true, fields.split(","), Strings.EMPTY_ARRAY));
  139. }
  140. GetResponse response = tqHighLevelClient.get(request, RequestOptions.DEFAULT);
  141. return response.getSource();
  142. }
  143. public <T> T searchDataById(String index, String id, Class<T> tClass) throws IOException {
  144. GetRequest request = new GetRequest(index, id);
  145. GetResponse response = tqHighLevelClient.get(request, RequestOptions.DEFAULT);
  146. return JSON.parseObject(response.getSourceAsString(), tClass);
  147. }
  148. /**
  149. * 查询全量(其实也是分页循环查询)
  150. *
  151. * @param indices 索引名称,多个时逗号分隔
  152. * @param searchSourceBuilder 查询的builder
  153. * @param options 请求类型
  154. * @param tClass 返回对象的类
  155. * @param propertyNamingStrategy 转换策略(PropertyNamingStrategyConstant.class)
  156. * @param <T> 泛型
  157. * @return 返回查询出来的全量数据
  158. * @throws IOException 抛出io异常
  159. */
  160. public <T> List<T> searchAll(@NonNull String indices, @NonNull SearchSourceBuilder searchSourceBuilder,
  161. @NonNull RequestOptions options, @NonNull Class<T> tClass, Integer propertyNamingStrategy) throws IOException {
  162. String[] indexArray = indices.split(COMMA_SEPARATE);
  163. Objects.requireNonNull(indices, "indices must not be null");
  164. for (String index : indexArray) {
  165. Objects.requireNonNull(index, "index must not be null");
  166. }
  167. if (ORIGINAL.equals(propertyNamingStrategy)) {
  168. } else if (PropertyNamingStrategyConstant.UNDERSCORE_TO_CAMEL.equals(propertyNamingStrategy)) {
  169. } else {
  170. throw new RuntimeException("propertyNamingStrategy is not found");
  171. }
  172. return searchAll(indexArray, null, searchSourceBuilder, options, tClass, propertyNamingStrategy);
  173. }
  174. }

问题一:Springboot版本与es版本不一致

如图所示,项目中springboot版本2.4.2,与其对应的es版本是7.9.3,我们要改es版本为6.3.2,对应springboot版本为2.1.x。此时需要做一个选择,是否要降springboot版本适应es6.3.2。因为项目比较复杂,降springboot版本影响较大,为避免出现其他jar版本冲突或者api不兼容问题,非bi必须,不考虑降低springboot版本。

多模块项目中需要在parent.pom中定义es相关jar的版本,不然会在子模块中出现springboot一致的es7.9.3版本。定义如下:

  1. <!-- 版本依赖管理器 -->
  2. <dependencyManagement>
  3. <dependencies>
  4. <!-- SpringBoot 版本管理器 -->
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.4.2</version>
  9. <type>pom</type>
  10. <scope>import</scope>
  11. </dependency>
  12. <dependency>
  13. <artifactId>elasticsearch-rest-high-level-client</artifactId>
  14. <groupId>org.elasticsearch.client</groupId>
  15. <version>${elasticsearch.version}</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.elasticsearch.client</groupId>
  19. <artifactId>elasticsearch-rest-client</artifactId>
  20. <version>${elasticsearch.version}</version>
  21. </dependency>
  22. <dependency>
  23. <artifactId>elasticsearch</artifactId>
  24. <groupId>org.elasticsearch</groupId>
  25. <version>${elasticsearch.version}</version>
  26. </dependency>
  27. </dependencies>
  28. </dependencyManagement>

问题二:重写es6.3.2的api

因为es6.3.2api与7.9.3版本不一致,需要重新编写ElasticsearchService

增删改demo参考es6.3.2增删改查demo

问题三:业务需求多字段分组聚合计算词频分页查询

业务需求,根据关键词检索文献,要求文献按上传时间排序,分页查询结果,因上传的文献存在超大文档,会对超大文档进行切分上传到es中,所以此需求需要根据fileId,createTime分组,汇总词频值,根据createTime排序后进行分页。

注意:因为没找到排序、分页的方法,所以获取到所有结果,在java应用中排序分页...

  1. SearchRequest searchRequest = new SearchRequest(index);
  2. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  3. BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
  4. boolQueryBuilder.should(QueryBuilders.wildcardQuery("text.keyword", "*" + dto.getKeyword() + "*"));
  5. sourceBuilder.query(boolQueryBuilder);
  6. sourceBuilder.fetchSource(false);   
  7. Map<String, Object> paramsMap = new HashMap<>();   
  8. paramsMap.put("key", dto.getKeyword());   
  9. //词频计算脚本,painless语言(和java类似)   
  10. String scriptStr = "xxx";  
  11. Script script = new Script(ScriptType.INLINE, "painless", scriptStr, paramsMap);
  12. //构建聚合
  13. List<CompositeValuesSourceBuilder<?>> sources = new ArrayList<>();
  14. sources.add(new TermsValuesSourceBuilder("fileId").field("fileId.keyword"));
  15. sources.add(new TermsValuesSourceBuilder("createTime").field("createTime.keyword"));
  16. CompositeAggregationBuilder compositeAggregationBuilder = new CompositeAggregationBuilder("multi_field_agg", sources);
  17. compositeAggregationBuilder.size(10000);
  18. compositeAggregationBuilder.subAggregation(AggregationBuilders.sum("total_keywordCount"). script(script));
  19. sourceBuilder.aggregation(compositeAggregationBuilder);
  20. searchRequest.source(sourceBuilder);
  21. SearchResponse searchResponse = restHighLevelClient.search(searchRequest);
  22. //hit是所有命中文档数,作为分页查询的total并不准确,需要做一步去重
  23. long total = searchResponse.getHits().getTotalHits();
  24. //处理搜索结果
  25. RestStatus restStatus = searchResponse.status();
  26. if (restStatus != RestStatus.OK) {   
  27. log.error("检索报错 restStatus={}",restStatus);   
  28. throw new RuntimeException("文献检索报错");
  29. }
  30. Aggregation agg = searchResponse.getAggregations().get("multi_field_agg");

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/68298
推荐阅读
相关标签
  

闽ICP备14008679号