当前位置:   article > 正文

java操作ElasticSearch之批量操作_elasticsearch批量删除数据

elasticsearch批量删除数据

pom文件

  1. # 版本号
  2. <version.elasticsearch-rest-high-level-client>6.5.0</version.elasticsearch-rest-high-level-client>
  3.     <dependency>
  4. <groupId>org.elasticsearch.client</groupId>
  5. <artifactId>elasticsearch-rest-high-level-client</artifactId>
  6. <version>${version.elasticsearch-rest-high-level-client}</version>
  7. </dependency>
  8. <dependency>
  9. <groupId>org.elasticsearch.client</groupId>
  10. <artifactId>elasticsearch-rest-client</artifactId>
  11. <version>${version.elasticsearch-rest-high-level-client}</version>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.elasticsearch</groupId>
  15. <artifactId>elasticsearch</artifactId>
  16. <version>${version.elasticsearch-rest-high-level-client}</version>
  17. </dependency>

RestHighLevelClient(操作ES的客户端,使用时注入bean即可)

  1. public RestHighLevelClient restHighLevelClient() {
  2. RestClientBuilder rclientBuilder = RestClient.builder(new HttpHost(host, port, "http"))
  3. .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
  4. @Override
  5. public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
  6. return httpClientBuilder
  7. .setKeepAliveStrategy((response, context) -> Duration.ofMinutes(5).toMillis());
  8. }
  9. });
  10. return new RestHighLevelClient(rclientBuilder);

DeleteRequest批量删除

  1. /**
  2. * @Description: 根据es主键删除数据
  3. * @Params:
  4. * @Return:
  5. * @Author: Mr.myq
  6. * @Date: 2022/12/2011:04
  7. */
  8. @Override
  9. public void deleteAllByIds(List<String> ids) {
  10. if (!CollectionUtils.isEmpty(ids)) {
  11. try {
  12. List<List<String>> outherList = SubListUtil.splitList(ids, count);
  13. for (List<String> innerList : outherList) {
  14. //批量删除数据
  15. BulkRequest request = new BulkRequest();
  16. request.timeout("60s");
  17. for (String id : innerList) {
  18. // 类型json/_doc
  19. DeleteRequest source = new DeleteRequest().index("索引").id(id).type(”_doc“);
  20. request.add(source);
  21. }
  22. BulkResponse response = restHighLevelClient().bulk(request, RequestOptions.DEFAULT);
  23. log.debug("ES批量删除数据 是否有失败 : " + response.hasFailures());
  24. }
  25. } catch (IOException e) {
  26. e.printStackTrace();
  27. log.error("ES批量删除数据: " + e.toString());
  28. throw new RuntimeException("ES批量删除数据: {}", e);
  29. } finally {
  30. ids.clear();
  31. }
  32. }
  33. }

BulkRequest批量新增

  1. /**
  2. * 批量更新es数据
  3. *
  4. * @param esDateList
  5. */
  6. private void batchUpdateEsData(List<CmsGoodsEntity> esDateList) {
  7. if (CollectionUtils.isEmpty(esDateList)) {
  8. return;
  9. }
  10. BulkRequest bulkRequest = new BulkRequest();
  11. bulkRequest.timeout("200s");
  12. try {
  13. for (int i = 0; i < esDateList.size(); i++) {
  14. CmsGoodsEntity object = esDateList.get(i);
  15. Map<String, Object> param = new LinkedHashMap<>();
  16. String goodid = object.getGoodid();
  17. param.put("tariffs_rate", object.getTariffsRate());
  18. param.put("inspection_charges_fee", object.getInspectionChargesFee());
  19. param.put("rates", object.getRates());
  20. UpdateRequest updateRequest = new UpdateRequest(”索引“, "类型", goodid);
  21. updateRequest.doc(param);
  22. bulkRequest.add(updateRequest);
  23. }
  24. // 操作ES
  25. BulkResponse bulk = restHighLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT);
  26. log.info("批量更新ES 是否有失败 {} ", bulk.hasFailures());
  27. } catch (IOException e) {
  28. e.printStackTrace();
  29. throw new RuntimeException("***********批量更新ES数据异常***********");
  30. } finally {
  31. esDateList.clear();
  32. }
  33. }

DeleteByQueryRequest根据查询条件删除所有ES数据

  1. /**
  2. * @Description: 根据查看条件删除
  3. * @Params:
  4. * @Return:
  5. * @Author: Mr.myq
  6. * @Date: 2023/2/1415:25
  7. */
  8. @Override
  9. public void deleteBySupplierIdAndHispAndSource(Integer supplierId, String hisp, Integer source) {
  10. //通过QueryBuilders中的搜索逻辑
  11. BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
  12. //1 设置条件
  13. //设置删除条件: key = value
  14. TermQueryBuilder supplierIdTermQueryBuilder = QueryBuilders.termQuery("supplier_id", supplierId);
  15. if (!StringUtils.isEmpty(hisp)) {
  16. TermQueryBuilder hispTermQueryBuilder = QueryBuilders.termQuery("hisp", hisp);
  17. queryBuilder.must(hispTermQueryBuilder);
  18. }
  19. TermQueryBuilder sourceTermQueryBuilder = QueryBuilders.termQuery("source", source);
  20. queryBuilder.must(sourceTermQueryBuilder);
  21. queryBuilder.must(supplierIdTermQueryBuilder);
  22. //2 通过DeleteByQueryRequest来构建删除请求,setQuery来装载条件,indices来指定索引
  23. DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
  24. deleteByQueryRequest.setTimeout("6000s");
  25. deleteByQueryRequest.setQuery(queryBuilder);
  26. //指定删除索引
  27. deleteByQueryRequest.indices(restClientConfig.getIndex());
  28. deleteByQueryRequest.setConflicts("proceed");
  29. try {
  30. //3 通过deleteByQuery来发起删除请求
  31. BulkByScrollResponse deleteResponse = restClientConfig.restHighLevelClient().deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
  32. if (deleteResponse.getDeleted() >= 1) {
  33. log.info("deleteData,删除成功,删除文档条数: " + deleteResponse.getDeleted() + " ,indexName:" + restClientConfig.getIndex());
  34. }
  35. } catch (IOException e) {
  36. e.printStackTrace();
  37. log.error("无法连接到ES目标服务器");
  38. throw new TargetServerException("无法连接到ES目标服务器");
  39. }
  40. }

