当前位置:   article > 正文

ElasticSearch(es)使用游标读取全部数据

ElasticSearch(es)使用游标读取全部数据
import com.cdel.utils.EsClientServiceFactoryByENV;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;

import java.util.Map;

public class bulkGetFaqAccV4 {
    private static Logger logger = LogManager.getLogger(bulkGetFaqAccV4.class);


    private static TransportClient client = EsClientServiceFactoryByENV.getInstance().getClient();


    public static void main(String[] args) {
        //数据来源索引
        String srcIndex = "xxx";
        String desType = "xxx";
 
 		//游标形式进行读取
        SearchResponse response = client.prepareSearch(srcIndex)//对应索引
                .setQuery(QueryBuilders.matchAllQuery()).setTypes(desType)//对应索引type
                .setScroll(TimeValue.timeValueMinutes(20))
                .setSize(1000).execute().actionGet();

        long totalHits = response.getHits().getTotalHits();
        logger.info("---------一共有{}个", totalHits);

        //
        while (response.getHits().getHits().length > 0) {
            BulkRequestBuilder bulkRequest = client.prepareBulk();

            SearchHits hits = response.getHits();
            SearchHit[] result = hits.getHits();
            for (SearchHit hit : result) {
                Map<String, Object> sourceAsMap = hit.getSourceAsMap();
                //TODO 具体处理逻辑
            }


            //获取下一批次的结果
            String scrollId = response.getScrollId();
            response = client.prepareSearchScroll(scrollId)
                    .setScroll(TimeValue.timeValueMinutes(20))//设置查询context的存活时间
                    .execute().actionGet();
        }

        //清理游标
        ClearScrollRequest request = new ClearScrollRequest();
        request.addScrollId(response.getScrollId());
        ClearScrollResponse clearScrollResponse = client.clearScroll(request).actionGet();


        client.close();

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/658458
推荐阅读
相关标签
  

闽ICP备14008679号