赞
踩
分组聚合及嵌套查询
聚合查询可以理解为SQL中的求和、求最大值、最小值以及求均值的需求
嵌套可以理解为es存值的某一个字段为对象属性的值做处理.
Elasticsearch Java API分组与聚合结合
AggregationBuilder oneAgg =
AggregationBuilders.terms(“fieldOne”) .field(“field_one”).size(2^31-1);
// 再创建一个
AggregationBuilder twoAgg
= AggregationBuilders.terms(“fieldTwo”) .field(“field_two”).size(2^31-1);
// 将两个分组结合
searchSourceBuilder.aggregation(oneAgg.subAggregation(twoAgg));
AggregationBuilder threeAgg =
AggregationBuilders.sum(“fieldThree”).field(“field_three”);
// 将数据通过field_one、field_two字段进行分组,对field_three进行求和聚合
searchSourceBuilder.aggregation(oneAgg.subAggregation(twoAgg.subAggregation(threeAgg)));
searchSourceBuilder.aggregation(threeAgg);
简单的分组查询
// 分页设置 private Integer start = 0; private Integer length = 20; EsPage esPage = new EsPage(start,length); // 排序设置 List<EsSort> esSorts = Lists.newArrayList(new EsSort("userId", SortOrder.ASC), new EsSort(id, SortOrder.DESC)); // 查询条件 BoolQueryBuilder queryBuilder = new BoolQueryBuilder(); queryBuilder.must(new TermQueryBuilder("userName","xxxx" )); // 组装查询对象 SearchRequest searchRequest = new SearchRequest("user_list"); // 1、添加普通查询条件 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() .query(queryBuilder) // 聚合无视query筛选出的数据 .size(0) .from(0) .fetchSource(false); // 组装聚合查询的条件 TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("group_by_userId") // 需要查询的字段 .field("userId") // 分页 .size(page.getPage() + page.getSize()); // 2、添加聚合查询条件 searchSourceBuilder.aggregation(aggregationBuilder); // 3、添加排序条件 searchSourceBuilder.sort(sort.getSortFiled(), sort.getSortOrder()); searchRequest.source(searchSourceBuilder); try { SearchResponse searchResponse = restClient.search(searchRequest, RequestOptions.DEFAULT); // 获取总数 long sumTotal = searchResponse.getHits().getTotalHits().value; if (sumTotal >= MAX_PAGE_DEPTH) { throw new EsDataException("目前暂不支持超过10000条的数据进行视图聚合,请修改聚合任务的数量"); } // sumTotal:查询出来的总数 // searchResponse.getAggregations():聚合之后查询出来的数据 return Pair.of(sumTotal, searchResponse.getAggregations()); } catch (IOException e) { EsExceptionUtil.dealIOException(e); } catch (ElasticsearchException e1) { EsExceptionUtil.dealElasticsearchException(e1); }
解析查聚合结果
// 查询出来的总数 int count = pair.getLeft().intValue(); // 聚合之后查询出来的数据 Aggregations aggregations = pair.getRight(); // 根据聚合时的聚合名称,获取数据 Terms terms = aggregations.get("group_by_userId"); List<Long> userIds = Lists.newArrayList(); if (terms != null && CollectionUtils.isNotEmpty(terms.getBuckets())) { // terms.getBuckets():就是聚合之后的每条数据桶 for (Terms.Bucket bucket : terms.getBuckets()) { // 遍历得到桶中的key,就是聚合后,查询的字段L:userId long key = NumberUtils.toLong(bucket.getKeyAsString(), 0); if (key == 0) { continue; } userIds.add(key); } }
上面是最简单的一层聚合查询,其实其他复杂的聚合也都是大同小异,就是一个套娃的操作.
复杂聚合查询
public Map<String, Map<String, Long>> aggTwoArgs(String indices, QueryBuilder queryBuilder, String args1, String args2, int i) throws IOException { Map<String, Map<String, Long>> map = new HashMap<>(); //设置要查询的索引 SearchRequest request = new SearchRequest().indices(indices); //构建搜索 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); //添加搜索长度 sourceBuilder.size(0); //添加搜索条件 sourceBuilder.query(queryBuilder); //设置要聚合的字段以及条数 //设置该次聚合的名称 terms(args1) //以及要聚合的字段field(args1 + ".keyword") 添加keyword是对字段进行不分词查询。 TermsAggregationBuilder agg1 = AggregationBuilders.terms(args1).field(args1 + ".keyword").size(i); //设置子聚合以及条数,设置返回数据中该聚合的名称 terms(args2),以及要聚合的字段field(args2 + ".keyword") TermsAggregationBuilder agg2 = AggregationBuilders.terms(args2).field(args2 + ".keyword").size(i); //合并子聚合 agg1.subAggregation(agg2); //添加聚合查询 sourceBuilder.aggregation(agg1); //创建请求 request.source(sourceBuilder); //发送请求 SearchResponse response = client.search(request, RequestOptions.DEFAULT); Aggregations aggregations = response.getAggregations(); // 取第一个分组 Terms terms1 = aggregations.get(args1); for (Terms.Bucket bucket1 : terms1.getBuckets()) { // 取分组下的分组 Terms terms2 = bucket1.getAggregations().get(args2); Map<String, Long> map1 = new HashMap<>(); for (Terms.Bucket bucket2 : terms2.getBuckets()) { map1.put(bucket2.getKeyAsString(), bucket2.getDocCount()); } // bucket1.getKeyAsString():第一个分组的值 // map1:第二个分组下的map值 map.put(bucket1.getKeyAsString(), map1); } return map; }
ElasticSearch Java API嵌套
public static final String MAPPING = "" + "{\n" + " \"properties\": {\n" + " \"itemAttributes\": {\n" + " \"type\": \"nested\",\n" + " \"properties\": {\n" + " \"specId\": {\n" + " \"type\": \"long\"\n" + " },\n" + " \"value\": {\n" + " \"type\": \"text\",\n" + " \"analyzer\": \"ngram_analyzer\",\n" + " \"fields\": {\n" + " \"value_keyword\": {\n" + " \"type\": \"keyword\"\n" + " }\n" + " }\n" + " },\n" + " \"key\": {\n" + " \"type\": \"long\"\n" + " },\n" + " \"ip\": {\n" + " \"type\": \"long\"\n" + " }\n" + " }\n" + " }\n" + " }\n" + "}";
上面ES的mapping中的itemAttributes字段就是一个嵌套式的数据结构.
所以上面的分组和聚合对itemAttributes对象下的属性不适用,需要再做一层处理.
// 对于嵌套的聚合,需要新建一个NestedAggregationBuilder 对象”nestedAgg”为别名 NestedAggregationBuilder nestedAggregationBuilder = AggregationBuilders.nested(“nestedAgg”, “itemAttributes”); // 分别对nested对象下的属性建分组 AggregationBuilder specIdAgg = AggregationBuilders.terms(“fieldSpecId”).field(“itemAttributes.specId”).size(2 ^ 31 - 1); AggregationBuilder valueAgg = AggregationBuilders.terms(“fieldValue”).field(“itemAttributes.value”).size(2 ^ 31 - 1); AggregationBuilder keyAgg = AggregationBuilders.terms(“fieldKey”).field(“itemAttributes.key”).size(2 ^ 31 - 1); AggregationBuilder ipAgg = AggregationBuilders.terms(“fieldIp”).field(“itemAttributes.ip”).size(2 ^ 31 - 1); // 再将上面写的一个个聚合体放入nestedAggregationBuilder,需要将上面的specIdAgg、valueAgg、keyAgg聚合相互之间有关联需要一层一层关联如下 nestedAggregationBuilder.subAggregation( specIdAgg.subAggregation(valueAgg.subAggregation(ipAgg.subAggregation( AggregationBuilders.sum(“keyAgg”).field(“itemAttributes.key”))))); searchSourceBuilder.query(boolQueryBuilder).size(0); searchSourceBuilder.aggregation(nestedAggregationBuilder); searchRequest.source(searchSourceBuilder); // 注意searchSourceBuilder.aggregation(nestedAggregationBuilder); // 这个段代码这是定义聚合方式的。 // searchSourceBuilder //.aggregation(specIdAgg) //.aggregation(valueAgg) //.aggregation(ipAgg) //.aggregation(keyAgg); // 和上方聚合方式完全不一样,单一将数据分组,聚合相互之间没有关联.
try { SearchResponse searchResponse = restHighLevelClient.search(searchRequest); Nested nested = searchResponse.getAggregations().get(“nestedAgg”); Aggregations nestedAggregations = nested.getAggregations(); Aggregation nestedAggregation = nestedAggregations.asList().get(0); // 取第一个分组 List<? extends Terms.Bucket> specIdBuckets = ( (ParsedStringTerms) nestedAggregation).getBuckets(); for (Terms.Bucket specIdBucket : specIdBuckets ) { // 第一个分组的值 String specId = specIdBucket .getKey().toString(); // 取第二个分组 List<Aggregation> valueAggregations = specIdBucket .getAggregations().asList(); List<? extends Terms.Bucket> valueBuckets = ((ParsedStringTerms) valueAggregations .get(0)).getBuckets(); for (Terms.Bucket valueBucket : valueBuckets ) { // 第二个分组的值 String vlaue = valueBucket .getKey().toString(); // 取第三个分组 List<Aggregation> ipAggregations = valueBucket.getAggregations() .asList(); List<? extends Terms.Bucket> ipBuckets = ((ParsedStringTerms)ipAggregations .get(0)).getBuckets(); for (Terms.Bucket ipBucket : ipBuckets ) { // 第三个分组的值 String ip= ipBucket .getKey().toString(); // 第四个分组 Aggregation keyeAggregation = ipBucket .getAggregations().asList().get(0); // 第四个分组的值 Integer count = (int) ((ParsedSum) keyAggregation ).getValue(); System.out.print(fieldOne); System.out.print(filedtwo); System.out.print(ip); System.out.print(count ); } } } } catch (IOException e) { log.error("数据解析错误:{}", e); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。