赞
踩
聚合(aggregations)可以实现对文档数据的统计、分析、运算。聚合常见的有三类:
注意:参与聚合的字段类型必须是:keyword、数值、日期、布尔,一定不能是可分词的类型。
# 使用DSL实现聚合 # 1.bucket桶聚合 + 限定聚合范围 # 例:根据酒店品牌名做聚合(并且限定价格不高于200的),并按照结果的升序排序,显示前5个品牌 GET /hotel/_search { "query": { "range": { "price": { "lte": 200 } } }, "size": 0, //设置size为0,结果中不包含文档,只包含聚合结果 "aggs": { // 定义聚合 "brandAgg": { // 定义聚合名 "terms": { // 聚合类型,按照品牌名聚合,所以选择term "field": "brand", // 参与聚合字段 "order": { "_count": "asc" //指定排序规则 升序 }, "size": 20 //希望获得聚合结果数 } } } } # 2.Metrics聚合 # 例:获得每个品牌的用户评分的min、max、avg,并且按照avg排序(降序) GET /hotel/_search { "size": 0, "aggs": { "brandAgg": { "terms": { "field": "brand", "size": 20, "order": { "score_stats.avg": "desc" } }, "aggs": { //子聚合 "score_stats": { //子聚合名 "stats": { //聚合类型,stats可以计算min、max、avg等 "field": "score" //聚合字段 } } } } } }
/** * 桶bucket聚合 */ @Test void testAgg() throws IOException { // 1.准备请求 SearchRequest request = new SearchRequest("hotel"); // 2.请求参数 // 2.1.size request.source().size(0); // 2.2.聚合 request.source().aggregation( AggregationBuilders.terms("brandAgg").field("brand").size(20)); // 3.发出请求 SearchResponse response = client.search(request, RequestOptions.DEFAULT); // 4.解析结果 Aggregations aggregations = response.getAggregations(); // 4.1.根据聚合名称,获取聚合结果 Terms brandAgg = aggregations.get("brandAgg"); // 4.2.获取buckets List<? extends Terms.Bucket> buckets = brandAgg.getBuckets(); // 4.3.遍历 for (Terms.Bucket bucket : buckets) { String brandName = bucket.getKeyAsString(); System.out.println("brandName = " + brandName); long docCount = bucket.getDocCount(); System.out.println("docCount = " + docCount); } }
@Slf4j @Service public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService { @Autowired private RestHighLevelClient restHighLevelClient; @Override public Map<String, List<String>> filters() { try { // 1.准备请求 SearchRequest request = new SearchRequest("hotel"); // 2.请求参数 // 2.1.size request.source().size(0); // 2.2.聚合 buildAggregation(request); // 3.发出请求 SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); // 4.解析结果 Map<String, List<String>> result = new HashMap<>(); Aggregations aggregations = response.getAggregations(); // 4.1.根据品牌名称,获取聚合结果 List<String> brandList = getAggByName(aggregations, "brandAgg"); // 放入map result.put("品牌",brandList); // 4.2.根据城市名称,获取聚合结果 List<String> cityList = getAggByName(aggregations, "cityAgg"); // 放入map result.put("城市",cityList); // 4.3.根据星级名称,获取聚合结果 List<String> starList = getAggByName(aggregations, "starAgg"); // 放入map result.put("星级",starList); return result; } catch (IOException e) { throw new RuntimeException(e); } } public List<String> getAggByName(Aggregations aggregations, String aggName) { // 4.1.根据聚合名称,获取聚合结果 Terms brandAgg = aggregations.get(aggName); // 4.2.获取buckets List<? extends Terms.Bucket> buckets = brandAgg.getBuckets(); // 4.3.遍历 List<String> brandList = new ArrayList<>(); for (Terms.Bucket bucket : buckets) { String key = bucket.getKeyAsString(); brandList.add(key); } return brandList; } public void buildAggregation(SearchRequest request) { request.source().aggregation(AggregationBuilders .terms("brandAgg") .field("brand") .size(100)); request.source().aggregation(AggregationBuilders .terms("cityAgg") .field("city") .size(100)); request.source().aggregation(AggregationBuilders .terms("starAgg") .field("starName") .size(100)); } }
测试
@SpringBootTest
public class HotelDemoApplicationTest {
@Autowired
private IHotelService hotelService;
@Test
void contextLoads(){
Map<String, List<String>> filters = hotelService.filters();
System.out.println(filters);
}
}
结果:
自动补全如下图所示:
要实现根据字母做补全,就必须对文档按照拼音分词。在GitHub上恰好有elasticsearch的拼音分词插件。地址:https://github.com/medcl/elasticsearch-analysis-pinyin
安装方式与IK分词器一样,分三步:
演示:
# 自定义拼音分词器 PUT /test { "settings": { "analysis": { "analyzer": { "my_analyzer": { "tokenizer": "ik_max_word", "filter": "py" } }, "filter": { "py": { "type": "pinyin", "keep_full_pinyin": false, "keep_joined_full_pinyin": true, "keep_original": true, "limit_first_letter_length": 16, "remove_duplicated_term": true, "none_chinese_pinyin_tokenize": false } } } }, "mappings": { "properties": { "name": { "type": "text", "analyzer": "my_analyzer", "search_analyzer": "ik_smart" } } } } POST /test/_doc/1 { "id": 1, "name": "狮子" } POST /test/_doc/2 { "id": 2, "name": "虱子" } GET /test/_search { "query": { "match": { "name": "掉入狮子笼咋办" } } }
注意:
拼音分词器通常在创建索引库时使用,搜索时使用普通分词器即可
查询语法如下
// 自动补全查询
POST /test/_search
{
"suggest": {
"title_suggest": { // 自定义补全查询名称
"text": "s", // 关键字
"completion": {
"field": "title", // 补全字段
"skip_duplicates": true, // 跳过重复的
"size": 10 // 获取前10条结果
}
}
}
}
演示:
# 2.自动补全 # 2.1 创建一个 自动补全的索引库 属性有title DELETE /test PUT test { "mappings": { "properties": { "title":{ "type": "completion" } } } } # 2.2 插入示例数据 POST test/_doc { "title": ["Sony", "WH-1000XM3"] } POST test/_doc { "title": ["SK-II", "PITERA"] } POST test/_doc { "title": ["Nintendo", "switch"] } # 2.3 自动补全查询 # 例:输入一个关键字s,看自动补全的结果 # 结果:"SK-II"、"Sony"和"switch" POST /test/_search { "suggest": { "title_suggest": { "text": "s", "completion": { "field": "title", "skip_duplicates": true, "size": 10 } } } }
结果:
1.修改索引库结构
# 酒店数据索引库 GET /hotel/_mapping DELETE /hotel PUT /hotel { "settings": { "analysis": { "analyzer": { "text_anlyzer": { "tokenizer": "ik_max_word", "filter": "py" }, "completion_analyzer": { "tokenizer": "keyword", "filter": "py" } }, "filter": { "py": { "type": "pinyin", "keep_full_pinyin": false, "keep_joined_full_pinyin": true, "keep_original": true, "limit_first_letter_length": 16, "remove_duplicated_term": true, "none_chinese_pinyin_tokenize": false } } } }, "mappings": { "properties": { "id":{ "type": "keyword" }, "name":{ "type": "text", "analyzer": "text_anlyzer", "search_analyzer": "ik_smart", "copy_to": "all" }, "address":{ "type": "keyword", "index": false }, "price":{ "type": "integer" }, "score":{ "type": "integer" }, "brand":{ "type": "keyword", "copy_to": "all" }, "city":{ "type": "keyword" }, "starName":{ "type": "keyword" }, "business":{ "type": "keyword", "copy_to": "all" }, "location":{ "type": "geo_point" }, "pic":{ "type": "keyword", "index": false }, "all":{ "type": "text", "analyzer": "text_anlyzer", "search_analyzer": "ik_smart" }, "suggestion":{ "type": "completion", "analyzer": "completion_analyzer" } } } } # 自动补全查询 GET /hotel/_search { "suggest": { "mySuggestion": { "text": "shang", "completion": { "field": "suggestion", "skip_duplicates": true, "size": 10 } } } }
2.修改HotelDoc
@Data @NoArgsConstructor public class HotelDoc { private Long id; private String name; private String address; private Integer price; private Integer score; private String brand; private String city; private String starName; private String business; private String location; private String pic; private Object distance; //新加加字段"距离":酒店距你选择位置的距离 private Boolean isAD; //新加加字段"标记":给你置顶的酒店添加一个标记 private List<String> suggestion;//新加该字段用于自动补全 public HotelDoc(Hotel hotel) { this.id = hotel.getId(); this.name = hotel.getName(); this.address = hotel.getAddress(); this.price = hotel.getPrice(); this.score = hotel.getScore(); this.brand = hotel.getBrand(); this.city = hotel.getCity(); this.starName = hotel.getStarName(); this.business = hotel.getBusiness(); this.location = hotel.getLatitude() + ", " + hotel.getLongitude(); this.pic = hotel.getPic(); // 自动补全字段的处理 this.suggestion = new ArrayList<>(); // 添加品牌、城市 this.suggestion.add(this.brand); this.suggestion.add(this.city); // 判断商圈是否包含/ if (this.business.contains("/")) { // business有多个值,需要切割 String[] arr = this.business.split("/"); // business的每个值都要加入到suggestion中 Collections.addAll(this.suggestion, arr); }else{ this.suggestion.add(this.business); } } }
3.【重新导入数据,不演示,参见之前的批量导入文档功能】查询结果
/** * 自动补全查询 */ @Test void testSuggest() throws IOException { // 1.准备请求 SearchRequest request = new SearchRequest("hotel"); // 2.请求参数 request.source().suggest(new SuggestBuilder().addSuggestion( "hotelSuggest", SuggestBuilders .completionSuggestion("suggestion") .size(10) .skipDuplicates(true) .prefix("s") )); // 3.发出请求 SearchResponse response = client.search(request, RequestOptions.DEFAULT); // 4.解析结果 Suggest suggest = response.getSuggest(); // 4.1.根据补全查询名称,获取补全结果 CompletionSuggestion suggestion = suggest.getSuggestion("hotelSuggest"); // 4.2.获取options for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) { // 4.3.获取补全的结果 String str = option.getText().toString(); System.out.println(str); } }
Mapper层
@RestController @RequestMapping("hotel") public class HotelController { @Autowired private IHotelService hotelService; @PostMapping("list") public PageResult search(@RequestBody RequestParams params) { return hotelService.search(params); } @PostMapping("filters") public Map<String, List<String>> getFilters(@RequestBody RequestParams params) { return hotelService.filters(params); } @GetMapping("suggestion") public List<String> getSuggestion(@RequestParam("key") String key) { return hotelService.getSuggestion(key); } }
Service层
@Slf4j @Service public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService { @Autowired private RestHighLevelClient restHighLevelClient; /** * 自动补全查询 */ @Override public List<String> getSuggestion(String key) { try { // 1.准备请求 SearchRequest request = new SearchRequest("hotel"); // 2.请求参数 request.source().suggest(new SuggestBuilder().addSuggestion( "hotelSuggest", SuggestBuilders .completionSuggestion("suggestion") .size(10) .skipDuplicates(true) .prefix(key) )); // 3.发出请求 SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); // 4.解析结果 Suggest suggest = response.getSuggest(); // 4.1.根据补全查询名称,获取补全结果 CompletionSuggestion suggestion = suggest.getSuggestion("hotelSuggest"); // 4.2.获取options List<String> result = new ArrayList<>(); for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) { // 4.3.获取补全的结果 String str = option.getText().toString(); result.add(str); } return result; } catch (IOException e) { throw new RuntimeException(e); } } }
结果演示
由于增和改都相当于插入,所以共用一个队列;删除占用一个队列。
一、对消费者hotel-demo的操作
<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
server: port: 8089 spring: datasource: url: jdbc:mysql://mysql:3306/heima?useSSL=false username: root password: 123 driver-class-name: com.mysql.jdbc.Driver rabbitmq: host: 192.168.150.101 port: 5672 username: itcast password: 123321 virtual-host: / logging: level: cn.itcast: debug pattern: dateformat: HH:mm:ss:SSS mybatis-plus: configuration: map-underscore-to-camel-case: true type-aliases-package: cn.itcast.hotel.pojo
public class HotelMqConstants {
// 交换机名称
public static final String EXCHANGE_NAME = "hotel.topic";
// 新增修改队列
public static final String INSERT_QUEUE_NAME = "hotel.insert.queue";
// 删除队列
public static final String DELETE_QUEUE_NAME = "hotel.delete.queue";
// 新增修改的RoutingKey
public static final String INSERT_KEY = "hotel.insert";
// 删除的RoutingKey
public static final String DELETE_KEY = "hotel.delete";
}
@Component public class HotelListener { @Autowired private IHotelService hotelService; @RabbitListener(bindings = @QueueBinding( value = @Queue(name = HotelMqConstants.INSERT_QUEUE_NAME), exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC), key = HotelMqConstants.INSERT_KEY )) public void listenHotelInsert(Long hotelId){ // 新增 hotelService.saveById(hotelId); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = HotelMqConstants.DELETE_QUEUE_NAME), exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC), key = HotelMqConstants.DELETE_KEY )) public void listenHotelDelete(Long hotelId){ // 删除 hotelService.deleteById(hotelId); } }
【bean方式】
@Configuration public class MqConfig { @Bean public TopicExchange topicExchange(){ return new TopicExchange(HotelMqConstants.EXCHANGE_NAME,true,false); } @Bean public Queue insertQueue(){ return new Queue(HotelMqConstants.INSERT_QUEUE_NAME,true); } @Bean public Queue deleteQueue(){ return new Queue(HotelMqConstants.DELETE_QUEUE_NAME,true); } @Bean public Binding insertQueueBinding(){ return BindingBuilder .bind(insertQueue()) .to(topicExchange()) .with(HotelMqConstants.INSERT_KEY); } @Bean public Binding deleteQueueBinding(){ return BindingBuilder .bind(deleteQueue()) .to(topicExchange()) .with(HotelMqConstants.DELETE_KEY); } }
@Slf4j @Service public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService { @Autowired private RestHighLevelClient restHighLevelClient; /** * 搜索框查询 */ @Override public PageResult search(RequestParams params) { try { // 1.准备Request SearchRequest request = new SearchRequest("hotel"); // 2.准备请求参数 // 2.1.多条件查询和过滤 buildBasicQuery(params, request); // 2.2.分页 int page = params.getPage(); int size = params.getSize(); request.source().from((page - 1) * size).size(size); /** * 2.3.距离排序 */ String location = params.getLocation(); if (StringUtils.isNotBlank(location)) {// 不为空则查询 request.source().sort(SortBuilders .geoDistanceSort("location", new GeoPoint(location)) .order(SortOrder.ASC) .unit(DistanceUnit.KILOMETERS) ); } // 3.发送请求 SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); // 4.解析响应 return handleResponse(response); } catch (IOException e) { throw new RuntimeException("搜索数据失败", e); } } /** * 复合查询 */ private void buildBasicQuery(RequestParams params, SearchRequest request) { // 1.准备Boolean复合查询 BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); /** * 1.查询关键字 * must参与 算分 */ // 1.1.关键字搜索,match查询,放到must中 String key = params.getKey(); if (StringUtils.isNotBlank(key)) { // 不为空,根据关键字查询 boolQuery.must(QueryBuilders.matchQuery("all", key)); } else { // 为空,查询所有 boolQuery.must(QueryBuilders.matchAllQuery()); } /** * 2.条件过滤:多条件复合查询 * 根据 “品牌 城市 星级 价格范围” 过滤数据 * filter不参与 算分 */ // 1.2.品牌 String brand = params.getBrand(); if (StringUtils.isNotBlank(brand)) { // 不为空则查询 boolQuery.filter(QueryBuilders.termQuery("brand", brand)); } // 1.3.城市 String city = params.getCity(); if (StringUtils.isNotBlank(city)) {// 不为空则查询 boolQuery.filter(QueryBuilders.termQuery("city", city)); } // 1.4.星级 String starName = params.getStarName(); if (StringUtils.isNotBlank(starName)) {// 不为空则查询 boolQuery.filter(QueryBuilders.termQuery("starName", starName)); } // 1.5.价格范围 Integer minPrice = params.getMinPrice(); Integer maxPrice = params.getMaxPrice(); if (minPrice != null && maxPrice != null) {// 不为空则查询 maxPrice = maxPrice == 0 ? Integer.MAX_VALUE : maxPrice; boolQuery.filter(QueryBuilders.rangeQuery("price").gte(minPrice).lte(maxPrice)); } /** * 3.算分函数查询 * 置顶功能:给你置顶的酒店添加一个标记,并按其算分 */ FunctionScoreQueryBuilder functionScoreQuery = QueryBuilders.functionScoreQuery( boolQuery, // 原始查询,boolQuery new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{ // function数组 new FunctionScoreQueryBuilder.FilterFunctionBuilder( QueryBuilders.termQuery("isAD", true), // 过滤条件 ScoreFunctionBuilders.weightFactorFunction(10) // 算分函数 ) } ); /** * 4.设置查询条件 */ request.source().query(functionScoreQuery); } /** * 结果解析 */ private PageResult handleResponse(SearchResponse response) { SearchHits searchHits = response.getHits(); // 4.1.总条数 long total = searchHits.getTotalHits().value; // 4.2.获取文档数组 SearchHit[] hits = searchHits.getHits(); // 4.3.遍历 List<HotelDoc> hotels = new ArrayList<>(hits.length); for (SearchHit hit : hits) { // 4.4.获取source String json = hit.getSourceAsString(); // 4.5.反序列化,非高亮的 HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class); // 4.6.处理高亮结果 // 1)获取高亮map Map<String, HighlightField> map = hit.getHighlightFields(); if (map != null && !map.isEmpty()) { // 2)根据字段名,获取高亮结果 HighlightField highlightField = map.get("name"); if (highlightField != null) { // 3)获取高亮结果字符串数组中的第1个元素 String hName = highlightField.getFragments()[0].toString(); // 4)把高亮结果放到HotelDoc中 hotelDoc.setName(hName); } } // 4.8.排序信息 Object[] sortValues = hit.getSortValues(); // 获取排序结果 if (sortValues.length > 0) { /** * 由于该程序是根据距离[酒店距你选择位置的距离]进行排序,所以排序结果为距离 */ hotelDoc.setDistance(sortValues[0]); } // 4.9.放入集合 hotels.add(hotelDoc); } return new PageResult(total, hotels); } /** * 多条件聚合 */ @Override public Map<String, List<String>> filters(RequestParams params) { try { // 1.准备请求 SearchRequest request = new SearchRequest("hotel"); // 2.请求参数 // 2.1.query查询信息 buildBasicQuery(params, request); // 2.2.size request.source().size(0); // 2.3.聚合 buildAggregation(request); // 3.发出请求 SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); // 4.解析结果 Map<String, List<String>> result = new HashMap<>(); Aggregations aggregations = response.getAggregations(); // 4.1.根据品牌名称,获取聚合结果 List<String> brandList = getAggByName(aggregations, "brandAgg"); // 放入map result.put("品牌",brandList); // 4.2.根据城市名称,获取聚合结果 List<String> cityList = getAggByName(aggregations, "cityAgg"); // 放入map result.put("城市",cityList); // 4.3.根据星级名称,获取聚合结果 List<String> starList = getAggByName(aggregations, "starAgg"); // 放入map result.put("星级",starList); return result; } catch (IOException e) { throw new RuntimeException(e); } } public List<String> getAggByName(Aggregations aggregations, String aggName) { // 4.1.根据聚合名称,获取聚合结果 Terms brandAgg = aggregations.get(aggName); // 4.2.获取buckets List<? extends Terms.Bucket> buckets = brandAgg.getBuckets(); // 4.3.遍历 List<String> brandList = new ArrayList<>(); for (Terms.Bucket bucket : buckets) { String key = bucket.getKeyAsString(); brandList.add(key); } return brandList; } public void buildAggregation(SearchRequest request) { request.source().aggregation(AggregationBuilders .terms("brandAgg") .field("brand") .size(100)); request.source().aggregation(AggregationBuilders .terms("cityAgg") .field("city") .size(100)); request.source().aggregation(AggregationBuilders .terms("starAgg") .field("starName") .size(100)); } /** * 自动补全查询 */ @Override public List<String> getSuggestion(String key) { try { // 1.准备请求 SearchRequest request = new SearchRequest("hotel"); // 2.请求参数 request.source().suggest(new SuggestBuilder().addSuggestion( "hotelSuggest", SuggestBuilders .completionSuggestion("suggestion") .size(10) .skipDuplicates(true) .prefix(key) )); // 3.发出请求 SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); // 4.解析结果 Suggest suggest = response.getSuggest(); // 4.1.根据补全查询名称,获取补全结果 CompletionSuggestion suggestion = suggest.getSuggestion("hotelSuggest"); // 4.2.获取options List<String> result = new ArrayList<>(); for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) { // 4.3.获取补全的结果 String str = option.getText().toString(); result.add(str); } return result; } catch (IOException e) { throw new RuntimeException(e); } } @Override public void deleteById(Long hotelId) { try { // 1.创建request DeleteRequest request = new DeleteRequest("hotel", hotelId.toString()); // 2.发送请求 restHighLevelClient.delete(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException("删除酒店数据失败", e); } } @Override public void saveById(Long hotelId) { try { // 查询酒店数据,应该基于Feign远程调用hotel-admin,根据id查询酒店数据(现在直接去数据库查) Hotel hotel = getById(hotelId); // 转换 HotelDoc hotelDoc = new HotelDoc(hotel); // 1.创建Request IndexRequest request = new IndexRequest("hotel").id(hotelId.toString()); // 2.准备参数 request.source(JSON.toJSONString(hotelDoc), XContentType.JSON); // 3.发送请求 restHighLevelClient.index(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException("新增酒店数据失败", e); } } }
二、对发送者hotel-admin的操作
@RestController @RequestMapping("hotel") public class HotelController { @Autowired private IHotelService hotelService; // 注入发送消息的api @Autowired private RabbitTemplate rabbitTemplate; /** * 根据id查询 */ @GetMapping("/{id}") public Hotel queryById(@PathVariable("id") Long id){ return hotelService.getById(id); } /** * 查询当前页内容 */ @GetMapping("/list") public PageResult hotelList( @RequestParam(value = "page", defaultValue = "1") Integer page, @RequestParam(value = "size", defaultValue = "1") Integer size ){ Page<Hotel> result = hotelService.page(new Page<>(page, size)); return new PageResult(result.getTotal(), result.getRecords()); } /** * 新增,并发送给mq消息 */ @PostMapping public void saveHotel(@RequestBody Hotel hotel){ // 新增酒店 hotelService.save(hotel); // 发送MQ消息 rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId()); } /** * 修改,并发送给mq消息 */ @PutMapping() public void updateById(@RequestBody Hotel hotel){ if (hotel.getId() == null) { throw new InvalidParameterException("id不能为空"); } hotelService.updateById(hotel); // 发送MQ消息 rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId()); } /** * 删除,并发送给mq消息 */ @DeleteMapping("/{id}") public void deleteById(@PathVariable("id") Long id) { hotelService.removeById(id); // 发送MQ消息 rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.DELETE_KEY, id); } }
单机的elasticsearch做数据存储,必然面临两个问题:海量数据存储问题、单点故障问题。
>> 海量数据存储问题:将索引库从逻辑上拆分为N个分片(shard),存储到多个节点
>> 单点故障问题:将分片数据在不同节点备份(replica )
我们会在单机上利用docker容器运行多个es实例来模拟es集群。不过生产环境推荐大家每一台服务节点仅部署一个es的实例。
部署es集群可以直接使用docker-compose来完成,但这要求你的Linux虚拟机至少有4G的内存空间
version: '2.2' services: es01: image: elasticsearch:7.12.1 container_name: es01 environment: - node.name=es01 - cluster.name=es-docker-cluster - discovery.seed_hosts=es02,es03 - cluster.initial_master_nodes=es01,es02,es03 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" volumes: - data01:/usr/share/elasticsearch/data ports: - 9200:9200 networks: - elastic es02: image: elasticsearch:7.12.1 container_name: es02 environment: - node.name=es02 - cluster.name=es-docker-cluster - discovery.seed_hosts=es01,es03 - cluster.initial_master_nodes=es01,es02,es03 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" volumes: - data02:/usr/share/elasticsearch/data ports: - 9201:9200 networks: - elastic es03: image: elasticsearch:7.12.1 container_name: es03 environment: - node.name=es03 - cluster.name=es-docker-cluster - discovery.seed_hosts=es01,es02 - cluster.initial_master_nodes=es01,es02,es03 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" volumes: - data03:/usr/share/elasticsearch/data networks: - elastic ports: - 9202:9200 volumes: data01: driver: local data02: driver: local data03: driver: local networks: elastic: driver: bridge
es运行需要修改一些linux系统权限,修改/etc/sysctl.conf
文件
vi /etc/sysctl.conf
添加下面的内容:
vm.max_map_count=262144
然后执行命令,让配置生效:
sysctl -p
通过docker-compose启动集群:
docker-compose up -d
kibana可以监控es集群,不过新版本需要依赖es的x-pack 功能,配置比较复杂。
这里推荐使用cerebro来监控es集群状态,官方网址:https://github.com/lmenezes/cerebro
课前资料已经提供了安装包:
解压即可使用,非常方便。
解压好的目录如下:
进入对应的bin目录:
双击其中的cerebro.bat文件即可启动服务。
访问http://localhost:9000 即可进入管理界面:
输入你的elasticsearch的任意节点的地址和端口,点击connect即可:
绿色的条,代表集群处于绿色(健康状态)。
1)利用kibana的DevTools创建索引库
在DevTools中输入指令:
PUT /itcast
{
"settings": {
"number_of_shards": 3, // 分片数量
"number_of_replicas": 1 // 副本数量
},
"mappings": {
"properties": {
// mapping映射定义 ...
}
}
}
2)利用cerebro创建索引库
利用cerebro还可以创建索引库:
填写索引库信息:
点击右下角的create按钮:
回到首页,即可查看索引库分片效果:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。