赞
踩
Elasticsearch桶聚合,目的就是数据分组,先将数据按指定的条件分成多个组,然后对每一个组进行统计。
- String dateHistogram = "dateHistogram";
- DateHistogramAggregationBuilder aggregationBuilder = AggregationBuilders.dateHistogram(dateHistogram).field(dateAggregateField).calendarInterval(dateHistogramInterval);
- // 嵌套子聚合,根据网站来源聚合
- String subAggregation = "subAggregation";
- aggregationBuilder.subAggregation(AggregationBuilders.terms(subAggregation).field(subAggregateField));
其中terms是聚合后的字段、field是实际聚合的字段(es中的字段)
- private SearchResponse search(String indexName, QueryBuilder queryBuilder, String[] includeFields, String[] excludeFields,
- SortBuilder sortBuilder, Integer from, Integer size, AggregationBuilder aggregationBuilder) {
- SearchRequest request = new SearchRequest(indexName);
- // 构建请求参数
- SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
- if (Objects.nonNull(queryBuilder)) {
- sourceBuilder.query(queryBuilder);
- }
- if (Objects.nonNull(includeFields) || Objects.nonNull(excludeFields)) {
- sourceBuilder.fetchSource(includeFields, excludeFields);
- }
- if (Objects.nonNull(sortBuilder)) {
- sourceBuilder.sort(sortBuilder);
- }
- if (Objects.nonNull(from)) {
- sourceBuilder.from(from);
- }
- if (Objects.nonNull(size)) {
- sourceBuilder.size(size);
- }
- if (Objects.nonNull(aggregationBuilder)) {
- sourceBuilder.aggregation(aggregationBuilder);
- }
-
- request.source(sourceBuilder);
- if (debugLog) {
- log.info("DSL: {}", sourceBuilder.toString());
- }
-
- // 执行查询
- try {
- return elasticSearchClient.search(request, RequestOptions.DEFAULT);
- } catch (IOException e) {
- log.error("ES 查询失败 --> indexName: {}, ", indexName, e);
- throw new BusinessException(ResultCode.INTERNAL_SERVER_ERROR);
- }
- }

- // 处理日期数据
- Histogram terms = searchResponse.getAggregations().get(dateHistogram);
- Map<String, Map<String, Long>> responseMap = new TreeMap<>();
- for (Histogram.Bucket bucket : terms.getBuckets()) {
- Terms subTerms = bucket.getAggregations().get(subAggregation);
- if (Objects.isNull(subTerms) || CollectionUtil.isEmpty(subTerms.getBuckets())) {
- continue;
- }
- // 处理横轴数据
- Map<String, Long> dataMap = responseMap.computeIfAbsent(bucket.getKeyAsString(), key -> new LinkedHashMap<>());
- // 各个维度数据汇总
- for (Terms.Bucket subBucket : subTerms.getBuckets()) {
- String title = titleFunction.apply(subBucket);
- if (StrUtil.isNotBlank(title)) {
- dataMap.put(title, subBucket.getDocCount());
- }
- }
- }

调用如下:
- // 处理纵轴数据
- Function<Terms.Bucket, String> dataFunction = (bucket) -> PolarityTypeEnum.matchTitleByCode(Integer.valueOf(bucket.getKeyAsString()));
-
- // 构建查询条件
- QueryBuilder queryBuilder = buildQueryBuilder(request);
-
- // 根据日期及情感倾向聚合
- Map<String, Map<String, Long>> dateByQuery = esAggregateService.aggregationDateByQuery(esConditionService.getIndex(request.getChannelId()), queryBuilder, EsIndexFieldConstant.FIELD_PUBTIME, DateHistogramInterval.DAY, EsIndexFieldConstant.FIELD_POLARITY, dataFunction);
- if (CollectionUtil.isEmpty(dateByQuery)) {
- return null;
- }
示例
- public Map<String, Map<String, Long>> aggregationDateByQuery(String indexName, QueryBuilder queryBuilder, String dateAggregateField, DateHistogramInterval dateHistogramInterval, String subAggregateField, Function<Terms.Bucket, String> titleFunction) {
- // 创建aggregation,根据发布时间按天聚合
- String dateHistogram = "dateHistogram";
- DateHistogramAggregationBuilder aggregationBuilder = AggregationBuilders.dateHistogram(dateHistogram).field(dateAggregateField).calendarInterval(dateHistogramInterval);
-
- String subAggregation = "subAggregation";
- aggregationBuilder.subAggregation(AggregationBuilders.terms(subAggregation).field(subAggregateField));
-
- // 根据日期分组聚合查询
- SearchResponse searchResponse = aggregationByQuery(indexName, queryBuilder, aggregationBuilder);
-
- // 处理日期数据
- Histogram terms = searchResponse.getAggregations().get(dateHistogram);
- Map<String, Map<String, Long>> responseMap = new TreeMap<>();
- for (Histogram.Bucket bucket : terms.getBuckets()) {
- Terms subTerms = bucket.getAggregations().get(subAggregation);
- if (Objects.isNull(subTerms) || CollectionUtil.isEmpty(subTerms.getBuckets())) {
- continue;
- }
- // 处理横轴数据
- Map<String, Long> dataMap = responseMap.computeIfAbsent(bucket.getKeyAsString(), key -> new LinkedHashMap<>());
- // 各个维度数据汇总
- for (Terms.Bucket subBucket : subTerms.getBuckets()) {
- String title = titleFunction.apply(subBucket);
- if (StrUtil.isNotBlank(title)) {
- dataMap.put(title, subBucket.getDocCount());
- }
- }
- }
-
- return responseMap;
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。