赞
踩
之前其实写过ES查询数据,进行分组聚合统计:
复杂聚合分组统计实现
我这里因为ES服务已经升级到8.0.0了,然后ES数据查询分组,我这里需要对时间进行格式化,再聚合avg,所以客户端相关版本用的7.17.4
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> <version>7.17.4</version> <exclusions> <exclusion> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.17.4</version> <exclusions> <exclusion> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.17.4</version> </dependency>
目前我们就是单服务的,这个配置类够用了。其实我配置类就是要把RestHighLevelClient注入,并交给spring管理。
/** * ES配置类 * @author zwmac */ @Configuration @Data public class ElasticSearchConfig { @Value("${es.host}") private String host; @Value("${es.port}") private int port; @Value("${es.username}") private String loginName; @Value("${es.password}") private String password; private RestHighLevelClient client; @Bean public RestHighLevelClient client() { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(loginName, password)); HttpHost[] httpHostArray = new HttpHost[1]; httpHostArray[0] = new HttpHost(host, port); RestClientBuilder restClientBuilder = RestClient.builder(httpHostArray) .setHttpClientConfigCallback(httpClientBuilder -> { httpClientBuilder.disableAuthCaching(); return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); }); restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder .setConnectTimeout(60000) .setSocketTimeout(150000)); client = new RestHighLevelClient( restClientBuilder ); return client; } }
@Resource private RestHighLevelClient restHighLevelClient; /** * 查询温湿度24小时平均值 * @param deviceCode 设备编码 * @param startTime 开始时间 * @param endTime 结束时间 * @param humName 湿度字段名 * @param tempName 温度字段名 * @return 温湿度24小时平均值 */ private TreeMap<String, Map<String, Double>> queryTempHumDayAvg(String deviceCode, Date startTime, Date endTime, String humName, String tempName) { TreeMap<String, Map<String, Double>> treeMap = new TreeMap<>(); //ES查询 String index = EsCalendar.getDeviceFlowIndex(startTime, endTime); SearchRequest searchRequest = new SearchRequest(index); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); //忽略不可用索引,允许索引不不存在,通配符表达式将扩展为打开的索引 searchRequest.indicesOptions(IndicesOptions.fromOptions(true, true, true, false)); String timeFmt = "yyyy-MM-dd"; // 组装ES请求数据 String startTimeStr = DateUtil.format(startTime, DatePattern.NORM_DATETIME_PATTERN); String endTimeStr = DateUtil.format(endTime, DatePattern.NORM_DATETIME_PATTERN); QueryBuilder rangeQuery = QueryBuilders.rangeQuery("createTime").lte(endTimeStr).gte(startTimeStr); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); // 必须为deviceCode boolQueryBuilder.must(QueryBuilders.termQuery("deviceCode", deviceCode)); rangeQuery = QueryBuilders.boolQuery().must(rangeQuery).must(boolQueryBuilder); QueryBuilder boolQuery = QueryBuilders.boolQuery().must(rangeQuery); searchSourceBuilder.query(boolQuery).size(0); //平均值 温度 //String tempName = "temp_avg"; String tempAvgName = tempName + "_avg"; String tempFactorName = "data." + tempName; AvgAggregationBuilder tempAvgAggregationBuilder = AggregationBuilders.avg(tempAvgName).field(tempFactorName); //平均值 湿度 //String humName = "hygrometer_avg"; String humAvgName = humName + "_avg"; String humFactorName = "data." + humName; AvgAggregationBuilder humAvgAggregationBuilder = AggregationBuilders.avg(humAvgName).field(humFactorName); String createTimeGroup = "createTimeGroup"; DateHistogramAggregationBuilder aggregation = AggregationBuilders.dateHistogram(createTimeGroup) .field("createTime").fixedInterval(DateHistogramInterval.DAY) .format(timeFmt) //过滤掉count为0的数据 .minDocCount(1).subAggregation(tempAvgAggregationBuilder).subAggregation(humAvgAggregationBuilder); //分组条件 searchSourceBuilder.aggregation(aggregation); searchRequest.source(searchSourceBuilder); // 按照因子列表查询 searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = null; Map<String, Map<String, Double>> mp = new HashMap<>(); try { log.info("方法getCabinetTempHum24HourAvg查询ES请求数据:" + searchRequest); searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); log.info("方法getCabinetTempHum24HourAvg查询ES响应数据:" + searchResponse.toString()); Aggregations aggregations = searchResponse.getAggregations(); if (aggregations != null) { //组织出参数 aggregations.forEach(agg -> { ParsedDateHistogram parsedDateHistogram = (ParsedDateHistogram) agg; List buckets = parsedDateHistogram.getBuckets(); if (CollectionUtil.isNotEmpty(buckets)) { buckets.forEach(bucket -> { ParsedDateHistogram.ParsedBucket timeGroupTerm = (ParsedDateHistogram.ParsedBucket) bucket; String timeStr = timeGroupTerm.getKeyAsString(); Aggregations subAggregations = timeGroupTerm.getAggregations(); if (subAggregations != null) { Map<String, Double> tempHumMap = new HashMap<>(); Map<String, Aggregation> subAggMap = subAggregations.asMap(); if (subAggMap != null) { Aggregation tempAgg = subAggMap.get(tempAvgName); if (tempAgg != null) { ParsedAvg tempAggPdh = (ParsedAvg) tempAgg; tempHumMap.put(tempName, tempAggPdh.getValue()); } Aggregation humAgg = subAggMap.get(humAvgName); if (humAgg != null) { ParsedAvg humAggPdh = (ParsedAvg) humAgg; tempHumMap.put(humName, humAggPdh.getValue()); } } mp.put(timeStr, tempHumMap); } }); } }); } //数据补全 List<DateTime> dateTimeList = DateUtil.rangeToList(startTime, DateUtil.offsetHour(endTime, -1), DateField.HOUR_OF_DAY); if (CollectionUtil.isNotEmpty(dateTimeList)) { String finTempName = "temp_avg"; String finHumName = "hum_avg"; dateTimeList.forEach(dateTime -> { String timeStr = DateUtil.format(dateTime, timeFmt); Map<String, Double> finTempHumMap = new HashMap<>(); Map<String, Double> tempHumMap = mp.get(timeStr); if (tempHumMap == null) { finTempHumMap.put(finTempName, 0.0); finTempHumMap.put(finHumName, 0.0); } else { Double tempAvg = tempHumMap.get(tempName); Double humAvg = tempHumMap.get(humName); finTempHumMap.put(finTempName, tempAvg); finTempHumMap.put(finHumName, humAvg); } treeMap.put(timeStr, finTempHumMap); }); } } catch (Exception e) { log.error("方法countByEs查询ES异常", e); } return treeMap; }
关键点注意:
QueryBuilders.rangeQuery传入的时间精度,需要yyyy-MM-dd HH:mm:ss,否则会报错
这里对时间格式化分组,使用的是DateHistogramAggregationBuilder
这个在EsApi7+就废弃了calendarInterval,替换新的fixedInterval
分组再聚合,注意嵌套关系,各位自己理解下subAggregation
最后数据查询出来后,迭代解析,注意理解ParsedDateHistogram取值、parsedDateHistogram.getBuckets()、迭代解析
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。