当前位置:   article > 正文

分布式搜索引擎(elasticsearch) 3/3章_bucketorder

bucketorder

目录

数据聚合:

DSL实现Bucket聚合:

RestAPI实现聚合(与上面DSL方法一样)

DSL实现度量(Metrics)和管道(pipeline)聚合:

RestAPI实现聚合(与上面DSL方法一样)

自动补全

拼音分词器

自定义分词器 (创建索引库时,通过settings来配置自定义的analyzer(分词器))

现在想根据词来分拼音

自定义分词器最终结果

因为拼音分词器适合在创建倒排索引的时候使用,但不能在搜索的时候使用

 语法:

自动补全查询

数据同步:

数据同步思路分析

方案一:同步调用

 方案二:异步通知

方案三:监听binlog

实现elasticsearch与数据库数据同步(异步通知)

引入依赖

配置mq地址

声明常量

声明exchange、queue、RoutingKey(有基于注解和基于bean)

在admin中的增、删、改业务中完成消息发送

在demo中完成消息监听,并更新elasticsearch中数据

es集群:

搭建ES集群(创建三台es服务成一个集群)

搭建es集群

集群状态监控

创建索引库(分片)

查看分片效果

集群节点角色

 集群脑裂问题

集群分布式存储

elasticsearch会通过hash算法来计算文档应该存储到哪个分片:

说明:

新增文档流程:

​编辑

集群分布式查询

集群故障转移


数据聚合:

聚合常见的有三类:

        1.桶(Bucket)聚合:用来对文档做分组
                TermAggregation:按照文档字段值分组
                Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组

        2.度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
                    avg:求平均值
                    max:求最大值
                    min:求最小值
                    stats:同时求max、min、avg、sum等

         3.管道(pipeline)聚合:其它聚合的结果为基础做聚合

注意:参与聚合的字段类型必须是 keyword 数值 日期 布尔

DSL实现Bucket聚合:

统计价格小于200的品牌(brand)并且按照统计的文档数量升序有几种

GET /hotel/_search
{

  "query": { //条件查询
    "range": {
      "price": {
        "lte": 200 // 只对200元以下的文档聚合
      }
    }
  },      
  "size": 0,  // 设置size为0,结果中不包含文档,只包含聚合结果
  "aggs": { // 定义聚合
    "brandAgg": { //给聚合起个名字
      "terms": { // 聚合的类型,按照品牌值聚合,所以选择term
        "field": "brand", // 参与聚合的字段
        "size": 20 // 希望获取的聚合结果数量

        "order": [  //聚合结果排序 按照_count降序 多条件排序 单条件 把中括号去掉即可

                { "_count": "asc"}, // 按照_count升序(聚合会统计Bucket内的文档数量,记为_count)

                { "_key": "desc"},
        ],
      }
    }
  }
}

RestAPI实现聚合(与上面DSL方法一样)

  1. // 1.准备请求
  2. SearchRequest request = new SearchRequest("hotel");
  3. //条件
  4. request.source().query(QueryBuilders.rangeQuery("price").lte(200));//只对200元以下的文档聚合
  5. //排序条件
  6. BucketOrder bucketOrder = BucketOrder.count(false); //这里的count方法中true表示升序排列,false代表降序排列
  7. BucketOrder keyOrder = BucketOrder.key(false);
  8. List<BucketOrder> bucketOrders=new ArrayList<>();
  9. bucketOrders.add(bucketOrder);
  10. bucketOrders.add(keyOrder);
  11. //聚合
  12. request.source().size(0); // 设置size为0,结果中不包含文档,只包含聚合结果
  13. request.source().aggregation(
  14. AggregationBuilders
  15. .terms("brandAgg") //聚合的类型,brandAgg聚合名字
  16. .field("brand") //参与聚合的字段
  17. .size(20)
  18. .order(bucketOrders)); //希望获取的聚合结果数量
  19. // 3.发送请求
  20. SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
  21. // 4.解析响应 .......

总结;

aggs代表聚合,与query同级,此时query的作用是?
        限定聚合的的文档范围
聚合必须的三要素:
        聚合名称
        聚合类型
        聚合字段
聚合可配置属性有:
        size:指定聚合结果数量
        order:指定聚合结果排序方式
        field:指定聚合字段

DSL实现度量(Metrics)和管道(pipeline)聚合:

