当前位置:   article > 正文

Elasticsearch 2.0以上版本根据条件批量删除Java如何实现_elasticsearch根据字段删除数据java

elasticsearch根据字段删除数据java

Elasticsearch在2.0以前版本,删除操作有两种方式,一种是通过id来进行删除,但是这种方式一般不常用,因为id不容易得到;另一种方式是通过先查询操作,然后删除,也就是通过client.prepareDeleteByQuery这种方式来根据条件批量删除数据:

  1. DeleteByQueryResponse response = client.prepareDeleteByQuery("library")
  2. .setQuery(QueryBuilders.termQuery("title", "ElasticSearch"))
  3. .execute().actionGet();

但是Delete by Query在2.0版本及其以上的版本已经被移除了,因为这种方式会自动强制刷新,所以在大量索引并发的情况下,会很快造成内存溢出。

详情可查看:https://www.elastic.co/guide/en/elasticsearch/client/java-api/1.7/delete-by-query.html

那么在2.0以后的版本,我们如何来进行批量的删除呢?

我们可以先通过Search API查询,然后得到需要删除的批量数据的id,然后再通过id来删除,但是这种方式在大批量数据的删除的时候,依然是行不通的。

具体实现代码:

  1. public void deleteByTerm(Client client){
  2. BulkRequestBuilder bulkRequest = client.prepareBulk();
  3. SearchResponse response = client.prepareSearch("megacorp").setTypes("employee")
  4. .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
  5. .setQuery(QueryBuilders.termQuery("first_name", "xiaoming"))
  6. .setFrom(0).setSize(20).setExplain(true).execute().actionGet();
  7. for(SearchHit hit : response.getHits()){
  8. String id = hit.getId();
  9. bulkRequest.add(client.prepareDelete("megacorp", "employee", id).request());
  10. }
  11. BulkResponse bulkResponse = bulkRequest.get();
  12. if (bulkResponse.hasFailures()) {
  13. for(BulkItemResponse item : bulkResponse.getItems()){
  14. System.out.println(item.getFailureMessage());
  15. }
  16. }else {
  17. System.out.println("delete ok");
  18. }
  19. }

同样通过delete-by-query插件,我们还可以根据type来批量删除数据,这种方式能够删除大批量的数据,他是现将要删除的数据一个一个做标记,然后再删除,于是效率会比较低。下面是官网的说明:https://www.elastic.co/guide/en/elasticsearch/plugins/2.3/plugins-delete-by-query.html

Queries which match large numbers of documents may run for a long time, as every document has to be deleted individually. Don’t use delete-by-query to clean out all or most documents in an index. Rather create a new index and perhaps reindex the documents you want to keep. 

可见这种删除方式并不适合大批量数据的删除,因为效率真的是很低,我是亲身体验过了。


这种方式需要先引入delete-by-query插件包,然后使用插件的api来删除:

  1. <dependency>
  2. <groupId>org.elasticsearch.plugin</groupId>
  3. <artifactId>delete-by-query</artifactId>
  4. <version>2.3.2</version>
  5. </dependency>