UpdateByQueryRequest批量更新

  1. /**
  2. * @Description: 根据查看条件更新
  3. * @Params:
  4. * @Return:
  5. * @Author: Mr.myq
  6. * @Date: 2023/2/1415:25
  7. */
  8. public void updateBySupplierIdAndHispAndSource(Integer supplierId, String hisp, Integer source,String value) {
  9. //通过QueryBuilders中的搜索逻辑
  10. BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
  11. //1 设置条件
  12. //设置删除条件: key = value
  13. TermQueryBuilder supplierIdTermQueryBuilder = QueryBuilders.termQuery("supplier_id", supplierId);
  14. if (!StringUtils.isEmpty(hisp)) {
  15. TermQueryBuilder hispTermQueryBuilder = QueryBuilders.termQuery("hisp", hisp);
  16. queryBuilder.must(hispTermQueryBuilder);
  17. }
  18. TermQueryBuilder sourceTermQueryBuilder = QueryBuilders.termQuery("source", source);
  19. queryBuilder.must(sourceTermQueryBuilder);
  20. queryBuilder.must(supplierIdTermQueryBuilder);
  21. //2 通过DeleteByQueryRequest来构建删除请求,setQuery来装载条件,indices来指定索引
  22. UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
  23. updateByQueryRequest.setTimeout("6000s");
  24. updateByQueryRequest.setQuery(queryBuilder);
  25. // 这个实际上就是对应给脚本中传参数的对象。
  26. HashMap<String, Object> params = new HashMap<>(16);
  27. params.put("hobby", "修改的参数value");
  28. final Script script = new Script(
  29. ScriptType.INLINE, "painless",
  30. "ctx._source.hobby = params.hobby",
  31. params);
  32. updateByQueryRequest.setScript(script);
  33. //指定索引
  34. updateByQueryRequest.indices(restClientConfig.getIndex());
  35. updateByQueryRequest.setConflicts("proceed");
  36. try {
  37. //3 通过deleteByQuery来发起删除请求
  38. BulkByScrollResponse deleteResponse = restClientConfig.restHighLevelClient().updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
  39. if (deleteResponse.getUpdated() >= 1) {
  40. log.info("更新成功,更新文档条数: " + deleteResponse.getUpdated() + " ,indexName:" + restClientConfig.getIndex());
  41. }
  42. } catch (IOException e) {
  43. e.printStackTrace();
  44. log.error("无法连接到ES目标服务器");
  45. throw new TargetServerException("无法连接到ES目标服务器");
  46. }
  47. }

BulkRequest批量删除

  1. /**
  2. * 批量更新es数据
  3. *
  4. * @param esDateList
  5. */
  6. private void batchUpdateEsData(List<CmsGoodsEntity> esDateList) {
  7. if (CollectionUtils.isEmpty(esDateList)) {
  8. return;
  9. }
  10. BulkRequest bulkRequest = new BulkRequest();
  11. bulkRequest.timeout("600s");
  12. RestHighLevelClient restHighLevelClient = restClientConfig.restHighLevelClient();
  13. try {
  14. for (int i = 0; i < esDateList.size(); i++) {
  15. CmsGoodsEntity object = esDateList.get(i);
  16. Map<String, Object> param = new LinkedHashMap<>();
  17. String goodid = object.getGoodid();
  18. param.put("tariffs_rate", object.getTariffsRate());
  19. param.put("inspection_charges_fee", object.getInspectionChargesFee());
  20. param.put("rates", object.getRates());
  21. UpdateRequest updateRequest = new UpdateRequest(restClientConfig.getIndex(), "_doc", goodid);
  22. updateRequest.doc(param);
  23. bulkRequest.add(updateRequest);
  24. }
  25. // 操作ES
  26. BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  27. log.warn("批量更新ES 是否有失败 {} ", bulk.hasFailures());
  28. } catch (IOException e) {
  29. log.error("批量更新ES数据异常,e", e);
  30. e.printStackTrace();
  31. throw new RuntimeException("ES批量更新数据异常:{}" + e);
  32. } finally {
  33. try {
  34. restHighLevelClient.close();
  35. } catch (IOException e) {
  36. e.printStackTrace();
  37. }
  38. esDateList.clear();
  39. }
  40. }

Script更新

  1. public void updateHobby(String user, ESEntity esEntity) throws IOException {
  2. final BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery().must(QueryBuilders.termQuery("user", user));
  3. // 这个实际上就是对应给脚本中传参数的对象。
  4. HashMap<String, Object> params = new HashMap<>(16);
  5. params.put("hobby", "和女朋友爬山");
  6. final Script script = new Script(
  7. ScriptType.INLINE, "painless",
  8. "ctx._source.hobby = params.hobby",
  9. params);
  10. updateByQuery(queryBuilder, "索引名称", script);
  11. }

出现: 版本冲突、文档类型不对、JAR包与使用的API不一致或其他问题。都可参考以下连接。

ElasticSearch超级实用API描述

以上代码需要变动一下,将一些参数替换掉。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/68816?site
推荐阅读
相关标签
  

闽ICP备14008679号