要求获取每个品牌的用户评分(score)的min、max、avg等值

GET /hotel/_search
{
  "size": 0, 
  "aggs": {//第一次聚合 根据品牌聚合
    "brandAgg": { 
      "terms": { 
        "field": "brand", 
        "size": 20
      },
      "aggs": { // 是brandAgg聚合的子聚合,也就是分组后对每组分别计算(管道)
        "score_stats": { // 聚合名称
          "stats": { // 聚合类型,这里stats可以计算min、max、avg等(度量)
            "field": "score" // 聚合字段,这里是score
          }
        }
      }
    }
  }
}

RestAPI实现聚合(与上面DSL方法一样)

  1. // 1.准备请求
  2. SearchRequest request = new SearchRequest("hotel");
  3. //条件
  4. request.source().query(QueryBuilders.rangeQuery("price").lte(200));//只对200元以下的文档聚合
  5. //度量(Metrics)和管道(pipeline)聚合
  6. StatsAggregationBuilder score = AggregationBuilders.stats("score");
  7. //聚合
  8. request.source().size(0); // 设置size为0,结果中不包含文档,只包含聚合结果
  9. request.source().aggregation(
  10. AggregationBuilders
  11. .terms("brandAgg") //聚合的类型,brandAgg聚合名字
  12. .field("brand") //参与聚合的字段
  13. .size(20) //希望获取的聚合结果数量
  14. .subAggregation(score));
  15. // 3.发送请求
  16. SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
  17. // 4.解析响应 .......

自动补全

拼音分词器

要实现根据字母做补全,就必须对文档按照拼音分词。在GitHub上恰好有elasticsearch的拼音分词插件  地址

安装方式与IK分词器一样,分三步:

1 解压

2 上传到虚拟机中,elasticsearch的plugin目录

3 重启elasticsearch 测试

测试

POST /_analyze
{
  "text": "如家酒店",
  "analyzer": "pinyin"
}

结果就是 每一个字都有拼音  不是词

自定义分词器 (创建索引库时,通过settings来配置自定义的analyzer(分词器))

现在想根据词来分拼音

elasticsearch中分词器(analyzer)的组成包含三部分

character filters:在tokenizer之前对文本进行处理。例如删除字符、替换字符
tokenizer:将文本按照一定的规则切割成词条(term)。例如keyword,就是不分词;还有ik_smart
tokenizer filter:将tokenizer输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等

自定义分词器最终结果

因为拼音分词器适合在创建倒排索引的时候使用,但不能在搜索的时候使用

创建倒排索引时:

搜索时,用户搜索“狮子”:

 语法:

PUT /test
{
  "settings": {
    "analysis": {
      "analyzer": { // 自定义分词器
        "my_analyzer": {  // 分词器名称
          "tokenizer": "ik_max_word",
          "filter": "py"  //下面filter定义的py
        }
      },
      "filter": { // 自定义tokenizer filter  配置
        "py": { // 过滤器名称
          "type": "pinyin", // 过滤器类型,这里是pinyin
          "keep_full_pinyin": false, //不单个字分词
          "keep_joined_full_pinyin": true, //全拼
          "keep_original": true, //保留中文
          "limit_first_letter_length": 16,
          "remove_duplicated_term": true,
          "none_chinese_pinyin_tokenize": false
        }
      }
    }
  },

"mappings": { //创建倒排索引时用my_analyzer分词器;字段在搜索时用ik_smart分词器
    "properties": {
      "name": {
        "type": "text",
        "analyzer": "my_analyzer",//创建
        "search_analyzer": "ik_smart"  //搜索
      }
    }
  }
}

自动补全查询

elasticsearch提供了Completion Suggester查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回

为了提高补全查询的效率,对于文档中字段的类型有一些约束:

        参与补全查询的字段必须是completion类型

        字段的内容一般是用来补全的多个词条形成的数组

例子:

// 创建索引库

PUT test
{
  "mappings": {
    "properties": {
      "title":{
        "type": "completion"
      }
    }
  }
}

// 示例数据
POST test/_doc
{
  "title": ["Sony", "WH-1000XM3"]
}
POST test/_doc
{
  "title": ["SK-II", "PITERA"]
}
POST test/_doc
{
  "title": ["Nintendo", "switch"]
}

