当前位置:   article > 正文

微服务-实用篇

微服务-实用篇

一、微服务治理

1.微服务远程调用

  • springCloud提供了RestTemplate,可以发起远程http协议的调用

  • 使用

    • 注入bean

          @Bean
          public RestTemplate restTemplate() {
              return new RestTemplate();
          }
      
      • 1
      • 2
      • 3
      • 4
    • restTemplate使用

      @RestController
      @RequestMapping("order")
      public class OrderController {
      
         @Autowired
         private OrderService orderService;
      
         @Autowired
         private RestTemplate restTemplate;
      
         private static final String UserBaseApiURL = "http://localhost:8081/user/";
      
         @GetMapping("{orderId}")
         public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) {
             // 根据id查询订单并返回
             Order order = orderService.queryOrderById(orderId);
             // 远程调用查询用户
             User user = restTemplate.getForObject(UserBaseApiURL + order.getUserId(), User.class);
             order.setUser(user);
             return order;
         }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22

2.Eureka注册中心

  • Eureka的作用:
    • 消费者该如何获取服务提供者具体信息?

      • 服务提供者启动时向eureka注册自己的信息
      • eureka保存这些信息
      • 消费者根据服务名称向eureka拉取提供者信息
    • 如果有多个服务提供者,消费者该如何选择?

      • 服务消费者利用负载均衡算法,从服务列表中挑选一个
    • 消费者如何感知服务提供者的健康状态?

      • 服务者会每隔30秒向EurekaServer发送心跳请求,报告健康状态
      • eureka会更新记录服务列表信息,心跳不正常会被剔除
      • 消费者就可以拉取到最新的信息
  • 搭建EurekaServer服务
    • 创建项目引入依赖

      <dependency>
                  <groupId>org.springframework.cloud</groupId>
                  <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
      </dependency>
      
      • 1
      • 2
      • 3
      • 4
    • 编写application.yml配置文件

      server:
        port: 10086
      spring:
        application:
          name: eureka-server # 服务名
      eureka:
        client:
          service-url: # 服务地址
            defaultZone: http://127.0.0.1:10086/eureka/
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
    • 在启动类上开启自动装配

      @EnableEurekaServer // 开启eureka自动装配
      @SpringBootApplication
      public class EurekaApplication {
      
          public static void main(String[] args) {
              SpringApplication.run(EurekaApplication.class , args);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
  • Client服务注册
    • 引入eureka-client依赖

      <dependency>
                  <groupId>org.springframework.cloud</groupId>
                  <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
      </dependency>
      
      • 1
      • 2
      • 3
      • 4
    • 在application.yml中配置eureka地址

      spring:
        datasource:
          url: jdbc:mysql://localhost:3306/cloud_order?useSSL=false
          username: root
          password: 123456
          driver-class-name: com.mysql.jdbc.Driver
        application:
          name: orderservice # 服务名
      eureka:
        client:
          service-url: # 服务地址
            defaultZone: http://127.0.0.1:10086/eureka/
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
  • 服务发现
    • 给RestTemplate添加@LoadBalanced注解

       @Bean
          @LoadBalanced
          public RestTemplate restTemplate() {
              return new RestTemplate();
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
    • 用服务提供者的服务名称远程调用

      @RestController
      @RequestMapping("order")
      public class OrderController {
      
         @Autowired
         private OrderService orderService;
      
         @Autowired
         private RestTemplate restTemplate;
      
         private static final String UserBaseApiURL = "http://userservice/user/";
      
         @GetMapping("{orderId}")
         public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) {
             // 根据id查询订单并返回
             Order order = orderService.queryOrderById(orderId);
             // 远程调用查询用户
             User user = restTemplate.getForObject(UserBaseApiURL + order.getUserId(), User.class);
             order.setUser(user);
             return order;
         }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
  • Ribbon负载均衡策略配置
      1. 代码方式,注入IRule实例bean

            @Bean
            public IRule randomIRule() {
                return new RandomRule();
            }
        
        • 1
        • 2
        • 3
        • 4
    • 2.配置文件方式: 在application.yml文件中添加配置项

    userservice:
     ribbon:
      NFLoadBalancerRuleClassName: com.netfix.loadbalancer.RandomRule
    
    • 1
    • 2
    • 3
  • Ribbon配置饥饿加载
    • ribbon默认采用懒加载,即第一次访问时才会去创建LoadBalanceClient,请求时间会很长

    • 在application.yml中开启饥饿加载

      ribbon:
        eager-load:
          enabled: true # 开启饥饿加载
          clients: 
           - userservice # 指定对userservice这个服务饥饿加载
      
      • 1
      • 2
      • 3
      • 4
      • 5

3.nacos注册中心

  • 使用nacos注册中心服务
    • 引入依赖

                  <!--    nacos的管理依赖        -->
                  <dependency>
                      <groupId>com.alibaba.cloud</groupId>
                      <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                      <version>2.2.5.RELEASE</version>
                      <type>pom</type>
                      <scope>import</scope>
                  </dependency>
      
      <!--   nacos客户端依赖包     -->
              <dependency>
                  <groupId>com.alibaba.cloud</groupId>
                  <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
              </dependency>
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
    • 修改application.yml文件nacos配置

      spring:
        application:
          name: orderservice # 服务名
        cloud:
          nacos:
            server-addr: localhost:8848 # nacos端口号
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
  • nacos区域负载均衡
    • nacos默认负载均衡策略为服务轮询,一个服务往往会在多地部署多个实例,相同区域内的服务之间相互调用时长消耗更短,因此区域内优先调用比较合理,nacos提供了这样的负载均衡策略,不过区域内的策略为随机策略,当区域内没有可用服务时再访问其他区域的可用服务。

    • application.yml配置区域

      spring:
        application:
          name: userservice # 服务名
        cloud:
          nacos:
            server-addr: localhost:8848
            discovery:
              cluster-name: HZ # 集群名称
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
    • 为Ribbon配置区域优先策略

      userservice:
        ribbon:
          NFLoadBalancerRuleClassName: com.alibaba.cloud.nacos.ribbon.NacosRule
      
      • 1
      • 2
      • 3
  • nacos环境隔离-namespace
    • 不同的时期有不同的环境,比如开发时需要开发环境,namespace就可以进行环境隔离,不同环境之间的服务无法调用。

    • application.yaml 配置namespace

      spring:
        application:
          name: orderservice # 服务名
        cloud:
          nacos:
            server-addr: localhost:8848
            discovery:
              cluster-name: HZ # 集群名称
              namespace: fd3445b3-8e01-446a-91d5-03ab8b5a7205 # 环境id(从nacos控制台查看)
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
  • Nacos和Eureka的对比
    • 共同点

      • 都支持服务注册和服务拉取
      • 都支持服务提供者心跳方式做健康检测
    • 区别

      • Nacos支持服务端主动监测提供者状态:临时实例采用心跳模式,非临时实例采用主动健康监测。

        spring:
          application:
            name: orderservice # 服务名
          cloud:
            nacos:
              server-addr: localhost:8848
              discovery:
                cluster-name: HZ # 集群名称
                ephemeral: false # 非临时实例
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
      • 临时实例心跳不正常会被剔除,非临时实例则不会被剔除。

      • Nacos支持服务列表变更的消息推送模式,服务列表更新及时。

      • Nacos集群默认采用AP方式,当集群中存在非临时实例时,采用CP模式;Eureka采用AP模式。

  • nacos配置管理
    • 配置管理步骤
      • 在控制台添加配置文件

        外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

      • 添加依赖

                <!--   nacos配置管理依赖     -->
                <dependency>
                    <groupId>com.alibaba.cloud</groupId>
                    <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
                </dependency>
        
        • 1
        • 2
        • 3
        • 4
        • 5
      • 在bootstrap.yml文件中配置

        spring:
          application:
            name: userservice
          profiles:
            active: dev # 开发环境
          cloud:
            nacos:
              server-addr: localhost:8848
              config:
                file-extension: yaml # 文件后缀
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
      • 使用@Value注解注入

          @Value("${pattern.format}")
          private String format;
        
        • 1
        • 2
    • 配置热更新
      • 通过@Value方式注入的配置属性需要在类上添加@RefreshScope注解即可实现配置热更新

        @Slf4j
        @RestController
        @RequestMapping("/user")
        @RefreshScope // 配置热更新注解
        public class UserController {
        
            @Autowired
            private UserService userService;
        	// 配置属性注入
            @Value("${pattern.format}")
            private String format;
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
      • 通过@ConfigurationProperties方式注入的属性自动热更新

        @Component
        @ConfigurationProperties(prefix = "pattern")
        @Data
        public class PatternProperties {
            private String format;
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
    • 多环境配置共享
      • 微服务会从nacos读取的配置文件:
        • [服务名]-[spring.proflle.active].yaml
        • [服务名].yaml,默认配置,多环境共享
      • 优先级
        • [服务名]-[环境].yaml > [服务名].yaml > 本地配置

4.http客户端Feign

  • Feigin的使用步骤
    • 引入依赖

              <!--   fegin依赖     -->
              <dependency>
                  <groupId>org.springframework.cloud</groupId>
                  <artifactId>spring-cloud-starter-openfeign</artifactId>
              </dependency>
      
      • 1
      • 2
      • 3
      • 4
      • 5
    • 启动类添加@EnableFeignClients注解

      @MapperScan("cn.itcast.order.mapper")
      @SpringBootApplication
      @EnableFeignClients
      public class OrderApplication {
      
          public static void main(String[] args) {
              SpringApplication.run(OrderApplication.class, args);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
    • 编写FeignClient接口

      
      @FeignClient("userservice")
      public interface UserClient {
      
          @GetMapping("user/{id}")
          User findById(@PathVariable("id") Long id);
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
    • 使用FeignClient中定义的方法代替RestTemplate

      @RestController
      @RequestMapping("order")
      public class OrderController {
      
         @Autowired
         private OrderService orderService;
      
         @Autowired
         private UserClient userClient;
      
         @GetMapping("{orderId}")
         public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) {
             // 根据id查询订单并返回
             Order order = orderService.queryOrderById(orderId);
             // 远程调用查询用户
             User user = userClient.findById(order.getUserId());
             order.setUser(user);
             return order;
         }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
  • Feign的日志配置
    • 方式一是配置文件

      feign:
        client:
          config:
            default: # 这里是default就是全局配置,如果是写服务名,则针对某个微服务的配置
              logger-level: FULL # 日志级别
      
      • 1
      • 2
      • 3
      • 4
      • 5
    • 方式二是JAVA代码配置类

      public class FeignClientConfiguration {
          @Bean
          public Logger.Level feignLogLevel(){
              return Logger.Level.FULL;
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      @MapperScan("cn.itcast.order.mapper")
      @SpringBootApplication
      // 在启动类上添加的配置类属于全局配置
      
      @EnableFeignClients(defaultConfiguration = FeignAutoConfiguration.class)
      public class OrderApplication {
      
          public static void main(String[] args) {
              SpringApplication.run(OrderApplication.class, args);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      // 如果需要对某一服务进行配置在服务接口上添加即可
      @FeignClient(value = "userservice" , configuration = FeignClientConfiguration.class)
      public interface UserClient {
      
          @GetMapping("user/{id}")
          User findById(@PathVariable("id") Long id);
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
  • Feign的性能优化
    • 引入依赖

              <dependency>
                  <groupId>io.github.openfeign</groupId>
                  <artifactId>feign-httpclient</artifactId>
              </dependency>
      
      • 1
      • 2
      • 3
      • 4
    • 配置连接池

      feign:
        client:
          config:
            default: # 这里是default就是全局配置,如果是写服务名,则针对某个微服务的配置
              logger-level: FULL # 日志级别
        httpclient:
          enabled: true # 开启feign对HttpClient的支持
          max-connections: 200 # 最大连接数
          max-connections-per-route: 50 # 每个路径的最大连接数
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9

5.统一网关Gateway

  • 作用
    • 对用户请求做身份验证,权限认证
    • 将用户请求路由到微服务,并实现负载均衡
    • 将用户请求做限流
  • 搭建网关服务
    • 创建新的module,引入SpringCloudGateWay的依赖和服务发现依赖

          <dependencies>
              <!--    nacos服务发现依赖    -->
              <dependency>
                  <groupId>com.alibaba.cloud</groupId>
                  <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
              </dependency>
              <!--   网关gateway依赖     -->
              <dependency>
                  <groupId>org.springframework.cloud</groupId>
                  <artifactId>spring-cloud-starter-gateway</artifactId>
              </dependency>
          </dependencies>
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
    • 编写路由配置及nacos地址

      server:
        port: 10010
      spring:
        application:
          name: gateway
        cloud:
          nacos:
            server-addr: localhost:80 # nacos地址
          gateway:
            routes: # 网关路由配置
              - id: user-service # 路由id 自定义
                uri: lb://userservice # 路由的目标地址 lb是负载均衡
                predicates: # 路由断言
                  - Path=/user/** # 这个是按照路径匹配,只要以/user/开头就符合要求
              - id: order-service
                uri: lb://orderservice
                predicates: # 路由断言
                  - Path=/order/** # 这个是按照路径匹配,只要以/order/开头就符合要求
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
  • 路由的过滤器配置
    • 为某个服务添加过滤器

      spring:
        application:
          name: gateway
        cloud:
          nacos:
            server-addr: localhost:8848 # nacos地址
          gateway:
            routes: # 网关路由配置
              - id: user-service # 路由id 自定义
                uri: lb://userservice # 路由的目标地址 lb是负载均衡
                predicates: # 路由断言
                  - Path=/user/** # 这个是按照路径匹配,只要以/user/开头就符合要求
              - id: order-service
                uri: lb://orderservice
                predicates: # 路由断言
                  - Path=/order/** # 这个是按照路径匹配,只要以/order/开头就符合要求
                filters:
                  - AddRequestHeader=color, blue # 局部过滤器
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
    • 添加默认过滤器(全局)

      spring:
        application:
          name: gateway
        cloud:
          nacos:
            server-addr: localhost:8848 # nacos地址
          gateway:
            routes: # 网关路由配置
              - id: user-service # 路由id 自定义
                uri: lb://userservice # 路由的目标地址 lb是负载均衡
                predicates: # 路由断言
                  - Path=/user/** # 这个是按照路径匹配,只要以/user/开头就符合要求
              - id: order-service
                uri: lb://orderservice
                predicates: # 路由断言
                  - Path=/order/** # 这个是按照路径匹配,只要以/order/开头就符合要求
            default-filters:
              - AddRequestHeader=color, blue # 全局过滤器
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
  • 全局过滤器
    @Order(1) // 过滤器等级 越低优先值越高
    @Component
    public class AuthorizeFilter implements GlobalFilter {
        @Override
        public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
            ServerHttpRequest request = exchange.getRequest();
            HttpHeaders headers = request.getHeaders();
            String token = headers.getFirst("token");
            if (token != null && token.equals("abc")) {
                return chain.filter(exchange);
            }
            exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
            return exchange.getResponse().setComplete();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
  • 过滤器链执行顺序
    • 每一个过滤器都必须指定一个int类型的order值,order值越小,优先级越高,执行顺序越靠前。
    • GlobalFilter通过实现Ordered接口,或者添加@Order注解来指定Order值,由我们自己指定
    • 路由过滤器和defaultFilter的order由Spring指定,默认是按照声明顺序从1递增
    • 当过滤器的order值一样时,会按照default>路由过滤器>GlobalFilter的循序执行

二、异步通信

1.什么是AMQP?

  • 应用层消息通信的一种协议,与语言和平台无关

2.部署RabbitMQ

RabbitMQ部署指南.md

3.SpingAMQP如何发送消息

  • 引入AMQP的Starter依赖

    <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
  • 配置RabbitMQ地址

    spring:
      rabbitmq:
        addresses: 192.168.88.101 # 地址名
        port: 5672 # 端口
        virtual-host: / # 虚拟主机
        username: itcast
        password: 123321
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
  • 新建测试类

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class SpringAMQPTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testSimpleQueue() {
            String queueName = "simple.queue";
            String message = "hello , Spring amqp!";
            rabbitTemplate.convertAndSend(queueName , message);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

4.SpingAMQP如何接收消息

  • 配置地址

    spring:
      rabbitmq:
        addresses: 192.168.88.101 # 地址名
        port: 5672 # 端口
        virtual-host: / # 虚拟主机
        username: itcast
        password: 123321
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
  • 新建类

    @Component
    public class SpringRabbitListener {
    
        @RabbitListener(queues = "simple.queue") // 在启动前要确保该队列存在!
        public void listenSimpleQueue(String msg) {
            System.out.println("消费者1接收到消息 = " + msg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

5.WorkQueue模型

  • 多个模型绑定到一个队列,用一个消息会被一个消费者处理

  • 通过设置prefetch来控制消费者预取的消息数量

    logging:
      pattern:
        dateformat: MM-dd HH:mm:ss:SSS
    spring:
      rabbitmq:
        addresses: 192.168.88.101 # 地址名
        port: 5672 # 端口
        virtual-host: / # 虚拟主机
        username: itcast
        password: 123321
        listener:
          simple:
            prefetch: 1 # 修改消费者提前把握的最大数量
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
  • 示例:

    • 设置发布50条消息

          @Test
          public void testSimpleWorkQueue() {
              String queueName = "simple.queue";
              for (int i = 0; i < 50; i++) {
                  String message = "hello , Spring amqp! - " + (i + 1);
                  rabbitTemplate.convertAndSend(queueName , message);
              }
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
    • 两个接收者

      @Component
      public class SpringRabbitListener {
      
          @RabbitListener(queues = "simple.queue")
          public void listenSimpleQueue(String msg) {
              System.out.println("消费者1接收到消息 = " + msg);
          }
      
          @RabbitListener(queues = "simple.queue")
          public void listenSimpleQueue2(String msg) throws InterruptedException {
              System.err.println("消费者2接收到消息 = " + msg);
              Thread.sleep(500);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14

6.交换机

  • 交换机的作用是什么?
    • 接收publisher发送的消息
    • 将消息按照规则路由到与之绑定的队列
    • 不能缓存消息,路由失败,消息丢失
    • FanoutExChange的会将消息路由到每个绑定的队列

7.FanoutExchange

  • 特点:

    • 会将交换机接收的消息转发给绑定的所有队列,所有与之监听的消费者全部都会收到消息
  • 编写配置类

    @Configuration
    public class FanoutConfig {
        // 声明FanoutExchange交换机
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange("itcast.fanout");
        }
    
        // 声明第一个队列
        @Bean
        public Queue queue1() {
            return new Queue("fanout.queue1");
        }
    
        // 绑定队列1和交换机
        @Bean
        public Binding bindingQueue1(Queue queue1 , FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(queue1).to(fanoutExchange);
        }
    
        // 声明第二个队列
        @Bean
        public Queue queue2() {
            return new Queue("fanout.queue2");
        }
    
        // 绑定队列2和交换机
        @Bean
        public Binding bindingQueue2(Queue queue2 , FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(queue2).to(fanoutExchange);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
  • 编写消费者代码

        @RabbitListener(queues = "fanout.queue1")
        public void listenFanoutQueue1(String msg) {
            System.out.println("消费者1接收到消息 = " + msg);
        }
    
        @RabbitListener(queues = "fanout.queue2")
        public void listenFanoutQueue2(String msg) throws InterruptedException {
            System.err.println("消费者2接收到消息 = " + msg);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

8.DirectExchange

  • 特点:

    • 该交换机需要设置key值,当该交换机收到消息时,交换机会转发给指定key值的队列。存在多个相同的key值时,则群发
  • 案例

    • 编写接收者,绑定交换机(注解方式)

          @RabbitListener(bindings = @QueueBinding(
                  value = @Queue( name = "direct.queue1"),
                  exchange = @Exchange( name = "itcast.direct" , type = ExchangeTypes.DIRECT),
                  key = {"red" , "blue" }
          ))
          public void listenDirectQueue1(String msg) {
              System.out.println("消费者1接收到消息 = " + msg);
          }
      
          @RabbitListener(bindings = @QueueBinding(
                  value = @Queue( name = "direct.queue2"),
                  exchange = @Exchange( name = "itcast.direct" , type = ExchangeTypes.DIRECT),
                  key = {"red" , "pink" }
          ))
          public void listenDirectQueue2(String msg) throws InterruptedException {
              System.err.println("消费者2接收到消息 = " + msg);
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
    • 编写发布者

          @Test
          public void testDirectQueue() {
              String queueName = "simple.queue";
              String message = "hello , Spring amqp!";
              // 交换机name
              String exchangeName = "itcast.direct";
              rabbitTemplate.convertAndSend(exchangeName , "red" , message);
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8

9.TopicExchange

  • 特点

    • TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以 . 分割。
    • Queue与Exchange指定BindingKey时可以使用通配符:
      • #:代指0个或多个单词
      • *:代指一个单词
  • 示例:

    • 编写接收者

          @RabbitListener(bindings = @QueueBinding(
                  value = @Queue( name = "topic.queue1"),
                  exchange = @Exchange( name = "itcast.topic" , type = ExchangeTypes.TOPIC),
                  key = {"china.*" }
          ))
          public void listenTopicQueue1(String msg) {
              System.out.println("消费者1接收到消息 = " + msg);
          }
      
          @RabbitListener(bindings = @QueueBinding(
                  value = @Queue( name = "topic.queue2"),
                  exchange = @Exchange( name = "itcast.topic" , type = ExchangeTypes.TOPIC),
                  key = {"china.*" }
          ))
          public void listenTopicQueue2(String msg) {
              System.err.println("消费者2接收到消息 = " + msg);
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
    • 编写发送者

          @Test
          public void testTopicQueue() {
              String queueName = "simple.queue";
              String message = "hello , Spring amqp!";
              // 交换机name
              String exchangeName = "itcast.topic";
              rabbitTemplate.convertAndSend(exchangeName , "china.news" , message);
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8

10.消息转换器

  • RabbieMQ是可以传递java对象的,通过MessageConverter实现,但是默认是JDk的序列化,最好为其配置JSON对象转化器,注意发布与接收方必须使用相同的MessageConverter

  • 案例

    • 消息发布方

      • 引入JackSon依赖

        <dependency>
           <groupId>com.fasterxml.jackson.dataformat</groupId>
           <artifactId>jackson-dataformat-xml</artifactId>
           <version>2.16.1</version>
        </dependency>
        
        • 1
        • 2
        • 3
        • 4
        • 5
      • 注入Bean

            @Bean // 注入json消息转化器
            public MessageConverter messageConverter() {
                return new Jackson2JsonMessageConverter();
            }
        
        • 1
        • 2
        • 3
        • 4
      • 发布消息

            @Test
            public void ObjectQueue() {
                String queueName = "object.queue";
        
                HashMap<String, String> hashMap = new HashMap<>();
                hashMap.put("name" , "小强");
                hashMap.put("age" , "18");
        
                rabbitTemplate.convertAndSend(queueName , hashMap);
            }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
    • 消息接收方

      • 引入JackSon依赖

        <dependency>
           <groupId>com.fasterxml.jackson.dataformat</groupId>
           <artifactId>jackson-dataformat-xml</artifactId>
           <version>2.16.1</version>
        </dependency>
        
        • 1
        • 2
        • 3
        • 4
        • 5
      • 注入Bean

            @Bean // 注入json消息转化器
            public MessageConverter messageConverter() {
                return new Jackson2JsonMessageConverter();
            }
        
        • 1
        • 2
        • 3
        • 4
      • 定义队列

            @Bean
            public Queue ObjectQueue() {
                return new Queue("object.queue");
            }
        
        • 1
        • 2
        • 3
        • 4
      • 接收消息

            @RabbitListener(queues = "object.queue")
            public void listenObjectQueue2(HashMap<String , Object> msg) {
                System.err.println("消费者接收到消息 = " + msg);
            }
        
        • 1
        • 2
        • 3
        • 4

三、分布式搜索

1.初识elasticsearch

  • 什么是elasticsearch?

    • 一个开源的分布式搜索引擎,可以用来实现搜索、日志统计、分析、系统监控等功能
  • 什么是elastic stack (ELK)?

    • 是指以elasticsearch为核心的技术栈,包括beats、logstash、kibana、elasticsearch

2.安装elastic

安装elasticsearch.md

3.索引库的操作

  • mapping属性

    • type:字段数据类型,常见的简单类型有:
      • 字符串:text(可分词文本)、keyword(精确值,例如:品牌、国家)
      • 数值:long、integer、short、byte、double、float
      • 布尔:boolean
      • 日期:date
      • 对象:object
    • index:是否创建索引,默认为true
    • analyzer:使用哪种分词器
    • properties:该字段的子字段
  • 索引库操作

    • 创建索引库

      PUT /索引库名

      # 创建索引库
      PUT /heima
      {
        "mappings": {
          "properties": {
            "info": {
              "type": "text",
              "analyzer": "ik_smart"
            },
            "email" : {
              "type": "keyword",
              "index": false
            },
            "name" : {
              "properties": {
                "firstName" : {
                  "type" : "keyword",
                  "index" : false
                },
                "lastName" : {
                  "type" : "keyword",
                  "index" : false
                }
              }
            }
          }
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
    • 查看索引库

      GET /索引库名

      # 查询索引库
      GET /heima
      
      • 1
      • 2
    • 删除索引库

      DELETE /索引库名

      # 删除索引库
      DELETE /黑马
      
      • 1
      • 2
    • 添加字段

      PUT /索引库名/_mapping

      # 添加新字段
      PUT /heima/_mapping
      {
        "properties": {
          "color" : {
            "type" : "keyword",
            "index" : false
          }
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10

4.文档操作

  • 添加文档

    • 模板

      POST /索引库名/_doc/文档id
      {
          "字段1" : "值1",
          "字段2" : "值2",
          "字段3" : {
              "子属性1" : "值3",
              "子属性2" : "值4",
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
    • 示例

      # 添加文档
      POST /heima/_doc/1
      {
        "info" : "尚硅谷,让天下没有学完的技术!",
        "email" : "1482939313@qq.com",
        "name" : {
          "firstName" : "尚",
          "lastName" : "硅谷"
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
  • 查看文档

    • 模板

      GET /索引库名/_doc/文档id
      
      • 1
    • 示例

      GET /heima/_doc/1
      
      • 1
  • 删除文档

    • 模板

      DELETE /索引库名/_doc/文档id
      
      • 1
    • 示例

      DELETE /heima/_doc/1
      
      • 1
  • 修改文档

    • 方式一:全量修改

      • 特点

        id存在则修改,不存在则创建

      • 模板

        PUT /索引库名/_doc/文档id
        {
        	"字段1" : "值1",
            "字段2" : "值2",
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
    • 方式二:局部修改

      • 模板

        POST /索引库名/_update/文档id
        {
        	"doc" : {
        		"字段1" : "值1",
            	"字段2" : "值2",
        	}
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7

5.RestClient

  • RestClient的初步使用
    • 引入依赖

      <dependency>
         <groupId>org.elasticsearch.client</groupId>
         <artifactId>elasticsearch-rest-high-level-client</artifactId>
         <version>7.12.1</version>
      </dependency>
      
      • 1
      • 2
      • 3
      • 4
      • 5
      <!-- 注意版本控制 -->
          <properties>
              <java.version>1.8</java.version>
              <elasticsearch.version>7.12.1</elasticsearch.version>
          </properties>
      
      • 1
      • 2
      • 3
      • 4
      • 5
    • 初始化RestHighLevelClient

      package cn.itcast.hotel;
      
      @SpringBootTest
      class HotelDemoApplicationTests {
      
          private RestHighLevelClient client;
          @BeforeEach
          void setUp() {
              this.client = new RestHighLevelClient(RestClient.builder(
                      HttpHost.create("http://192.168.88.101:9200")
              ));
          }
      
          @AfterEach
          void tearDown() throws IOException {
              this.client.close();
          }
      
          @Test
          void contextLoads() {
              System.out.println(this.client);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
  • RestClient增删改查
    • 操作索引库

      • 创建索引库

            // 新建索引
            @Test
            public void addIndex() throws IOException {
                // 创建Request对象
                CreateIndexRequest request = new CreateIndexRequest("hotel");
                // 准备请求参数 RestClientConstant.HOTELTEMPLATE 为新建索引的json结构
                request.source(RestClientConstant.HOTELTEMPLATE , XContentType.JSON);
                // 发送请求
                client.indices().create(request , RequestOptions.DEFAULT);
            }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
      • 删除和判断索引库

            // 删除索引
            @Test
            public void deleteIndex() throws IOException {
                // 创建Request对象
                DeleteIndexRequest request = new DeleteIndexRequest("hotel");
                // 发送请求
                client.indices().delete(request , RequestOptions.DEFAULT);
            }
        
            // 判断索引库是否存在
            @Test
            public void existsIndex() throws IOException {
                // 创建Request对象
                GetIndexRequest request = new GetIndexRequest("hotel");
                // 发送请求
                boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
                System.out.println( exists ? "索引库存在" : "索引库不存在");
            }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
    • 操作文档

      • 新增文档

            // 新增文档
            @Test
            public void addDocument() throws IOException {
                // 获取信息
                Hotel hotel = hotelService.getById(36934L);
                HotelDoc hotelDoc = new HotelDoc(hotel);
                // 创建Request对象 并设置id
                IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
                // 设置请求
                request.source(JSON.toJSONString(hotelDoc) , XContentType.JSON);
                // 发送请求
                client.index(request , RequestOptions.DEFAULT);
            }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
      • 查询文档

            // 新增文档
            @Test
            public void findDocument() throws IOException {
                // 创建Request对象 并设置id
                GetRequest request = new GetRequest("hotel").id("36934");
                // 发送请求
                GetResponse response = client.get(request, RequestOptions.DEFAULT);
                String json = response.getSourceAsString();
                HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
                System.out.println("hotelDoc = " + hotelDoc);
            }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
      • 更新文档

            // 更新文档
            @Test
            public void updateDocument() throws IOException {
                // 创建Request对象 并设置id
                UpdateRequest request = new UpdateRequest("hotel", "36934");
                // 设置更新信息 每两个参数为一对 key,value
                request.doc(
                        "city" , "美国"
                );
                // 发送请求
                client.update(request , RequestOptions.DEFAULT);
            }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
      • 删除文档

            // 删除文档
            @Test
            public void deleteDocument() throws IOException {
                // 创建Request对象 并设置id
                DeleteRequest req = new DeleteRequest("hotel", "36934");
                // 发送请求
                client.delete(req , RequestOptions.DEFAULT);
            }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
      • 批量操作

            // 批量操作
            @Test
            public void bulkDocument() throws IOException {
                // 获取数据
                List<Hotel> hotelList = hotelService.list();
        
                // 创建Request对象
                BulkRequest bulkRequest = new BulkRequest();
        
                // 写入
                hotelList.stream().forEach( item -> {
                    HotelDoc hotelDoc = new HotelDoc(item);
                    bulkRequest.add(new IndexRequest("hotel")
                                    .id(item.getId().toString()).source(JSON.toJSONString(hotelDoc) , XContentType.JSON));
                        }
                );
                
                // 发送请求
                client.bulk(bulkRequest , RequestOptions.DEFAULT);
            }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20

6.elasticsearch搜索功能

  • DSL查询语法
    • 基本语法

      GET /indexName/_search
      {
          "query" : {
              "查询类型" : {
                  "查询条件" : "条件值"
              }
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
    • 查询所有

      GET /hotel/_search
      {
          "query" : {
              “match_all” : {
                  
              }
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
    • 全文检索查询

      • 会对查询text进行分词,查询倒排索引

      • match和multi_match的区别:

        match查询单字段,multi_match根据多个字段查询,参与查询字段越多,查询性能越差

      # match查询
      GET /hotel/_search
      {
        "query": {
          "match": {
            "all": "上海如家"
          }
        }
      }
      
      # multi_match查询
      GET /hotel/_search
      {
        "query": {
          "multi_match": {
            "query": "上海如家",
            "fields": ["brand" , "name" , "business"]
          }
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
    • 精确查询

      # term查询
      GET /hotel/_search
      {
        "query": {
          "term": {
            "brand": {
              "value": "如家"
            } J
          }
        }
      }
      
      # range查询 范围
      GET /hotel/_search
      {
        "query": {
          "range": {
            "price": {
              "gte": 400,
              "lte": 500
            }
          }
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
    • 地理查询

      • geo_bounding_box:查询geo_point值落在某个矩形范围的所有文档

        # geo_bounding_box 边界框查询
        GET /hotel/_search
        {
          "query": {
            "geo_bounding_box" : {
              "location" : {
                "top_left" : {
                  "lat" : 31.1,
                  "lon" : 121.5
                },
                "bottom_right" : {
                  "lat" : 30.9,
                  "lon" : 121.7
                }
              }
            }
          }
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
      • geo_distance:查询到指定中心点小于某个距离值的所有文档

        # geo_distance 距离中心点查询
        GET /hotel/_search
        {
          "query": {
            "geo_distance" : {
              "distance" : "15km",
              "location" : "31.21,121.5"
            }
          }
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
    • 复合查询

      • function score query 可以修改文档的相关性算分(query socre),根据新得到的算分排序。

      • function score query定义的三要素

        • 过滤条件:哪些文档要加分
        • 算分函数:如何计算function score
        • 加权方式:funcation score 与 query score如何运算
        # function sorce 
        GET /hotel/_search
        {
          "query": {
            "function_score": {
              "query": {
                "match": {
                  "all": "上海"
                }
              },
              "functions": [
                {
                  "filter": {
                    "term": {
                      "brand": "万豪"
                    }
                  },
                  "weight": 10
                }
              ],
              "boost_mode": "multiply"
            }
          }
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
      • 复合查询Boolean Query是一个或多个查询子句的组合。子查询的组合方式有:

        • must:必须匹配每个子查询,类似“与”
        • should:选择性匹配子查询,类似“或”
        • must_not:必须不匹配,不参与算分,类似“非”
        • filter:必须匹配,不参与算分
        # 搜索名字包含“如家”,价格不高于400,在坐标31.21,121.5周围10km范围内的酒店
        # 搜索名字包含“如家”,价格不高于400,在坐标31.21,121.5周围10km范围内的酒店
        GET /hotel/_search
        {
          "query": {
            "bool": {
              "must": [
                {
                  "match": {
                    "name": "如家"
                  }
                }
              ],
              "must_not": [
                {
                  "range": {
                    "price": {
                      "gt": 400
                    }
                  }
                }
              ],
              "filter": [
                {
                  "geo_distance": {
                    "distance": "10km",
                    "location": {
                      "lat": 31.21,
                      "lon": 121.5
                    }
                  }
                }
              ]
            }
          }
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
        • 28
        • 29
        • 30
        • 31
        • 32
        • 33
        • 34
        • 35
        • 36
    • 排序

      • elasticsearch默认根据相关度算分,也可以指定

      • 简单类型

        GET /hotel/_search
        {
          "query": {
            "match": {
              "all": "北京"
            }
          },
          "sort": [
            {
              "price": {
                "order": "desc"
              }
            }
          ]
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
      • 地理坐标

        GET /hotel/_search
        {
          "query": {
            "match": {
              "all": "上海"
            }
          },
          "sort": [
            {
              "_geo_distance": {
                "location": "31.21,121.5",
                "order": "asc",
                "unit": "km"
              }
            }     
          ]
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
    • 分页

      • elasticsearch默认只返回top10的数据

        GET /hotel/_search
        {
          "query": {
            "match_all": {}
          },
          "from": 10, // 分页开始页
          "size": 10  // 分页数量
        } 
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
    • 高亮

      GET /hotel/_search
      {
        "query": {
          "match": {
            "all": "如家"
          }
        },
        "highlight": {
          "fields": {
            "name": {
              "require_field_match": "false"
            }
          }
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15

7.RestClient查询文档

    @Test
    public void query() throws IOException {
        // 创建request对象
        SearchRequest searchRequest = new SearchRequest("hotel");
        // 构造DSL语句
//        searchRequest.source().query(QueryBuilders.matchAllQuery());
        searchRequest.source().query(QueryBuilders.matchQuery("all" , "上海"))
                .sort("price" , SortOrder.ASC)
                .highlighter(new HighlightBuilder().field("name").requireFieldMatch(false))
                .from(0)
                .size(50);
        // 发送请求
        SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
        System.out.println("查询结果总条数为:" + response.getHits().getTotalHits() + "条");
        System.out.println("高亮部分为:" + response.getHits().getHits()[0].getHighlightFields());
        List<HotelDoc> hotelDocList = parseRestClintResponse(response, HotelDoc.class);
        hotelDocList.stream().forEach(System.out :: println);
    }

    private <T> List<T> parseRestClintResponse(SearchResponse response , Class<T> clazz) {
        SearchHits hits = response.getHits();
        ArrayList<T> list = new ArrayList<>();
        SearchHit[] hitsArray = hits.getHits();
        Arrays.stream(hitsArray).forEach( item -> list.add(JSON.parseObject(item.getSourceAsString() , clazz)));
        return list;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

8.聚合

  • 什么是聚合?

    • 聚合可以实现对文档数据的统计、分析、运算
  • 聚合的分类

    • 桶聚合:用来对文档作分组
      • TermAggregation:按照文档字段值分组
      • Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
    • 度量聚合:用以计算一些值,比如:最大值、最小值、avg等
      • Avg:平均值
      • Max:最大值
      • Min:最小值
      • Stats:同时求Avg、Max、Min、sum等
    • 管道聚合:其他聚合的结果为基础做聚合
  • 示例:

    • DSL实现Bucket聚合

      GET /hotel/_search
      {
        "query": {
          "range": {
            "price": {
              "lte": 500
            }
          }
        },
        "size" : 0,
        "aggs": {
          "brandAgg": {
            "terms": {
              "field": "brand",
              "size": 100,
              "order": {
                "_count": "asc"
              }
            }
          }
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
    • DSL实现Metrics

      GET /hotel/_search
      {
        "size": 0,
        "aggs": {
          "brandAggs": {
            "terms": {
              "field": "brand",
              "size": 30, 
              "order": {
                "score_aggs.avg": "desc"
              }
            },
            "aggs": {
              "score_aggs": {
                "stats": {
                  "field": "score"
                }
              }
            }
          }
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22

9.RestClient实现聚合

    /**
     * 聚合查询
     */
    @Test
    public void queryAggregation() throws IOException {
        SearchRequest request = new SearchRequest("hotel");

        request.source().size(0);
        request.source().aggregation(AggregationBuilders
                .terms("brandAgg").field("brand").size(20).order(BucketOrder.aggregation("scoreStats.avg" , false))
                .subAggregation(AggregationBuilders.stats("scoreStats")
                        .field("score")));

        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        Aggregations aggregations = response.getAggregations();
        Terms brandAgg = aggregations.get("brandAgg");
        List<? extends Terms.Bucket> buckets = brandAgg.getBuckets();
        buckets.stream().forEach( item -> {
            System.out.print(item.getKeyAsString());
            Aggregations aggregations1 = item.getAggregations();

            List<Aggregation> aggregations2 = aggregations1.asList();
            Aggregation aggregation = aggregations2.get(0);
            // 解析出aggregation内的内容
            if (aggregation instanceof Stats) {
                Stats stats = (Stats) aggregation;
                double avgScore = stats.getAvg();
                double minScore = stats.getMin();
                double maxScore = stats.getMax();
                long count = stats.getCount();

                System.out.println("平均分:" + avgScore + ", 最低分:" + minScore + ", 最高分:" + maxScore + ", 总数:" + count);
            }
                }
        );

    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

10.自动补全

  • 自定义分词器

    // 酒店数据索引库
    PUT /hotel
    {
      "settings": {
        "analysis": {
          "analyzer": {
            "text_anlyzer": {
              "tokenizer": "ik_max_word",
              "filter": "py"
            },
            "completion_analyzer": {
              "tokenizer": "keyword",
              "filter": "py"
            }
          },
          "filter": {
            "py": {
              "type": "pinyin",
              "keep_full_pinyin": false,
              "keep_joined_full_pinyin": true,
              "keep_original": true,
              "limit_first_letter_length": 16,
              "remove_duplicated_term": true,
              "none_chinese_pinyin_tokenize": false
            }
          }
        }
      },
      "mappings": {
        "properties": {
          "id":{
            "type": "keyword"
          },
          "name":{
            "type": "text",
            "analyzer": "text_anlyzer",
            "search_analyzer": "ik_smart",
            "copy_to": "all"
          },
          "address":{
            "type": "keyword",
            "index": false
          },
          "price":{
            "type": "integer"
          },
          "score":{
            "type": "integer"
          },
          "brand":{
            "type": "keyword",
            "copy_to": "all"
          },
          "city":{
            "type": "keyword"
          },
          "starName":{
            "type": "keyword"
          },
          "business":{
            "type": "keyword",
            "copy_to": "all"
          },
          "location":{
            "type": "geo_point"
          },
          "pic":{
            "type": "keyword",
            "index": false
          },
          "all":{
            "type": "text",
            "analyzer": "text_anlyzer",
            "search_analyzer": "ik_smart"
          },
          "suggestion":{
              "type": "completion",
              "analyzer": "completion_analyzer"
          }
        }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    // 自动补全查询
    POST /hotel/_search
    {
      "suggest": {
        "suggestions": {
          "text": "sd", 
          "completion": {
            "field": "suggestion", 
            "skip_duplicates": true, 
            "size": 10 
          }
        }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

11.RestClient实现自动补全

    /**
     * 自动补全
     */
    @Test
    public void autoComplete() throws IOException {
        SearchRequest request = new SearchRequest("hotel");

        request.source()
                .suggest(new SuggestBuilder().addSuggestion(
                        "suggestions",
                        SuggestBuilders
                                .completionSuggestion("suggestion")
                                .size(10)
                                .prefix("sd")
                        ));


        SearchResponse response = client.search(request, RequestOptions.DEFAULT);

        Suggest suggest = response.getSuggest();
        CompletionSuggestion suggestion = suggest.getSuggestion("suggestions");
        List<CompletionSuggestion.Entry.Option> options = suggestion.getOptions();
        options.stream().forEach(item -> {
            System.out.println(item.getHit());
        });
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

12.数据同步

  • 数据同步的几种实现方式

    • 同步调用:实现简单,粗暴、业务耦合度高
    • 异步通知:低耦合,实现难度一般、依赖mp的可靠性
    • 监听binlog:完全解除服务间耦合、开始binlog增加数据库负担、实现复杂度高
  • 利用MQ实现mysql与elasticsearch数据同步

    • 定义常量类

      public class MqConstant {
          // 交换机
          public static final String HOTEL_EXCHANGE_TOPIC = "hotel.exchange.topic";
      
          // update队列
          public static final String HOTEL_UPDATE_QUEUE = "hotel.update";
      
          // delete队列
          public static final String HOTEL_DELETE_QUEUE = "hotel.delete";
      
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
    • 在消费者微服务中声明exchange、queue、RoutingKey(rabbitMQ懒加载,只有消费者在监听,不监听不会创建交换机队列等)

      @Configuration
      public class MqConfig {
      
          // 交换机
          @Bean
          public TopicExchange topicExchange() {
              return new TopicExchange(MqConstant.HOTEL_EXCHANGE_TOPIC , true , false);
          }
      
          // 更新队列
          @Bean(name = "updateQueue")
          public Queue updateQueue(){
              return new Queue(MqConstant.HOTEL_UPDATE_QUEUE , true);
          }
      
          // 删除队列
          @Bean(name = "deleteQueue")
          public Queue deleteQueue(){
              return new Queue(MqConstant.HOTEL_DELETE_QUEUE , true);
          }
      
          // 绑定队列交换机
          @Bean
          public Binding bindingUpdateQueue(@Qualifier("updateQueue") Queue updateQueue , TopicExchange topicExchange){
              return BindingBuilder.bind(updateQueue).to(topicExchange).with(MqConstant.HOTEL_UPDATE_QUEUE);
          }
      
          // 绑定队列交换机
          @Bean
          public Binding bindingDeleteQueue(@Qualifier("deleteQueue") Queue deleteQueue , TopicExchange topicExchange){
              return BindingBuilder.bind(deleteQueue).to(topicExchange).with(MqConstant.HOTEL_DELETE_QUEUE);
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
    • 生产者发布

      @RestController
      @RequestMapping("hotel")
      @CrossOrigin("*")
      public class HotelController {
      
          @Autowired
          private IHotelService hotelService;
      
          @Autowired
          private RabbitTemplate rabbitTemplate;
      
          @PutMapping()
          public void updateById(@RequestBody Hotel hotel){
              if (hotel.getId() == null) {
                  throw new InvalidParameterException("id不能为空");
              }
              hotelService.updateById(hotel);
              rabbitTemplate.convertAndSend(MqConstant.HOTEL_EXCHANGE_TOPIC , MqConstant.HOTEL_UPDATE_QUEUE , hotel.getId());
          }
      
          @DeleteMapping("/{id}")
          public void deleteById(@PathVariable("id") Long id) {
              hotelService.removeById(id);
              rabbitTemplate.convertAndSend(MqConstant.HOTEL_EXCHANGE_TOPIC , MqConstant.HOTEL_DELETE_QUEUE , id);
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
    • 消费者监听

      @Component
      public class HotelListener {
      
          @Autowired
          private IHotelService hotelService;
      
          // 更新监听
          @RabbitListener(queues = MqConstant.HOTEL_UPDATE_QUEUE)
          public void updateListen(Long id) {
              System.out.println("更新es文档...");
              hotelService.updateEs(id);
          }
      
          // 删除监听
          @RabbitListener(queues = MqConstant.HOTEL_DELETE_QUEUE)
          public void deleteListen(Long id) {
              System.out.println("删除es文档...");
              hotelService.deleteEs(id);
          }
      
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22

13.ES集群

  • 各节点的职责
    • master eligible节点的作用是什么?
      • 参与集群选主
      • 主节点可以管理集群状态、管理分片信息、处理创建和删除索引库的请求
    • data节点的作用是什么?
      • 数据CRUD
    • coordinator节点的作用是什么?
      • 路由请求到其他节点
      • 合并查询到的结果,返回用户
  • 分布式新增如何确定分配分片?
    • coordinating node根据id做hash运算,得到结果对shard数量取余,余数就是对应的分片
  • 分布式查询
    • 分散阶段:coordinator node将查询请求分发给不同分片
    • 收集阶段:将查询结果汇总到coordinator node,整理并返回给用户
  • ES集群的故障转移
    • 集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其他节点,确保数据安全,这叫故障转移

nt.HOTEL_DELETE_QUEUE);
}
}

```
  • 1
  • 生产者发布

    @RestController
    @RequestMapping("hotel")
    @CrossOrigin("*")
    public class HotelController {
    
        @Autowired
        private IHotelService hotelService;
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @PutMapping()
        public void updateById(@RequestBody Hotel hotel){
            if (hotel.getId() == null) {
                throw new InvalidParameterException("id不能为空");
            }
            hotelService.updateById(hotel);
            rabbitTemplate.convertAndSend(MqConstant.HOTEL_EXCHANGE_TOPIC , MqConstant.HOTEL_UPDATE_QUEUE , hotel.getId());
        }
    
        @DeleteMapping("/{id}")
        public void deleteById(@PathVariable("id") Long id) {
            hotelService.removeById(id);
            rabbitTemplate.convertAndSend(MqConstant.HOTEL_EXCHANGE_TOPIC , MqConstant.HOTEL_DELETE_QUEUE , id);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
  • 消费者监听

    @Component
    public class HotelListener {
    
        @Autowired
        private IHotelService hotelService;
    
        // 更新监听
        @RabbitListener(queues = MqConstant.HOTEL_UPDATE_QUEUE)
        public void updateListen(Long id) {
            System.out.println("更新es文档...");
            hotelService.updateEs(id);
        }
    
        // 删除监听
        @RabbitListener(queues = MqConstant.HOTEL_DELETE_QUEUE)
        public void deleteListen(Long id) {
            System.out.println("删除es文档...");
            hotelService.deleteEs(id);
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

13.ES集群

  • 各节点的职责
    • master eligible节点的作用是什么?
      • 参与集群选主
      • 主节点可以管理集群状态、管理分片信息、处理创建和删除索引库的请求
    • data节点的作用是什么?
      • 数据CRUD
    • coordinator节点的作用是什么?
      • 路由请求到其他节点
      • 合并查询到的结果,返回用户
  • 分布式新增如何确定分配分片?
    • coordinating node根据id做hash运算,得到结果对shard数量取余,余数就是对应的分片
  • 分布式查询
    • 分散阶段:coordinator node将查询请求分发给不同分片
    • 收集阶段:将查询结果汇总到coordinator node,整理并返回给用户
  • ES集群的故障转移
    • 集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其他节点,确保数据安全,这叫故障转移
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/151261?site
推荐阅读
相关标签
  

闽ICP备14008679号