当前位置:   article > 正文

java 使用RestHighLevelClient 请求es聚合数据_java es restclient aggregation 转map

java es restclient aggregation 转map

java 使用RestHighLevelClient 查询es聚合数据

使用Java请求es,使用RestHighLevelClient 进行es聚合请求查询,ES聚合嵌套参数拼接,就好处理了,拼接好查询参数和聚合参数,就直接请求了

代码:

相关引用和参数类代码关联《使用RestHighLevelClient 请求ES数据》

1,参数拼接

  1. import org.apache.commons.collections4.ListUtils;
  2. import org.apache.commons.collections4.MapUtils;
  3. import org.apache.commons.lang3.StringUtils;
  4. import org.apache.commons.lang3.math.NumberUtils;
  5. import org.elasticsearch.script.Script;
  6. import org.elasticsearch.search.aggregations.AggregationBuilder;
  7. import org.elasticsearch.search.aggregations.AggregationBuilders;
  8. import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
  9. import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
  10. import org.elasticsearch.search.aggregations.bucket.range.DateRangeAggregationBuilder;
  11. import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder;
  12. import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders;
  13. import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptPipelineAggregationBuilder;
  14. import org.elasticsearch.search.builder.SearchSourceBuilder;
  15. import org.joda.time.DateTimeZone;
  16. import java.util.List;
  17. import java.util.Map;
  18. public class StrTermNextScript {
  19.  
  20.     /**
  21.      *  字符型的拼接
  22.      *   第一层用 searchSourceBuilder 加
  23.      */
  24.     public static void joinEsStrAggs(Map<String,Object> aggsMap, SearchSourceBuilder searchSourceBuilder) {
  25.         Map<String,Object> aggsNameMap = (Map<String, Object>) aggsMap.get("aggs");
  26.         AggregationBuilder aggregationBuilder;
  27.         for (Map.Entry name : aggsNameMap.entrySet()) {
  28.             String aggName = name.getKey().toString();
  29.             Map<String,Object> valueMap = (Map<String, Object>) name.getValue();
  30.             for (Map.Entry value : valueMap.entrySet()) {
  31.                 String aggType = value.getKey().toString();
  32.                 Map<String,Object> fieldMap = (Map<String, Object>) value.getValue();
  33.                 aggregationBuilder = setAggsBuilders(aggName,aggType,fieldMap);
  34.                 if("terms".equals(aggType)){
  35.                     Map<String,Object> innerAggsMap = (Map<String, Object>) valueMap.get("aggs");
  36.                     if(MapUtils.isNotEmpty(innerAggsMap)){
  37.                         joinSecEsStrAggs(innerAggsMap, aggregationBuilder);
  38.                     }
  39.                 }
  40.                 if(aggregationBuilder != null){
  41.                     searchSourceBuilder.aggregation(aggregationBuilder);
  42.                 }
  43.             }
  44.         }
  45.     }
  46.     /**
  47.      *  字符型的拼接
  48.      * 第二层用 aggregationBuilder 的 subAggregation 方法添加后面的内容
  49.      */
  50.     private static void joinSecEsStrAggs(Map<String,Object> aggsNameMap, AggregationBuilder allAggregationBuilder) {
  51.         AggregationBuilder aggregationBuilder;
  52.         for (Map.Entry name : aggsNameMap.entrySet()) {
  53.             String aggName = name.getKey().toString();
  54.             Map<String,Object> valueMap = (Map<String, Object>) name.getValue();
  55.             for (Map.Entry value : valueMap.entrySet()) {
  56.                 String aggType = value.getKey().toString();
  57.                 Map<String,Object> fieldMap = (Map<String, Object>) value.getValue();
  58.                 aggregationBuilder = setAggsBuilders(aggName,aggType,fieldMap);
  59.                 if("terms".equals(aggType)){
  60.                     Map<String,Object> innerAggsMap = (Map<String, Object>) valueMap.get("aggs");
  61.                     if(MapUtils.isNotEmpty(innerAggsMap)){
  62.                         joinSecEsStrAggs(innerAggsMap, aggregationBuilder);
  63.                     }
  64.                 }
  65.                 if("bucket_script".equals(aggType)){
  66.                     BucketScriptPipelineAggregationBuilder bucketScript = getBucketScript(aggName,fieldMap);
  67.                     if(aggregationBuilder != null){
  68.                         aggregationBuilder.subAggregation(bucketScript);
  69.                     }else{
  70.                         allAggregationBuilder.subAggregation(bucketScript);
  71.                     }
  72.                 }
  73.                 if(aggregationBuilder != null){
  74.                     allAggregationBuilder.subAggregation(aggregationBuilder);
  75.                 }
  76.             }
  77.         }
  78.     }
  79.     private static BucketScriptPipelineAggregationBuilder getBucketScript(String aggName, Map<String, Object> aggParam){
  80.         // 这边还得把之前的添加到一块
  81.         Map<String, String> bucketsPathHashMap = (Map<String, String>) aggParam.get("buckets_path");
  82.         return PipelineAggregatorBuilders.bucketScript(aggName, bucketsPathHashMap,
  83.                 new Script(MapUtils.getString(aggParam,"script"))).format("#.##");
  84.     }
  85.     /**
  86.      *  根据聚合类型 获取对应的聚合builder
  87.      * @param aggName  聚合名称
  88.      * @param aggType  聚合类型
  89.      * @param aggParam  聚合参数
  90.      * @return AggregationBuilder
  91.      */
  92.     private static AggregationBuilder setAggsBuilders(String aggName, String aggType,  Map<String, Object> aggParam){
  93.         String fieldValue = MapUtils.getString(aggParam, "field");
  94.         AggregationBuilder aggregationBuilder = null;
  95.         if("terms".equals(aggType)){
  96.             int size = MapUtils.getIntValue(aggParam, "size");
  97.             if(size == NumberUtils.INTEGER_ZERO){
  98.                 size = 10;
  99.             }
  100.             aggregationBuilder = AggregationBuilders.terms(aggName).field(fieldValue).size(size);
  101.         }else if("sum".equals(aggType)){
  102.             aggregationBuilder = AggregationBuilders.sum(aggName).field(fieldValue);
  103.         }else if("avg".equals(aggType)){
  104.             aggregationBuilder = AggregationBuilders.avg(aggName).field(fieldValue);
  105.         }else if("max".equals(aggType)){
  106.             aggregationBuilder = AggregationBuilders.max(aggName).field(fieldValue);
  107.         }else if("min".equals(aggType)){
  108.             aggregationBuilder = AggregationBuilders.min(aggName).field(fieldValue);
  109.         }else if("value_count".equals(aggType)){
  110.             aggregationBuilder = AggregationBuilders.count(aggName).field(fieldValue);
  111.         }else if ("cardinality".equals(aggType)) {// 去重之后总数
  112.             aggregationBuilder = AggregationBuilders.cardinality(aggName).field(fieldValue);
  113.         } else if ("histogram".equals(aggType)) {// 直方图
  114.             Double interval = MapUtils.getDouble(aggParam, "interval");
  115.             Long minDocCount = MapUtils.getLong(aggParam, "min_doc_count");
  116.             aggregationBuilder = AggregationBuilders.histogram(aggName).field(fieldValue).interval(interval)
  117.                     .minDocCount(minDocCount);
  118.         } else if ("date_histogram".equals(aggType)) {// 日期直方图
  119.             String interval = MapUtils.getString(aggParam, "interval");
  120.             String format = MapUtils.getString(aggParam, "format");
  121.             Long minDocCount = MapUtils.getLong(aggParam, "min_doc_count");
  122.             DateHistogramAggregationBuilder dateHistogramAggregationBuilder = AggregationBuilders.dateHistogram(aggName)
  123.                     .field(fieldValue).dateHistogramInterval(new DateHistogramInterval(interval))
  124.                     .minDocCount(minDocCount).timeZone(DateTimeZone.forOffsetHours(8));
  125.             if (StringUtils.isNotEmpty(format)) {
  126.                 dateHistogramAggregationBuilder.format(format);
  127.             }
  128.             aggregationBuilder = dateHistogramAggregationBuilder;
  129.         } else if ("range".equals(aggType)) {// 范围
  130.             String fieleName = MapUtils.getString(aggParam, "field");
  131.             List<Map<String, Object>> ranges = (List<Map<String, Object>>) aggParam.get("ranges");
  132.             RangeAggregationBuilder rangeAggregationBuilder = AggregationBuilders.range(aggName).field(fieleName);
  133.             ListUtils.emptyIfNull(ranges).forEach(e -> {
  134.                 Double from = MapUtils.getDouble(e, "from");
  135.                 Double to = MapUtils.getDouble(e, "to");
  136.                 rangeAggregationBuilder.addRange(from, to);
  137.             });
  138.             aggregationBuilder = rangeAggregationBuilder;
  139.         } else if ("date_range".equals(aggType)) {// 日期范围
  140.             String fieleName = MapUtils.getString(aggParam, "field");
  141.             String format = MapUtils.getString(aggParam, "format");
  142.             List<Map<String, Object>> ranges = (List<Map<String, Object>>) aggParam.get("ranges");
  143.             DateRangeAggregationBuilder dateRangeAggregationBuilder = AggregationBuilders.dateRange(aggName)
  144.                     .field(fieleName).timeZone(DateTimeZone.forOffsetHours(8));
  145.             if (StringUtils.isNotEmpty(format)) {
  146.                 dateRangeAggregationBuilder.format(format);
  147.             }
  148.             ListUtils.emptyIfNull(ranges).forEach(e -> {
  149.                 String from = MapUtils.getString(e, "from");
  150.                 String to = MapUtils.getString(e, "to");
  151.                 dateRangeAggregationBuilder.addRange(from, to);
  152.             });
  153.             aggregationBuilder = dateRangeAggregationBuilder;
  154.         }
  155.         return aggregationBuilder;
  156.     }
  157. }

