当前位置:   article > 正文

Java与Elasticsearch的API实现工具类(三)——聚合查询、多字段分组聚合、游标查询_bool must 多字段aggregations分组

bool must 多字段aggregations分组

相关代码已上传gitee

https://gitee.com/gangye/elasticsearch_demo

一、聚合查询

  1. /**
  2. * @Description 聚合查询(注意聚合的字段必须是数值类型,不然会报错)
  3. * @param tableName index名
  4. * @param equalsCondition 等值条件
  5. * @param rangeCondition 范围条件
  6. * @param needAggrFields 需要被聚合的字段(当然,这里是不同的字段,不同的聚合)
  7. * @return
  8. */
  9. public static Map<String, Object> aggregationQuery(String tableName, Map<String, Object> equalsCondition, Map<String, Object> rangeCondition, List<String> needAggrFields){
  10. Map<String, Object> resultMap = new HashMap<>();
  11. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  12. BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
  13. //等值查询
  14. if (null != equalsCondition && !equalsCondition.isEmpty()){
  15. for (Map.Entry<String ,Object> entry : equalsCondition.entrySet()){
  16. String key = entry.getKey();
  17. //由于我创建索引的时候使用字符串不分词使用的.keyword类型
  18. if (key.endsWith("_s")){
  19. queryValueBuild(boolQueryBuilder, key + ".keyword", entry.getValue());
  20. }else{
  21. queryValueBuild(boolQueryBuilder, key, entry.getValue());
  22. }
  23. }
  24. }
  25. //范围查询
  26. if (null != rangeCondition && !rangeCondition.isEmpty()){
  27. rangeValueBuild(boolQueryBuilder, rangeCondition);
  28. }
  29. sourceBuilder.query(boolQueryBuilder);
  30. //聚合。可以使用具体的Sum或者max等,但是stats的有总记录数,最值,总和以及平均值,更全面些
  31. if (!CollectionUtils.isEmpty(needAggrFields)){
  32. StatsAggregationBuilder statsAggr = null;
  33. for (String aggField : needAggrFields){
  34. //自定义要统计的字段的聚合名字为:字段名+_count_nums
  35. statsAggr = AggregationBuilders.stats(aggField + "_count_nums").field(aggField);
  36. }
  37. sourceBuilder.aggregation(statsAggr);
  38. SearchResponse response = executeSearch(tableName, sourceBuilder);
  39. for (String aggField : needAggrFields){
  40. ParsedStats stats = response.getAggregations().get(aggField + "_count_nums");
  41. Map<String, String> statMap= new HashMap<>();
  42. statMap.put("recordNum", String.valueOf(stats.getCount()));//总记录数
  43. statMap.put("sumValue", new DecimalFormat(DefineConstant.ZERP_MONEY).format(stats.getSum()));//求和
  44. statMap.put("avgValue", new DecimalFormat(DefineConstant.ZERP_MONEY).format(stats.getAvg()));//求平均值
  45. statMap.put("maxValue", new DecimalFormat(DefineConstant.ZERP_MONEY).format(stats.getMax()));//求最大值
  46. statMap.put("minValue", new DecimalFormat(DefineConstant.ZERP_MONEY).format(stats.getMin()));//求最小值
  47. resultMap.put(aggField + "_count_result", statMap);
  48. }
  49. }
  50. log.info("aggs result:{}",resultMap);
  51. return resultMap;
  52. }

 tips:代码中的一些私有方法在Java与Elasticsearch的API实现(一)——条件查询这篇文章中已经封装好

 例如聚合客户号为111666,交易月份为2020-03或者2020-04的交易金额字段(注意聚合的字段必须是数值类型的,不然会报错

响应报文以及日志打印

二、多字段分组,然后聚合

此处使用的多字段可以自由定义,应为传入的分组字段是一个集合,由于有点复杂,便于理解有些重复的代码我就没有提取出来,若有更好的解决数据处理的方法一起交流学习哈

主要的思想:组装查询的时候是有最内而外层层嵌套;得出返回的结果,处理返回数据的时候,是将结果看成一个树,然后将树平级化,获得所有的叶子节点,且发现所有的叶子节点的层级是一致的,先获得叶子节点,然后获取当前节点的父节点,层层逆推,最终补全数据,将树平级化

  1. /**
  2. * @Description 根据指定字段进行分组,然后聚合
  3. * @param tableName index名
  4. * @param equalsCondition 等值条件
  5. * @param rangeCondition 范围条件
  6. * @param needAggrField 需要被聚合的字段(此处就只聚合一个字段,多字段的分别聚合在上述中已实现)
  7. * @param groupByFields 分组的字段,一般业务涉及不到多字段分组,此处就实现复杂的,简单的也就更容易了
  8. * @return
  9. */
  10. public static Map<String, Object> groupAggregationQuery(String tableName, Map<String, Object> equalsCondition, Map<String, Object> rangeCondition, String needAggrField,List<String> groupByFields){
  11. Map<String, Object> result = new HashMap<>(4);
  12. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  13. BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
  14. //等值查询
  15. if (!CollectionUtils.isEmpty(equalsCondition)){
  16. for (Map.Entry<String ,Object> entry : equalsCondition.entrySet()){
  17. String key = entry.getKey();
  18. //由于我创建索引的时候使用字符串不分词使用的.keyword类型
  19. if (key.endsWith("_s")){
  20. queryValueBuild(boolQueryBuilder, key + ".keyword", entry.getValue());
  21. }else{
  22. queryValueBuild(boolQueryBuilder, key, entry.getValue());
  23. }
  24. }
  25. }
  26. //范围查询
  27. if (!CollectionUtils.isEmpty(rangeCondition)){
  28. rangeValueBuild(boolQueryBuilder, rangeCondition);
  29. }
  30. sourceBuilder.query(boolQueryBuilder);
  31. //由于这种递归是内部不停的嵌套的,所以组装查询语句的时候是由内而外的思想
  32. //聚合。可以使用具体的Sum或者max等,但是stats的有总记录数,最值,总和以及平均值,更全面些,自定义要统计的字段的聚合名字为:字段名+_count_nums
  33. StatsAggregationBuilder statsAggr = AggregationBuilders.stats(needAggrField + "_count_nums").field(needAggrField);
  34. //分组
  35. TermsAggregationBuilder termsGroupAggs = null;
  36. if (!CollectionUtils.isEmpty(groupByFields)){
  37. if (groupByFields.size() == 1){
  38. //此处groupByFields.get() + "_group",是为了便于定义分组的名而自行定义的分组名,可以不同,便于区别,自己能用即可
  39. termsGroupAggs = AggregationBuilders.terms(groupByFields.get(0) + "_group")
  40. .field(groupByFields.get(0).endsWith("_s") ? groupByFields.get(0) + ".keyword" : groupByFields.get(0)).subAggregation(statsAggr);
  41. } else{
  42. termsGroupAggs = AggregationBuilders.terms(groupByFields.get(0) + "_group")
  43. .field(groupByFields.get(0).endsWith("_s") ? groupByFields.get(0) + ".keyword" : groupByFields.get(0));
  44. if (groupByFields.size() == 2){
  45. termsGroupAggs.subAggregation(AggregationBuilders.terms(groupByFields.get(1) + "_group")
  46. .field(groupByFields.get(1).endsWith("_s") ? groupByFields.get(1) + ".keyword" : groupByFields.get(1))
  47. .subAggregation(statsAggr));
  48. }else {
  49. List<TermsAggregationBuilder> termAggs = new ArrayList<>();
  50. for (String groupField : groupByFields){
  51. termAggs.add(AggregationBuilders.terms(groupField + "_group")
  52. .field(groupField.endsWith("_s") ? groupField + ".keyword" : groupField));
  53. }
  54. TermsAggregationBuilder tempAggr = null;
  55. for (int i = groupByFields.size()-1 ; i>1 ;i--){
  56. if (null == tempAggr){
  57. tempAggr = termAggs.get(i-1).subAggregation(termAggs.get(i).subAggregation(statsAggr));
  58. }else {
  59. tempAggr = termAggs.get(i-1).subAggregation(tempAggr);
  60. }
  61. if (i-2 == 0){
  62. termsGroupAggs.subAggregation(tempAggr);
  63. }
  64. }
  65. }
  66. }
  67. }
  68. if (null == termsGroupAggs){
  69. sourceBuilder.aggregation(statsAggr);
  70. }else {
  71. sourceBuilder.aggregation(termsGroupAggs);
  72. }
  73. SearchResponse searchResponse = executeSearch(tableName, sourceBuilder);
  74. //处理返回的树形结构数据,转化成数组的平级表状数据
  75. List<Map<String, Object>> groupAggsMapList = new ArrayList<>();
  76. //定义子父级的关系
  77. List<Map<String,String>> fatherSonMaps = new ArrayList<>();
  78. if (!CollectionUtils.isEmpty(groupByFields)){
  79. ParsedStringTerms strTerms;
  80. if (groupByFields.size() == 1){
  81. strTerms = searchResponse.getAggregations().get(groupByFields.get(0) + "_group");
  82. for (Terms.Bucket bucket : strTerms.getBuckets()){
  83. ParsedStats stats = bucket.getAggregations().get(needAggrField + "_count_nums");
  84. Map<String, Object> statMap = new HashMap<>();
  85. //返回的结果呈现,分组的字段以及字段对应的组值,还有对应组聚合的结果(需要啥取啥,此处取求和),自定义聚合结果的键名使用聚合字段加_count_result
  86. statMap.put(groupByFields.get(0), bucket.getKeyAsString());
  87. statMap.put(needAggrField + "_count_result", stats.getSumAsString());
  88. groupAggsMapList.add(statMap);
  89. }
  90. }else {
  91. ParsedStringTerms strTermsFirst = searchResponse.getAggregations().get(groupByFields.get(0) + "_group");
  92. ParsedStringTerms strTermsTemp = null;
  93. List<ParsedStringTerms> strTermsTempList = new ArrayList<>();
  94. for (int i = 0; i < groupByFields.size(); i++) {
  95. if (null == strTermsTemp){
  96. strTermsTemp = strTermsFirst;
  97. }else {
  98. List buckets = strTermsTemp.getBuckets();
  99. if (CollectionUtils.isEmpty(strTermsTempList)){
  100. for (Object bucket : buckets){
  101. strTermsTemp = ((Terms.Bucket) bucket).getAggregations().get(groupByFields.get(i) + "_group");
  102. strTermsTempList.add(strTermsTemp);
  103. /**获取子父级关系的集合**/
  104. Map<String,String> fatherChildMap = new HashMap<>();
  105. fatherChildMap.put("time",String.valueOf(i-1));//循环的次数
  106. fatherChildMap.put(groupByFields.get(i-1),((Terms.Bucket)bucket).getKeyAsString());
  107. fatherChildMap.put("childNum",String.valueOf(strTermsTemp.getBuckets().size()));
  108. fatherSonMaps.add(fatherChildMap);
  109. /**获取子父级关系的集合**/
  110. if (i+1 == groupByFields.size()){
  111. for (Terms.Bucket statsBucket : strTermsTemp.getBuckets()){
  112. Map<String, Object> statMap = new HashMap<>();
  113. ParsedStats stats = statsBucket.getAggregations().get(needAggrField + "_count_nums");
  114. statMap.put(groupByFields.get(i), statsBucket.getKeyAsString());
  115. statMap.put(needAggrField + "_count_result", stats.getSumAsString());
  116. groupAggsMapList.add(statMap);
  117. }
  118. }
  119. }
  120. }else {
  121. List<ParsedStringTerms> tempStringTerms = new ArrayList<>();
  122. tempStringTerms.addAll(strTermsTempList);
  123. strTermsTempList.clear();
  124. for (ParsedStringTerms stringTerms : tempStringTerms){
  125. buckets = stringTerms.getBuckets();
  126. for (Object bucket : buckets){
  127. strTermsTemp = ((Terms.Bucket) bucket).getAggregations().get(groupByFields.get(i) + "_group");
  128. strTermsTempList.add(strTermsTemp);
  129. /**获取子父级关系的集合**/
  130. Map<String,String> fatherChildMap = new HashMap<>();
  131. fatherChildMap.put("time",String.valueOf(i-1));//循环的次数
  132. fatherChildMap.put(groupByFields.get(i-1),((Terms.Bucket)bucket).getKeyAsString());
  133. fatherChildMap.put("childNum",String.valueOf(strTermsTemp.getBuckets().size()));
  134. fatherSonMaps.add(fatherChildMap);
  135. /**获取子父级关系的集合**/
  136. if (i+1 == groupByFields.size()){
  137. for (Terms.Bucket statsBucket : strTermsTemp.getBuckets()){
  138. Map<String, Object> statMap = new HashMap<>();
  139. ParsedStats stats = statsBucket.getAggregations().get(needAggrField + "_count_nums");
  140. statMap.put(groupByFields.get(i), statsBucket.getKeyAsString());
  141. statMap.put(needAggrField + "_count_result", stats.getSumAsString());
  142. groupAggsMapList.add(statMap);
  143. }
  144. }
  145. }
  146. }
  147. }
  148. }
  149. }
  150. //组装并平级化树
  151. List[] timeNodeArrays = new ArrayList[groupByFields.size()-1];
  152. for (int x = 0; x<groupByFields.size()-1; x++){
  153. timeNodeArrays[x] = new ArrayList();
  154. for (Map<String,String> fatherSonMap : fatherSonMaps){
  155. if (String.valueOf(x).equals(fatherSonMap.get("time"))){
  156. timeNodeArrays[x].add(fatherSonMap);
  157. }
  158. }
  159. }
  160. if (groupByFields.size()>2){
  161. for (int n = 0; n<groupByFields.size() - 2; n++){
  162. int tempNum = 0;
  163. for (Object timeNodeMap : timeNodeArrays[n]){
  164. int timeNodeChildNum = Integer.valueOf(((Map<String,String>)timeNodeMap).get("childNum"));
  165. for (int m = tempNum; m<tempNum+timeNodeChildNum; m++){
  166. for (int j = 0;j<=n;j++){
  167. ((List<Map<String,String>>)timeNodeArrays[n+1]).get(m).put(groupByFields.get(j), (((Map<String, String>) timeNodeMap).get(groupByFields.get(j))));
  168. }
  169. }
  170. tempNum += timeNodeChildNum;
  171. }
  172. }
  173. }
  174. //处理最后的结果,补齐对应的字段
  175. int tempNumLast = 0;
  176. for (Object timeNodeMap : timeNodeArrays[groupByFields.size()-2]){
  177. int timeNodeChildNum = Integer.valueOf(((Map<String,String>)timeNodeMap).get("childNum"));
  178. for (int m = tempNumLast; m<tempNumLast+timeNodeChildNum; m++){
  179. for (int j = 0;j<=groupByFields.size()-2;j++){
  180. groupAggsMapList.get(m).put(groupByFields.get(j), (((Map<String, String>) timeNodeMap).get(groupByFields.get(j))));
  181. }
  182. }
  183. tempNumLast += timeNodeChildNum;
  184. }
  185. }
  186. }
  187. result.put("resultAggs", groupAggsMapList);
  188. return result;
  189. }

例如获得客户号为111666,根据交易月份txn_month_s、收支类型txn_re_type_s、交易渠道txn_chnl_s分组,然后聚合求和交易金额txn_amt_d

响应结果以及日志打印

 原始满足客户号的记录是这些,在看上面的返回结果确实分组,而且对应的值也清晰展示

三、游标查询

  1. /**
  2. * @Description 游标查询
  3. * @param tableName
  4. * @param equalsCondition
  5. * @param rangeCondition
  6. * @param orderBy
  7. * @param pageSize 每页大小
  8. * @param scrollId 游标id
  9. * @return
  10. */
  11. public static Map<String, Object> queryEsByScroll(String tableName, Map<String, Object> equalsCondition, Map<String, Object> rangeCondition, List<String> orderBy, int pageSize, String scrollId){
  12. RestHighLevelClient client = EsClient.getInstance();
  13. List<Map<String, Object>> queryList = new ArrayList<>();
  14. Map<String, Object> result = new HashMap<>();
  15. Scroll scroll = new Scroll(TimeValue.timeValueMillis(1L));
  16. SearchResponse searchResponse = null;
  17. SearchHit[] searchHits = null;
  18. try {
  19. if (StringUtils.isEmpty(scrollId)){
  20. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  21. BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
  22. //等值查询
  23. if (null != equalsCondition && !equalsCondition.isEmpty()){
  24. for (Map.Entry<String ,Object> entry : equalsCondition.entrySet()){
  25. String key = entry.getKey();
  26. //由于我创建索引的时候使用字符串不分词使用的.keyword类型
  27. if (key.endsWith("_s")){
  28. queryValueBuild(boolQueryBuilder, key + ".keyword", entry.getValue());
  29. }else{
  30. queryValueBuild(boolQueryBuilder, key, entry.getValue());
  31. }
  32. }
  33. }
  34. //范围查询
  35. if (null != rangeCondition && !rangeCondition.isEmpty()){
  36. rangeValueBuild(boolQueryBuilder, rangeCondition);
  37. }
  38. //排序
  39. if (!CollectionUtils.isEmpty(orderBy)){
  40. buildSort(sourceBuilder, orderBy);
  41. }
  42. sourceBuilder.query(boolQueryBuilder);
  43. sourceBuilder.size(pageSize);
  44. SearchRequest searchRequest = new SearchRequest(tableName);
  45. searchRequest.types(DefineConstant.SEARCH_REQUEST_TYPE);
  46. searchRequest.source(sourceBuilder);
  47. searchRequest.scroll(scroll);
  48. searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
  49. }else{
  50. SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
  51. scrollRequest.scroll(scroll);
  52. searchResponse = client.scroll(scrollRequest, RequestOptions.DEFAULT);
  53. }
  54. if (null != searchResponse){
  55. scrollId = searchResponse.getScrollId();
  56. searchHits = searchResponse.getHits().getHits();
  57. //获取结果
  58. if (null != searchHits && searchHits.length > 0){
  59. for (SearchHit hit : searchHits){
  60. queryList .add(hit.getSourceAsMap());
  61. }
  62. log.info("searchStatus:" + searchResponse.status() + ", hitNum" + searchHits.length + ", searchTook:" + searchResponse.getTook());
  63. }
  64. }
  65. }catch (IOException ie){
  66. log.error(ie.getMessage());
  67. } finally {
  68. if (!StringUtils.isEmpty(scrollId)){
  69. if (null == searchHits || searchHits.length != pageSize){
  70. ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
  71. clearScrollRequest.addScrollId(scrollId);
  72. try {
  73. ClearScrollResponse clearScrollResponse = client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
  74. log.error("clearStatus:" + clearScrollResponse.status() + ", clearSucceed:" + clearScrollResponse.isSucceeded());
  75. } catch (IOException e) {
  76. log.error(e.getMessage());
  77. }
  78. scrollId = "";
  79. }
  80. }
  81. }
  82. result.put("pageList", queryList);
  83. result.put("scrollId",scrollId);
  84. return result;
  85. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/68101
推荐阅读
相关标签
  

闽ICP备14008679号