赞
踩
java 使用RestHighLevelClient 查询es聚合数据
使用Java请求es,使用RestHighLevelClient 进行es聚合请求查询,ES聚合嵌套参数拼接,就好处理了,拼接好查询参数和聚合参数,就直接请求了
相关引用和参数类代码关联《使用RestHighLevelClient 请求ES数据》
- import org.apache.commons.collections4.ListUtils;
- import org.apache.commons.collections4.MapUtils;
- import org.apache.commons.lang3.StringUtils;
- import org.apache.commons.lang3.math.NumberUtils;
- import org.elasticsearch.script.Script;
- import org.elasticsearch.search.aggregations.AggregationBuilder;
- import org.elasticsearch.search.aggregations.AggregationBuilders;
- import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
- import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
- import org.elasticsearch.search.aggregations.bucket.range.DateRangeAggregationBuilder;
- import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder;
- import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders;
- import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptPipelineAggregationBuilder;
- import org.elasticsearch.search.builder.SearchSourceBuilder;
- import org.joda.time.DateTimeZone;
-
- import java.util.List;
- import java.util.Map;
-
- public class StrTermNextScript {
-
- /**
- * 字符型的拼接
- * 第一层用 searchSourceBuilder 加
- */
- public static void joinEsStrAggs(Map<String,Object> aggsMap, SearchSourceBuilder searchSourceBuilder) {
- Map<String,Object> aggsNameMap = (Map<String, Object>) aggsMap.get("aggs");
- AggregationBuilder aggregationBuilder;
-
- for (Map.Entry name : aggsNameMap.entrySet()) {
- String aggName = name.getKey().toString();
-
- Map<String,Object> valueMap = (Map<String, Object>) name.getValue();
- for (Map.Entry value : valueMap.entrySet()) {
- String aggType = value.getKey().toString();
- Map<String,Object> fieldMap = (Map<String, Object>) value.getValue();
- aggregationBuilder = setAggsBuilders(aggName,aggType,fieldMap);
- if("terms".equals(aggType)){
- Map<String,Object> innerAggsMap = (Map<String, Object>) valueMap.get("aggs");
- if(MapUtils.isNotEmpty(innerAggsMap)){
- joinSecEsStrAggs(innerAggsMap, aggregationBuilder);
- }
- }
- if(aggregationBuilder != null){
- searchSourceBuilder.aggregation(aggregationBuilder);
- }
- }
- }
-
- }
-
-
- /**
- * 字符型的拼接
- * 第二层用 aggregationBuilder 的 subAggregation 方法添加后面的内容
- */
- private static void joinSecEsStrAggs(Map<String,Object> aggsNameMap, AggregationBuilder allAggregationBuilder) {
-
- AggregationBuilder aggregationBuilder;
- for (Map.Entry name : aggsNameMap.entrySet()) {
- String aggName = name.getKey().toString();
-
- Map<String,Object> valueMap = (Map<String, Object>) name.getValue();
- for (Map.Entry value : valueMap.entrySet()) {
- String aggType = value.getKey().toString();
- Map<String,Object> fieldMap = (Map<String, Object>) value.getValue();
- aggregationBuilder = setAggsBuilders(aggName,aggType,fieldMap);
- if("terms".equals(aggType)){
- Map<String,Object> innerAggsMap = (Map<String, Object>) valueMap.get("aggs");
- if(MapUtils.isNotEmpty(innerAggsMap)){
- joinSecEsStrAggs(innerAggsMap, aggregationBuilder);
- }
- }
- if("bucket_script".equals(aggType)){
- BucketScriptPipelineAggregationBuilder bucketScript = getBucketScript(aggName,fieldMap);
- if(aggregationBuilder != null){
- aggregationBuilder.subAggregation(bucketScript);
- }else{
- allAggregationBuilder.subAggregation(bucketScript);
- }
- }
- if(aggregationBuilder != null){
- allAggregationBuilder.subAggregation(aggregationBuilder);
- }
-
- }
-
- }
-
-
- }
-
- private static BucketScriptPipelineAggregationBuilder getBucketScript(String aggName, Map<String, Object> aggParam){
- // 这边还得把之前的添加到一块
- Map<String, String> bucketsPathHashMap = (Map<String, String>) aggParam.get("buckets_path");
- return PipelineAggregatorBuilders.bucketScript(aggName, bucketsPathHashMap,
- new Script(MapUtils.getString(aggParam,"script"))).format("#.##");
- }
-
- /**
- * 根据聚合类型 获取对应的聚合builder
- * @param aggName 聚合名称
- * @param aggType 聚合类型
- * @param aggParam 聚合参数
- * @return AggregationBuilder
- */
- private static AggregationBuilder setAggsBuilders(String aggName, String aggType, Map<String, Object> aggParam){
-
- String fieldValue = MapUtils.getString(aggParam, "field");
-
- AggregationBuilder aggregationBuilder = null;
- if("terms".equals(aggType)){
- int size = MapUtils.getIntValue(aggParam, "size");
- if(size == NumberUtils.INTEGER_ZERO){
- size = 10;
- }
- aggregationBuilder = AggregationBuilders.terms(aggName).field(fieldValue).size(size);
- }else if("sum".equals(aggType)){
- aggregationBuilder = AggregationBuilders.sum(aggName).field(fieldValue);
- }else if("avg".equals(aggType)){
- aggregationBuilder = AggregationBuilders.avg(aggName).field(fieldValue);
- }else if("max".equals(aggType)){
- aggregationBuilder = AggregationBuilders.max(aggName).field(fieldValue);
- }else if("min".equals(aggType)){
- aggregationBuilder = AggregationBuilders.min(aggName).field(fieldValue);
- }else if("value_count".equals(aggType)){
- aggregationBuilder = AggregationBuilders.count(aggName).field(fieldValue);
- }else if ("cardinality".equals(aggType)) {// 去重之后总数
- aggregationBuilder = AggregationBuilders.cardinality(aggName).field(fieldValue);
- } else if ("histogram".equals(aggType)) {// 直方图
- Double interval = MapUtils.getDouble(aggParam, "interval");
- Long minDocCount = MapUtils.getLong(aggParam, "min_doc_count");
- aggregationBuilder = AggregationBuilders.histogram(aggName).field(fieldValue).interval(interval)
- .minDocCount(minDocCount);
- } else if ("date_histogram".equals(aggType)) {// 日期直方图
- String interval = MapUtils.getString(aggParam, "interval");
- String format = MapUtils.getString(aggParam, "format");
- Long minDocCount = MapUtils.getLong(aggParam, "min_doc_count");
- DateHistogramAggregationBuilder dateHistogramAggregationBuilder = AggregationBuilders.dateHistogram(aggName)
- .field(fieldValue).dateHistogramInterval(new DateHistogramInterval(interval))
- .minDocCount(minDocCount).timeZone(DateTimeZone.forOffsetHours(8));
- if (StringUtils.isNotEmpty(format)) {
- dateHistogramAggregationBuilder.format(format);
- }
- aggregationBuilder = dateHistogramAggregationBuilder;
- } else if ("range".equals(aggType)) {// 范围
- String fieleName = MapUtils.getString(aggParam, "field");
- List<Map<String, Object>> ranges = (List<Map<String, Object>>) aggParam.get("ranges");
- RangeAggregationBuilder rangeAggregationBuilder = AggregationBuilders.range(aggName).field(fieleName);
- ListUtils.emptyIfNull(ranges).forEach(e -> {
- Double from = MapUtils.getDouble(e, "from");
- Double to = MapUtils.getDouble(e, "to");
- rangeAggregationBuilder.addRange(from, to);
- });
-
- aggregationBuilder = rangeAggregationBuilder;
- } else if ("date_range".equals(aggType)) {// 日期范围
- String fieleName = MapUtils.getString(aggParam, "field");
- String format = MapUtils.getString(aggParam, "format");
- List<Map<String, Object>> ranges = (List<Map<String, Object>>) aggParam.get("ranges");
- DateRangeAggregationBuilder dateRangeAggregationBuilder = AggregationBuilders.dateRange(aggName)
- .field(fieleName).timeZone(DateTimeZone.forOffsetHours(8));
-
- if (StringUtils.isNotEmpty(format)) {
- dateRangeAggregationBuilder.format(format);
- }
- ListUtils.emptyIfNull(ranges).forEach(e -> {
- String from = MapUtils.getString(e, "from");
- String to = MapUtils.getString(e, "to");
- dateRangeAggregationBuilder.addRange(from, to);
- });
- aggregationBuilder = dateRangeAggregationBuilder;
- }
- return aggregationBuilder;
- }
-
- }
参数处理的部分,再贴出来
- public void getEsAggs(EsQuery esQuery) {
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
- SearchRequest searchRequest = new SearchRequest();
- // 查询的索引
- searchRequest.indices(esQuery.getSourceIndex());
-
- String sourceExpr = esQuery.getSourceExpr();
- String queryStr = "";
- JSONObject searchJson = JSONObject.parseObject(sourceExpr);
- // 参数,带query 和 aggs的, 只有aggs的
- WrapperQueryBuilder wrapperQueryBuilder = null;
-
- if (!StringUtils.isEmpty(sourceExpr)) {
- // 查询参数
- if (sourceExpr.contains("query")) {// 带有query的查询
- queryStr = searchJson.getJSONObject("query").toJSONString();
- }
- if (StringUtils.isNotEmpty(queryStr)) {
- wrapperQueryBuilder = QueryBuilders.wrapperQuery(queryStr);
- }
-
- // 聚合参数
- JSONObject aggs = searchJson.getJSONObject("aggregations");
- StrTermNextScript.joinEsStrAggs(aggs, searchSourceBuilder);
- }
-
- if (wrapperQueryBuilder != null) {
- searchSourceBuilder.query(wrapperQueryBuilder);
- }
- searchSourceBuilder.size(0);
- searchRequest.source(searchSourceBuilder);
- System.out.println("aggExpr " + searchSourceBuilder.toString());
-
- try {
- SearchResponse searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
- logger.info(searchResponse.toString());
- Aggregations aggregations = searchResponse.getAggregations();
- List<Aggregation> aggregationList = aggregations.asList();
-
- for (Aggregation aggregation : aggregationList) {
- logger.info(JSON.toJSONString(aggregation));
- Terms histogram = (Terms) aggregation;
- for (Terms.Bucket bucket : histogram.getBuckets()) {
- logger.info(bucket.getKeyAsString());
- logger.info("" + bucket.getDocCount());
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
聚合查询的时候,size设置为0
Es聚合参数,包括参数部分和聚合部分。参数部分只用处理参数部分,分页,排序,那些就不用处理了。聚合部分,参数处理好了,就是获取值了。对于值的处理,又是需要花点时间处理。
下一篇: 《平铺es聚合数据》
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。