赞
踩
2023年最新笔记,全文约 3 万字,蕴含 Spring Cloud 常用组件 Nacos、OpenFeign、Seata、Sentinel 等
什么是Spring Cloud?
Spring Cloud是一系列框架的有序集合,是一种基于微服务的分布式架构技术。它利用 Spring Boot 的开发便利性巧妙地简化了分布式系统基础设施的开发,如服务发现注册、配置中心、消息总线、负载均衡、断路器、数据监控等,都可以用 Spring Boot 的开发风格做到一键启动和部署,从而提供了良好的开箱即用体验。
主流的架构方式:
微服务架构特征:
总体方向:高内聚、低耦合
常见微服务技术对比:
Spring Cloud 版本说明
大版本说明:
小版本说明:
其余版本信息说明
【其他注意点】:
构建 Spring Cloud 父工程
创建 Maven 项目,选择一个较为简单的架构模式(方便后面删除)
将父工程中除了.pom
文件的其余文件全部删除
在父工程的pom 文件中修改或新增<packaging>pom</packaging>
,代表这是父工程,其他工程项目可继承于它。
<packaging>pom</packaging>
粘贴下列pom配置:
<dependencyManagement>
:只声明依赖,不实现引入,子项目需要显示声明使用的依赖<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.9.RELEASE</version> <relativePath/> </parent> <!-- 广泛使用的 lombok --> <dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies> <!-- 定义版本号 --> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> <spring-cloud.version>Hoxton.SR8</spring-cloud.version> <mysql.version>5.1.47</mysql.version> <mybatis.version>2.1.1</mybatis.version> </properties> <dependencyManagement> <dependencies> <!-- springCloud --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> <!--nacos的管理依赖--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>2.2.5.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> <!-- mysql驱动 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> <!--mybatis--> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>${mybatis.version}</version> </dependency> </dependencies> </dependencyManagement>
构建 Spring Cloud 子工程
父类显式声明子类,子类标明继承自父类
<modules>
<module>子类1</module>
<module>子类2</module>
</modules>
<!--标明继承自父类-->
<parent>
<artifactId>springcloud_test</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
【强制性】凡是微服务,一般都需要有端口号与名称(程序名称将作为服务Id
,用于与其他服务分辨)
server:
port: 8001
spring:
application:
name: payment8001
返回结果定义(通常结构)
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CommonResult<T> {
private Integer code;
private String message;
private T data;
}
RestTemplate
类简介:
RestTemplate 是 Spring 提供的用于访问 Restful 风格服务的客户端模版工具集,其提供了多种便捷访问远程 Http 服务的方法,作用类似 Java 原生的 HttpClient
。
Spring Cloud 初体验:
服务之间通过暴露接口、HTTP 请求实现沟通。
自行配置Spring对象 RestTemplate
并注入,发送 GET 与 POST 请求使用 .getForObject()
、.postForObject()
@Configuration
public class CommonConfig {
@Bean
RestTemplate getRsetTemplate(){
return new RestTemplate();
}
}
NetFlix Eureka,注册中心
简介:
Eureka 拥有 3 个角色
Eureka 实现原理
简单实现(单机版)
@EnableEurekaServer
。【服务端】:服务端一般不需要将自己注册成微服务
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
@EnableEurekaServer
server:
port: 10086
spring:
application:
name: MyServer
eureka:
client:
service-url:
defaultZone: http://127.0.0.1:10086/eureka
# 不向 eureka server 注册自己与获取服务列表
register-with-eureka: false
fetch-registry: false
【客户端】
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
spring:
application:
name: user_service
eureka:
client:
service-url:
defaultZone: http://127.0.0.1:10086/eureka
利用 RestTemplate
向其他微服务发送请求。在编写 URL 路径时,通过指定其他微服务的应用名即spring.application.name
来调用其服务(如http://userservice/
),注册中心将充当 DNS 为各微服务提供解析服务,从而使我们不用像之前一样编写 IP 或域名硬编码的形式(如http://127.0.0.1:8080/
)。
// 子微服务使用其他微服务,并实现负载均衡
@Bean
@LoadBalanced
public RestTemplate rest() {
return new RestTemplate();
}
String url="http://userservice/user/"+order.getUserId();
NetFlix Ribbon,负载均衡
简介:
Ribbon 默认使用【轮询算法】
下面是 Ribbon 中实现的各种算法简介,IRule
是顶层接口,下面是具体的实现类。
简单实现:
由于 Ribbon 与 Eureka 都是由 NetFlix 公司开发,且 Ribbon 常用于与 Eureka 组合实现负载均衡,所以当我们引入 spring-cloud-starter-eureka
依赖时也会默认引入 Ribbon 依赖,无需重复引入。
<!-- 下面是 spring-cloud-starter-eureka 的依赖 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
我们要做的只是通过简单配置更改 Ribbon 的【负载均衡】模式,有 2 种办法:
全局生效:因为 Ribbon 的所有模式都基于IRule
接口,所以可以通过改变其注入的 Bean 实现。
@Bean
public IRule randomRule(){
// 随机模式
return new RandomRule();
}
局部生效:仅对所调用的某微服务生效
某微服务名称:
ribbon:
NFLoadBalancerRuLeClassName: com.netfLix.Loadbalancer.RandomRule
微服务名称即:所要调用的微服务名称
另外,由于 Ribbon 默认采用**【懒汉模式】,即第一次请求链接时才会获取“可用的微服务列表”,这将造成一定的体验损耗,我们可以将其更改成【饿汉模式】**。
ribbon:
eager-load:
enable: true
# 客户端在启动时,就会去请求这些名称的“微服务表”
clients:
- userservice
- vipservice
阿里 Nacos,Eureka的替代品
注册中心(服务发现中心)、配置管理。
Nacos /nɑ:kəʊs/ ,Dynamic Naming and Configuration Service(动态域名命名和配置服务)
首字母简称,一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台,Nacos 致力于发现、配置和管理微服务。
Nacos 使用 Java 编写,如果本地 JDK 环境配置不对,会出现一系列不明所以的报错。
Nacos是一个内部微服务组件,需要在可信的内部网络中运行,并非面向公网环境的产品,不可暴露在公网环境,强烈不建议部署在公共网络环境。Nacos提供了简单的鉴权实现,是为防止业务错用的弱鉴权体系,而不是防止恶意攻击的强鉴权体系。
Nacos 架构
空串
代表公共命名空间public
。DEFAULT_GROUP
,作项目区分,用来区分相同开发环境下的不同项目(如测试环境下的电商项目、测试环境下的培训机构项目)例如在某命名空间下(如测试环境的命名空间),有众多分组(项目),每个项目又有一些服务(服务可以说是最小可用单位),服务又会归属于不同集群(提升可用性与性能)。
整合 Spring Cloud 配置说明:
discovery
:服务发现中心config
:配置中心当 Nacos 没有整合 OpenFeign 时,默认使用的是 RestTemplate ,此时如果需要实现“负载均衡”策略,则:
@LoadBalanced
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
负载均衡方式默认为轮询
简介:
手动模式:
解压并启动(此处为单机模式)
单机模式
集群模式
# 单击模式启动
./startup.sh -m standalone
# 关闭
./shutdown.sh
Docker模式
未挂载配置目录与日志目录
docker run \
--name myNacos \
-e MODE=standalone \
--env NACOS_AUTH_ENABLE=true \
-p 8848:8848 \
-d \
nacos/nacos-server
挂载已有的配置目录与日志目录:提前将 Nacos/conf/
目录文件拷贝至/tmp/nacos/conf/
docker run \
--name myNacos \
-e MODE=standalone \
--env NACOS_AUTH_ENABLE=true \
-v /tmp/nacos/conf/:/home/nacos/conf/ \
-v /tmp/nacos/logs/:/home/nacos/logs/ \
-p 8848:8848 \
-d \
nacos/nacos-server
挂载新的的配置目录与日志目录:
docker run \
--name myNacos \
-e MODE=standalone \
--env NACOS_AUTH_ENABLE=true \
-v nacosConf:/home/nacos/conf/ \
-v nacosLogs:/home/nacos/logs/ \
-p 8848:8848 \
-d \
nacos/nacos-server
docker inspect mq | grep volume
开启服务器鉴权
按照官方文档配置启动,默认是不需要登录的,这样会导致配置中心对外直接暴露。而启用鉴权之后,需要在使用用户名和密码登录之后,才能正常使用nacos。(所以 Nacos 才推荐不要把自身放在“外网”中)
配置/conf/application.properties
文件
nacos.core.auth.enabled=true
如此一来,Client 端便需要配置 nacos 的账号密码才能登录。
**注意:**鉴权开关是修改之后立马生效的,不需要重启服务端。
安装之后
/logs/start.out
日志来查看启动详情。http://127.0.0.1:8848/nacos
登录 Nacos,默认账号密码均为 nacos。Spring项目引入 Nacos 依赖
父工程(这是必备的)
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.2.5.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
子工程
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
配置 Nacos 地址
在未开启“鉴权模式”时,可以不配置
username
与password
spring:
cloud:
nacos:
server-addr: localhost:8848
username: nacos
password: nacos
**注意:**Nacos 包不可与 Eureka 包同时导入同一工程,否则产生冲突Bean multiple
。
命名空间使实例之间【相互隔离】,看不到彼此,这可以用作正式环境与测试环境的区分。当 Nacos 启动时会默认使用全局唯一命名空间public
。
步骤:
spring:
cloud:
nacos:
server-addr: http://localhost:8848
discovery:
cluster-name: HZ
namespace: 53a68426-7e6c-4e09-83e3-57a87f116980 # 声明命名空间
服务分级模型在相同“命名空间”的前提下,Nacos 利用服务分级存储模型来提高【容灾率】,例如:
集群默认为DEFAULT_GROUP
,更改如下:
spring:
cloud:
nacos:
server-addr: localhost:8848
discovery:
cluster-name: HZ # 例如:HZ代表杭州、SH表示上海
某微服务名称:
ribbon:
NFLoadBalancerRuLeClassName: com.alibaba.cloud.ribbon.NacosRule
Nacos可以通过【网页控制台】为实例设置权重,范围从0~1
,值越大越容易被访问,设置为0
则完全不会被访问,这可以用作“灰度升级”。
注意:必须是相同集群下拥有多个相同实例时,才可配置权重。
监测实例的健康状态
Nacos拥有临时监测(被动)、非临时监测(主动)
Eureka只有临时监测
临时监测(默认、被动检测):
非临时监测(主动检测)
false
。配置非临时检测:
spriing:
cloud:
nacos:
server-addr: http://localhost:8848
discovery:
cluster-name: HZ
namespace: 53a68426-7e6c-4e09-83e3-57a87f116980
# ephemeral,短暂的
ephemeral: fasle
【非临时监测】的另外一个作用:设置保护阈值,防止产生服务雪崩效应
Nacos中可以针对具体的实例设置一个保护阈值,值为0-1之间的浮点类型。本质上,保护阈值是⼀个⽐例值(当前服务健康实例数/当前服务总实例数)。
⼀般情况下(临时监测),服务消费者要从Nacos获取可用实例有健康/不健康状态之分。Nacos在返回实例时,只会返回健康实例。
但在高并发、大流量场景会存在⼀定的问题。比如,服务A有100个实例,98个实例都处于不健康状态,如果Nacos只返回这两个健康实例的话,流量洪峰的到来可能会直接打垮这两个服务,进一步产生雪崩效应。保护阈值存在的意义在于当服务A健康实例数/总实例数 < 保护阈值时,说明健康的实例不多了,保护阈值会被触发(状态true)。
Nacos会把该服务所有的实例信息(健康的+不健康的)全部提供给消费者,消费者可能访问到不健康的实例,请求失败,但这样也⽐造成雪崩要好。牺牲了⼀些请求(将请求分流到不健康的实例),保证了整个系统的可⽤。
实现“统一配置”与“热更新”
简介:
使用 Nacos 可以实现实例的统一配置与配置热更新(即当配置被修改时,主动推送并实现热更新、不重启)
应该将固定不变配置写入服务本身的application.yml
,易于变化的配置则写入 Nacos 配置文件。
应用 Nacos 统一配置流程图
声明:一个服务如果以 nacos 作为配置中心,应该先拉取 nacos 中管理的配置,然后与本地的配置文件比如 application.yml 中的配置合并,最后作为项目的完整配置,启动项目。
实现原理:Spring 中bootstrap.yml
文件的启动优先级高于application.yml
,我们可以将 Nacos 配置写入其中(注意单词有两个t
)。
【共同配置】
在Nacos情境下,微服务在启动时会从 Nacos 读取2个配置文件,按优先级为:
配置名称-环境.yaml
:userservice-dev.yaml
配置环境.yaml
:userservice.yaml
而且无论如何都会读取到第二个配置环境,所以我们可以将微服务相同的配置再放入第二种配置环境中。
【统一配置】:
服务名称-环境.yaml
,在其中编写易于变化的配置。nacos-config
依赖。bootstrap.yml
文件,这些配置决定了微程序会去读取哪一个Nacos配置文件。
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
spring:
application:
name: userservice
profiles:
active: dev # 环境
cloud:
nacos:
server-addr: localhost:8848 # nacos地址
config:
file-extension: yaml # 文件后缀名
username: nacos
password: nacos
discovery:
ephemeral: false
**【热更新】**实现:
在【统一配置】的基础上,代码中有两种方式可以实现热更新:
@RefreshScope
+ @Value (${属性key})
注解@ConfigurationProperties
两种方式不存在优劣,只是在形式作用上有些许差别,如果只想绑定少量属性方式一、否则方式二。
@RestController
@RequestMapping("users")
// 热更新
@RefreshScope
public class TestController {
@Value("${pattern.dataformat}")
String dataformat;
@GetMapping("/a")
String get(){
return dataformat;
}
}
@Data
@Component
@ConfigurationProperties(prefix = "pattern")
public class CommonConfig {
String dataformat;
}
// 后面使用 @Autowired 注入使用
在这一步,小坑特别多
将官方内嵌的小型数据库
Derby
替换为MySQL
Nacos 默认将数据存储在内嵌数据库 Derby
中,该数据库不属于生产可用的数据库,官方推荐的最佳实践是使用带有主从的高可用数据库集群,例如MySQL
(而且目前只支持 MySQL )。
简单实现(单机版,下节集群部署):
nacos
(其实命名什么也无所谓,后面要用到)nacos
中运行数据库文件/conf/mysql-schema.sql
建表。application.properties
:打开配置文件,将注释解除、然后添加数据库信息。namespaceControllerV2
,然而真正的报错信息却隐藏在控制台末尾的一小行文字,最终发现是之前后台的 Nacos 进程未完全关闭(残留),完全关闭之后再次尝试重启,成功。ps -ef |grep nacos
# 单机重启
./startup.sh -m standalone
其余注意点:
No DataSource set
。利用上节的数据持久化知识( MySQL 数据库),将3 台 Nacos 绑定同步相同的数据源,便可以做到同时更新。
实现步骤:
nacos
,导入/conf/mysql-schema.sql
。/conf/application.properties
配置文件,添加数据库 MySQL 配置。cluster.conf.example
重命名为 cluster.conf
,添加集群机器信息。/conf/
目录上传至服务器,复制 3 份/tmp/nacos/conf1
、/tmp/nacos/conf2
、/tmp/nacos/conf3
。# 清理时用
docker rm -f $(docker ps -a)
175.178.20.191:8845
175.178.20.191:8846
175.178.20.191:8847
cp -r /conf/ /tmp/nacos/conf1 /tmp/nacos/conf2 /tmp/nacos/conf3
docker run -d \
--env NACOS_AUTH_ENABLE=true \
-v /tmp/nacos/conf1/:/home/nacos/conf/ \
-v /tmp/nacos/logs1/:/home/nacos/logs/ \
-p 8845:8848 \
--name nacos1 \
nacos/nacos-server
docker run -d \
--env NACOS_AUTH_ENABLE=true \
-v /tmp/nacos/conf2/:/home/nacos/conf/ \
-v /tmp/nacos/logs2/:/home/nacos/logs/ \
-p 8846:8848 \
--name nacos2 \
nacos/nacos-server
docker run -d \
--env NACOS_AUTH_ENABLE=true \
-v /tmp/nacos/conf3/:/home/nacos/conf/ \
-v /tmp/nacos/logs3/:/home/nacos/logs/ \
-p 8847:8848 \
--name nacos3 \
nacos/nacos-server
结果:腾讯云 2G2核 同时开启 3 个 Nacos,2 个成功,第 3 个失败(CPU飙满),总体算部署成功。
右菜单栏,步骤:
共享配置 shared-configs
扩展配置 extension-config
简介:
日常开发中,多个模块可能会有很多共用的配置,比如数据库连接信息、Redis/RabbitMQ 连接信息、监控配置等等。那么此时我们就希望可以加载多个配置,或者多个项目共享同一个配置。
两者除了优先级不同之外没有其他任何区别,都⽀持三个属性,:
data-id
group
:默认 DEFAULT_GROUP。refresh
: 在配置变更时,应用内是否支持动态刷新。简单范例:
spring: application: name: nacos-config-multi main: allow-bean-definition-overriding: true cloud: nacos: username: ${nacos.username} password: ${nacos.password} config: server-addr: ${nacos.server-addr} namespace: ${nacos.namespace} # 共享配置 shared-configs: - data-id: swagger-${spring.profiles.active}.yaml group: xuecheng-plus-common refresh: true - data-id: logging-${spring.profiles.active}.yaml group: xuecheng-plus-common refresh: true # 扩展配置,优先级大于shared-configs (在之后加载) extension-configs: - data-id: content-service-${spring.profiles.active}.yaml group: eat-plus-project refresh: true - data-id: dataId group: eat-plus-project refresh: true
基本思想:影响的范围越小,优先级越高。
远端 > 本地
带有profiles > 不带
配置中心(远端) > 命令行参数 > 本地application.yaml > 本地bootstrap.yaml
存在 3 种配置文件大类,优先级从上至下:
远端
服务名-环境.yaml
服务名.yaml
扩展配置.yaml
共享配置.yaml
命令行参数
本地
application.properties
application.yaml
bootstrap.yaml
bootstrap.yaml优于application.yaml执行,application.yaml优于application.properties执行,但是后执行的会覆盖前执行的配置,所以在本地越先执行的优先级越低。
声明式的 Web HTTP 服务客户端,替代原生 RestTemplate
与 Nacos 组合使用时,Nacos提供“域名”的解析服务
简介:
OpenFeign是一个声明式的Web服务客户端,使得编写Web服务客户端变得非常容易,只需要创建一个接口,然后在上面添加注解,便可以通过接口来调用服务端的服务。
OpenFeign 遵循 RPC 协议,即 Remote Procedure Call Protocol,远程调用协议。
历史上存在过 Feign
(由 NetFlix 公司开发),SpringCloud组件中的一个轻量级RESTful的HTTP服务客户端,也是SpringCloud中的第一代负载均衡客户端。
OpenFeign
是SpringCloud自己研发的,在Feign的基础上支持了Spring MVC的注解,如@RequesMapping等,是SpringCloud中的第二代负载均衡客户端。
与 Ribbon 的关系:
OpenFeign默认将Ribbon作为负载均衡器,直接内置了 Ribbon。在导入OpenFeign 依赖后无需专门导入Ribbon 依赖。所以说,当我们需要更改 OpenFeign 的负载均衡策略时,其实就是需要修改 Ribbon 的策略,直接按照 Ribbon 的策略配置方式就行配置(即分为两种方式:全局与局部)。
步骤:
引入依赖
主类添加@EnableFeignClients
注解,声明使用 Feign。
使用注解@FeignClient()
编写具体的 FeignClient 接口。
@Autowired
注入对应 FeignClient 并使用。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
@EnableFeignClients
@FeignClient("userservice")
public interface UserClient { // 此处是接口不是普通类
@GetMapping("/user/{id}")
User findById(@PathVariable Long id); // 注意需标注成“路径参数”
}
@Autowired
UserClient userClient;
FeignClient接口定义说明(5大定义),以上面举例:
其余配置
# 开启 Gzip 压缩
feign:
compression:
request:
enabled: true
min-request-size: 2048
mime-types: text/xml, application/xml, application/json
response:
enabled: true
useGzipDecoder: true
【自定义配置】
一般我们可能只需要配置“日志级别”就好了。
可以使用两种方式配置日志级别,一般使用None(默认,不打印)
或Basic
,避免控制台打印过多信息。
feign:
client:
config:
default: # 全局生效
logger-level: full
feign:
client:
config:
userservice: # 局部(指定微服务)生效
logger-level: full
# Basic 级别打印的日志
[UserClient#findById] ---> GET http://userservice/user/2 HTTP/1.1
[UserClient#findById] <--- HTTP/1.1 200 (537ms)
优化Feign
Feign 底层的客户端实现有 3 种
使用连接池可以复用连接(避免在连接时多次产生3次握手4次挥手),更改为OKHttp使用步骤如下:
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-okhttp</artifactId>
</dependency>
feign:
okhttp:
enabled: true
Feign最佳实践:
jar
包,使用依赖的方式进行导入。(之前)
(现在)
Spring Gateway
简介
网关的作用:
Spring Cloud网关类型
实现:
本质:创建单一Spring程序用于 Gateway 实现路由转发,单一 jar 包。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
下面定义了 2 个路由规则
server: port: 10080 spring: application: name: gateway cloud: nacos: server-addr: http://localhost:8848 discovery: cluster-name: HZ gateway: routes: - id: user-service # 路由标识,全局唯一 uri: lb://userservice # 路由的地址,lb:load balanced 负载均衡 predicates: - Path=/user/** # 路由断言,如果路径以 /user/ 开头则符合 default-filters: - AddRequestHeader=Content-type,text/html # 添加请求头 - id: order-service uri: lb://orderservice predicates: - Path=/order/** default-filters: - AddRequestHeader=Content-type,text/html
网关路由的【配置项】包括:
http
与 lb
两种类型。11种基本的 Predicate 类型(上面范例使用了 Path )
3种过滤器
过滤器的31种细分类
种类过多,需要时查看官方文档即可(只要查看名字就能得知该过滤器的作用 )
过滤器的优先级说明:
这里稍有点乱
简单案例
# 默认过滤(也对全局生效,默认此) default-filters,对所有路由生效
spring:
cloud:
gateway:
routes:
- id: user-service # 路由标识,全局唯一
uri: lb://userservice # 路由的地址,lb:load balanced
predicates:
- Path=/user/** # 路由断言,如果路径以 /user/ 开头则符合
default-filters:
- AddRequestHeader=Content-type,text/html # 添加请求头
spring:
cloud:
gateway:
routes:
- id: user-service # 路由标识,全局唯一
uri: lb://userservice # 路由的地址,lb:load balanced
predicates:
- Path=/user/** # 路由断言,如果路径以 /user/ 开头则符合
filters:
- AddRequestHeader=Content-type,text/html # 添加请求头
GlobalFilter
接口并重写 filter() 方法,此处注意:
@Order(-1)
表示优先级,值越低优先级越高,允许负值。exchange参数
属于 Spring WebFlux 组件中的知识,它用来获取请求与响应两者,但是例如获取出来的请求request不是 servlet 的静态技术,而是属于 WebFlux 的动态技术,即ServerHttpRequest
(注意是以 Server 开头而不是 Servlet )。chain参数
用来生成成功时的返回值Mono<Void>
exchange
设置失败的响应码,如401 Forbidden 并返回给客户端。Mono<Void>
是什么暂时不用管CORS跨域处理
Spring Boot 也可以实现跨域处理,并不一定要依赖于 Spring Gateway
禁止跨域是浏览器的策略,后端之间互相调用接口不存在跨域。
允许浏览器跨域一般需要配置的 5 大选项,并在 yml 文件中配置,如下:
Options
请求,得到确认后在指定的有效期内不会重发Options请求,节约资源)这章暂时跳过,具体内容查看:Docker笔记
MQ:Message Queue 消息队列
我们在大多数情况下使用【同步通信】,因为对时效性的要求较强
【同步通信】:
优点:时效性强、可以立即得到结果
缺点:业务之间耦合度高、性能和吞吐能力低、存在额外的资源消耗与级联失败的情况。
【异步通信】:
优点:耦合度高、吞吐量能力强、故障隔离、流量削峰
缺点:对消息中间件的可靠性、安全性、吞吐能力有严重的依赖,业务架构复杂,没有明显的流程线、难以追踪管理
4 种不同形式的 MQ 产品:
消息一旦消费完就会被删除,RabbitMQ 没有消息回溯功能
docker 版本安装:
docker pull rabbitmq
docker run -it \
-e RABBITMQ_DEFAULT_USER=user \
-e RABBITMQ_DEFAULT_PASS=123 \
-v mq-plugins:/plugins \
--name=mq \
-hostname=mq \
-p 15672:15672 \
-p 5672:5672 \
rabbitmq
下面代码全部在容器内操作:
rabbitmq-plugins enable rabbitmq_management
cd /etc/rabbitmq/conf.d/
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
# 退出并重启容器
exit
docker restart mq
15672
: web 界面访问端口,需要进入容器内手动开启5672
:具体的通信端口user
,密码为123
plugins
:RabbitMQ插件目录,提供后续插件安装接口常见的5种消息模型
【注意】:
RabbitMQ实现流程:
Spring 简化原生代码
简介:
Spring-amqp
是接口,具体的实现有spring-rabbit
(即RabbitMQ)等。BasicQueue实现:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 10.211.55.4
port: 5672
virtual-host: / # 配置虚拟主机名(不同的虚拟主机之间存在分割,无法互相访问)
username: user
password: 123
发送消息:使用RabbitTemplate
模板类
@Autowired
RabbitTemplate template;
@Test
public void sendMessage(){
String queueName="simple.queue";
Object message="你好MQ!";
template.convertAndSend(queueName,message);
}
接收消息:使用@RabbitListener
注解
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listener(String msg){
System.out.println("【接收到消息】:"+msg);
}
}
WorkQueue实现:
即多个接收队列,提高队列接收的速度。
注意这里存在:“贪心的消费者”(消息预取),即消费者会优先获取消息,(不管当下能不能立即执行),此时需要设置消费预取上限,例如设为1
,即一次一次的取。
spring:
rabbitmq:
host: 10.211.55.4
port: 5672
virtual-host: / # 配置虚拟主机名(不同的虚拟主机之间存在分割,无法互相访问)
username: user
password: 123
listener:
direct:
prefetch: 1 # 消息预取数量限制为 1 ,默认为无限、即不作限制
publish/subscribe实现:
广播Fanout,交换机将消息转发至所有队列
先将队列与 Exchange 交换机建立绑定关系,然后 publisher
向交换机发送消息,交换机自动将消息转发至各队列,subscribe
向队列请求消息。队列与交换机之间的绑定有两种形式:代码实现、注解实现,这里使用 代码实现 ,后续使用 注解实现 形成对比。
代码实现:建立交换机与队列之间的绑定关系
// 声明(创建)交换机 @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("myExchange"); } // 声明(创建)队列 @Bean public Queue fanoutQueueOne(){ return new Queue("myQueue.one"); } // 绑定交换机与队列 @Bean public Binding binding(Queue fanoutQueueOne,FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueueOne).to(fanoutExchange); } // 以相同的方式声明第二个队列...
接收消息(代码几乎不变)
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "myQueue.one")
public void listener1(String msg){
System.out.println("【 1 接收到消息】:"+msg);
}
@RabbitListener(queues = "myQueue.two")
public void listener2(String msg){
System.out.println("【 2 接收到消息】:"+msg);
}
}
发送消息
@Autowired
RabbitTemplate template;
@Test
public void sendMessage(){
String exchangeName="myExchange";
Object message="你好MQ!";
template.convertAndSend(exchangeName,"",message); // 中间参数为routingkey,下节使用
}
}
Routing实现:
交换机根据规则 routingkey
将消息路由至指定队列(对暗号),消息发送者在发送消息时指定 routingkey
,队列在建立时绑定 routingkey
(可以绑定多个key) ,符合则接收。
注解实现:在接收消息时,顺便建立交换机与队列之间的绑定关系(注解里面套注解,第一次见)
@Component
public class SpringRabbitListener {
// 第一个
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = “direct.queueOne”),
exchange = @Exchange(name = “myEx”,type = ExchangeTypes.DIRECT),
key = {“red”,“blue”}
))
public void listener1(String msg){
System.out.println(“【 1 接收到消息】:”+msg);
}
// 第二个
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queueTow"),
exchange = @Exchange(name = "myEx",type = ExchangeTypes.DIRECT),
key = {"red","yellow"}
))
public void listener2(String msg){
System.out.println("【 2 接收到消息】:"+msg);
}
}
发送消息
```java
@Autowired
RabbitTemplate template;
@Test
public void sendMessage(){
String exchangeName="myEx";
Object message="你好MQ!";
// 第二个参数 routingkey 指定发送的“规则”
template.convertAndSend(exchangeName,"yellow",message);
}
}
Topics实现:
Topic 与 Direct 类似,区别在于 routingKey 必须是多个单词的列表,以.
分割,并且支持通配符#
与*
。
@Component public class SpringRabbitListener { // 注意要将交换机类型修改为Topic:type = ExchangeTypes.TOPIC @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topics.queueOne"), exchange = @Exchange(name = "myExchangeTwo",type = ExchangeTypes.TOPIC), key = {"China.#","#.news"} )) public void listener1(String msg){ System.out.println("【 1 接收到消息】:"+msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topics.queueTow"), exchange = @Exchange(name = "myExchangeTwo",type = ExchangeTypes.TOPIC), key = {"America.#","#.news"} )) public void listener2(String msg){ System.out.println("【 2 接收到消息】:"+msg); } }
@Autowired
RabbitTemplate template;
@Test
public void sendMessage(){
String exchangeName="myExchangeTwo";
Object message="你好MQ!";
template.convertAndSend(exchangeName,"China.news",message);
}
在这里我们将替换 Spring 默认提供的消息转换器,以提高性能。
为什么要替换呢?
因为Spring默认的消息处理接口是org.springframework.amqp.support.converter.MessageConverter
,默认实现为:SimpleMessageConverter
,且基于 JDK 的 ObjectOutputStream
实现序列化,这种序列化方式在处理对象的时候会将对象编码并且经过Base64编码,不仅会占用更多的内存空间,而且会导致性能下降。
解决方法即采用 JSON
格式,例如引入Jackson
依赖并实现:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.14.1</version>
</dependency>
@Configuration
public class CommonConfig {
// 更换消息转换器
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
}
当然,以上配置在消息【发送者】与【接收者】之间都需要配置,后续发送什么类型的消息,就使用什么类型接收(这点需十分注意,我在第一次编写时就忘记了修改消息的接收类型导致 Converter error)。
Elasticsearch,基于 Java 实现的分布式搜索:中文官网
Elasticsearch是一款非常强大的开源搜索引擎,可以帮助我们从海量的数据中快速找到所需内容。
具体功能:内容搜索、日志统计与分析、系统监控等。
Elasticsearch对内存的消耗特别大,少于512MB直接启动失败。
注意以下安装的所有软件版本需与 Elasticsearch 保持一致
Elasticsearch结合 Kibana、Logstash、Beats,被称为「elastic stack」(也就是ELK),被广泛运用在日志数据分析、实时监控等领域。
Elasticsearch基于 Lucene,Lucene既是一个 Java 语言的搜索引擎类库,也是Apache公司的顶级项目之一。
Elasticsearch中,文档数据会以JSON
格式存储,即全部文本字段都需添加双引号。
以 MySQL 为例,与 Elasticsearch 作对比
两者优势互补,不能替代
Elasticsearch使用**【倒排索引】**(“优先耗费时间建立新表,后续以空间换时间实现搜索”)。
新老数据库概念对应关系
Elasticsearch查询语句为DSL语句(JSON
格式),使用HTTP
发送请求。
应用领域:
安装 Elasticsearch 与 Kibana(提供工具方便编写DSL语句)
两者安装包大小都在
1GB
左右,且运行时所占内存也较大,推荐使用docker安装。
建立docker网络:Elasticsearch与Kibana必须处在同一个网络之中,并且此时两者可以通过docker服务名来建立连接。
docker network create es-net
安装Elasticsearch
docker pull elasticsearch:7.17.7
docker run -d \
--name es \
-e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \
-e "discovery.type=single-node" \
-v es-data:/usr/share/elasticsearch/data \
-v es-plugins:/usr/share/elasticsearch/plugins \
--privileged \
--network es-net \
-p 9200:9200 \
-p 9300:9300 \
elasticsearch:7.17.7
访问http://ip:9200
能看到下列信息说明部署成功。
当启动不成功时,查看日志排错
docker logs -f es
安装Kibana
docker pull kibana:7.17.7
docker run -d \
--name kibana \
-e ELASTICSEARCH_HOSTS=http://es:9200 \
--network=es-net \
-p 5601:5601 \
kibana:7.17.7
注意,Kibana启动较慢,可以使用docker logs -f 服务名
查看其日志。
访问http://ip:5601/
当显示下列内容时表示成功。
左边菜单栏
→ Management
→ Dev Tools
工具,后续用它来编写 DSL 操作。Elasticsearch默认的分词器对中文分词兼容性极差,只能“按字依次分词”
IK分词器,专为Elasticsearch中文分词打造
离线安装(推荐):
ik
。ik
目录上传到该文件夹Dev Tools
测试分词效果 docker volume inspect es-plugins
docker restart es
# 查看es日志
docker logs -f es
POST /_analyze
{
"text": "这是一段中文句子,请分词",
"analyzer": "ik_smart"
}
在线安装(服务器 GitHub 访问速度较慢,不推荐):
# 1、进入容器内部
docker exec -it elasticsearch /bin/bash
# 2、在线下载并安装
./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.12.1/elasticsearch-analysis-ik-7.12.1.zip
# 3、退出
exit
# 4、重启容器
docker restart elasticsearch
IK分词器的 2 种模式
ik_smart
:智能(最少)拆分
ik_max_word
:重复(最细)切分
自定义字典(2种形式)
要自定义词库,只需要到ik/config/IKAnalyzer.cfg.xml
中新增配置,并在配置文件的当前目录新建.dic
字典,以行
为分割属于相关词语,然后重启Elasticsearch容器即可(可以使用 Kibana 的Dev Tools
进行测试)。
<properties>
<comment>IK Analyzer 扩展配置</comment>
<!-- 扩展词库,myDict.dic 是文件名 -->
<entry key="ext_dict">myDict.dic</entry>
<!-- 停止词库,myStopwords.dic 是文件名 -->
<entry key="ext_stopwords">myStopwords.dic</entry>
</properties>
docker restart es
POST /_analyze
{
"text": "这是一段超长的词语,腾讯你好",
"analyzer": "ik_smart"
}
表,使用映射(约束)定义规则
Mapping映射规则:
true
(其实许多字段并不需要创建索引)注意:索引库无数组概念,但允许某字段有多个值,例如下面的字段类型应为integer
。
"score": [60,39,77,99]
创建规则 以及 案例
PUT /索引库名称 { "mappings": { "properties": { "字段名1":{ "type": "text", "analyzer": "ik_smart" }, "字段名2":{ "type": "keyword", "index": false }, "字段名3":{ "properties": { "子字段名1":{ "type":"keyword" }, "子字段名2":{ "type":"keyword" }}}}}}
PUT /mytable { "mappings": { "properties": { "info":{ "type": "text", "analyzer": "ik_smart" }, "email":{ "type": "keyword", "index": false }, "name":{ "properties": { "firstName":{ "type":"keyword" }, "LastName":{ "type":"keyword" }}}}}}
查询、删除、修改
首先声明:【索引库】和【Mapping】一旦创建就无法修改,但是可以(只能)添加新的字段,这是因为当索引库创建时 Elasticsearch 就会去创建倒排索引,如果允许修改索引库可能引起无法预知的错误,所以 Elasticsearch 在这点上比 MySQL 更加彻底,直接禁止修改。
查询:
GET /索引库名
删除
DELETE /索引库名
修改(新增)索引库
PUT /索引库名/_mapping
{
"properties":{
"新增的字段名":{
"type":"integer",
"index":false
}
}
}
数据:新增、查询、删除、修改
新增文档:
文档id
:类似 MySQL 主键,推荐手动添加(例如1
),如果未添加则会自动生成较长的随机 id 代替
POST /索引库名/_doc/文档id
{
"字段名1":{
"firstName":"张",
"LastName":"三"
},
"字段名2":18,
"字段名3":"123@qq.com",
"字段名4":"程序猿"
}
查询
GET /索引库名/_doc/文档id
GET /索引库名/_search
删除
DELETE /索引库名/_doc/文档id
修改:修改文档这里有 2 种方式
PUT
+ _doc
,先完全删除旧文档、然后用新文档替代。POST
+ _update
,在旧文档的基础上进行修改。PUT 索引库名/_doc/文档id
{
"字段1":"值1",
"字段2":"值2"
}
POST 索引库名/_update/文档id
{
"doc":{
"字段":"新的值"
}
}
Java 操作 Elasticsearch
简介:
ES官方提供了多种不同语言的客户端(包)用来操作ES。这些客户端的本质就是先组装DSL语句,然后通过 HTTP 请求发送给 ES。
建立索引库的【步骤】:
例如:
以下案例为 MySQL 建表语句,经过分析发现,发现酒店名称需要分词并建立索引,酒店品牌不需要分词但需要索引,酒店经纬度不需要建立索引,酒店价格、评分等需要建立索引以方便排序。
{ "mappings": { "properties": { "id": { "type": "keyword" }, "name": { "type": "text", "analyzer": "ik_max_word", "copy_to": "all" }, "address": { "type": "keyword", "index": false }, "price": { "type": "integer" }, "score": { "type": "integer" }, "brand": { "type": "keyword", "copy_to": "all" }, "city": { "type": "keyword" }, "starName": { "type": "keyword" }, "business": { "type": "keyword", "copy_to": "all" }, "pic": { "type": "keyword", "index": false }, "location": { "type": "geo_point" }, "all": { "type": "text", "analyzer": "ik_max_word" } } } }
多字段搜索
字段拷贝:既想要实现多字段搜索,又想要效率最快
字段拷贝可以使用 copy_to
属性将当前字段拷贝到指定字段,示例:
"all": {
"type": "text",
"analyzer": "ik_max_word"
}
"brand":{
"type": "keyword",
"copy_to":"all"
}
"name":{
"type": "keyword",
"copy_to":"all"
}
all
成功包含brand
于name
,以后搜索时只需要指定all
即可。
另外,虽然名叫“字段拷贝”,但是其实并不会真正的拷贝多份造成存储空间冗余。
初始化 Java RestClient
<properties>
<java.version>1.8</java.version>
<elasticsearch.version>7.17.7</elasticsearch.version>
</properties>
RestHighLevelClient client=new RestHighLevelClient(RestClient.builder(
HttpHost.create("175.178.20.191:9200")
));
或者将以上对象注册成Bean
@Configuration
public class CommonConfig {
@Bean
RestHighLevelClient rest() {
return new RestHighLevelClient(RestClient.builder(
HttpHost.create("175.178.20.191:9200")
));
}
}
下面所有操作都是建立在初始化RestClient的基础上。
建立索引库
CreateIndexRequest request = new CreateIndexRequest("hotel");
request.source(MAPPING_TEMPLATE, XContentType.JSON);
client.indices().create(request, RequestOptions.DEFAULT);
删除索引库
DeleteIndexRequest request = new DeleteIndexRequest("hotel");
client.indices().delete(request, RequestOptions.DEFAULT);
判断索引库是否存在
GetIndexRequest request= new GetIndexRequest("hotel");
Boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
System.out.println(exists);
导入文档数据
IndexRequest
Mybatis Plus + BulkRequest
// 注意在这可以赋予【id】
IndexRequest request = new IndexRequest("hotel").id("1");
// 利用 fastJSON 反序列化对象,生成 JSON 字符串
Hotel hotel = new Hotel();
hotel.setId(1L);
hotel.setName("张三");
hotel.setAddress("北京");
request.source(JSON.toJSONString(hotel),XContentType.JSON);
client.index(request,RequestOptions.DEFAULT);
List<Hotel> list = hotelService.list();
BulkRequest bulkRequest = new BulkRequest();
for (Hotel hotel:list){
HotelDoc hotelDoc = new HotelDoc(hotel);
bulkRequest.add(new IndexRequest("hotel")
.id(hotelDoc.getId().toString())
.source(JSON.toJSONString(hotelDoc),XContentType.JSON));
}
client.bulk(bulkRequest,RequestOptions.DEFAULT);
获取文档数据
根据 id
GetRequest request = new GetRequest("hotel").id("1");
GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);
String json = getResponse.getSourceAsString();
System.out.println(json);
此处为什么使用 getResponse.getSourceAsString()
如此形式获取 JSON 字符串?
因为我们在调用get()
方法时,底层实际上使用的是GET /hotel/_doc/1
,这种请求会返回一串json字符串
,但是此时我们想要的数据却保存在_source
结构体中。
GET /hotel/_doc/1
更新文档数据
UpdateRequest request = new UpdateRequest("hotel","1");
request.doc(
"age","18",
"name","李四"
);
client.update(request,RequestOptions.DEFAULT);
删除文档数据
// 10086 为文档id
DeleteRequest request = new DeleteRequest("hotel","10086");
client.delete(request,RequestOptions.DEFAULT);
文档操作总结:
__Request
,即IndexRequest
、BulkRequest
、GetRequest
、UpdateRequest
、DeleteRequest
RestHighLevelClient.__()
方法,即index()
、bulk()
、get()
、update()
、delete()
DSL 是基于 JSON 格式的查询方式
常见的查询方式
id s
根据 id 进行查询【注意事项】
查询的基本语法
GET /索引名称/_search
{
"query": {
"查询类型": {
"查询条件":"条件值"
}
}
}
查询所有:match_all
GET /hotel/_search
{
"query": {
"match_all": {}
}
}
全文检索
以下两种方式查询结果一样。在前面时,我们定义 all 字段为拷贝字段,这里虽然两种方式的查询结果一样,但是推荐使用拷贝字段all,因为效率高;而在另一种查询方式中,联合查询的字段越多,性能越低。
常用于“搜索框”搜索
GET /hotel/_search
{
"query": {
"match": {
"all": "外滩如家"
}
}
}
GET /hotel/_search
{
"query": {
"multi_match": {
"query": "外滩如家",
"fields": ["brand","name","business"]
}
}
}
精确查询
关键字不会分词,查询出来的结果也要与关键字完全匹配
city=="上海"
GET /hotel/_search
{
"query": {
"term": {
"city": {
"value": "上海"
}
}
}
}
GET /hotel/_search
{
"query": {
"range": {
"price": {
"gte": 100,
"lte": 2000
}
}
}
}
地理查询
可用作“打车”、“附近的人”等功能
geo_bounding_box
:画矩形。查询值落在矩形内的所有文档。get_distance
:画圆形。以点开始作半径查询,查询距离你多少米的人。常用于“附近的人”。GET /hotel/_search { "query": { "geo_bounding_box":{ "location":{ "top_left":{ "lat":31.1, "lon":121.5 }, "bottom_right":{ "lat":30.9, "lon":121.7 } } } } }
GET /hotel/_search
{
"query": {
"geo_distance":{
"distance":"15km",
"location":"31.21,121.5"
}
}
}
复合查询
_socre
字段表示得分
原始查询条件、过滤条件、算分函数、加权模式
排序、分页、高亮
简介:
_score
进行排序,可以排序的字段类型有:keyword类型、数值类型、地理坐标类型、日期类型等。order by
思想一致。_score
就显得无意义,因此此时无得分,score始终为0。简单案例:sort
声明
GET /hotel/_search
{
"query": {
"match_all": {}
},
// sort与query同级且为数组形式,意味着可以有多种排序定义
"sort": [
{
"price": {
"order": "desc" // 排序字段和排序方式,AES与DESC
}
}
]
}
简写版本
"sort": [
{
"price": "desc"
}
]
地理位置排序:距离案例,结果单位为km
(有点智能)
"sort": [
{
"_geo_distance": {
"location": {
"lat": "18.57",
"lon": "109.70"
},
"order": "asc",
"unit": "km"
}
}
]
简介:使用from
与size
标签。
案例
GET /hotel/_search
{
"query": {
"match_all": {}
},
"from": 100,
"size": 20
}
注意事项:【深度分页】限制
from
与size
标签相加不能大于10000
,否则报错。这是由于Elasticsearch使用倒排索引所产生的限制(倒排索引本身并不适合分页),一般也不会超过10000,但是如果有需求,官方也推荐了两种解决方式如下:
after search
:
scroll
:
原理简介:
<em></em>
注意:
<em></em>
match
而不能为match_all
,因为后一种方式并无关键字!简单实现
GET /hotel/_search { "query": { "match": { "all": "如家" } }, "highlight": { "fields": { "name": { // ES默认搜索字段应该与搜索字段一致,如果不一致需要将:require_field_match=false // 这里查询字段为:all,高亮字段为:name "require_field_match": "false", "pre_tags": "<strong>", "post_tags": "</strong>" } } } }
搜索结果展现形式:新增highlight
字段,高亮后的字段将放在里面,_source
中的原内容并不会被改变,这点需十分注意!
此处 ES 将 API 封装的比较完善,(不同于前面)无需硬编码。
基本查询步骤:
SearchRuquest
对象Request.source().___query()
,塞入QueryBuilder
构建查询条件。注意:
Request.source()
API接口,掌握了该接口就掌握了本节简单实现:matchAllQuery()
SearchRequest request = new SearchRequest("hotel"); // .QueryBuilder中包含绝大多数查询方式 request.source().query( QueryBuilders.matchAllQuery() ); // 发送请求,得到响应数据,获取响应数据(JSON)并解析 SearchResponse response = client.search(request, RequestOptions.DEFAULT); SearchHits hits = response.getHits(); System.out.println("查询到的文档数:"+hits.getTotalHits().value); // 遍历查询到的数据(有分页,默认10条) for (SearchHit hit:hits.getHits()){ String json = hit.getSourceAsString(); System.out.println(json); }
【结果解析】示例图
单字段查询
QueryBuilders.matchQuery("all","如家")
多字段查询
QueryBuilders.multiMatchQuery("如家","name","brand")
精确查询
QueryBuilders.termQuery("brand","如家")
QueryBuilders.rangeQuery("price").gt(100).lte(1000)
复合查询
较复杂
// 建立复合查询“构建器”
BoolQueryBuilder boolQuery = new BoolQueryBuilder();
// request组装复合查询
boolQuery.must(QueryBuilders.termQuery("brand","如家"));
boolQuery.filter(QueryBuilders.rangeQuery("price").gt(100));
request.source().query(boolQuery);
// 同以往:发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
QueryBuilders
选项一览(还有更多没有展示出来)
普通排序
request.source().sort("price",SortOrder.ASC);
距离排序
分页
request.source()
.query(QueryBuilders.matchAllQuery())
.from(56)
.size(20);
高亮
request.source()
.query(QueryBuilders.matchQuery("all", "如家"))
// requireFieldMatch 表示是否与查询字段匹配
.highlighter(new HighlightBuilder().field("name").requireFieldMatch(false));
【高亮结果】解析
搜索、分页、条件过滤、附近、广告置顶
示意图
搜索框功能实现(核心代码)
终极案例
除 附近功能 之外均实现。
@Override public PageResult pageResult(RequestParams params) throws IOException { SearchRequest request = new SearchRequest("hotel"); BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); // 获取搜索关键词 if (params.getKey() == null || "".equals(params.getKey())) { boolQuery.must(QueryBuilders.matchAllQuery()); } else { boolQuery.must(QueryBuilders.matchQuery("all", params.getKey())); } String brand = params.getBrand(); if (StringUtils.isNotBlank(brand)) { boolQuery.filter(QueryBuilders.termQuery("brand", brand)); } // 1.3.城市 String city = params.getCity(); if (StringUtils.isNotBlank(city)) { boolQuery.filter(QueryBuilders.termQuery("city", city)); } // 1.4.星级 String starName = params.getStarName(); if (StringUtils.isNotBlank(starName)) { boolQuery.filter(QueryBuilders.termQuery("starName", starName)); } // 1.5.价格范围 Integer minPrice = params.getMinPrice(); Integer maxPrice = params.getMaxPrice(); if (minPrice != null && maxPrice != null) { maxPrice = maxPrice == 0 ? Integer.MAX_VALUE : maxPrice; boolQuery.filter(QueryBuilders.rangeQuery("price").gte(minPrice).lte(maxPrice)); } // 2.算分函数查询 FunctionScoreQueryBuilder functionScoreQuery = QueryBuilders.functionScoreQuery( boolQuery, // 原始查询,boolQuery new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{ // function数组 new FunctionScoreQueryBuilder.FilterFunctionBuilder( QueryBuilders.termQuery("isAD", true), // 过滤条件 ScoreFunctionBuilders.weightFactorFunction(100) // 算分函数 ) } ); // 设置查询条件 request.source() .query(functionScoreQuery) .from((params.getPage() - 1) * params.getSize()) .size(params.getSize()); // 向 ES 发送请求并获得结果、解析 SearchResponse response = client.search(request, RequestOptions.DEFAULT); SearchHits hits = response.getHits(); List<HotelDoc> hotelDocs = new ArrayList<>(); for (SearchHit hit : hits.getHits()) { String json = hit.getSourceAsString(); hotelDocs.add(JSON.parseObject(json, HotelDoc.class)); } return new PageResult(hits.getTotalHits().value, hotelDocs); }
数据聚合、自动补全、同步、集群
aggregations,聚合
简介:聚合可以实现对文档数据的统计、分析、运算,常见的 3 种类型为
==【注意】==参与聚合的字段类型必须为:
聚合必备的【三要素】
聚合可配置的属性
桶聚合Bucket案例
附加对统计结果
_count
进行排序
GET /hotel/_search
{
"size": 0, // 令文档查询数为0,避免干扰
"aggs": { // 定义聚合
"myAggs": { // 给聚合起个名字
"terms": { // 聚合的类型
"field": "brand", // 对“brand”字段进行聚合,这里选择精确查询 term 模式
"size": 20, // 聚合的结果也会进行分页,这里为20
"order": { // 对聚合结果根据 _count 进行排序
"_count": "desc"
}
}
}
}
}
度量聚合Metric案例
GET /hotel/_search { "size": 0, "aggs": { "myAggs": { "terms": { "field": "brand", "size": 20, "order": { "_count": "desc" } }, // 【桶聚合】内套【度量聚合】,score字段为文档自带,这里对其求 stats 操作 "aggs": { "myScoreAggs": { "stats": { "field": "score" } } } } } }
【重要】:
默认情况下,Bucket聚合将会对索引库中的所有文档做聚合,当索引库很大时这无疑会很消耗性能,我们可以通过添加query条件限制要聚合的文档范围。
即先 query 后 bucket,先查询再聚合。
GET /hotel/_search { "query": { "range": { "price": { "lte": 300 // 只对 300 元以下的酒店作聚合 } } }, "size": 0, "aggs": { "myAggs": { "terms": { "field": "brand", "size": 20, } } } }
标准实现流程:
准备 Request
准备 DSL
设置 Size==0
聚合语句
发出请求
解析结果
聚合代码映射:依次对照
结果解析:获取 Buckets 数据
安装拼音分词器pinyin
(步骤与 IK 分词器一致)
py
后上传至 ES 的 plugin 目录
docker restart es
POST /_analyze
{
"text": "这是一段超长的词语,腾讯你好",
"analyzer": "pinyin"
}
pinyin
分词器分词说明:ES分词器组成说明(3部分)
如何自定义分词器?
在创建索引库时,于 settings 中声明(同时可指定 character filters、tokenizer、tokenizer filter)。
**自定义分词器有什么用?**首先软件中可引入多种开源分词器,我想组合这些分词器(例如:分别在 3 各不同阶段使用不同分词器)以达到最佳效果。意即如果我自定义分词器,直接使用开源分词器也是可以的,只不过在这里我想自定义。
PUT /test { "settings": { "analysis": { "analyzer": { // 自定义分词器 "my_analyzer":{ // 分词器名称 "tokenizer":"ik_max_word", // 2 使用 ik_max_word "filter":"py" // 3 使用使用 py ( py 在下面定义) } }, "filter": { // 自定义 tokenizer filter 过滤器 "py": { // 过滤器名称,下面为属性,具体参考 pinyin 官网文档 "type": "pinyin", "keep_full_pinyin": false, "keep_joined_full_pinyin": true, "keep_original": true, "limit_first_letter_length": 16, "remove_duplicated_term": true, "none_chinese_pinyin_tokenize": false } } } }, // 字段定义,即“建表语句” "mappings": { "properties": { "name":{ "type": "text", "analyzer": "my_analyzer", // 插入数据时,使用【自定义分词器】,即 pinyin +ik "search_analyzer": "ik_smart" // 搜索时不应该使用 pinyin ,只需单独使用 ik } } } }
插入数据并测试
POST /test/_doc/1 { "id": 1, "name": "狮子" } POST /test/_doc/2 { "id": 2, "name": "虱子" } // 查询 1 GET /test/_search { "query": { "match": { "name": "狮子" } } } // 查询 2 GET /test/_search { "query": { "match": { "name": "shizi" } } }
使用【拼音分词器】时应该注意的问题:
为避免搜索到多音字情况,我们应该采取 2 套策略:
建立酒店索引库(新增自动补全字段 suggestion )
DELETE /hotel // 酒店数据索引库 PUT /hotel { "settings": { "analysis": { "analyzer": { // 全文检索 "text_anlyzer": { "tokenizer": "ik_max_word", "filter": "py" }, // 自动补全 "completion_analyzer": { "tokenizer": "keyword", "filter": "py" } }, "filter": { "py": { "type": "pinyin", "keep_full_pinyin": false, "keep_joined_full_pinyin": true, "keep_original": true, "limit_first_letter_length": 16, "remove_duplicated_term": true, "none_chinese_pinyin_tokenize": false } } } }, "mappings": { "properties": { "id":{ "type": "keyword" }, "name":{ "type": "text", "analyzer": "text_anlyzer", "search_analyzer": "ik_smart", "copy_to": "all" }, "address":{ "type": "keyword", "index": false }, "price":{ "type": "integer" }, "score":{ "type": "integer" }, "brand":{ "type": "keyword", "copy_to": "all" }, "city":{ "type": "keyword" }, "starName":{ "type": "keyword" }, "business":{ "type": "keyword", "copy_to": "all" }, "location":{ "type": "geo_point" }, "pic":{ "type": "keyword", "index": false }, "all":{ "type": "text", "analyzer": "text_anlyzer", "search_analyzer": "ik_smart" }, "suggestion":{ "type": "completion", "analyzer": "completion_analyzer" } } } }
更改HotelDoc.java
:新增suggestion字段,类型为 List<String>
@Data @NoArgsConstructor @AllArgsConstructor public class HotelDoc { private Long id; private String name; private String address; private Integer price; private Integer score; private String brand; private String city; private String starName; private String business; private String location; private String pic; private Boolean isAD; private List<String> suggestion; public HotelDoc(Hotel hotel) { this.id = hotel.getId(); this.name = hotel.getName(); this.address = hotel.getAddress(); this.price = hotel.getPrice(); this.score = hotel.getScore(); this.brand = hotel.getBrand(); this.city = hotel.getCity(); this.starName = hotel.getStarName(); this.business = hotel.getBusiness(); this.location = hotel.getLatitude() + ", " + hotel.getLongitude(); this.pic = hotel.getPic(); this.suggestion = Arrays.asList(this.brand, this.business); } }
导入数据
@Test
void importData() throws IOException {
List<Hotel> list = hotelService.list();
BulkRequest bulkRequest = new BulkRequest();
for (Hotel hotel:list){
HotelDoc hotelDoc = new HotelDoc(hotel);
bulkRequest.add(new IndexRequest("hotel")
.id(hotelDoc.getId().toString())
.source(JSON.toJSONString(hotelDoc),XContentType.JSON));
}
client.bulk(bulkRequest,RequestOptions.DEFAULT);
}
DSL测试自动补全功能
GET /hotel/_search
{
"suggest": {
"suggestions": {
// 关键词
"text": "sd",
"completion": {
"field": "suggestion",
// 跳过重复字符
"skip_duplicates":true,
"size":10
}
}
}
}
准备请求,解析结果
SearchRequest request = new SearchRequest("hotel"); request.source().suggest(new SuggestBuilder().addSuggestion( "mySuggestion", SuggestBuilders .completionSuggestion("suggestion") // 字段名 .prefix("sd") .skipDuplicates(true) .size(10) )); // 发送请求,获得结果并解析 SearchResponse response = client.search(request, RequestOptions.DEFAULT); CompletionSuggestion suggestion = response.getSuggest().getSuggestion("mySuggestion"); for ( CompletionSuggestion.Entry.Option option:suggestion.getOptions()){ String text = option.getText().string(); System.out.println(text); }
@GetMapping("suggestion") List<String> suggestion(@RequestParam("key") String prefix) throws IOException { SearchRequest request = new SearchRequest("hotel"); request.source().suggest(new SuggestBuilder().addSuggestion( "mySuggestion", SuggestBuilders .completionSuggestion("suggestion") // 字段名 .prefix(prefix) .skipDuplicates(true) .size(10) )); SearchResponse response = client.search(request, RequestOptions.DEFAULT); CompletionSuggestion suggestion = response.getSuggest().getSuggestion("mySuggestion"); List<String > suggestions=new ArrayList<>(); for ( CompletionSuggestion.Entry.Option option:suggestion.getOptions()){ String text = option.getText().string(); suggestions.add(text); } System.out.println(suggestions.size()); return suggestions; }
ES 的数据来源于 MySQL ,当 MySQL 数据发生改变时,ES也要跟着变化
情境:ES 和 MySQL 分别来自不同的微服务。
3 种不同方案的同步方式:
实现的简单步骤流程,具体步骤请点击
准备 2 个项目
暂时跳过,待到后面有机会应用时自然会访问此章节
单机的 ES 面临 2 个问题:
解决方式:
微服务保护 + 面试三板斧:分布式事务、分布式缓存、分布式消息
阿里 Sentinel,相比 Nginx 更加细粒度
流量控制、隔离降级、授权规则、规则持久化
简介:
8719
,控制台端口自定义,注意两个端口是不一样的东西。什么是雪崩问题?如何解决?
在微服务之间相互调用时,因为个别微服务发生故障而引起整条链路都发生故障的情况。
故障后纠错:超时处理、线程隔离、降级熔断(失败达到一定比例次数时暂停访问)。
故障前预防:流量控制,使用 Sentinel 哨兵模式限制业务访问的QPS,避免服务因流量突增而故障。
常见的服务保护技术对比
基本概念
资源
资源是 Sentinel 的关键概念。它可以是 Java 应用程序中的任何内容,例如,由应用程序提供的服务,或由应用程序调用的其它应用提供的服务,甚至可以是一段代码。在接下来的文档中,我们都会用资源来描述代码块。
只要通过 Sentinel API 定义的代码,就是资源,能够被 Sentinel 保护起来。大部分情况下,可以使用方法签名,URL,甚至服务名称作为资源名来标示资源。
规则
围绕资源的实时状态设定的规则,可以包括流量控制规则、熔断降级规则以及系统保护规则。所有规则可以动态实时调整。
流控降级与容错标准
Rule = target + strategy +fallbackAction
安装步骤
指定控制台端口为 8090(程序端口依旧为 8719),账号 Sentinel,密码123456.
java -Dserver.port=8090 \
-Dsentinel.dashboard.auth.username=sentinel \
-Dsentinel.dashboard.auth.password=123456 \
-jar sentinel-dashboard-1.8.6.jar
代码配置
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
spring:
cloud:
sentinel:
transport:
dashboard: localhost:8090
简介:
即项目内的调用链路,链路中被监控的一个接口就是一个资源。
默认情况下 sentinel 会监控 SpringMVC 的每一个端点( Controller),因此每一个端点就是调用链中的一个资源,我们可对资源进行如下 4 种操作:
流控:流量控制
降级:熔断降级
热点:热点参数限流
授权:授权规则
【注意】
Sentinel 默认只会将 Controller 中的方法标记为“资源”,如果要标记其他方法(例如 Sevice),则要:
@SentinelResource
注解标记方法spring:
cloud:
sentinel:
web-context-unify: false
@SentinelResource("名称")
public void queryGoods(){
// err 会打印出“红色标记”,更容易辨认
System.err.println("查询商品")
}
Sentinel提供了 3 种限流模式
【关联限流】配置
利用 update 限制 query,即 update 更新请求具有更高的优先级。
失败返回数据
1/3
),可以避免冷启动时高并发导致的服务宕机,超过阈值拒绝新的请求。相较于 流控规则 ,热点参数限流规则 更细力度。
简介:对参数相同的请求进行限流。
配置选项
【注意】
Sentinel 的热点限流规则对只属于 Spring MVC 的资源无效,要想生效则必须标识@SentinelResource
注解。
@SentinelResource("hot")
@GetMapping("/hot")
public String hot(){
return "hot榜单";
}
案例
hot
资源的 0 号参数(第一个参数)做统计,相同参数值的请求每秒不能超过5次。101
的请求,阈值应为10。“流控 与 热点”基于
QPS
进行的限流整合 Feign 可以通过
线程数
来进行限流
简介:
虽然限流可以尽量避免因高并发而引起的服务故障,但服务还是会因为其他原因而故障。我们要将这些故障控制在一定的范围内、避免雪崩,就要靠线程隔离和熔断降级。不管是线程隔离还是熔断降级,都是对客户端(调用方)的保护。
Feign整合Sentinel步骤
反馈类
,实现泛型接口FallbackFactory<T>
并注册成 Bean 对象。反馈类
feign:
sentinel:
enabled: true
@Slf4j
public class UserClientFallbackFactory implements FallbackFactory<UserClient> {
@Override
public UserClient create(Throwable throwable) {
return new UserClient() {
@Override
public User findById(Long id) {
log.error("记录失败信息", throwable);
// 发生异常时,返回空对象于前端处理
return new User();
}
};
}
}
@Configuration
public class DefaultFeignConfiguration {
@Bean
public UserClientFallbackFactory factory ( ){
return new UserClientFallbackFactory();
}
}
通过对 Feign 的配置控制后,可以在 Sentinel 中设置限制最大
并发线程数
实现限流
上步补充:
给 FeignClient 编写失败后的降级逻辑可以继承自 2 个接口:
==【线程隔离】==简介
2 种方式。
==【熔断降级】==简介
解决雪崩问题的重要手段,由断路器统计服务的异常比例、慢请求比例,如果超过阈值则会进行熔断(即拒绝服务),一段时间后断路器会再次统计服务异常比例,如果服务良好则恢复正常。
熔断降级,一共由 3 个阶段,分别为:
【断路器】的 3 种熔断策略:
只对 Linux 系统有效,保护系统
简介:
实现:
@Component
public class HeaderOriginParser implements RequestOriginParser {
@Override
public String parseOrigin(HttpServletRequest request) {
// cipher 为网关中自定义的请求头 key
String cipher = request.getHeader("cipher");
if (StringUtils.isEmpty(cipher)){
return "blank";
}
return cipher;
}
}
spring:
cloud:
gateway:
default-filters:
# 网关每次发送请求都会默认携带的请求头 key-value
- AddRequestHeader=cipher,myPassword
实现效果:
http://localhost:8088/order/101
http://localhost:10010/order/101?authorization=admin
简介
在前面我们可以观察到无论是被限流、熔断降级、授权拒绝,被请求的微服务总是会返回相同的响应数据Blocked by Sentinel (flow limiting)
,这对于用户来说并不友好,我们可针对不同的场景定义不同的响应内容。
可定义的异常类型
实现方式:自定义 ___BlockHandler
并实现 BlockExceptionHandler
接口,返回不同内容。
@Component public class SentinelExceptionHandler implements BlockExceptionHandler { @Override public void handle(HttpServletRequest request, HttpServletResponse response, BlockException e) throws Exception { String msg = "未知异常"; int status = 429; // 自适配异常类型,返回不同内容 if (e instanceof FlowException) { msg = "请求限流"; } else if (e instanceof ParamFlowException) { msg = "请求热点参数限流"; } else if (e instanceof DegradeException) { msg = "请求降级"; } else if (e instanceof AuthorityException) { msg = "未授权"; status = 401; } response.setContentType("application/json;charset=utf-8"); response.setStatus(status); response.getWriter().println("{\"msg\": " + msg + ", \"status\": " + status + "}"); } }
待后续寻找最佳方案并实现,官网教程
简介:
Sentinel默认将配置保存在内存中,重启数据丢失。
数据的 3 种控制台管理模式:
【注意点】
在以上 3 种方式种,Push 模式无疑是最好的,但是阿里在开源 Sentinel 的时候并没有附带此模式,而是将其作为商业版(云服务)进行兜售,所以如果我们不想要付费,并且想要实现 Push 模式的规则持久化,则需要自己改写并编译 Sentinel 程序,实现起来相当复杂。
Sentinel整合原有项目非常简单,只需要引入依赖,然后进行简单的 yml 配置即可,但是需要注意 Sentinel 所兼容的 SpringBoot 版本问题(SpringBoot版本太新时,需要降级)。
但是没有持久化就意味着服务终归还是不稳定!!!
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
<version>2021.1</version>
</dependency>
spring:
cloud:
sentinel:
transport:
dashboard: localhost:8090
# 存在一些“循环依赖”的情况,下面配置允许忽略这种情况(否则报错)
main:
allow-circular-references: true
事务基本要求:ACID,原子性、一致性、隔离性、持久性。
本章解决问题:
基于微服务的分布式事务。在分布式系统下,一个业务跨越多个服务或数据源,每个服务都是一个分支事务,要保证所有分支事务最终状态一致,这样的事务就是分布式事务。
CAP定理
1998年,加州大学的计算机科学家 Eric Brewer 提出,分布式系统有 3 个指标:
于是 CAP 定理的内容为:
我们之前搭建的 Elasticsearch 集群属于 CP,不属于 AP。
BASE理论
解决CAP存在的问题
解决的方式:
分布式事务解决方案,http://seata.io/zh-cn/
每个微服务都需配置 Seata,略微繁琐
简介:
Seata 事务中有 3 个重要角色
Seata 就是 TC(作为TC,搭建成功后我们不需要访问它,这是给 TM 和 RM 访问的),企业中需要搭建集群。
Seata提供了 4 种不同的分布式事务处理方案:
修改conf目录下的application.yml
文件
1.4.2版本为
registry.conf
server: port: 7091 spring: application: name: seata-server logging: config: classpath:logback-spring.xml file: path: ${user.home}/logs/seata # 若无以下配置则注释 # extend: # logstash-appender: # destination: 127.0.0.1:4560 # kafka-appender: # bootstrap-servers: 127.0.0.1:9092 # topic: logback_to_logstash console: user: username: seata password: seata seata: # 配置中心 config: type: nacos nacos: server-addr: 127.0.0.1:8848 namespace: "" # 命名空间为空,默认 public group: SEATA_GROUP username: nacos password: nacos data-id: seataServer.properties # 注册中心 registry: type: nacos nacos: server-addr: 127.0.0.1:8848 namespace: "" group: DEFAULT_GROUP username: nacos password: nacos cluster: SH # SH表示上海 # 已经配置了 nacos 作为配置中心,所以这里 store 与 server 不配置 # store: # support: file 、 db 、 redis # mode: file # server: # service-port: 8091 #If not configured, the default is '${server.port} + 1000' security: secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017 tokenValidityInMilliseconds: 1800000 ignore: urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.ico,/console-fe/public/**,/api/v1/auth/login
新建数据库 seata
,在此基础上新增两张表branch_table
与global_table
作事务管理。
SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; DROP TABLE IF EXISTS `branch_table`; CREATE TABLE `branch_table` ( `branch_id` bigint(20) NOT NULL, `xid` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `transaction_id` bigint(20) NULL DEFAULT NULL, `resource_group_id` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `resource_id` varchar(256) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `branch_type` varchar(8) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `status` tinyint(4) NULL DEFAULT NULL, `client_id` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `application_data` varchar(2000) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `gmt_create` datetime(6) NULL DEFAULT NULL, `gmt_modified` datetime(6) NULL DEFAULT NULL, PRIMARY KEY (`branch_id`) USING BTREE, INDEX `idx_xid`(`xid`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact; DROP TABLE IF EXISTS `global_table`; CREATE TABLE `global_table` ( `xid` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `transaction_id` bigint(20) NULL DEFAULT NULL, `status` tinyint(4) NOT NULL, `application_id` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `transaction_service_group` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `transaction_name` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `timeout` int(11) NULL DEFAULT NULL, `begin_time` bigint(20) NULL DEFAULT NULL, `application_data` varchar(2000) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `gmt_create` datetime NULL DEFAULT NULL, `gmt_modified` datetime NULL DEFAULT NULL, PRIMARY KEY (`xid`) USING BTREE, INDEX `idx_gmt_modified_status`(`gmt_modified`, `status`) USING BTREE, INDEX `idx_transaction_id`(`transaction_id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact; SET FOREIGN_KEY_CHECKS = 1;
Nacos 新建配置文件
为 Seata 集群作准备
Nacos创建seataServer.properties
配置文件,修改 MySQL 数据库信息,其余配置默认。
# 数据存储方式,db代表数据库 store.mode=db store.db.datasource=druid store.db.dbType=mysql store.db.driverClassName=com.mysql.cj.jdbc.Driver store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true store.db.user=root store.db.password=数据库密码 store.db.minConn=5 store.db.maxConn=30 store.db.globalTable=global_table store.db.branchTable=branch_table store.db.queryLimit=100 store.db.lockTable=lock_table store.db.maxWait=5000 # 事务、日志等配置 server.recovery.committingRetryPeriod=1000 server.recovery.asynCommittingRetryPeriod=1000 server.recovery.rollbackingRetryPeriod=1000 server.recovery.timeoutRetryPeriod=1000 server.maxCommitRetryTimeout=-1 server.maxRollbackRetryTimeout=-1 server.rollbackRetryTimeoutUnlockEnable=false server.undo.logSaveDays=7 server.undo.logDeletePeriod=86400000 # 客户端与服务端传输方式 transport.serialization=seata transport.compressor=none # 关闭metrics功能,提高性能 metrics.enabled=false metrics.registryType=compact metrics.exporterList=prometheus metrics.exporterPrometheusPort=9898
启动:
Linux 选择.sh
,Windows 选择.bat
。
另外注意这里可能会报nohup: /Library/Internet: No such file or directory
错误,原因是JDK路径查找失败,解决方式见我的另一篇博客。
./bin/seata-server.sh
查看启动日志:/seata/logs/start.out
判断是否启动成功。
微服务中引入依赖并配置连接
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> <!-- 排除旧版本 Seata ,引入 1.6.1 版本 --> <exclusions> <exclusion> <artifactId>seata-spring-boot-starter</artifactId> <groupId>io.seata</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> <version>1.6.1</version> </dependency>
nacos服务名称必须包括 4 部分,而且每个微服务都必须配置这些信息,微服务将根据这些信息去注册 Seata。
namespace
group
serviceName
cluster
seata:
registry:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace: ""
group: DEFAULT_GROUP
application: seata-server # TC 在 Nacos 中的名称
tx-service-group: seata-demo
service:
vgroup-mapping:
seata-demo: SH
分阶段事务模式,几乎所有的主流数据库都对其提供了支持。
示意图:
优点:
缺点:
实现步骤
@GlobalTransaction
seata:
data-source-proxy-mode: XA # 使用 XA 模式
@Override
@GlobalTransactional
public void create(Order order) {
// 创建订单
// 扣用户余额
// 扣库存
}
【补充说明】data-source-proxy-mode
配置的作用:
设置数据源代理模式,Seata 通过劫持数据源data-source
来实现分布式事务的管理,配置后所有事务都将由 Seata 托管。
默认
同样是分阶段事务模式,弥补了 XA 模式中资源锁定周期过长的缺陷,但同时也牺牲了一些安全性。
示意图:允许先成功,后续使用 undo log
进行回滚。
【脏读问题】:
这里阐述比较复杂,总之 AT 模式就是牺牲一定的安全性换来效率
由于各事务在一定程度上存在“独立性”,所以 AT 模式存在“脏读”现象。
AT 模式新增**【全局锁】**用来防止数据脏读,当数据遇到同时 update 请求时,全局锁会限制另一方的提交,直到原来的一方释放全局锁,此时 AT 模式相当于退化为 XA 模式。
但是全局锁只能作用于 Seata 事务,也就是说对非 Seata 管理的事务无效,在这种情况下依旧会产生“脏读”现象(无法解决)。幸运的是,Seata 能察觉这种现象的产生并抛出异常,我们可以捕获这种异常并编写代码发送邮件告知服务管理者。
当数据没有发生“脏读”问题时,AT模式效率较高,原因如下:
事务分布式提交,突破“木桶效应”限制。
// 例如在下面字段中,当 name 被某事务支配时,money字段并不受影响
{"id":1,"name":"张三","money":100}
【 XA模式 与 AT模式 总结】
AT模式牺牲的只是一些比较小的安全性(sava 与 update 属于“小概率”操作),换来的是极大的效率提升,在业务sava 与 update 次数较少且安全性要求不高的数据库,应优先使用AT模式。
实现步骤
lock_table
:导入到与 TC(即 Seata 服务端)相关联的数据库undo_log
:导入到与微服务相关的数据库(也就是在每个相关的微服务数据库中都需要导入undo_log
表)application.yml
配置文件,声明为使用 AT 模式(其实默认模式)。DROP TABLE IF EXISTS `lock_table`;
CREATE TABLE `lock_table` (
`row_key` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`xid` varchar(96) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`transaction_id` bigint(20) NULL DEFAULT NULL,
`branch_id` bigint(20) NOT NULL,
`resource_id` varchar(256) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`table_name` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`pk` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`gmt_create` datetime NULL DEFAULT NULL,
`gmt_modified` datetime NULL DEFAULT NULL,
PRIMARY KEY (`row_key`) USING BTREE,
INDEX `idx_branch_id`(`branch_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`branch_id` bigint(20) NOT NULL COMMENT 'branch transaction id',
`xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'global transaction id',
`context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` longblob NOT NULL COMMENT 'rollback info',
`log_status` int(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` datetime(6) NOT NULL COMMENT 'create datetime',
`log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = 'AT transaction mode undo table' ROW_FORMAT = Compact;
seata:
data-source-proxy-mode: AT
TCC模式效率很高,但过于复杂
具体案例见:链接
简介
需编写代码分别实现 3 个阶段
示意图:
优点:
缺点:
【名词解释】
举例
TCC模式的“简化版”,牺牲了一定的安全性,存在数据“脏读”风险
Saga模式在实际中很少被运用
简介:
Saga模式是 Seata 提供的长事务解决方案,具体分为两个阶段:
优点:
缺点:
单点 Redis 存在问题(附解决方案):
下面将根据以上 4 个问题实现解决方案。
Docker 安装 Redis
redis.conf
(必须设置密码,防止漏洞攻击)与存放目录mkdir -p /myredis/conf/
vim /myredis/conf/redis.conf
requirepass 密码
docker run -d\
-v /myredis/conf:/usr/local/etc/redis \
--name myredis \
-p 6379:6379 \
redis \
redis-server /usr/local/etc/redis/redis.conf
redis-cli -h 175.178.20.191 -p 6379
# 回车后
auth 密码
Redis Database Backup file:Redis数据备份文件,也叫“Redis数据快照”。简单来说就是把内存中的所有数据都记录到磁盘中,当发生故障重启时,从磁盘读取快照恢复数据。
快照文件称为 RDB 文件,默认保存在当前运行目录,我们由两种生成方式:
另外:Redis在停机时默认会自动执行一次 RDB。
存在 3 种消息丢失类型
【注意】:
在确认机制发送消息时,需要给每个消息设置全局唯一的 id,用以区分不同的消息,避免 ack 冲突。
简介
RabbitMQ 提供了 publisher confirm
机制来避免消息在发送到 MQ 过程中丢失。即消息在发送到 MQ 后,会返回结果给发送方,表示消息投递状态。有两种结果:
publisher-confirm
,发送者确认
ack
nack
publisher-return
,发送者回执
ack
及路由失败原因实现方式:
publisher-confirm
与publisher-return
响应代码编写配置文件
spring:
rabbitmq:
host: 175.178.20.191
port: 5672
username: user
password: 123
virtual-host: /
# 下面配置为本节新增 RabbitMQ 配置
template:
mandatory: true
publisher-confirm-type: correlated
publisher-returns: true
配置说明:
template.mandatory
:定义消息路由失败时的策略,true
表示调用 ReturnCallback ;false
表示丢弃消息。publisher-confirm-type
:开启 publisher-confirm ,支持两种类型:
publish-returns
:开启 publish-return 功能,定义 ReturnCallback【测试时使用】自建消息队列并绑定路由
自建队列
【代码编写】
ReturnCallback
编写:每个 RabbitTemplete 只能配置一个 ReturnCallback(而 RabbitTemplete 也是全局唯一的),因此我们可以利用ApplicationContextAware进行配置(方式多样,唯一即可)。@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取 RabbitTemplate
RabbitTemplate rt = applicationContext.getBean(RabbitTemplate.class);
// 设置 ReturnCallback:失败记录日志
// 【注意】:这里可编写消息重发代码,或邮件通知管理员
rt.setReturnCallback((msg, replyCode, replyText, exchange, routingKey) -> {
log.info("消息发送到队列失败,应答码{},原因{},交换机{},路由键{},消息{}",
replyCode, replyText, exchange, routingKey, msg);
});
}
}
ConfirmCallback
:每次发送消息时携带(可配置多个),维护其全局唯一 ID 。
@Test public void postMsg() { String msg = "Hello RabbitMQ!"; // 定义异步回调 CorrelationData,并赋予全局唯一 ID(UUID,作辨识) CorrelationData correlation = new CorrelationData(UUID.randomUUID().toString()); // 类似 AJAX,3种结果 correlation.getFuture().addCallback( result -> { if (result.isAck()) { log.debug("消息发送成功投递到交换机,ID:{}", correlation.getId()); } else { log.error("消息投递到交换机失败,ID:{},原因:{}", correlation.getId(), result.getReason()); } }, ex -> { log.error("消息发送失败,ID:{},原因:{}", correlation.getId(), ex.getMessage()); } ); // 发送消息时附加上异步回调 correlation 的定义 rabbitTemplate.convertAndSend("amq.direct", "simple", msg, correlation); // 这里是 Test 测试环境,休眠 2s 等待消息的回执 // 否则 MQ 会收不到消息回执,而认为消息投递到交换机失败 Thread.sleep(2000); }
测试
成功
提供错误的路由地址
rabbitTemplate.convertAndSend("error.amq.direct", "simple", msg, correlation);
提供错误的队列名
rabbitTemplate.convertAndSend("amq.direct", "error.simple", msg, correlation);
SpringAMQP 规范定义路由、队列以及消息的创建默认都是
durable 持久化
的。
简介:MQ 默认内存存储消息,开启持久化功能可以避免缓存在 MQ 中的消息丢失。
类型分类
durable
:持久化transient
:暂时性的持久化(代码创建版)
以下都是默认持久化的,如果需要暂时性的配置再更改即可
@Bean
public DirectExchange simpleExchange(){
// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否删除
return new DirectExchange("simple.direct",true,false);
// return new DirectExchange("simple.direct",false,false);
}
@Bean
public Queue simpleQueue(){
return QueueBuilder.durable("simple.queue").build();
// return QueueBuilder.nonDurable("simple.queue").build();
}
Message message = MessageBuilder.withBody("msg".getBytes("UTF-8"))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
// .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
.build();
rabbitTemplate.convertAndSend(message);
持久化(图形界面版):
Persistent(代号2)
如何查看某条消息是否属于“持久化消息”?
RabbitMQ 支持消费者确认机制,即消费者在处理完信息后向MQ发送回执,MQ收到回执之后才会正式删除该消息。
SpringAMQP 可以配置 3 种【确认模式】:
消息接收方中配置
spring:
rabbitmq:
host: 175.178.20.191
port: 5672
username: user
password: 123
virtual-host: /
listener:
simple:
prefetch: 1
# 配置消息确认机制
acknowledge-mode: auto
【注意】:当确认模式为 “auto” (而且就是这种方式),没有配置【失败重试模式】时,生产者的消息会一直处于“悬挂”状态(即每次都没有被真正消费),消费者会无限循环的从生产者获取消息,造成严重的资源空转浪费。
【失败重试模式】设置:
注意
initial-interval
规定的是第一次立即读取失败后的等待时间,并不是第一次读取前的等待时间!需理。
spring: rabbitmq: host: 175.178.20.191 port: 5672 username: user password: 123 virtual-host: / listener: simple: prefetch: 1 acknowledge-mode: auto # 设置【失败重试模式】 retry: enabled: true initial-interval: 1000 # 第一次立即读取,第二次(即初始等待时长)为 1s multiplier: 2 # 下次等待时长倍数,下次等待时长=上次等待时长 * 等待时长倍数 max-attempts: 3 # 最大重试次数 max-interval: 60000 # 最大等待时间间隔(我这可不设) stateless: true # true表无状态(默认),false表有状态(业务包含事务时需设)
在上述设置情境,消息一旦达到重试次数的限制后,即被丢弃。然而有时候我们却并不想直接把消息丢弃,而是想把它保存下来(例如用日志的形式),这时候就需要更改**【消费者失败信息处理策略】**,我们有 3 种形式:
实际就是覆盖 Spring 默认的 Bean:
MessageRecoverer(是接口)
我们以实现第 3 种方式为例:
代码创建负责处理“已经死亡的信息”的交换机、队列,并绑定两者
@Configuration public class CommonConfig { @Bean public DirectExchange errorExchange() { return new DirectExchange("error.direct"); } @Bean public Queue errorQueue() { return new Queue("error.queue", true); } @Bean public Binding errorBinding() { return BindingBuilder.bind(errorQueue()) .to(errorExchange()) .with("error"); } }
定义RepublishMessageRecoverer
(即覆盖 Spring 默认的 Bean):
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
public MessageRecoverer RepublishMessageRecoverer() {
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
概念理解:
dead-letter-exchange
属性,并且指定了一个交换机,那么队列中的所有死信就会投递到这个交换机中,而这个交换机也被成为“死信交换机”。【死信交换机】与上节的【消费者失败信息处理策略】区别:
Time To Live,存活时间(默认未设置)
如果一个队列中的消息在 TTL 结束时仍未被消费,则会变为“死信”,具体可以分为 2 种情况:
**注意:**如果两者同时设置了,则以【时间短的】为准!
应用
简单实现:设置超时时间 与 延迟消费者对消息的接收
@Bean public DirectExchange ttlExchange() { return new DirectExchange("ttl.direct"); } @Bean public Queue ttlQueue() { return QueueBuilder.durable("ttl.queue") .ttl(10000) // 指定超时后转发的“死信交换机”与其 routingKey .deadLetterExchange("dl.direct") .deadLetterRoutingKey("dl") .build(); } @Bean public Binding ttlBinding() { return BindingBuilder.bind(ttlQueue()) .to(ttlExchange()) .with("ttl"); }
@RabbitListener(bindings = @QueueBinding(
value = @Queue("dl.queue"),
exchange = @Exchange("dl.direct"),
key = "dl"
))
public void listenDlQueue(String msg){
log.info("接收到 dl.queue 的延迟消息:{}",msg);
}
@Test
public void testTTLMsg( ) throws UnsupportedEncodingException {
Message msg =MessageBuilder
.withBody("hello ttl".getBytes("UTF-8"))
.setExpiration("5000")
.build();
// 消息 ID,需要封装到 CorrelationData 中
CorrelationData correlation = new CorrelationData(UUID.randomUUID().toString());
// 发送消息
rabbitTemplate.convertAndSend("ttl.direct","ttl",msg,correlation);
}
上节利用 TTL 结合死信交换机的方式虽然能实现消息的延迟接收,但是我们可以有更加简便的办法。
延迟队列的使用场景:
“延迟插件”原理:
对官方原生的路由 Exchange 做了功能升级,衍生出 DelayExchange ,其会将接收到的消息暂存在内存中直至“过期”(而官方的 Exchange 是无法存储消息的),过期后将消息投递到队列中。
安装“延迟队列”插件
前提:安装 RabbitMQ 时需创建“配置”插件目录容器卷
插件全称:
rabbitmq_delayed_message_exchange-3.11.1.ez
DelayExchange
插件,点击 release 进入 GitHub 下载docker volume inspect 容器卷名
docker exec -it 容器名 bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
docker restart 容器名
插件安装成功之后,我们就可以在发送消息时直接指定消息的延迟时间,而无需其他繁杂配置。
【延迟队列实现】
x-delay
并附上时间数值即可。向 DelayExchange 发送消息
@RabbitListener(bindings = @QueueBinding(
value = @Queue("delay.queue"),
// delayed 属性为 true
exchange = @Exchange(name = "delay.direct",delayed ="true" ),
key = "delay"
))
public void delayQueue(String msg){
log.info("接收到 dl.queue 的延迟消息:{}",msg);
}
@Bean
public DirectExchange delayExchange() {
return ExchangeBuilder.directExchange("delay.direct")
.delayed() // 设置则属性为 true
.build();
}
// 这里代码只负责创建,后续自行绑定队列
发送消息样板:添加请求头x-delay : 时间(单位毫秒)
即可。
Message msg =MessageBuilder
.withBody("消息体.getBytes("UTF-8"))
.setHeader("x-delay",5000) // 设置延迟时间
.build();
// 消息 ID,需要封装到 CorrelationData 中
CorrelationData correlation = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("delay.direct","delay",msg,correlation);
消息堆积问题:
当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直至达到上限;这时最早接收到的消息很有可能就会成为“死信”。
解决消息堆积的 3 种思路:
惰性队列
从 RabbitMQ 3.6.0 开始,新增 Lazy Queues (惰性队列)概念。
设置惰性队列的 2 种方式:
x-queue-mode
属性为lazy
@RabbitListener(bindings = @QueueBinding(
// 设置为“惰性队列”
value = @Queue(name="dl.queue",
arguments=@Argument(name="x-queue-mode",value="lazy")),
exchange = @Exchange("dl.direct"),
key = "dl"
))
public void listenDlQueue(String msg){
log.info("接收到 dl.queue 的延迟消息:{}",msg);
}
@Beam
public void lazyQueue(){
return QueueBuilder.durable("lazy.queue").lazy().build(;)
}
queue-mode
为lazy
# 正则表达式匹配,--apply-to queues 令所有匹配的队列属性值均修改
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
伪集群搭建,后续完善
注意!集群是指的是
Queue队列
集群!
RabbitMQ由 Erland语言(面向并发)
编写,天然支持集群模式,传统的 RabbitMQ 支持 2种集群模式:
镜像集群虽然支持集群,但是主从同步并不是强一致的,在某些情况下可能存在数据丢失的风险。因此官方在 RabbitMQ 3.8 版本之后推出了新的集群模式仲裁队列来代替镜像集群,其底层使用 Raft 协议确保主从数据一致。
简介:Classis Cluster,普通集群、经典集群。
特性
下面为 黑马程序员 提供的“在相同 Docker 环境”的伪集群搭建教程
我们先来看普通模式集群,我们的计划部署3节点的mq集群:
主机名 | 控制台端口 | amqp通信端口 |
---|---|---|
mq1 | 8081 —> 15672 | 8071 —> 5672 |
mq2 | 8082 —> 15672 | 8072 —> 5672 |
mq3 | 8083 —> 15672 | 8073 —> 5672 |
集群中的节点标示默认都是:rabbit@[hostname]
,因此以上三个节点的名称分别为:
rabbit@mq1
rabbit@mq2
rabbit@mq3
获取cookie
RabbitMQ底层依赖于Erlang,而Erlang虚拟机就是一个面向分布式的语言,默认就支持集群模式。集群模式中的每个RabbitMQ 节点使用 cookie 来确定它们是否被允许相互通信。
要使两个节点能够通信,它们必须具有相同的共享秘密,称为Erlang cookie。cookie 只是一串最多 255 个字符的字母数字字符。
每个集群节点必须具有相同的 cookie。实例之间也需要它来相互通信。
我们先在之前启动的mq容器中获取一个cookie值,作为集群的cookie。执行下面的命令:
docker exec -it mq cat /var/lib/rabbitmq/.erlang.cookie
可以看到cookie值如下:
FXZMCVGLBIXZCDEMMVZQ
接下来,停止并删除当前的mq容器,我们重新搭建集群。
docker rm -f mq
准备集群配置
在/tmp目录新建一个配置文件 rabbitmq.conf:
vim /tmp/rabbitmq.conf
文件内容如下:
loopback_users.guest = false
listeners.tcp.default = 5672
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq1
cluster_formation.classic_config.nodes.2 = rabbit@mq2
cluster_formation.classic_config.nodes.3 = rabbit@mq3
再创建一个文件,记录cookie
# 写入cookie
echo "FXZMCVGLBIXZCDEMMVZQ" > /tmp/.erlang.cookie
# 修改cookie文件的权限
chmod 600 .erlang.cookie
echo "LNFBFJDGJUGVBTXDJJYE" > .erlang.cookie
准备三个目录,mq1、mq2、mq3:
cd /tmp
# 创建目录
mkdir mq1 mq2 mq3
然后拷贝rabbitmq.conf、cookie文件到mq1、mq2、mq3:
# 进入/tmp
cd /tmp
# 拷贝
cp rabbitmq.conf mq1
cp rabbitmq.conf mq2
cp rabbitmq.conf mq3
cp .erlang.cookie mq1
cp .erlang.cookie mq2
cp .erlang.cookie mq3
启动集群
创建一个网络:
docker network create mq-net
运行命令
docker run -d --net mq-net \
-v ${PWD}/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq1 \
--hostname mq1 \
-p 8071:5672 \
-p 8081:15672 \
rabbitmq:3.8-management
docker run -d --net mq-net \
-v ${PWD}/mq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq2 \
--hostname mq2 \
-p 8072:5672 \
-p 8082:15672 \
rabbitmq:3.8-management
docker run -d --net mq-net \
-v ${PWD}/mq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq3 \
--hostname mq3 \
-p 8073:5672 \
-p 8083:15672 \
rabbitmq:3.8-management
访问网页,搭建成功
选择节点添加队列
特性:
总结如下:
镜像模式的配置
镜像模式的配置有3种模式:
ha-mode | ha-params | 效果 |
---|---|---|
准确模式exactly | 队列的副本量count | 集群中队列副本(主服务器和镜像服务器之和)的数量。count如果为1意味着单个副本:即队列主节点。count值为2表示2个副本:1个队列主和1个队列镜像。换句话说:count = 镜像数量 + 1。如果群集中的节点数少于count,则该队列将镜像到所有节点。如果有集群总数大于count+1,并且包含镜像的节点出现故障,则将在另一个节点上创建一个新的镜像。 |
all | (none) | 队列在群集中的所有节点之间进行镜像。队列将镜像到任何新加入的节点。镜像到所有节点将对所有群集节点施加额外的压力,包括网络I / O,磁盘I / O和磁盘空间使用情况。推荐使用exactly,设置副本数为(N / 2 +1)。 |
nodes | node names | 指定队列创建到哪些节点,如果指定的节点全部不存在,则会出现异常。如果指定的节点在集群中存在,但是暂时不可用,会创建节点到当前客户端连接到的节点。 |
这里我们以rabbitmqctl命令作为案例来讲解配置语法。
语法示例:
exactly模式
rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
rabbitmqctl set_policy
:固定写法ha-two
:策略名称,自定义"^two\."
:匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以two.
开头的队列名称'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
: 策略内容
"ha-mode":"exactly"
:策略模式,此处是exactly模式,指定副本数量"ha-params":2
:策略参数,这里是2,就是副本数量为2,1主1镜像"ha-sync-mode":"automatic"
:同步策略,默认是manual,即新加入的镜像节点不会同步旧的消息。如果设置为automatic,则新加入的镜像节点会把主节点中所有消息都同步,会带来额外的网络开销all模式
rabbitmqctl set_policy ha-all "^all\." '{"ha-mode":"all"}'
ha-all
:策略名称,自定义"^all\."
:匹配所有以all.
开头的队列名'{"ha-mode":"all"}'
:策略内容
"ha-mode":"all"
:策略模式,此处是all模式,即所有节点都会称为镜像节点nodes模式
rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
rabbitmqctl set_policy
:固定写法ha-nodes
:策略名称,自定义"^nodes\."
:匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以nodes.
开头的队列名称'{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
: 策略内容
"ha-mode":"nodes"
:策略模式,此处是nodes模式"ha-params":["rabbit@mq1", "rabbit@mq2"]
:策略参数,这里指定副本所在节点名称测试
我们使用exactly模式的镜像,因为集群节点数量为3,因此镜像数量就设置为2.
运行下面的命令:
docker exec -it mq1 rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
下面,我们创建一个新的队列:
Quorum:仲裁,3.8版本之后出现的功能,约定大于配置,目的在于取代镜像集群。
默认count值为5,即 1主4从
。
添加仲裁队列
在任意控制台添加一个队列,一定要选择队列类型为Quorum类型。
在任意控制台查看队列:
可以看到,仲裁队列的 + 2字样。代表这个队列有2个镜像节点。
因为仲裁队列默认的镜像数为5。如果你的集群有7个节点,那么镜像数肯定是5;而我们集群只有3个节点,因此镜像数量就是3.
加入集群
1)启动一个新的MQ容器:
docker run -d --net mq-net \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq4 \
--hostname mq5 \
-p 8074:15672 \
-p 8084:15672 \
rabbitmq:3.8-management
2)进入容器控制台:
docker exec -it mq4 bash
3)停止mq进程
rabbitmqctl stop_app
4)重置RabbitMQ中的数据:
rabbitmqctl reset
5)加入mq1:
rabbitmqctl join_cluster rabbit@mq1
6)再次启动mq进程
rabbitmqctl start_app
增加仲裁队列副本
我们先查看下quorum.queue这个队列目前的副本情况,进入mq1容器:
docker exec -it mq1 bash
执行命令:
rabbitmq-queues quorum_status "quorum.queue"
结果:
现在,我们让mq4也加入进来:
rabbitmq-queues add_member "quorum.queue" "rabbit@mq4"
结果:
再次查看:
rabbitmq-queues quorum_status "quorum.queue"
查看控制台,发现quorum.queue的镜像数量也从原来的 +2 变成了 +3:
简介:
应用实例:
开源产品
英文官网:https://min.io|中文官网:https://www.minio.org.cn/
MinIO 是什么?
MinIO 是一个非常轻量的服务,可以很简单的和其他应用的结合使用,它兼容亚马逊 S3 云存储服务接口(亚马逊云的 S3 API 接口协议是在全球范围内达到共识的对象存储协议,是世界范围内认可的标准),非常适合于存储大容量非结构化的数据,例如图片、视频、日志文件、备份数据和容器/虚拟机镜像等。我国的企业通常使用阿里云、腾讯云提供的 OSS 云端存储服务,不过对于一些敏感信息,可以“自建 OSS ”。
它一大特点就是轻量,使用简单,功能强大,支持各种平台,单个文件最大 5TB,兼容 Amazon S3接口,提供了 Java、Python、GO等多版本SDK支持。
特点:
纠错码 erasure code
和校验和 checksum
保护数据免受硬件故障和数据损坏,即使丢失一半数量(N/2)的硬盘,仍然可以恢复数据。read-after-write
和 list-after-write
一致性模型纠错码 erasure code 简介
纠删码是一种恢复丢失和损坏数据的数学算法,传输过程中发生错误后能在收端自行发现或纠正的码。为使一种码具有检错或纠错能力,须对原码字增加多余的码元。
Minio 将数据分块冗余的分散存储在各各节点的磁盘上,所有的可用磁盘组成一个集合,上图由8块硬盘组成一个集合,当上传一个文件时会通过纠删码算法计算对文件进行分块存储,除了将文件本身分成4个数据块,还会生成4个校验块,数据块和校验块会分散的存储在这 8 块硬盘上。
使用纠删码的好处是即便丢失一半数量(N/2)的硬盘,仍然可以恢复数据。在上图中,当丢失3个硬盘时,依旧可读可写;当丢失4个硬盘时,只能读不能写;当丢失数量大于一半即 4 个硬盘时,数据无法恢复。
纠错码 erasure code 工作流程
当数据对象在MinIO集群中进行存储时,先进行纠删分片,后打散存储在各硬盘上。具体为:
MinIO 恢复过程:
删除一个目录,稍等片刻删除的目录自动恢复。
一些思想:
Docker安装
docker run \
-p 9000:9000 \
-p 9090:9090 \
-d --restart=always \
-e "MINIO_ACCESS_KEY=minioadmin" \
-e "MINIO_SECRET_KEY=minioadmin" \
-v /home/minio/data:/data \
-v /home/minio/config:/root/.minio \
--name minio \
minio/minio:latest \
server /data --console-address ":9090" -address ":9000"
端口9000
/9090
,账号密码默认minioadmin
。
使用纠错码(8份):存储的文件被拆分,平均存储在 8 份硬盘。
实测存储 274KB
,占用硬盘:67*8+4*16=600KB
(其中 4*16 表示目录所占用的存储,即最小块 4KB * 16),当然这里只是大致估算,并不代表最终准确值,有印象即可。
docker run -d \ -p 9001:9001 \ -p 9091:9091 \ -e "MINIO_ACCESS_KEY=minioadmin" \ -e "MINIO_SECRET_KEY=minioadmin" \ -v /home/minio/data1:/data1 \ -v /home/minio/data2:/data2 \ -v /home/minio/data3:/data3 \ -v /home/minio/data4:/data4 \ -v /home/minio/data5:/data5 \ -v /home/minio/data6:/data6 \ -v /home/minio/data7:/data7 \ -v /home/minio/data8:/data8 \ -v /home/minio/config:/root/.minio \ --name minio1 \ minio/minio server /data1 /data2 /data3 /data4 /data5 /data6 /data7 /data8 \ --console-address ":9091" -address ":9001"
tree -h \
/home/minio/data1 \
/home/minio/data2 \
/home/minio/data3 \
/home/minio/data4 \
/home/minio/data5 \
/home/minio/data6 \
/home/minio/data7 \
/home/minio/data8
设置账号密码及权限
AccessKey
与SecretKey
为用户创建AccessKey
与SecretKey
(相当于受限的账号密码),用以在其他客户端中声明使用。
下载保存,备用
MinIO 使用桶来组织对象,桶类似于文件系统中的文件夹或目录,其中每个桶可以容纳任意数量的对象。
赋予桶 public
权限、上传文件,然后可以通过网址来访问文件,如:
http:// 域名:端口 / 桶名 / 文件名
http://127.0.0.1:9000/testbucket/1.jpeg
MinIO官方提供了许多语言的 SDK
引入依赖
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>8.5.1</version>
</dependency>
上传文件
可将 MinioClient 配置成 Bean 对象
创建 MinioClient 对象需要提供accessKey
和secretKey
(由具有读写权限的账户创建)
// 创建 MinioClient 对象 MinioClient minioClient = MinioClient.builder() .endpoint("http://175.178.20.191:9000") .credentials("HlaV03Fck1XuwE4X", "Sp5CeqEVtasxcgkJ5ZhPJPsFoRknUlSS") .build(); // 如果桶不存在则创建 String bucket = "testbucket"; if (!minioClient.bucketExists(BucketExistsArgs.builder().bucket(bucket).build())) { minioClient.makeBucket(MakeBucketArgs.builder().bucket(bucket).build()); } // 声明上传文件(可定义多层文件夹) UploadObjectArgs uploadObject = UploadObjectArgs.builder() .bucket(bucket) .object("finename/credentials.json") .filename("credentials.json") .build(); // 正式上传文件 minioClient.uploadObject(uploadObject); System.out.println("上传成功~");
查询文件及下载
GetObjectArgs getObject = GetObjectArgs.builder()
.bucket(bucket)
.object("1.jpeg")
.build();
// 判断文件是否存在及创建输出流
InputStream input = minioClient.getObject(getObject);
FileOutputStream output = new FileOutputStream(getObject.object());
// 存在则下载
IOUtils.copy(input, output);
// 关闭输出流
output.close();
System.out.println("下载成功");
tips:输入流与输出流之间,可以使用 Spring 工具类 IOUtils
进行“快捷拷贝”
IOUtils.copy( source , target );
删除文件
minioClient.removeObject(
RemoveObjectArgs.builder().bucket(bucket).object("credentials.json").build()
);
编写:通用的 Service 层文件传输接口
暂时搭建失败!
简介:
分布式 MinIO 能够将多块硬盘(可以不在同一台机器上)组成一个对象存储 服务,分布式Minio里所有的节点必须拥有相同的access秘钥和secret秘钥才能建立联接,即accessKey
和secretKey
一样。
分布式MinIO可以通过 Docker Compose
或者 Swarm mode
进行部署。这两者之间的主要区别是 Compose 只实现单主机多容器部署(测试环境),而 Swarm 模式能实现多主机多容器部署(生产环境)。
集群原理:
MinIO分布式集群是指在多个服务器节点均部署MinIO服务,并将其组建为分布式存储集群,对外提供标准S3接口以进行统一访问。MinIO采用去中心化无共享架构,各节点间为对等关系,连接至任一节点均可实现对集群的访问,我们可以使用 Nginx 对节点进行轮询。
实战(搭建失败):
2台机器、4个硬盘,硬盘序号一致
175.178.20.191 minio1
47.94.55.73 minio2
第一台机器
docker run -d \
-p 9000:9000 \
-p 9090:9090 \
--net=host \
-e "MINIO_ROOT_USER=minioadmin" \
-e "MINIO_ROOT_PASSWORD=minioadmin" \
-v /home/minio/data1:/data1 \
-v /home/minio/data2:/data2 \
-v /home/minio/data3:/data3 \
-v /home/minio/data4:/data4 \
-v /home/minio/config:/root/.minio \
--name minio \
minio/minio \
server http://minio{1...2}/data{1...4} \
--console-address ":9090" -address ":9000"
第二台机器
docker run -d \
-p 9001:9001 \
-p 9091:9091 \
--net=host \
-e "MINIO_ROOT_USER=minioadmin" \
-e "MINIO_ROOT_PASSWORD=minioadmin" \
-v /home/minio/data1:/data1 \
-v /home/minio/data2:/data2 \
-v /home/minio/config:/root/.minio \
--name minio2 \
minio/minio \
server http://minio{1...2}/data{1...4} \
--console-address ":9091" -address ":9001"
测试
docker rm -f $(docker ps -a)
docker run -d --name minio \
-p 9000:9000 \
-p 9001:9001 \
--restart=always --net=host \
-e MINIO_ACCESS_KEY=minio \
-e d=minio123 \
-v /data/config:/root/.minio \
-v /data/data1:/data1 \
-v /data/data2:/data2 \
-v /data/data3:/data3 \
-v /data/data4:/data4 \
minio/minio server http://minio{1...2}/data{1...4} \
--console-address ":9001"
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。