// 自动补全查询
GET /test/_search
{
  "suggest": {
    "title_suggest": {//取名
      "text": "s", // 关键字
      "completion": {
        "field": "title", // 补全查询的字段
        "skip_duplicates": true, // 跳过重复的
        "size": 10 // 获取前10条结果
      }
    }
  }
}

RestAPI自动补全查询(与上面DSL查询结果一样

  1. String key="s";
  2. // 1.准备请求
  3. SearchRequest request = new SearchRequest("test");
  4. // 2.请求参数
  5. request.source().suggest(new SuggestBuilder()
  6. .addSuggestion(
  7. "title_suggest",//取名
  8. SuggestBuilders
  9. .completionSuggestion("title")//补全查询的字段
  10. .size(10)  // 获取前10条结果
  11. .skipDuplicates(true) // 跳过重复的
  12. .prefix(key)//关键字
  13. ));
  14. // 3.发出请求
  15. SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
  16. // 4.解析.......

数据同步:

数据同步思路分析

方案一:同步调用

优点:实现简单,粗暴   

缺点:业务耦合度高

 方案二:异步通知

优点:低耦合,实现难度一般

缺点:依赖mq的可靠性

方案三:监听binlog

优点:完全解除服务间耦合

缺点:开启binlog增加数据库负担、实现复杂度高

实现elasticsearch与数据库数据同步(异步通知)

引入依赖

<!--amqp-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置mq地址

spring:
  rabbitmq:
    host: 192.168.150.101
    port: 5672
    username: admmin
    password: 123456
    virtual-host: /          #虚拟主机

声明常量

  1. public class HotelMqConstants {
  2. public static final String EXCHANGE_NAME = "hotel.topic"; //交换机名称
  3. public static final String INSERT_QUEUE_NAME = "hotel.insert.queue"; //新增队列
  4. public static final String DELETE_QUEUE_NAME = "hotel.delete.queue"; //删除队列
  5. public static final String INSERT_KEY = "hotel.insert"; //新增RoutingKey
  6. public static final String DELETE_KEY = "hotel.delete"; //删除RoutingKey
  7. }

声明exchange、queue、RoutingKey(有基于注解和基于bean)

基于注解(在demo方声明)

  1. @Component
  2. public class HotelListener {
  3. @Autowired
  4. private IHotelService hotelService;//对es的增 删操作
  5. @RabbitListener(bindings = @QueueBinding(
  6. value = @Queue(name = HotelMqConstants.INSERT_QUEUE_NAME),
  7. exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
  8. key = HotelMqConstants.INSERT_KEY
  9. ))
  10. public void listenHotelInsert(Long hotelId){
  11. // 新增
  12. hotelService.saveById(hotelId);
  13. }
  14. @RabbitListener(bindings = @QueueBinding(
  15. value = @Queue(name = HotelMqConstants.DELETE_QUEUE_NAME),
  16. exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
  17. key = HotelMqConstants.DELETE_KEY
  18. ))
  19. public void listenHotelDelete(Long hotelId){
  20. // 删除
  21. hotelService.deleteById(hotelId);
  22. }
  23. }

基于bean(在demo方声明)

  1. @Configuration
  2. public class MqConfig {
  3. @Bean
  4. public TopicExchange topicExchange(){//定义交换机
  5. return new TopicExchange(HotelMqConstants.EXCHANGE_NAME,true,false); //交换机名字 是否持久化 是否自动删除
  6. }
  7. @Bean
  8. public Queue insertQueue(){//定义新增或修改队列
  9. return new Queue(HotelMqConstants.INSERT_QUEUE_NAME,true);//队列名字 持久化
  10. }
  11. @Bean
  12. public Queue deleteQueue(){//定义删除队列
  13. return new Queue(HotelMqConstants.DELETE_QUEUE_NAME,true);
  14. }
  15. @Bean
  16. public Binding insertQueueBinding(){//定义绑定关系
  17. return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(HotelMqConstants.INSERT_KEY);
  18. }
  19. @Bean
  20. public Binding deleteQueueBinding(){//定义绑定关系
  21. return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(HotelMqConstants.DELETE_KEY);
  22. }
  23. }

在admin中的增、删、改业务中完成消息发送

  1. @RestController
  2. @RequestMapping("hotel")
  3. public class HotelController {
  4. @Autowired
  5. private IHotelService hotelService; //对数据库的接口
  6. @Autowired
  7. private RabbitTemplate rabbitTemplate;
  8. @PostMapping
  9. public void saveHotel(@RequestBody Hotel hotel){
  10. // 新增酒店
  11. hotelService.save(hotel);
  12. // 发送MQ消息
  13. rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());
  14. }
  15. @PutMapping()
  16. public void updateById(@RequestBody Hotel hotel){
  17. if (hotel.getId() == null) {
  18. throw new InvalidParameterException("id不能为空");
  19. }
  20. hotelService.updateById(hotel);
  21. // 发送MQ消息
  22. rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());
  23. }
  24. @DeleteMapping("/{id}")
  25. public void deleteById(@PathVariable("id") Long id) {
  26. hotelService.removeById(id);
  27. // 发送MQ消息
  28. rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.DELETE_KEY, id);
  29. }
  30. }

