当前位置:   article > 正文

Elasticsearch桶聚合_querybuilder 桶聚合

querybuilder 桶聚合

Elasticsearch桶聚合,目的就是数据分组,先将数据按指定的条件分成多个组,然后对每一个组进行统计。

  1. 在根据时间聚合的基础上、在对底层指标进行聚合操作
  1. String dateHistogram = "dateHistogram";
  2. DateHistogramAggregationBuilder aggregationBuilder = AggregationBuilders.dateHistogram(dateHistogram).field(dateAggregateField).calendarInterval(dateHistogramInterval);
  3. // 嵌套子聚合,根据网站来源聚合
  4. String subAggregation = "subAggregation";
  5. aggregationBuilder.subAggregation(AggregationBuilders.terms(subAggregation).field(subAggregateField));

其中terms是聚合后的字段、field是实际聚合的字段(es中的字段)

2、根据条件先查询再聚合
  1. private SearchResponse search(String indexName, QueryBuilder queryBuilder, String[] includeFields, String[] excludeFields,
  2. SortBuilder sortBuilder, Integer from, Integer size, AggregationBuilder aggregationBuilder) {
  3. SearchRequest request = new SearchRequest(indexName);
  4. // 构建请求参数
  5. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  6. if (Objects.nonNull(queryBuilder)) {
  7. sourceBuilder.query(queryBuilder);
  8. }
  9. if (Objects.nonNull(includeFields) || Objects.nonNull(excludeFields)) {
  10. sourceBuilder.fetchSource(includeFields, excludeFields);
  11. }
  12. if (Objects.nonNull(sortBuilder)) {
  13. sourceBuilder.sort(sortBuilder);
  14. }
  15. if (Objects.nonNull(from)) {
  16. sourceBuilder.from(from);
  17. }
  18. if (Objects.nonNull(size)) {
  19. sourceBuilder.size(size);
  20. }
  21. if (Objects.nonNull(aggregationBuilder)) {
  22. sourceBuilder.aggregation(aggregationBuilder);
  23. }
  24. request.source(sourceBuilder);
  25. if (debugLog) {
  26. log.info("DSL: {}", sourceBuilder.toString());
  27. }
  28. // 执行查询
  29. try {
  30. return elasticSearchClient.search(request, RequestOptions.DEFAULT);
  31. } catch (IOException e) {
  32. log.error("ES 查询失败 --> indexName: {}, ", indexName, e);
  33. throw new BusinessException(ResultCode.INTERNAL_SERVER_ERROR);
  34. }
  35. }

3、处理聚合后的数据
  1. // 处理日期数据
  2. Histogram terms = searchResponse.getAggregations().get(dateHistogram);
  3. Map<String, Map<String, Long>> responseMap = new TreeMap<>();
  4. for (Histogram.Bucket bucket : terms.getBuckets()) {
  5. Terms subTerms = bucket.getAggregations().get(subAggregation);
  6. if (Objects.isNull(subTerms) || CollectionUtil.isEmpty(subTerms.getBuckets())) {
  7. continue;
  8. }
  9. // 处理横轴数据
  10. Map<String, Long> dataMap = responseMap.computeIfAbsent(bucket.getKeyAsString(), key -> new LinkedHashMap<>());
  11. // 各个维度数据汇总
  12. for (Terms.Bucket subBucket : subTerms.getBuckets()) {
  13. String title = titleFunction.apply(subBucket);
  14. if (StrUtil.isNotBlank(title)) {
  15. dataMap.put(title, subBucket.getDocCount());
  16. }
  17. }
  18. }

调用如下:

  1. // 处理纵轴数据
  2. Function<Terms.Bucket, String> dataFunction = (bucket) -> PolarityTypeEnum.matchTitleByCode(Integer.valueOf(bucket.getKeyAsString()));
  3. // 构建查询条件
  4. QueryBuilder queryBuilder = buildQueryBuilder(request);
  5. // 根据日期及情感倾向聚合
  6. Map<String, Map<String, Long>> dateByQuery = esAggregateService.aggregationDateByQuery(esConditionService.getIndex(request.getChannelId()), queryBuilder, EsIndexFieldConstant.FIELD_PUBTIME, DateHistogramInterval.DAY, EsIndexFieldConstant.FIELD_POLARITY, dataFunction);
  7. if (CollectionUtil.isEmpty(dateByQuery)) {
  8. return null;
  9. }

示例

  1. public Map<String, Map<String, Long>> aggregationDateByQuery(String indexName, QueryBuilder queryBuilder, String dateAggregateField, DateHistogramInterval dateHistogramInterval, String subAggregateField, Function<Terms.Bucket, String> titleFunction) {
  2. // 创建aggregation,根据发布时间按天聚合
  3. String dateHistogram = "dateHistogram";
  4. DateHistogramAggregationBuilder aggregationBuilder = AggregationBuilders.dateHistogram(dateHistogram).field(dateAggregateField).calendarInterval(dateHistogramInterval);
  5. String subAggregation = "subAggregation";
  6. aggregationBuilder.subAggregation(AggregationBuilders.terms(subAggregation).field(subAggregateField));
  7. // 根据日期分组聚合查询
  8. SearchResponse searchResponse = aggregationByQuery(indexName, queryBuilder, aggregationBuilder);
  9. // 处理日期数据
  10. Histogram terms = searchResponse.getAggregations().get(dateHistogram);
  11. Map<String, Map<String, Long>> responseMap = new TreeMap<>();
  12. for (Histogram.Bucket bucket : terms.getBuckets()) {
  13. Terms subTerms = bucket.getAggregations().get(subAggregation);
  14. if (Objects.isNull(subTerms) || CollectionUtil.isEmpty(subTerms.getBuckets())) {
  15. continue;
  16. }
  17. // 处理横轴数据
  18. Map<String, Long> dataMap = responseMap.computeIfAbsent(bucket.getKeyAsString(), key -> new LinkedHashMap<>());
  19. // 各个维度数据汇总
  20. for (Terms.Bucket subBucket : subTerms.getBuckets()) {
  21. String title = titleFunction.apply(subBucket);
  22. if (StrUtil.isNotBlank(title)) {
  23. dataMap.put(title, subBucket.getDocCount());
  24. }
  25. }
  26. }
  27. return responseMap;
  28. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/889837
推荐阅读
相关标签
  

闽ICP备14008679号