赞
踩
目录
DSL实现度量(Metrics)和管道(pipeline)聚合:
自定义分词器 (创建索引库时,通过settings来配置自定义的analyzer(分词器))
因为拼音分词器适合在创建倒排索引的时候使用,但不能在搜索的时候使用
声明exchange、queue、RoutingKey(有基于注解和基于bean)
在demo中完成消息监听,并更新elasticsearch中数据
elasticsearch会通过hash算法来计算文档应该存储到哪个分片:
聚合常见的有三类:
1.桶(Bucket)聚合:用来对文档做分组
TermAggregation:按照文档字段值分组
Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组2.度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
avg:求平均值
max:求最大值
min:求最小值
stats:同时求max、min、avg、sum等3.管道(pipeline)聚合:其它聚合的结果为基础做聚合
注意:参与聚合的字段类型必须是 keyword 数值 日期 布尔
统计价格小于200的品牌(brand)并且按照统计的文档数量升序有几种
GET /hotel/_search
{"query": { //条件查询
"range": {
"price": {
"lte": 200 // 只对200元以下的文档聚合
}
}
},
"size": 0, // 设置size为0,结果中不包含文档,只包含聚合结果
"aggs": { // 定义聚合
"brandAgg": { //给聚合起个名字
"terms": { // 聚合的类型,按照品牌值聚合,所以选择term
"field": "brand", // 参与聚合的字段
"size": 20 // 希望获取的聚合结果数量"order": [ //聚合结果排序 按照_count降序 多条件排序 单条件 把中括号去掉即可
{ "_count": "asc"}, // 按照_count升序(聚合会统计Bucket内的文档数量,记为_count)
{ "_key": "desc"},
],
}
}
}
}
- // 1.准备请求
- SearchRequest request = new SearchRequest("hotel");
- //条件
- request.source().query(QueryBuilders.rangeQuery("price").lte(200));//只对200元以下的文档聚合
- //排序条件
- BucketOrder bucketOrder = BucketOrder.count(false); //这里的count方法中true表示升序排列,false代表降序排列
- BucketOrder keyOrder = BucketOrder.key(false);
- List<BucketOrder> bucketOrders=new ArrayList<>();
- bucketOrders.add(bucketOrder);
- bucketOrders.add(keyOrder);
- //聚合
- request.source().size(0); // 设置size为0,结果中不包含文档,只包含聚合结果
- request.source().aggregation(
- AggregationBuilders
- .terms("brandAgg") //聚合的类型,brandAgg聚合名字
- .field("brand") //参与聚合的字段
- .size(20)
- .order(bucketOrders)); //希望获取的聚合结果数量
-
- // 3.发送请求
- SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
- // 4.解析响应 .......
总结;
aggs代表聚合,与query同级,此时query的作用是?
限定聚合的的文档范围
聚合必须的三要素:
聚合名称
聚合类型
聚合字段
聚合可配置属性有:
size:指定聚合结果数量
order:指定聚合结果排序方式
field:指定聚合字段
要求获取每个品牌的用户评分(score)的min、max、avg等值
GET /hotel/_search
{
"size": 0,
"aggs": {//第一次聚合 根据品牌聚合
"brandAgg": {
"terms": {
"field": "brand",
"size": 20
},
"aggs": { // 是brandAgg聚合的子聚合,也就是分组后对每组分别计算(管道)
"score_stats": { // 聚合名称
"stats": { // 聚合类型,这里stats可以计算min、max、avg等(度量)
"field": "score" // 聚合字段,这里是score
}
}
}
}
}
}
- // 1.准备请求
- SearchRequest request = new SearchRequest("hotel");
- //条件
- request.source().query(QueryBuilders.rangeQuery("price").lte(200));//只对200元以下的文档聚合
- //度量(Metrics)和管道(pipeline)聚合
- StatsAggregationBuilder score = AggregationBuilders.stats("score");
- //聚合
- request.source().size(0); // 设置size为0,结果中不包含文档,只包含聚合结果
- request.source().aggregation(
- AggregationBuilders
- .terms("brandAgg") //聚合的类型,brandAgg聚合名字
- .field("brand") //参与聚合的字段
- .size(20) //希望获取的聚合结果数量
- .subAggregation(score));
-
- // 3.发送请求
- SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
- // 4.解析响应 .......
要实现根据字母做补全,就必须对文档按照拼音分词。在GitHub上恰好有elasticsearch的拼音分词插件 地址
安装方式与IK分词器一样,分三步:
1 解压
2 上传到虚拟机中,elasticsearch的plugin目录
3 重启elasticsearch 测试
测试
POST /_analyze
{
"text": "如家酒店",
"analyzer": "pinyin"
}结果就是 每一个字都有拼音 不是词
elasticsearch中分词器(analyzer)的组成包含三部分:
character filters:在tokenizer之前对文本进行处理。例如删除字符、替换字符
tokenizer:将文本按照一定的规则切割成词条(term)。例如keyword,就是不分词;还有ik_smart
tokenizer filter:将tokenizer输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等
创建倒排索引时:
搜索时,用户搜索“狮子”:
PUT /test
{
"settings": {
"analysis": {
"analyzer": { // 自定义分词器
"my_analyzer": { // 分词器名称
"tokenizer": "ik_max_word",
"filter": "py" //下面filter定义的py
}
},
"filter": { // 自定义tokenizer filter 配置
"py": { // 过滤器名称
"type": "pinyin", // 过滤器类型,这里是pinyin
"keep_full_pinyin": false, //不单个字分词
"keep_joined_full_pinyin": true, //全拼
"keep_original": true, //保留中文
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},"mappings": { //创建倒排索引时用my_analyzer分词器;字段在搜索时用ik_smart分词器
"properties": {
"name": {
"type": "text",
"analyzer": "my_analyzer",//创建
"search_analyzer": "ik_smart" //搜索
}
}
}
}
elasticsearch提供了Completion Suggester查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回
为了提高补全查询的效率,对于文档中字段的类型有一些约束:
参与补全查询的字段必须是completion类型
字段的内容一般是用来补全的多个词条形成的数组
例子:
// 创建索引库
PUT test
{
"mappings": {
"properties": {
"title":{
"type": "completion"
}
}
}
}// 示例数据
POST test/_doc
{
"title": ["Sony", "WH-1000XM3"]
}
POST test/_doc
{
"title": ["SK-II", "PITERA"]
}
POST test/_doc
{
"title": ["Nintendo", "switch"]
}// 自动补全查询
GET /test/_search
{
"suggest": {
"title_suggest": {//取名
"text": "s", // 关键字
"completion": {
"field": "title", // 补全查询的字段
"skip_duplicates": true, // 跳过重复的
"size": 10 // 获取前10条结果
}
}
}
}
RestAPI自动补全查询(与上面DSL查询结果一样)
- String key="s";
- // 1.准备请求
- SearchRequest request = new SearchRequest("test");
- // 2.请求参数
- request.source().suggest(new SuggestBuilder()
- .addSuggestion(
- "title_suggest",//取名
- SuggestBuilders
- .completionSuggestion("title")//补全查询的字段
- .size(10) // 获取前10条结果
- .skipDuplicates(true) // 跳过重复的
- .prefix(key)//关键字
- ));
- // 3.发出请求
- SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
- // 4.解析.......
优点:实现简单,粗暴
缺点:业务耦合度高
优点:低耦合,实现难度一般
缺点:依赖mq的可靠性
优点:完全解除服务间耦合
缺点:开启binlog增加数据库负担、实现复杂度高
<!--amqp--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
spring: rabbitmq: host: 192.168.150.101 port: 5672 username: admmin password: 123456 virtual-host: / #虚拟主机
- public class HotelMqConstants {
- public static final String EXCHANGE_NAME = "hotel.topic"; //交换机名称
- public static final String INSERT_QUEUE_NAME = "hotel.insert.queue"; //新增队列
- public static final String DELETE_QUEUE_NAME = "hotel.delete.queue"; //删除队列
- public static final String INSERT_KEY = "hotel.insert"; //新增RoutingKey
- public static final String DELETE_KEY = "hotel.delete"; //删除RoutingKey
- }
基于注解(在demo方声明)
- @Component
- public class HotelListener {
-
- @Autowired
- private IHotelService hotelService;//对es的增 删操作
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = HotelMqConstants.INSERT_QUEUE_NAME),
- exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
- key = HotelMqConstants.INSERT_KEY
- ))
- public void listenHotelInsert(Long hotelId){
- // 新增
- hotelService.saveById(hotelId);
- }
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = HotelMqConstants.DELETE_QUEUE_NAME),
- exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
- key = HotelMqConstants.DELETE_KEY
- ))
- public void listenHotelDelete(Long hotelId){
- // 删除
- hotelService.deleteById(hotelId);
- }
- }
基于bean(在demo方声明)
- @Configuration
- public class MqConfig {
-
- @Bean
- public TopicExchange topicExchange(){//定义交换机
- return new TopicExchange(HotelMqConstants.EXCHANGE_NAME,true,false); //交换机名字 是否持久化 是否自动删除
- }
- @Bean
- public Queue insertQueue(){//定义新增或修改队列
- return new Queue(HotelMqConstants.INSERT_QUEUE_NAME,true);//队列名字 持久化
- }
- @Bean
- public Queue deleteQueue(){//定义删除队列
- return new Queue(HotelMqConstants.DELETE_QUEUE_NAME,true);
- }
- @Bean
- public Binding insertQueueBinding(){//定义绑定关系
- return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(HotelMqConstants.INSERT_KEY);
- }
- @Bean
- public Binding deleteQueueBinding(){//定义绑定关系
- return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(HotelMqConstants.DELETE_KEY);
- }
- }
- @RestController
- @RequestMapping("hotel")
- public class HotelController {
-
- @Autowired
- private IHotelService hotelService; //对数据库的接口
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
-
- @PostMapping
- public void saveHotel(@RequestBody Hotel hotel){
- // 新增酒店
- hotelService.save(hotel);
- // 发送MQ消息
- rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());
- }
-
- @PutMapping()
- public void updateById(@RequestBody Hotel hotel){
- if (hotel.getId() == null) {
- throw new InvalidParameterException("id不能为空");
- }
- hotelService.updateById(hotel);
-
- // 发送MQ消息
- rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());
- }
-
- @DeleteMapping("/{id}")
- public void deleteById(@PathVariable("id") Long id) {
- hotelService.removeById(id);
-
- // 发送MQ消息
- rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.DELETE_KEY, id);
- }
- }
- 注解方法
- @Component
- public class HotelListener {
-
- @Autowired
- private IHotelService hotelService;//对es的增 删操作
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = HotelMqConstants.INSERT_QUEUE_NAME),
- exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
- key = HotelMqConstants.INSERT_KEY
- ))
- public void listenHotelInsert(Long hotelId){
- // 新增
- hotelService.saveById(hotelId);
- }
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = HotelMqConstants.DELETE_QUEUE_NAME),
- exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
- key = HotelMqConstants.DELETE_KEY
- ))
- public void listenHotelDelete(Long hotelId){
- // 删除
- hotelService.deleteById(hotelId);
- }
- }
-
-
- //--------------------------------------------------------------
-
- bean方式
- @Component
- public class HotelListener {
-
- @Autowired
- private IHotelService hotelService;//对es的增 删操作
-
- @RabbitListener(queues=HotelMqConstants.INSERT_QUEUE_NAME)
- public void listenHotelInsert(Long hotelId){
- // 新增
- hotelService.saveById(hotelId);
- }
-
- @RabbitListener(queues=HotelMqConstants.DELETE_QUEUE_NAME)
- public void listenHotelDelete(Long hotelId){
- // 删除
- hotelService.deleteById(hotelId);
- }
- }
-
-
- //--------------------------------------------------------------
-
- @Slf4j
- @Service
- public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {
-
- @Autowired
- private RestHighLevelClient restHighLevelClient;
-
-
- @Override
- public void deleteById(Long hotelId) {
- try {
- // 1.创建request
- DeleteRequest request = new DeleteRequest("hotel", hotelId.toString());
- // 2.发送请求
- restHighLevelClient.delete(request, RequestOptions.DEFAULT);
- } catch (IOException e) {
- throw new RuntimeException("删除酒店数据失败", e);
- }
- }
-
- @Override
- public void saveById(Long hotelId) {
- try {
- // 查询酒店数据,应该基于Feign远程调用hotel-admin,根据id查询酒店数据(现在直接去数据库查)
- Hotel hotel = getById(hotelId);
- // 转换
- HotelDoc hotelDoc = new HotelDoc(hotel);
-
- // 1.创建Request
- IndexRequest request = new IndexRequest("hotel").id(hotelId.toString());
- // 2.准备参数
- request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
- // 3.发送请求
- restHighLevelClient.index(request, RequestOptions.DEFAULT);
- } catch (IOException e) {
- throw new RuntimeException("新增酒店数据失败", e);
- }
- }
-
-
- }
部署es集群可以直接使用docker-compose来完成,但这要求你的Linux虚拟机至少有4G的内存空间
首先编写一个docker-compose文件,内容如下:
version: '2.2'
services:
es01:
image: elasticsearch:7.12.1 //镜像
container_name: es01 //容器名称
environment: //环境变量
- node.name=es01 //节点名称
- cluster.name=es-docker-cluster //集群名称 集群名称一样 es就会自动组装成一个集群(其它节点必须一致)
- discovery.seed_hosts=es02,es03//另外两个节点的Ip地址(因为用的docker docker可以通过容器名互联)
- cluster.initial_master_nodes=es01,es02,es03 //初始化的主节点 主从 这三个节点里选举
- "ES_JAVA_OPTS=-Xms512m -Xmx512m" //堆内存大小
volumes: //数据卷
- data01:/usr/share/elasticsearch/data
ports: //docker映射
- 9200:9200
networks:
- elastic
es02:
image: elasticsearch:7.12.1
container_name: es02
environment:
- node.name=es02
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es03
- cluster.initial_master_nodes=es01,es02,es03
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- data02:/usr/share/elasticsearch/data
ports:
- 9201:9200
networks:
- elastic
es03:
image: elasticsearch:7.12.1
container_name: es03
environment:
- node.name=es03
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es02
- cluster.initial_master_nodes=es01,es02,es03
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- data03:/usr/share/elasticsearch/data
networks:
- elastic
ports:
- 9202:9200
volumes:
data01:
driver: local
data02:
driver: local
data03:
driver: localnetworks:
elastic:
driver: bridge
es运行需要修改一些linux系统权限,修改/etc/sysctl.conf
文件
vi /etc/sysctl.conf
添加下面的内容:
vm.max_map_count=262144
然后执行命令,让配置生效:
sysctl -p
通过docker-compose启动集群:
docker-compose up -d
使用cerebro来监控es集群状态,官方网址:GitHub - lmenezes/cerebro
解压即可使用,非常方便。
解压好的目录如下:
进入对应的bin目录:
双击其中的cerebro.bat文件即可启动服务。
访问http://localhost:9000 即可进入管理界面:
输入你的elasticsearch的任意节点的地址和端口,点击connect即可:
绿色的条,代表集群处于绿色(健康状态)。
注意:每个索引库的分片数量、副本数量都是在创建索引库时指定的,并且分片数量一旦设置以后无法修改。
利用kibana的DevTools创建索引库
1)在DevTools中输入指令:
PUT /itcast
{
"settings": {
"number_of_shards": 3, // 分片数量
"number_of_replicas": 1 // 副本数量
},
"mappings": {
"properties": {
// mapping映射定义 ...
}
}
}
2)或利用cerebro创建索引库
填写索引库信息:
点击右下角的create按钮即可
回到首页,即可查看索引库分片效果:
节点类型 | 配置参数 | 默认值 | 节点职责 |
master eligible | node.master | true | 备选主节点:主节点可以管理和记录集群状态、决定分片在哪个节点、处理创建和删除索引库的请求 |
data | node.data | true | 数据节点:存储数据、搜索、聚合、CRUD |
ingest | node.ingest | true | 数据存储之前的预处理 |
coordinating | 上面3个参数都为false则为coordinating节点 | 无 | 路由请求到其它节点 合并其它节点处理的结果,返回给用户 |
默认情况下节点同时具备这四种角色
elasticsearch中的每个节点角色都有自己不同的职责,因此建议集群部署时,每个节点都有独立的角色。(三台coordinating节点 五台data节点 三台备选主节点)
例:
有A B C 三台es服务组成集群 A为主 当因为B C因为网络阻塞与A无法通信时 但A又能去客户端通信,这时B C就会在他们中选举一个主节点 这时集群里就有了两个主节点
解决方案:
为了避免脑裂,需要要求选票超过 ( eligible节点数量 + 1 )/ 2 才能当选为主,因此eligible节点数量最好是奇数。对应配置项是discovery.zen.minimum_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题
shard = hash(_routing) % number_of_shards
_routing默认是文档的id
算法与分片数量有关,因此索引库一旦创建,分片数量不能修改!
elasticsearch的查询分成两个阶段
scatter phase:分散阶段,coordinating node会把请求分发到每一个分片
gather phase:聚集阶段,coordinating node汇总data node的搜索结果,并处理为最终结 果集返回给用户
集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全,这个叫做故障转移。
当node1宕机后:星星代表主节点 红色代表宕机
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。