在demo中完成消息监听,并更新elasticsearch中数据

  1. 注解方法
  2. @Component
  3. public class HotelListener {
  4. @Autowired
  5. private IHotelService hotelService;//对es的增 删操作
  6. @RabbitListener(bindings = @QueueBinding(
  7. value = @Queue(name = HotelMqConstants.INSERT_QUEUE_NAME),
  8. exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
  9. key = HotelMqConstants.INSERT_KEY
  10. ))
  11. public void listenHotelInsert(Long hotelId){
  12. // 新增
  13. hotelService.saveById(hotelId);
  14. }
  15. @RabbitListener(bindings = @QueueBinding(
  16. value = @Queue(name = HotelMqConstants.DELETE_QUEUE_NAME),
  17. exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
  18. key = HotelMqConstants.DELETE_KEY
  19. ))
  20. public void listenHotelDelete(Long hotelId){
  21. // 删除
  22. hotelService.deleteById(hotelId);
  23. }
  24. }
  25. //--------------------------------------------------------------
  26. bean方式
  27. @Component
  28. public class HotelListener {
  29. @Autowired
  30. private IHotelService hotelService;//对es的增 删操作
  31. @RabbitListener(queues=HotelMqConstants.INSERT_QUEUE_NAME)
  32. public void listenHotelInsert(Long hotelId){
  33. // 新增
  34. hotelService.saveById(hotelId);
  35. }
  36. @RabbitListener(queues=HotelMqConstants.DELETE_QUEUE_NAME)
  37. public void listenHotelDelete(Long hotelId){
  38. // 删除
  39. hotelService.deleteById(hotelId);
  40. }
  41. }
  42. //--------------------------------------------------------------
  43. @Slf4j
  44. @Service
  45. public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {
  46. @Autowired
  47. private RestHighLevelClient restHighLevelClient;
  48. @Override
  49. public void deleteById(Long hotelId) {
  50. try {
  51. // 1.创建request
  52. DeleteRequest request = new DeleteRequest("hotel", hotelId.toString());
  53. // 2.发送请求
  54. restHighLevelClient.delete(request, RequestOptions.DEFAULT);
  55. } catch (IOException e) {
  56. throw new RuntimeException("删除酒店数据失败", e);
  57. }
  58. }
  59. @Override
  60. public void saveById(Long hotelId) {
  61. try {
  62. // 查询酒店数据,应该基于Feign远程调用hotel-admin,根据id查询酒店数据(现在直接去数据库查)
  63. Hotel hotel = getById(hotelId);
  64. // 转换
  65. HotelDoc hotelDoc = new HotelDoc(hotel);
  66. // 1.创建Request
  67. IndexRequest request = new IndexRequest("hotel").id(hotelId.toString());
  68. // 2.准备参数
  69. request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
  70. // 3.发送请求
  71. restHighLevelClient.index(request, RequestOptions.DEFAULT);
  72. } catch (IOException e) {
  73. throw new RuntimeException("新增酒店数据失败", e);
  74. }
  75. }
  76. }

es集群:

搭建ES集群(创建三台es服务成一个集群)

部署es集群可以直接使用docker-compose来完成,但这要求你的Linux虚拟机至少有4G的内存空间

搭建es集群

首先编写一个docker-compose文件,内容如下:

version: '2.2'
services:
  es01:
    image: elasticsearch:7.12.1  //镜像
    container_name: es01    //容器名称
    environment:        //环境变量
      - node.name=es01    //节点名称
      - cluster.name=es-docker-cluster //集群名称 集群名称一样 es就会自动组装成一个集群(其它节点必须一致)
      - discovery.seed_hosts=es02,es03//另外两个节点的Ip地址(因为用的docker docker可以通过容器名互联)
      - cluster.initial_master_nodes=es01,es02,es03 //初始化的主节点 主从 这三个节点里选举
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"    //堆内存大小
    volumes:    //数据卷
      - data01:/usr/share/elasticsearch/data
    ports:        //docker映射
      - 9200:9200
    networks:
      - elastic
  es02:
    image: elasticsearch:7.12.1
    container_name: es02
    environment:
      - node.name=es02
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es03
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data02:/usr/share/elasticsearch/data
    ports:
      - 9201:9200
    networks:
      - elastic
  es03:
    image: elasticsearch:7.12.1
    container_name: es03
    environment:
      - node.name=es03
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es02
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data03:/usr/share/elasticsearch/data
    networks:
      - elastic
    ports:
      - 9202:9200
volumes:
  data01:
    driver: local
  data02:
    driver: local
  data03:
    driver: local

networks:
  elastic:
    driver: bridge

es运行需要修改一些linux系统权限,修改/etc/sysctl.conf文件

vi /etc/sysctl.conf

添加下面的内容:

vm.max_map_count=262144

然后执行命令,让配置生效:

sysctl -p

通过docker-compose启动集群:

docker-compose up -d

集群状态监控

使用cerebro来监控es集群状态,官方网址:GitHub - lmenezes/cerebro

解压即可使用,非常方便。

解压好的目录如下:

进入对应的bin目录: 

 双击其中的cerebro.bat文件即可启动服务。

访问http://localhost:9000 即可进入管理界面:

 输入你的elasticsearch的任意节点的地址和端口,点击connect即可:

绿色的条,代表集群处于绿色(健康状态)。 

创建索引库(分片)

注意:每个索引库的分片数量、副本数量都是在创建索引库时指定的,并且分片数量一旦设置以后无法修改。

利用kibana的DevTools创建索引库

1)在DevTools中输入指令:

PUT /itcast
{
  "settings": {
    "number_of_shards": 3, // 分片数量
    "number_of_replicas": 1 // 副本数量
  },
  "mappings": {
    "properties": {
      // mapping映射定义 ...
    }
  }
}

2)或利用cerebro创建索引库

 填写索引库信息:

点击右下角的create按钮即可

查看分片效果

回到首页,即可查看索引库分片效果:

集群节点角色

节点类型

配置参数

默认值

节点职责

master eligible

node.master

true

备选主节点:主节点可以管理和记录集群状态、决定分片在哪个节点、处理创建和删除索引库的请求

data

node.data

true

数据节点:存储数据、搜索、聚合、CRUD

ingest

node.ingest

true

数据存储之前的预处理

coordinating

上面3个参数都为false则为coordinating节点

路由请求到其它节点

合并其它节点处理的结果,返回给用户

默认情况下节点同时具备这四种角色

elasticsearch中的每个节点角色都有自己不同的职责,因此建议集群部署时,每个节点都有独立的角色。(三台coordinating节点  五台data节点  三台备选主节点)

 集群脑裂问题

例:

  有A B C 三台es服务组成集群 A为主 当因为B C因为网络阻塞与A无法通信时 但A又能去客户端通信,这时B C就会在他们中选举一个主节点 这时集群里就有了两个主节点

解决方案:

  为了避免脑裂,需要要求选票超过 ( eligible节点数量 + 1 )/ 2 才能当选为主,因此eligible节点数量最好是奇数。对应配置项是discovery.zen.minimum_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题

集群分布式存储

elasticsearch会通过hash算法来计算文档应该存储到哪个分片:

shard = hash(_routing) % number_of_shards

说明:

_routing默认是文档的id

算法与分片数量有关,因此索引库一旦创建,分片数量不能修改!

新增文档流程:

集群分布式查询

elasticsearch的查询分成两个阶段

        scatter phase:分散阶段,coordinating node会把请求分发到每一个分片

        gather phase:聚集阶段,coordinating node汇总data node的搜索结果,并处理为最终结                                   果集返回给用户

集群故障转移

集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全,这个叫做故障转移。

 当node1宕机后:星星代表主节点  红色代表宕机

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/961106
推荐阅读
相关标签
  

闽ICP备14008679号