参数处理的部分,再贴出来

2,查询聚合getEsAggs

  1. public void getEsAggs(EsQuery esQuery) {
  2.     SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  3.     SearchRequest searchRequest = new SearchRequest();
  4.     // 查询的索引
  5.     searchRequest.indices(esQuery.getSourceIndex());
  6.   
  7.     String sourceExpr = esQuery.getSourceExpr();
  8.     String queryStr = "";
  9.     JSONObject searchJson = JSONObject.parseObject(sourceExpr);
  10.     // 参数,带query 和 aggs的, 只有aggs的
  11.     WrapperQueryBuilder wrapperQueryBuilder = null;
  12.     if (!StringUtils.isEmpty(sourceExpr)) {
  13.         // 查询参数
  14.         if (sourceExpr.contains("query")) {// 带有query的查询
  15.             queryStr = searchJson.getJSONObject("query").toJSONString();
  16.         }
  17.         if (StringUtils.isNotEmpty(queryStr)) {
  18.             wrapperQueryBuilder = QueryBuilders.wrapperQuery(queryStr);
  19.         }
  20.         // 聚合参数
  21.         JSONObject aggs = searchJson.getJSONObject("aggregations");
  22.         StrTermNextScript.joinEsStrAggs(aggs, searchSourceBuilder);
  23.     }
  24.     if (wrapperQueryBuilder != null) {
  25.         searchSourceBuilder.query(wrapperQueryBuilder);
  26.     }
  27.     searchSourceBuilder.size(0);
  28.     searchRequest.source(searchSourceBuilder);
  29.     System.out.println("aggExpr " + searchSourceBuilder.toString());
  30.     try {
  31.         SearchResponse searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  32.         logger.info(searchResponse.toString());
  33.         Aggregations aggregations = searchResponse.getAggregations();
  34.         List<Aggregation> aggregationList = aggregations.asList();
  35.         for (Aggregation aggregation : aggregationList) {
  36.             logger.info(JSON.toJSONString(aggregation));
  37.             Terms histogram = (Terms) aggregation;
  38.             for (Terms.Bucket bucket : histogram.getBuckets()) {
  39.                 logger.info(bucket.getKeyAsString());
  40.                 logger.info("" + bucket.getDocCount());
  41.             }
  42.         }
  43.     } catch (IOException e) {
  44.         e.printStackTrace();
  45.     }
  46. }

聚合查询的时候,size设置为0

总结:

    Es聚合参数,包括参数部分和聚合部分。参数部分只用处理参数部分,分页,排序,那些就不用处理了。聚合部分,参数处理好了,就是获取值了。对于值的处理,又是需要花点时间处理。

下一篇: 《平铺es聚合数据》

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/68596
推荐阅读
相关标签
  

闽ICP备14008679号