赞
踩
桶(bucket):
桶的作用
,是按照某种方式对数据进行分组
,每一组数据在ES中称为一个桶
,ES中提供的划分桶的方式有很多:
日期阶梯分组
,例如给定阶梯为周,会自动每周分为一组数值阶梯分组
,与日期类似,需要知道分组的间隔(interval)词条内容分组
,词条内容完全匹配的为一组数值和日期
的范围分组,指定开始和结束,然后按段分组度量(metrics):
分组完成以后,我们一般会对组中的数据进行聚合运算
,例如求平均值、最大、最小、求和等,这些在ES中称为度量
常见度量聚合方式:
统计价格在500元之内酒店品牌有几种,此时可以根据酒店品牌的名称做聚合。
GET /hotel/_search { "query":{ //搜索条件 "range": { "price":{ "lte": 500 } } }, "size": 0, //不查询具体的数据 "aggs": { //声明这是一个聚合查询,是aggregations的缩写 "brandAgg": { //给这次聚合起一个名字,可任意指定 "terms": { //聚合的类型,这里选择terms,是根据词条内容(这里是品牌)划分 "field": "brand", //按照哪个字段分组 "size": 10, //显示多少条聚合结果 "order": { "_count": "asc" //分组之后可以根据数量排序 } } } } }
结果: { "took" : 4, "timed_out" : false, "_shards" : { //省略 }, "hits" : { "total" : { "value" : 95, "relation" : "eq" }, "max_score" : null, "hits" : [ ] //不显示搜索的数据结果 }, "aggregations" : { "brandAgg" : { //桶的名称 "doc_count_error_upper_bound" : 0, "sum_other_doc_count" : 78, "buckets" : [ { "key" : "万丽", "doc_count" : 1 }, { "key" : "万怡", "doc_count" : 1 } //...省略 ] } } }
前面的例子告诉我们每个桶里面的文档数量,这很有用。 但通常,我们的应用需要提供更复杂的文档度量。 例如,每种品牌酒店的平均价格是多少?
因此,我们需要告诉ES使用哪个字段
,使用何种度量方式
进行运算,这些信息要嵌套在桶
内,度量
的运算会基于桶
内的文档进行
GET /hotel/_search { "query":{ "match_all": {} }, "size": 0, //不查询具体的数据 "aggs": { "brandAgg": { "terms": { "field": "brand", //按照哪个字段分组 "order": { "scoreAgg.avg": "desc" //根据指定的统计项排序 } }, "aggs":{ //是brands聚合的子聚合,也就是分组后对每组分别计算 "scoreAgg": {//聚合名称 "stats": {//聚合类型,这里stats可以计算min、max、avg等 "field":"score"//聚合字段,这里是score } } } } } }
结果: { "took" : 6, "timed_out" : false, "_shards" : { }, "hits" : { }, "aggregations" : { "brandAgg" : { "doc_count_error_upper_bound" : 0, "sum_other_doc_count" : 111, "buckets" : [ { "key" : "万丽", "doc_count" : 2, "scoreAgg" : { "count" : 2, "min" : 46.0, "max" : 47.0, "avg" : 46.5, "sum" : 93.0 } }, { "key" : "凯悦", "doc_count" : 8, "scoreAgg" : { "count" : 8, "min" : 45.0, "max" : 47.0, "avg" : 46.25, "sum" : 370.0 } }, //省略 } ] } } }
构建测试类AggsTest
//聚合为桶 @Test public void aggs() throws IOException { //1. 创建请求对象 SearchRequest request = new SearchRequest("hotel"); //2. 设置条件 request.source().query(QueryBuilders.rangeQuery("price").lte(500));//设置查询条件 request.source().size(0);//不显示查询结果 request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand") .size(10).order(BucketOrder.count(true)));//集合为桶 //3. 发送请求 SearchResponse response = client.search(request, RequestOptions.DEFAULT); //4. 处理结果 //4-1获取聚合结果 Terms brandTerm = response.getAggregations().get("brandAgg"); //4-2 获取结果中的桶 for (Terms.Bucket bucket : brandTerm.getBuckets()) { System.out.println(bucket.getKey() + ":" + bucket.getDocCount()); } }
//聚合为桶, 桶内度量 @Test public void aggs2() throws IOException { //1. 创建请求对象 SearchRequest request = new SearchRequest("hotel"); //2. 设置条件 request.source().query(QueryBuilders.rangeQuery("price").lte(500));//设置查询条件 request.source().size(0);//不显示查询结果 request.source().aggregation( AggregationBuilders.terms("brandAgg").field("brand").size(10).order(BucketOrder.count(true))//集合为桶 .subAggregation(AggregationBuilders.stats("scoreAgg").field("score"))//按照评分统计 ); //3. 发送请求 SearchResponse response = client.search(request, RequestOptions.DEFAULT); //4. 处理结果 //4-1获取聚合结果 Terms brandTerm = response.getAggregations().get("brandAgg"); //4-2 获取结果中的桶 for (Terms.Bucket bucket : brandTerm.getBuckets()) { ParsedStats stats = (ParsedStats) bucket.getAggregations().getAsMap().get("scoreAgg");//获取统计结果 System.out.println(bucket.getKey() + ":" + bucket.getDocCount()+":"+stats.getAvg()); } }
结果是一个Map结构:
Controller
//统计
@PostMapping("/hotel/filters")
public Map<String, List<String>> getFilters(@RequestBody RequestParams requestParams) throws IOException {
return hotelService.filters(requestParams);
}
ServiceImpl
//统计结果 @Override public Map<String, List<String>> filters(RequestParams requestParams) throws IOException { //1. 构建查询请求 SearchRequest request = new SearchRequest("hotel"); //2. 设置查询条件 //2-1 创建复合查询 BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); //2-2 获取搜索关键字, 设置为查询条件 String key = requestParams.getKey(); if (StringUtils.isEmpty(key)) { boolQuery.must(QueryBuilders.matchAllQuery()); } else { boolQuery.must(QueryBuilders.matchQuery("all", key)); } //2-3 获取城市、星级、品牌、价格,使用过滤语法筛选 // 城市 if (StrUtil.isNotEmpty(requestParams.getCity())) { boolQuery.filter(QueryBuilders.termQuery("city", requestParams.getCity())); } // 星级 if (StrUtil.isNotEmpty(requestParams.getStarName())) { boolQuery.filter(QueryBuilders.termQuery("starName", requestParams.getStarName())); } // 品牌 if (StrUtil.isNotEmpty(requestParams.getBrand())) { boolQuery.filter(QueryBuilders.termQuery("brand", requestParams.getBrand())); } // 价格 if (requestParams.getMinPrice() != null && requestParams.getMaxPrice() != null) { boolQuery.filter(QueryBuilders.rangeQuery("price").gte(requestParams.getMinPrice()).lte(requestParams.getMaxPrice())); } request.source().query(boolQuery);//设置查询条件 request.source().size(0);//不查询信息 //3. 设置统计分析 request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(50)); request.source().aggregation(AggregationBuilders.terms("cityAgg").field("city").size(50)); request.source().aggregation(AggregationBuilders.terms("starNameAgg").field("starName").size(50)); //4. 发起请求 SearchResponse response = client.search(request, RequestOptions.DEFAULT); //5. 处理统计结果 Aggregations aggregations = response.getAggregations(); List<String> brandAgg = ((Terms) response.getAggregations().get("brandAgg")).getBuckets().stream().map(e -> e.getKeyAsString()).collect(Collectors.toList()); List<String> cityAgg = ((Terms) response.getAggregations().get("cityAgg")).getBuckets().stream().map(e -> e.getKeyAsString()).collect(Collectors.toList()); List<String> starNameAgg = ((Terms) response.getAggregations().get("starNameAgg")).getBuckets().stream().map(e -> e.getKeyAsString()).collect(Collectors.toList()); Map<String, List<String>> map = new HashMap<>(); map.put("brand",brandAgg); map.put("city",cityAgg); map.put("starName",starNameAgg); return map; }
① 使用docker容器安装mq
docker run \
-v mq-plugins:/plugins \
--name mq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8-management
② 登录管理界面
浏览器访问:http://192.168.149.128:15672/
账号: guest
密码: guest
③ 配置虚拟主机、用户
例如:
账号: hotel
密码: hotel
主机: /hotel
① 引入依赖
<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
② 配置文件
spring:
rabbitmq:
host: 192.168.149.128
port: 5672
virtual-host: /hotel
username: hotel
password: hotel
③ 声明队列交换机名称
public class MqConstants { /** * 交换机 */ public final static String HOTEL_EXCHANGE = "hotel.topic"; /** * 监听新增和修改的队列 */ public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue"; /** * 监听删除的队列 */ public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue"; /** * 新增或修改的RoutingKey */ public final static String HOTEL_INSERT_KEY = "hotel.insert"; /** * 删除的RoutingKey */ public final static String HOTEL_DELETE_KEY = "hotel.delete"; }
④ 创建交换机和队列
@Configuration public class MqConfiguration { @Bean public TopicExchange topicExchange() { return new TopicExchange(MqConstants.HOTEL_EXCHANGE); } // 声明第1个队列 @Bean public Queue insertQueue() { return new Queue(MqConstants.HOTEL_INSERT_QUEUE); } // 队列1绑定交换机 @Bean public Binding bindingInsertQueue(TopicExchange topicExchange, Queue insertQueue) { return BindingBuilder.bind(insertQueue).to(topicExchange).with(MqConstants.HOTEL_INSERT_KEY); } // 声明第2个队列 @Bean public Queue deleteQueue() { return new Queue(MqConstants.HOTEL_DELETE_QUEUE); } // 队列2绑定交换机 @Bean public Binding bindingDeleteQueue(TopicExchange topicExchange, Queue deleteQueue) { return BindingBuilder.bind(deleteQueue).to(topicExchange).with(MqConstants.HOTEL_DELETE_KEY); } }
@Component public class HotelListener { @Autowired private RestHighLevelClient client; @Autowired private HotelMapper hotelMapper; // 新增、修改 @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE) public void insertOrUpdateHotel(Long id) throws IOException { // 1.创建request IndexRequest request = new IndexRequest("hotel").id(id.toString()); // 2.准备DSL Hotel hotel = hotelMapper.selectById(id); HotelDoc hotelDoc = new HotelDoc(hotel); request.source(JSON.toJSONString(hotelDoc), XContentType.JSON); // 3.发送请求 client.index(request, RequestOptions.DEFAULT); System.out.println("es同步新增或更新:" + id); } // 删除 @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE) public void deleteHotel(Long id) throws IOException { // 1.创建request DeleteRequest request = new DeleteRequest("hotel", id.toString()); // 2.发送请求 client.delete(request, RequestOptions.DEFAULT); System.out.println("es同步删除:" + id); } }
es运行需要修改一些linux系统权限,修改/etc/sysctl.conf
文件
vim /etc/sysctl.conf
添加下面的内容:
vm.max_map_count=262144
然后执行命令,让配置生效:
sysctl -p
创建集群
编写一个docker-compose.yml文件,内容如下:
version: '2.2' services: es01: image: elasticsearch:7.12.1 container_name: es01 environment: - node.name=es01 - cluster.name=es-docker-cluster - discovery.seed_hosts=es02,es03 - cluster.initial_master_nodes=es01,es02,es03 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" volumes: - data01:/usr/share/elasticsearch/data ports: - 9201:9200 networks: - elastic es02: image: elasticsearch:7.12.1 container_name: es02 environment: - node.name=es02 - cluster.name=es-docker-cluster - discovery.seed_hosts=es01,es03 - cluster.initial_master_nodes=es01,es02,es03 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" volumes: - data02:/usr/share/elasticsearch/data ports: - 9202:9200 networks: - elastic es03: image: elasticsearch:7.12.1 container_name: es03 environment: - node.name=es03 - cluster.name=es-docker-cluster - discovery.seed_hosts=es01,es02 - cluster.initial_master_nodes=es01,es02,es03 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" volumes: - data03:/usr/share/elasticsearch/data networks: - elastic ports: - 9203:9200 volumes: data01: driver: local data02: driver: local data03: driver: local networks: elastic: driver: bridge
在docker-compose.yml目录下执行下面命令,运行集群
docker-compose up -d
创建索引库
使用head插件创建索引库,分片设置为3,每个分片设置1个副本
查看分片效果
回到首页,即可查看索引库分片效果:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。