具体实现代码:

  1. import java.net.InetAddress;
  2. import java.net.UnknownHostException;
  3. import java.util.ResourceBundle;
  4. import java.util.Stack;
  5. import org.elasticsearch.action.deletebyquery.DeleteByQueryAction;
  6. import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
  7. import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
  8. import org.elasticsearch.action.search.SearchRequestBuilder;
  9. import org.elasticsearch.action.search.SearchResponse;
  10. import org.elasticsearch.action.search.SearchType;
  11. import org.elasticsearch.client.Client;
  12. import org.elasticsearch.client.transport.TransportClient;
  13. import org.elasticsearch.common.settings.Settings;
  14. import org.elasticsearch.common.transport.InetSocketTransportAddress;
  15. import org.elasticsearch.plugin.deletebyquery.DeleteByQueryPlugin;
  16. import org.slf4j.Logger;
  17. import org.slf4j.LoggerFactory;
  18. import com.xgd.log.common.ExceptionUtil;
  19. public class EsDeleteByType {
  20. private static final Logger logger = LoggerFactory.getLogger(EsDeleteByType.class);
  21. private Client client;
  22. private static ResourceBundle getEsConfig(){
  23. return ResourceBundle.getBundle("elasticsearch");
  24. }
  25. private void getClient(){
  26. String clusterName = getEsConfig().getString("clusterName");
  27. String hosts = getEsConfig().getString("hosts");
  28. if (hosts == null || clusterName == null) {
  29. throw new IllegalArgumentException("hosts or clusterName was null.");
  30. }
  31. Settings settings = Settings.settingsBuilder().put("cluster.name", clusterName).build();
  32. client = TransportClient.builder()
  33. .addPlugin(DeleteByQueryPlugin.class)
  34. .settings(settings).build();
  35. String[] hostsArray = hosts.split(",");
  36. for(String hostAndPort : hostsArray){
  37. String[] tmpArray = hostAndPort.split(":");
  38. try {
  39. client = ((TransportClient)client).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(tmpArray[0]), Integer.valueOf(tmpArray[1])));
  40. } catch (NumberFormatException e) {
  41. logger.error(ExceptionUtil.getTrace(e));
  42. } catch (UnknownHostException e) {
  43. logger.error(ExceptionUtil.getTrace(e));
  44. }
  45. }
  46. }
  47. /**
  48. * 判断一个index中的type是否有数据
  49. * @param index
  50. * @param type
  51. * @return
  52. * @throws Exception
  53. */
  54. public Boolean existDocOfType(String index, String type) throws Exception {
  55. SearchRequestBuilder builder = client.prepareSearch(index).setTypes(type)
  56. .setSearchType(SearchType.QUERY_THEN_FETCH)
  57. .setSize(1);
  58. SearchResponse response = builder.execute().actionGet();
  59. long docNum = response.getHits().getTotalHits();
  60. if (docNum == 0) {
  61. return false;
  62. }
  63. return true;
  64. }
  65. /**
  66. * 根据type来删除数据
  67. * @param index
  68. * @param types
  69. * @return
  70. */
  71. public long deleteDocByType(String index, String[] types) {
  72. getClient();
  73. long oldTime = System.currentTimeMillis();
  74. StringBuilder b = new StringBuilder();
  75. b.append("{\"query\":{\"match_all\":{}}}");
  76. DeleteByQueryResponse response = new DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
  77. .setIndices(index).setTypes(types)
  78. .setSource(b.toString())
  79. .execute().actionGet();
  80. Stack<String> allTypes = new Stack<String>();
  81. for(String type : types){
  82. allTypes.add(type);
  83. }
  84. while(!allTypes.isEmpty()){
  85. String type = allTypes.pop();
  86. while(true){
  87. try {
  88. if (existDocOfType(index, type) == false) {
  89. break;
  90. }
  91. } catch (Exception e) {
  92. logger.error("queryError: " + e.getMessage());
  93. }
  94. }
  95. }
  96. System.out.println(System.currentTimeMillis() - oldTime);
  97. return response.getTotalDeleted();
  98. }
  99. }



那么当我们在开发中,使用到elasticsearch的时候,总会涉及到大批量数据的删除,我们要怎么办呢?

经过很长时间的纠结,我发现使用elasticsearch存储数据的时候,千万不要把所有数据都存储于一个index,这样一个是不利于查询的效率,一个是不利于后面的删除,既然我们不能index中去删除部分的大批量数据,那么我们为啥不改变一种思路呢,就是分索引,然后通过索引来删除数据,例如:我在生产上面,每天有5亿的数据,那么我每天在集群中生成一个index用于存储这5亿的数据,如果我们的elasticsearch集群对数据只要求保存7天的数据,超过7天的数据就可以删除了,这样我们可以通过index直接删除7天以前的数据,这种方式,我们在查询的时候不会在所有数据中查询,只需要在所要查询的时间段内查询,便提高了查询的效率,同时删除效率的问题也解决了,能够很快删除不需要的数据,释放掉磁盘空间。

针对于elasticsearch大批量数据删除效率的问题,目前官网上面也没有一个特别好的解决办法,这种方式算是目前还算能行得通的方式了。


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/672459
推荐阅读
相关标签
  

闽ICP备14008679号