赞
踩
- /**
- * @Description 聚合查询(注意聚合的字段必须是数值类型,不然会报错)
- * @param tableName index名
- * @param equalsCondition 等值条件
- * @param rangeCondition 范围条件
- * @param needAggrFields 需要被聚合的字段(当然,这里是不同的字段,不同的聚合)
- * @return
- */
- public static Map<String, Object> aggregationQuery(String tableName, Map<String, Object> equalsCondition, Map<String, Object> rangeCondition, List<String> needAggrFields){
- Map<String, Object> resultMap = new HashMap<>();
- SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
- BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
- //等值查询
- if (null != equalsCondition && !equalsCondition.isEmpty()){
- for (Map.Entry<String ,Object> entry : equalsCondition.entrySet()){
- String key = entry.getKey();
- //由于我创建索引的时候使用字符串不分词使用的.keyword类型
- if (key.endsWith("_s")){
- queryValueBuild(boolQueryBuilder, key + ".keyword", entry.getValue());
- }else{
- queryValueBuild(boolQueryBuilder, key, entry.getValue());
- }
- }
- }
- //范围查询
- if (null != rangeCondition && !rangeCondition.isEmpty()){
- rangeValueBuild(boolQueryBuilder, rangeCondition);
- }
- sourceBuilder.query(boolQueryBuilder);
-
- //聚合。可以使用具体的Sum或者max等,但是stats的有总记录数,最值,总和以及平均值,更全面些
- if (!CollectionUtils.isEmpty(needAggrFields)){
- StatsAggregationBuilder statsAggr = null;
- for (String aggField : needAggrFields){
- //自定义要统计的字段的聚合名字为:字段名+_count_nums
- statsAggr = AggregationBuilders.stats(aggField + "_count_nums").field(aggField);
- }
- sourceBuilder.aggregation(statsAggr);
- SearchResponse response = executeSearch(tableName, sourceBuilder);
- for (String aggField : needAggrFields){
- ParsedStats stats = response.getAggregations().get(aggField + "_count_nums");
- Map<String, String> statMap= new HashMap<>();
- statMap.put("recordNum", String.valueOf(stats.getCount()));//总记录数
- statMap.put("sumValue", new DecimalFormat(DefineConstant.ZERP_MONEY).format(stats.getSum()));//求和
- statMap.put("avgValue", new DecimalFormat(DefineConstant.ZERP_MONEY).format(stats.getAvg()));//求平均值
- statMap.put("maxValue", new DecimalFormat(DefineConstant.ZERP_MONEY).format(stats.getMax()));//求最大值
- statMap.put("minValue", new DecimalFormat(DefineConstant.ZERP_MONEY).format(stats.getMin()));//求最小值
- resultMap.put(aggField + "_count_result", statMap);
- }
- }
- log.info("aggs result:{}",resultMap);
- return resultMap;
- }
tips:代码中的一些私有方法在Java与Elasticsearch的API实现(一)——条件查询这篇文章中已经封装好
例如聚合客户号为111666,交易月份为2020-03或者2020-04的交易金额字段(注意聚合的字段必须是数值类型的,不然会报错)
响应报文以及日志打印
此处使用的多字段可以自由定义,应为传入的分组字段是一个集合,由于有点复杂,便于理解有些重复的代码我就没有提取出来,若有更好的解决数据处理的方法一起交流学习哈
主要的思想:组装查询的时候是有最内而外层层嵌套;得出返回的结果,处理返回数据的时候,是将结果看成一个树,然后将树平级化,获得所有的叶子节点,且发现所有的叶子节点的层级是一致的,先获得叶子节点,然后获取当前节点的父节点,层层逆推,最终补全数据,将树平级化
- /**
- * @Description 根据指定字段进行分组,然后聚合
- * @param tableName index名
- * @param equalsCondition 等值条件
- * @param rangeCondition 范围条件
- * @param needAggrField 需要被聚合的字段(此处就只聚合一个字段,多字段的分别聚合在上述中已实现)
- * @param groupByFields 分组的字段,一般业务涉及不到多字段分组,此处就实现复杂的,简单的也就更容易了
- * @return
- */
- public static Map<String, Object> groupAggregationQuery(String tableName, Map<String, Object> equalsCondition, Map<String, Object> rangeCondition, String needAggrField,List<String> groupByFields){
- Map<String, Object> result = new HashMap<>(4);
- SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
- BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
- //等值查询
- if (!CollectionUtils.isEmpty(equalsCondition)){
- for (Map.Entry<String ,Object> entry : equalsCondition.entrySet()){
- String key = entry.getKey();
- //由于我创建索引的时候使用字符串不分词使用的.keyword类型
- if (key.endsWith("_s")){
- queryValueBuild(boolQueryBuilder, key + ".keyword", entry.getValue());
- }else{
- queryValueBuild(boolQueryBuilder, key, entry.getValue());
- }
- }
- }
- //范围查询
- if (!CollectionUtils.isEmpty(rangeCondition)){
- rangeValueBuild(boolQueryBuilder, rangeCondition);
- }
- sourceBuilder.query(boolQueryBuilder);
- //由于这种递归是内部不停的嵌套的,所以组装查询语句的时候是由内而外的思想
-
- //聚合。可以使用具体的Sum或者max等,但是stats的有总记录数,最值,总和以及平均值,更全面些,自定义要统计的字段的聚合名字为:字段名+_count_nums
- StatsAggregationBuilder statsAggr = AggregationBuilders.stats(needAggrField + "_count_nums").field(needAggrField);
-
- //分组
- TermsAggregationBuilder termsGroupAggs = null;
- if (!CollectionUtils.isEmpty(groupByFields)){
- if (groupByFields.size() == 1){
- //此处groupByFields.get() + "_group",是为了便于定义分组的名而自行定义的分组名,可以不同,便于区别,自己能用即可
- termsGroupAggs = AggregationBuilders.terms(groupByFields.get(0) + "_group")
- .field(groupByFields.get(0).endsWith("_s") ? groupByFields.get(0) + ".keyword" : groupByFields.get(0)).subAggregation(statsAggr);
- } else{
- termsGroupAggs = AggregationBuilders.terms(groupByFields.get(0) + "_group")
- .field(groupByFields.get(0).endsWith("_s") ? groupByFields.get(0) + ".keyword" : groupByFields.get(0));
- if (groupByFields.size() == 2){
- termsGroupAggs.subAggregation(AggregationBuilders.terms(groupByFields.get(1) + "_group")
- .field(groupByFields.get(1).endsWith("_s") ? groupByFields.get(1) + ".keyword" : groupByFields.get(1))
- .subAggregation(statsAggr));
- }else {
- List<TermsAggregationBuilder> termAggs = new ArrayList<>();
- for (String groupField : groupByFields){
- termAggs.add(AggregationBuilders.terms(groupField + "_group")
- .field(groupField.endsWith("_s") ? groupField + ".keyword" : groupField));
- }
- TermsAggregationBuilder tempAggr = null;
- for (int i = groupByFields.size()-1 ; i>1 ;i--){
- if (null == tempAggr){
- tempAggr = termAggs.get(i-1).subAggregation(termAggs.get(i).subAggregation(statsAggr));
- }else {
- tempAggr = termAggs.get(i-1).subAggregation(tempAggr);
- }
- if (i-2 == 0){
- termsGroupAggs.subAggregation(tempAggr);
- }
- }
- }
- }
- }
- if (null == termsGroupAggs){
- sourceBuilder.aggregation(statsAggr);
- }else {
- sourceBuilder.aggregation(termsGroupAggs);
- }
- SearchResponse searchResponse = executeSearch(tableName, sourceBuilder);
-
- //处理返回的树形结构数据,转化成数组的平级表状数据
- List<Map<String, Object>> groupAggsMapList = new ArrayList<>();
- //定义子父级的关系
- List<Map<String,String>> fatherSonMaps = new ArrayList<>();
- if (!CollectionUtils.isEmpty(groupByFields)){
- ParsedStringTerms strTerms;
- if (groupByFields.size() == 1){
- strTerms = searchResponse.getAggregations().get(groupByFields.get(0) + "_group");
- for (Terms.Bucket bucket : strTerms.getBuckets()){
- ParsedStats stats = bucket.getAggregations().get(needAggrField + "_count_nums");
- Map<String, Object> statMap = new HashMap<>();
- //返回的结果呈现,分组的字段以及字段对应的组值,还有对应组聚合的结果(需要啥取啥,此处取求和),自定义聚合结果的键名使用聚合字段加_count_result
- statMap.put(groupByFields.get(0), bucket.getKeyAsString());
- statMap.put(needAggrField + "_count_result", stats.getSumAsString());
- groupAggsMapList.add(statMap);
- }
- }else {
- ParsedStringTerms strTermsFirst = searchResponse.getAggregations().get(groupByFields.get(0) + "_group");
- ParsedStringTerms strTermsTemp = null;
- List<ParsedStringTerms> strTermsTempList = new ArrayList<>();
- for (int i = 0; i < groupByFields.size(); i++) {
- if (null == strTermsTemp){
- strTermsTemp = strTermsFirst;
- }else {
- List buckets = strTermsTemp.getBuckets();
- if (CollectionUtils.isEmpty(strTermsTempList)){
- for (Object bucket : buckets){
- strTermsTemp = ((Terms.Bucket) bucket).getAggregations().get(groupByFields.get(i) + "_group");
- strTermsTempList.add(strTermsTemp);
- /**获取子父级关系的集合**/
- Map<String,String> fatherChildMap = new HashMap<>();
- fatherChildMap.put("time",String.valueOf(i-1));//循环的次数
- fatherChildMap.put(groupByFields.get(i-1),((Terms.Bucket)bucket).getKeyAsString());
- fatherChildMap.put("childNum",String.valueOf(strTermsTemp.getBuckets().size()));
- fatherSonMaps.add(fatherChildMap);
- /**获取子父级关系的集合**/
- if (i+1 == groupByFields.size()){
- for (Terms.Bucket statsBucket : strTermsTemp.getBuckets()){
- Map<String, Object> statMap = new HashMap<>();
- ParsedStats stats = statsBucket.getAggregations().get(needAggrField + "_count_nums");
- statMap.put(groupByFields.get(i), statsBucket.getKeyAsString());
- statMap.put(needAggrField + "_count_result", stats.getSumAsString());
- groupAggsMapList.add(statMap);
- }
- }
- }
- }else {
- List<ParsedStringTerms> tempStringTerms = new ArrayList<>();
- tempStringTerms.addAll(strTermsTempList);
- strTermsTempList.clear();
- for (ParsedStringTerms stringTerms : tempStringTerms){
- buckets = stringTerms.getBuckets();
- for (Object bucket : buckets){
- strTermsTemp = ((Terms.Bucket) bucket).getAggregations().get(groupByFields.get(i) + "_group");
- strTermsTempList.add(strTermsTemp);
- /**获取子父级关系的集合**/
- Map<String,String> fatherChildMap = new HashMap<>();
- fatherChildMap.put("time",String.valueOf(i-1));//循环的次数
- fatherChildMap.put(groupByFields.get(i-1),((Terms.Bucket)bucket).getKeyAsString());
- fatherChildMap.put("childNum",String.valueOf(strTermsTemp.getBuckets().size()));
- fatherSonMaps.add(fatherChildMap);
- /**获取子父级关系的集合**/
- if (i+1 == groupByFields.size()){
- for (Terms.Bucket statsBucket : strTermsTemp.getBuckets()){
- Map<String, Object> statMap = new HashMap<>();
- ParsedStats stats = statsBucket.getAggregations().get(needAggrField + "_count_nums");
- statMap.put(groupByFields.get(i), statsBucket.getKeyAsString());
- statMap.put(needAggrField + "_count_result", stats.getSumAsString());
- groupAggsMapList.add(statMap);
- }
- }
- }
- }
- }
-
- }
- }
- //组装并平级化树
- List[] timeNodeArrays = new ArrayList[groupByFields.size()-1];
- for (int x = 0; x<groupByFields.size()-1; x++){
- timeNodeArrays[x] = new ArrayList();
- for (Map<String,String> fatherSonMap : fatherSonMaps){
- if (String.valueOf(x).equals(fatherSonMap.get("time"))){
- timeNodeArrays[x].add(fatherSonMap);
- }
- }
- }
- if (groupByFields.size()>2){
- for (int n = 0; n<groupByFields.size() - 2; n++){
- int tempNum = 0;
- for (Object timeNodeMap : timeNodeArrays[n]){
- int timeNodeChildNum = Integer.valueOf(((Map<String,String>)timeNodeMap).get("childNum"));
- for (int m = tempNum; m<tempNum+timeNodeChildNum; m++){
- for (int j = 0;j<=n;j++){
- ((List<Map<String,String>>)timeNodeArrays[n+1]).get(m).put(groupByFields.get(j), (((Map<String, String>) timeNodeMap).get(groupByFields.get(j))));
- }
- }
- tempNum += timeNodeChildNum;
- }
- }
- }
- //处理最后的结果,补齐对应的字段
- int tempNumLast = 0;
- for (Object timeNodeMap : timeNodeArrays[groupByFields.size()-2]){
- int timeNodeChildNum = Integer.valueOf(((Map<String,String>)timeNodeMap).get("childNum"));
- for (int m = tempNumLast; m<tempNumLast+timeNodeChildNum; m++){
- for (int j = 0;j<=groupByFields.size()-2;j++){
- groupAggsMapList.get(m).put(groupByFields.get(j), (((Map<String, String>) timeNodeMap).get(groupByFields.get(j))));
- }
- }
- tempNumLast += timeNodeChildNum;
- }
- }
- }
- result.put("resultAggs", groupAggsMapList);
- return result;
- }
例如获得客户号为111666,根据交易月份txn_month_s、收支类型txn_re_type_s、交易渠道txn_chnl_s分组,然后聚合求和交易金额txn_amt_d
响应结果以及日志打印
原始满足客户号的记录是这些,在看上面的返回结果确实分组,而且对应的值也清晰展示
- /**
- * @Description 游标查询
- * @param tableName
- * @param equalsCondition
- * @param rangeCondition
- * @param orderBy
- * @param pageSize 每页大小
- * @param scrollId 游标id
- * @return
- */
- public static Map<String, Object> queryEsByScroll(String tableName, Map<String, Object> equalsCondition, Map<String, Object> rangeCondition, List<String> orderBy, int pageSize, String scrollId){
- RestHighLevelClient client = EsClient.getInstance();
- List<Map<String, Object>> queryList = new ArrayList<>();
- Map<String, Object> result = new HashMap<>();
- Scroll scroll = new Scroll(TimeValue.timeValueMillis(1L));
- SearchResponse searchResponse = null;
- SearchHit[] searchHits = null;
- try {
- if (StringUtils.isEmpty(scrollId)){
- SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
- BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
- //等值查询
- if (null != equalsCondition && !equalsCondition.isEmpty()){
- for (Map.Entry<String ,Object> entry : equalsCondition.entrySet()){
- String key = entry.getKey();
- //由于我创建索引的时候使用字符串不分词使用的.keyword类型
- if (key.endsWith("_s")){
- queryValueBuild(boolQueryBuilder, key + ".keyword", entry.getValue());
- }else{
- queryValueBuild(boolQueryBuilder, key, entry.getValue());
- }
- }
- }
- //范围查询
- if (null != rangeCondition && !rangeCondition.isEmpty()){
- rangeValueBuild(boolQueryBuilder, rangeCondition);
- }
- //排序
- if (!CollectionUtils.isEmpty(orderBy)){
- buildSort(sourceBuilder, orderBy);
- }
- sourceBuilder.query(boolQueryBuilder);
- sourceBuilder.size(pageSize);
- SearchRequest searchRequest = new SearchRequest(tableName);
- searchRequest.types(DefineConstant.SEARCH_REQUEST_TYPE);
- searchRequest.source(sourceBuilder);
- searchRequest.scroll(scroll);
- searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
- }else{
- SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
- scrollRequest.scroll(scroll);
- searchResponse = client.scroll(scrollRequest, RequestOptions.DEFAULT);
- }
- if (null != searchResponse){
- scrollId = searchResponse.getScrollId();
- searchHits = searchResponse.getHits().getHits();
- //获取结果
- if (null != searchHits && searchHits.length > 0){
- for (SearchHit hit : searchHits){
- queryList .add(hit.getSourceAsMap());
- }
- log.info("searchStatus:" + searchResponse.status() + ", hitNum" + searchHits.length + ", searchTook:" + searchResponse.getTook());
-
- }
- }
- }catch (IOException ie){
- log.error(ie.getMessage());
- } finally {
- if (!StringUtils.isEmpty(scrollId)){
- if (null == searchHits || searchHits.length != pageSize){
- ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
- clearScrollRequest.addScrollId(scrollId);
- try {
- ClearScrollResponse clearScrollResponse = client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
- log.error("clearStatus:" + clearScrollResponse.status() + ", clearSucceed:" + clearScrollResponse.isSucceeded());
- } catch (IOException e) {
- log.error(e.getMessage());
- }
- scrollId = "";
- }
- }
- }
- result.put("pageList", queryList);
- result.put("scrollId",scrollId);
- return result;
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。