赞
踩
1. 互联网项目与传统项目
互联网项目架构的特点:
传统项目和互联网项目的不同:
用户体验:
大型互联网项目架构的目标:
衡量网站的性能指标:
响应时间:指执行一个请求从开始到最后收到响应数据所花费的总体时间。
并发数:指系统同时能处理的请求数量。
并发连接数:每秒钟服务器连接的总 TCP 数量(客户端向服务器发起请求,并建立了 TCP 连接)。
请求数:也称为 QPS(Query Per Second),即每秒的请求数。
并发用户数:单位时间内有多少用户。
吞吐量:指单位时间内系统能处理的请求数量(通常以 QPS 或 TPS 来衡量)。
一个事务是指一个客户机向服务器发送请求然后服务器做出反应的过程(即客户机在发送请求时开始计时,收到服务器响应后结束计时,以此来计算使用的时间和完成的事务个数)。
1 个页面的 1 次访问,只会形成 1 个 TPS;但 1 次页面请求,可能产生多次对服务器的请求,就会有多个 QPS 。
通常:QPS >= 并发连接数 >= TPS
2. 程序三高
1)高并发
高并发(High Concurrency)是互联网分布式系统架构设计中必须考虑的因素之一。当多个进程或线程同时(或着说在同一段时间内)访问同一资源时会产生并发问题,因此需要通过专门的设计来保证系统能够同时(并发)正确处理多个请求。
2)高性能
简单地说,高性能(High Performance)就是指程序处理速度快、耗能少。与性能相关的一些指标如下:
高并发和高性能是紧密相关的,提高应用的性能,可以提高系统的并发能力。
应用性能优化时,对于计算密集型和 I/O 密集型还是有很大差别,需要分开来考虑。
水平扩展(Scale Out):只要增加服务器数量,就能线性扩充系统性能。通常增加服务器资源(CPU、内存、服务器数量),大部分时候是可以提高应用的并发能力和性能 (前提是应用能够支持多任务并行计算和多服务器分布式计算才行)。但水平扩展对系统架构设计是有要求的,难点在于:如何在架构各层进行可水平扩展的设计。
3)高可用
高可用性(High Availability)通常用来描述一个系统经过专门的设计,从而减少停工时间,保证服务的持续可用。
如高可用性集群就是保证业务连续性的有效解决方案。
1. 提高服务器性能(单机)
硬件/系统级别的解决方案:
应用级别的解决方案:
随着业务的不断增加,服务器性能很快又到达瓶颈。不管是提升单机硬件性能,还是提升单机架构性能,都有一个致命的不足:单机性能总是有极限的。所以互联网系统架构对高性能的解决方案还是水平扩展。
2. 增加服务器数量(DNS 负载均衡)
DNS(Domain Name System,域名系统),因特网上作为域名和 IP 地址相互映射的一个分布式数据库,能够使用户更方便地访问互联网,而不用去记住能够被机器直接读取的 IP 数串。通过主机域名得到该域名对应的 IP 地址的过程叫做域名解析。DNS 协议运行在 UDP 协议之上,使用端口号 53。
循环复用 DNS 是一个普遍使用的在 Web 服务器上负载均衡的解决方案。
- http://www.company.cn : 192.168.1.100
- 192.168.1.101
- 192.168.1.102
弊端:循环复用 DNS 将传入的 IP 请求映射到定义的一系列循环形式的服务器。一旦其中的服务器发生故障,循环复用 DNS 会继续把请求发送到这个故障服务器,直到把该服务器从 DNS 中移走为止。在这之前,许多用户必须等到 DNS 连接超时以后才能成功地访问目标网站(正常运行的服务器)。
3. 负载均衡
由于现有系统的各个核心部分随着业务量、访问量和数据流量的快速增长,其处理能力和计算强度也需要相应地增大,使得单一的服务器设备根本无法承担。在此情况下,如果扔掉现有设备去做大量的硬件升级,这样将造成现有资源的浪费,而且如果再面临下一次业务量的提升时,这又将导致再一次硬件升级的高额成本投入,甚至性能再卓越的设备也不能满足当前业务量增长的需求。
针对此情况而衍生出来的一种廉价、有效、透明的方法以扩展现有网络设备和服务器的带宽、增加吞吐量、加强网络数据处理能力、提高网络的灵活性和可用性的技术就是负载均衡(Load Balance)。
负载均衡的功能总结:
负载均衡种类:
负载均衡——主流的软件解决方案:
1)Apache + JK
Apache 是世界使用排名第一的 Web 服务器软件。它可以运行在几乎所有广泛使用的计算机平台上,由于其跨平台和安全性被广泛使用,是最流行的 Web 服务器端软件。
JK 则是 apache 提供的一款为解决大量请求而分流处理的开源插件。
2)Nginx
Nginx 是一款轻量级的反向代理服务器,由俄罗斯的程序设计师 Igor Sysoev(伊戈尔·西索夫)所开发,供俄国大型的入口网站及搜索引擎 Rambler(漫步者)使用。
Nginx 特点是占有内存少,并发能力强,事实上 Nginx 的并发能力确实在同类型的网页服务器中表现较好,中国大陆使用 Nginx 的网站用户有腾讯、新浪、网易等。
优点:
Nginx 配置
修改 nginx.conf 的配置项:
- server {
- listen 80;
- server_name nginx-01.itcast.cn; # nginx 服务器的主机名
- # 反向代理的配置
- location / {
- root html;
- proxy_pass http://192.168.0.21:8080; # 代理走向的目标服务器(tomcat)
- }
- }
动态资源:
- location ~ .*\.(jsp|do|action)$ {
- proxy_pass http://tomcat-01.itcast.cn:8080;
- }
静态资源:
- location ~ .*\.(html|js|css|gif|jpg|jpeg|png)$ {
- expires 3d;
- }
在 http 配置项中,跟在 upstream 后面的名字可以随意取,但是要和 location 下 proxy_pass http:// 后的组号保持一致。
- http {
- upstream tomcats {
- server shizhan02:8080 weight=1; # weight 表示轮询权重
- server shizhan03:8080 weight=1;
- server shizhan04:8080 weight=1;
- }
-
- # 动态资源配置
- location ~ .*\.(jsp|do|action) {
- proxy_pass http://tomcats; #tomcats是后端服务器组的逻辑组号
- }
- }
3)Keepalived
Keepalived 是一个基于 VRRP 协议来实现的 WEB 服务高可用方案,可以利用其来避免单点故障。
Keepalived 主要用作 Web 服务器的健康状态检查,以及负载均衡主服务器和备服务器之间 Failover(失效转移)的实现。
Keepalived 通常部署在 2 台服务器上,分为一主(Master)一备(Backup),但是对外表现为一个虚拟 IP。Keepalived 可以对本机上的进程进行检测,一旦 Master 检测出某个进程出现问题,就会将自己切换成 Backup 状态,然后通知另外一个节点切换成 Master 状态。
4)LVS
LVS 的英文全称是 Linux Virtual Server,即 Linux 虚拟服务器,是一个虚拟的服务器集群系统,本项目在1998年5月由章文嵩博士成立,是中国国内最早出现的开源软件之一。在 Linux 内核 2.6 中,它已经成为内核的一部分,在此之前的内核版本则需要重新编译内核。
优点:
缺点:
LVS 对比 Nginx:
为什么说 LVS 几乎无流量产生?
LVS 总共有三种代理方式:
NAT(网络地址映射)工作原理图:
所有的连接都要经过 LVS,所以这种负载方式是有流量限制的,具体能撑起多大的流量跟机器有关。
IP Tunneling(IP 隧道):
我们可以发现 LVS 只对 Request 连接进行了负载,而 Response 直接通过 Real Server 发送到 Client(LVS 通过修改源/目标 IP 实现)。但是,Request 的时候应该也有流量啊,为什么说它没有流量产生呢?等介绍完第三种负载策略,我们放在后面一起讨论。
Direct Routing(直接路由):
我们发现它的工作原理图和 IP Tunneling 很像:Request 通过 LVS 进行转发,然后 Real Server 直接将 Response 发送给 Client。
只是有一点不同,图中说了 LVS 和 Real Server 需要在同一个网段上(Must be in a physical segment)。
总结:
负载均衡解决方案示意图:
注意上图中的三角链路:由 LVS 转发的用户请求,再经 Nginx 转发 Real Server 处理完请求后,直接通过 Nginx 将响应返回给用户。
CDN:全称是 Content Delivery Network,即内容分发网络,也称为内容传送网络。CDN 是构建在现有网络基础之上的智能虚拟网络,依靠部署在各地的边缘服务器,通过中心平台的负载均衡、内容分发、调度等功能模块,使用户就近获取所需内容,降低网络拥塞,提高用户访问响应速度和命中率。CDN的关键技术主要有内容存储和分发技术。
CDN 的基本原理是广泛采用各种缓存服务器,将这些缓存服务器分布到用户访问相对集中的地区或网络中,在用户访问网站时,利用全局负载技术将用户的访问指向距离最近的工作正常的缓存服务器上,由缓存服务器直接响应用户请求。
CDN 的基本思路是尽可能避开互联网上有可能影响数据传输速度和稳定性的瓶颈和环节,使内容传输的更快、更稳定。通过在网络各处放置节点服务器所构成的在现有的互联网基础之上的一层智能虚拟网络,CDN 系统能够实时地根据网络流量和各节点的连接、负载状况以及到用户的距离和响应时间等综合信息,将用户的请求重新导向离用户最近的服务节点上。其目的是使用户可就近取得所需内容,解决 Internet 网络拥挤的状况,提高用户访问网站的响应速度。
4. 数据库解决方案
1)主从复制、读写分离
Mysql:
Oracle:
2)分库分表
当访问用户越来越多,写请求暴涨,对于上面的单 Master 节点肯定扛不住,那么该怎么办呢?多加几个 Master?不行,这样会带来更多的数据不一致的问题,且增加系统的复杂度。那该怎么办?就只能对库表进行拆分了。
常见的拆分类型有垂直拆分和水平拆分。
以拼夕夕电商系统为例,一般有订单表、用户表、支付表、商品表、商家表等,最初这些表都在一个数据库里。后来随着砍一刀带来的海量用户,拼夕夕后台扛不住了!于是紧急从阿狸粑粑那里挖来了几个 P8、P9 大佬对系统进行重构。
由于垂直分库已经按照业务关联切分到了最小粒度,但数据量仍然非常大,于是 P9 大佬开始水平分库,比如可以把订单库分为订单 1 库、订单 2 库、订单 3 库……那么如何决定某个订单放在哪个订单库呢?可以考虑对主键通过哈希算法计算放在哪个库。
分完库,单表数据量任然很大,查询起来非常慢,P9 大佬决定按日或者按月将订单分表,叫做日表、月表。
分库分表同时会带来一些问题,比如平时单库单表使用的主键自增特性将作废,因为某个分区库表生成的主键无法保证全局唯一,这就需要引入全局 UUID 服务了。
经过一番大刀阔斧的重构,拼夕夕恢复了往日的活力,大家又可以愉快的在上面互相砍一刀了。
在代码层面实现分库分表逻辑:
3)分布式
使用分布式/分库分表的中间件:
使用分布式数据库:
1. 集群与分布式
2. 架构演进
Dubbo 是 SOA 时代的产物,SpringCloud 是微服务时代的产物。
1)单体架构
优点:
缺点:
2)垂直架构
垂直架构:垂直架构是指将单体架构中的多个模块拆分为多个独立的项目。形成多个独立的单体架构。
优点:
缺点:
3)分布式架构
分布式架构:指在垂直架构的基础上,将公共业务模块抽取出来,作为独立的服务供其他调用者消费,以实现服务的共享和重用(底层通过 RPC 实现)。
RPC(Remote Procedure Call):远程过程调用。有非常多的协议和技术来都实现了 RPC,比如:HTTP REST 风格、Java RMI 规范、WebService SOAP 协议、Hession 等等。
优点:
缺点:
4)SOA 架构
SOA(Service- Oriented Architecture,面向服务的架构):是一个组件模型(设计理念),它将应用程序拆分成不同的功能单元(称为服务),并通过定义良好的接口和契约将各服务联系起来。
ESB(Enterparise Servce Bus):企业服务总线,服务中介。主要是提供了服务与服务之间的交互。ESB 包含的功能如:负载均衡、流量控制、加密处理、服务监控、异常处理、监控告急等。
5)微服务架构
微服务架构:是在 SOA 上做的升华,微服务架构强调的一个重点是“业务需要彻底的组件化和服务化”,即原有的单个业务系统会拆分为多个可以独立开发、设计、运行的小应用,这些小应用之间通过服务完成交互和集成。
微服务架构 = 80% 的 SOA 服务架构思想 + 100% 的组件化架构思想 + 80% 的领域建模思想
特点:
1. 微服务架构
传统架构(单体架构)的劣势:
2. 微服务设计演化
1)模块化
将单个大型系统,解耦拆分成各子模块系统。
2)服务化
3)数据拆分/分布式事务化
4)单元化
继续将系统架构进行分层,且下图中每一列实时链路分别属于所在地的数据中心,假如此时用户从 A 地(如北京市)跑到了 B 地(如深圳市)使用服务,那么两地间的用户数据同步及跨地访问服务将产生效率问题。
策略:如下图所示,用户每一次访问服务仅使用所在地的实时链路。这样虽然牺牲了用户在短暂逗留地区的访问效率,但保证了用户在主要居住地的服务/数据访问效率。
如某用户主要居住在北京市,平时在查看个人历史数据时访问效率快,而在其他区域(如在深圳市)查看个人历史数据时则可能需要等待多转几个圈的加载时间。
5)大型微服务系统架构
3. 第二代的微服务架构:服务网格
服务网格是一种将服务治理能力(比如:服务路由、安全、可观测性、限流熔断等)下沉到基础设施的架构。让业务更专注于业务开发,而由微服务架构引入的问题,交给基础设施去解决。
第一代微服务的常见架构(SpringCloud/Dubbo)如下图所示:
在黄色的容器内有服务 A、服务 B。A 和 B 都包含自己的业务逻辑,如果想要 A 调用 B,同时试图对这个服务进行治理,通常会在业务的内部集成一个 SDK,来实现服务发现、负载均衡、服务路由、重试、熔断限流等功能。
但是,这个架构存在三个主要问题:
ServiceMesh(服务网格)解决了当前架构的哪些痛点:
这种架构实现了服务治理技术和业务逻辑的解耦,是云原生时代微服务治理技术的发展方向,也得到了越来越多的公司的关注。
传统方式:On-Premise(本地部署)
IaaS(Infrastructure as a Service):基础设施即服务
PaaS(Platform as a Service):平台即服务
SaaS(Software as a Service):软件即服务
ZooKeeper(简称 zk)是一个分布式的、开源的(分布式)应用程序的协调服务。
ZooKeeper 是 Apache Hadoop 项目下的一个子项目,是一个树形目录服务。
ZooKeeper 翻译过来就是“动物园管理员”,它是用来管 Hadoop(大象)、Hive(蜜蜂)、Pig(小猪)的管理员。
ZooKeeper 提供的主要功能包括:
ZooKeeper 是用 Java 写的,它运行在 JVM 之上,因此需要安装 JDK7 或更高版本。
1. 下载安装
1)下载
2)安装
- # 打开 opt 目录
- cd /opt
- # 创建 zooKeeper 目录
- mkdir zooKeeper
- # 将 zookeeper 安装包移动到 /opt/zookeeper
- mv apache-zookeeper-3.5.6-bin.tar.gz /opt/zookeeper/
- # 将 tar 包解压到 /opt/zookeeper 目录下
- tar -zxvf apache-ZooKeeper-3.5.6-bin.tar.gz
2. 配置
配置 zoo.cfg:
- # 进入到 conf 目录
- cd /opt/zooKeeper/apache-zooKeeper-3.5.6-bin/conf/
- # 拷贝并重命名
- cp zoo_sample.cfg zoo.cfg
-
- cd /opt/zooKeeper/
- # 创建ZooKeeper存储目录
- mkdir zkdata
- # 修改zoo.cfg 配置项
- vim /opt/zooKeeper/apache-zooKeeper-3.5.6-bin/conf/zoo.cfg
- # 修改数据存储目录:dataDir=/opt/zookeeper/zkdata
3. 启停
- cd /opt/zooKeeper/apache-zooKeeper-3.5.6-bin/bin/
- # 启动
- ./zkServer.sh start
- # 停止
- ./zkServer.sh stop
- # 查看状态
- ./zkServer.sh status
如下图所示:ZooKeeper 成功启动。
如下图所示:ZooKeeper 启动成功(standalone 表示 zk 没有搭建集群,现在是单节点)。
1. 数据模型
ZooKeeper 是一个树形目录服务,其数据模型和 Unix 的文件系统目录树很类似,拥有一个层次化结构。
这里面的每一个节点都被称为“ZNode”,每个节点上都会保存自己的数据和节点信息。
节点可以拥有子节点,同时也允许少量(1 MB)数据存储在该节点之下。
节点可以分为四大类:
2. 服务端常用命令
- # 启动 ZooKeeper 服务
- ./zkServer.sh start
- # 查看 ZooKeeper 服务状态
- ./zkServer.sh status
- # 停止 ZooKeeper 服务
- ./zkServer.sh stop
- # 重启 ZooKeeper 服务
- ./zkServer.sh restart
3. 客户端常用命令
CRUD 命令:
- # 连接 ZooKeeper 服务端
- # 不写ip:port则默认连本地2181
- ./zkCli.sh –server ip:port
-
- # 断开连接
- quit
-
- # 查看命令帮助
- help
-
- # 显示指定目录下节点
- ls 目录
-
- # 创建节点
- create /节点path value
-
- # 获取节点值
- get /节点path
-
- # 设置节点值
- set /节点path value
-
- # 删除单个节点
- delete /节点path
-
- # 删除带有子节点的节点
- deleteall /节点path
创建临时有序节点:
- # 创建临时节点
- create -e /节点path value
-
- # 创建顺序节点
- create -s /节点path value
-
- # 查询节点详细信息
- ls –s /节点path
节点详细信息说明:
1. Curator 介绍
2. 常用 API
Curator 的 Maven 依赖:
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- <version>4.0.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>4.0.0</version>
- </dependency>
1. 建立连接
- // 重试策略
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
-
- // 连接方式一:newClient
- /**
- * @param connectString 连接信息字符串:zk server 地址和端口,如集群 "192.168.149.135:2181, 192.168.149.136:2181"
- * @param sessionTimeoutMs 会话超时时间(单位ms)
- * @param connectionTimeoutMs 连接超时时间(单位ms)
- * @param retryPolicy 重试策略
- */
- CuratorFramework client1 = CuratorFrameworkFactory.newClient(
- "192.168.149.135:2181",
- 60 * 1000,
- 15 * 1000,
- retryPolicy);
-
- // 连接方式二:builder
- CuratorFramework client2 = CuratorFrameworkFactory.builder().
- connectString("192.168.200.130:2181").
- sessionTimeoutMs(60 * 1000).
- connectionTimeoutMs(15 * 1000).
- retryPolicy(retryPolicy).
- namespace("node1"). // 命名空间:之后的操作均以node1为根节点
- build();
-
- // 开启连接
- client1.start();
- client2.start();
-
- // 关闭连接
- client1.close();
- client2.close();
2. 创建节点
- // 1. 创建节点(无数据):默认将当前客户端的ip作为数据存储
- String path1 = client.create().forPath("/app1");
- System.out.println(path1); // /app1
-
- // 2. 创建节点且带有数据
- String path2 = client.create().forPath("/app2", "hehe".getBytes());
- System.out.println(path2); // /app2
-
- // 3. 设置节点的类型(默认类型:持久化)
- String path3 = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3"); // 临时节点
- System.out.println(path3); // /app3
-
- // 4. 创建多级节点
- //creatingParentsIfNeeded():如果父节点不存在,则创建父节点
- String path4 = client.create().creatingParentsIfNeeded().forPath("/app4/p1");
- System.out.println(path4); // /app4/p1
3. 查询节点
get
查询数据:getData().forPath()ls
查询子节点:getChildren().forPath()ls -s
查询节点状态信息:getData().storingStatIn(状态对象).forPath()- // 1. 查询节点数据:get
- byte[] data = client.getData().forPath("/app1");
- System.out.println(new String(data)); // 192.168.56.1
-
- // 2. 查询子节点:ls
- List<String> path = client.getChildren().forPath("/");
- System.out.println(path); // [dubbo, zookeeper, app2, app1, app4]
-
- // 3. 查询节点状态信息:ls -s
- Stat status = new Stat();
- System.out.println(status); // 0,0,0,0,0,0,0,0,0,0,0
- client.getData().storingStatIn(status).forPath("/app1");
- System.out.println(status); // 109,109,1641077872296,1641077872296,0,0,0,0,12,0,109
4. 修改节点数据
- // 1. 直接修改节点数据
- client.setData().forPath("/app1", "abc".getBytes());
-
- // 2. 根据节点版本修改其数据
- Stat status = new Stat();
- byte[] path = client.getData().storingStatIn(status).forPath("/app1");
- // version 需要事先查询,目的是为了让其他客户端或者线程不干扰自己的修改操作
- int version = status.getVersion();
- client.setData().withVersion(version).forPath("/app1", "efg".getBytes());
5. 删除节点
- // 1. 删除单个节点
- client.delete().forPath("/app1");
-
- // 2. 删除带有子节点的节点
- client.delete().deletingChildrenIfNeeded().forPath("/app2");
-
- // 3. 必须成功的删除
- client.delete().guaranteed().forPath("/app3");
-
- // 4. 回调
- client.delete().guaranteed().inBackground(
- new BackgroundCallback() {
- @Override
- public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
- System.out.println("我被删除了~");
- System.out.println(event);
- }
- }
- ).forPath("/app4");
6. Watch(事件监听)
ZooKeeper 允许用户在指定节点上注册一些 Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去。该机制是 ZooKeeper 实现分布式协调服务的重要特性。
ZooKeeper 中引入了 Watcher 机制来实现了发布/订阅功能,能够让多个订阅者同时监听某一个对象。当一个对象自身状态变化时,会通知所有订阅者。
ZooKeeper 原生支持通过注册 Watcher 来进行事件监听,但是其使用并不是特别方便(需要开发人员自己反复注册 Watcher,比较繁琐)。
Curator 引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。
ZooKeeper 提供了三种 Watcher:
7. NodeCache
NodeCache:仅监听某个节点自身(不包括子节点)。
- // 1. 创建NodeCache对象
- final NodeCache nodeCache = new NodeCache(client,"/app4");
-
- // 2. 注册监听
- nodeCache.getListenable().addListener(
- new NodeCacheListener() {
- @Override
- public void nodeChanged() throws Exception {
- System.out.println("节点变化了~");
- // 获取修改节点后的数据
- byte[] data = nodeCache.getCurrentData().getData();
- System.out.println(new String(data));
- }
- }
- );
-
- // 3. 开启监听(设置为true,则在开启监听时加载缓冲数据)
- nodeCache.start(true);
8. PathChildrenCache
PathChildrenCache:监听某个节点的所有子节点(不包括父节点本身)。
- // 1.创建监听对象
- PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app4", true);
-
- // 2. 绑定监听器
- pathChildrenCache.getListenable().addListener(
- new PathChildrenCacheListener() {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
- System.out.println("子节点变化了~");
- System.out.println(event);
- // 监听子节点的数据变更,并且拿到变更后的数据
- // 1.获取类型
- PathChildrenCacheEvent.Type type = event.getType();
- // 2.判断类型是否是update
- if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
- System.out.println("子节点数据修改了~");
- byte[] data = event.getData().getData();
- System.out.println(new String(data));
- }
- }
- }
- );
-
- // 3. 开启监听器
- pathChildrenCache.start();
9. TreeCache
TreeCache:监听某个节点自己和所有子节点们。
- // 1. 创建监听器
- TreeCache treeCache = new TreeCache(client, "/app2");
-
- // 2. 注册监听
- treeCache.getListenable().addListener(
- new TreeCacheListener() {
- @Override
- public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
- System.out.println("节点变化了");
- System.out.println(event);
- }
- }
- );
-
- // 3. 开启
- treeCache.start();
1. 分布式锁简介
跨机器的进程之间的数据同步
问题——这就是分布式锁。2. ZooKeeper 分布式锁原理
核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点。
客户端获取锁时,在 lock(自定义)节点下创建临时顺序节点。
然后获取 lock 下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。
如果发现自己创建的节点并非 lock 所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。
如果发现比自己小的那个节点被删除,则客户端的 Watcher 会收到相应通知,此时再次判断自己创建的节点是否是 lock 子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。
3. 分布式锁案例:模拟售票
在 Curator 中有五种锁方案:
InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
InterProcessMutex:分布式可重入排它锁
InterProcessReadWriteLock:分布式读写锁
InterProcessMultiLock:将多个锁作为单个实体管理的容器
InterProcessSemaphoreV2:共享信号量
- import org.apache.curator.RetryPolicy;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.framework.recipes.locks.InterProcessMutex;
- import org.apache.curator.retry.ExponentialBackoffRetry;
-
- import java.util.concurrent.TimeUnit;
-
- // 测试分布式锁
- public class ZkDemoTest {
-
- public static void main(String[] args) {
- TicketServer ticketServer = new TicketServer();
- Thread t1 = new Thread(ticketServer, "携程");
- Thread t2 = new Thread(ticketServer, "飞猪");
- t1.run();
- t2.run();
- }
- }
-
- // 分布式锁实现
- class TicketServer implements Runnable {
-
- // 票库存
- private int tickets = 10;
-
- // zk的锁对象
- private InterProcessMutex lock ;
-
- // 在构造方法中连接zk
- public TicketServer() {
- // 重试策略
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
- CuratorFramework client = CuratorFrameworkFactory.builder()
- .connectString("192.168.3.244:2181")
- .sessionTimeoutMs(60 * 1000)
- .connectionTimeoutMs(15 * 1000)
- .retryPolicy(retryPolicy)
- .build();
- // 开启连接
- client.start();
- lock = new InterProcessMutex(client, "/lock");
- }
-
- // 售票
- @Override
- public void run() {
- while (tickets>0) {
- // 获取锁
- try {
- // 获取锁的频率:每3秒获取一次
- // 当3秒获取不到时会有报错信息,但会继续重试
- lock.acquire(3, TimeUnit.SECONDS);
- if(tickets > 0) {
- System.out.println(Thread.currentThread()+": "+tickets);
- Thread.sleep(100);
- tickets--;
- }
- } catch (Exception e) {
- e.printStackTrace();
- }finally {
- // 释放锁
- try {
- lock.release();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
1. ZK 集群角色
在 ZooKeeper 集群中有三种角色:
Leader(领导者)
:
Follower(跟随者)
:
Observer(观察者)
:
2. 集群规划
Leader 选举机制:
Serverid(服务器 ID):比如有三台服务器,编号分别是 1、2、3,那么编号越大,在选择算法中的权重越大。
Zxid(数据 ID):服务器中存放的最大数据 ID。值越大说明数据越新,那么在选举算法中数据越新,权重越大。
在 Leader 选举的过程中,如果某台 ZooKeeper 获得了超过半数的选票,那么此 ZooKeeper 就可以成为 Leader 了。
3. 集群搭建
1)搭建要求
真实的集群是需要部署在不同的服务器上的,这里示例搭建伪集群,也就是把所有的服务都搭建在一台虚拟机上,用端口进行区分,即搭建一个 3 个节点的 ZooKeeper 集群(伪集群)。
2)准备工作
- mkdir /usr/local/zookeeper-cluster
- cp -r apache-zookeeper-3.5.6-bin /usr/local/zookeeper-cluster/zookeeper-1
- cp -r apache-zookeeper-3.5.6-bin /usr/local/zookeeper-cluster/zookeeper-2
- cp -r apache-zookeeper-3.5.6-bin /usr/local/zookeeper-cluster/zookeeper-3
- mkdir /usr/local/zookeeper-cluster/zookeeper-1/data
- mkdir /usr/local/zookeeper-cluster/zookeeper-2/data
- mkdir /usr/local/zookeeper-cluster/zookeeper-3/data
-
- mv /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo_sample.cfg /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
- mv /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo_sample.cfg /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg
- mv /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo_sample.cfg /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg
- vi /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
-
- clientPort=2181
- dataDir=/usr/local/zookeeper-cluster/zookeeper-1/data
-
- vi /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg
-
- clientPort=2182
- dataDir=/usr/local/zookeeper-cluster/zookeeper-2/data
-
- vi /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg
-
- clientPort=2183
- dataDir=/usr/local/zookeeper-cluster/zookeeper-3/data
3)配置集群
- echo 1 >/usr/local/zookeeper-cluster/zookeeper-1/data/myid
- echo 2 >/usr/local/zookeeper-cluster/zookeeper-2/data/myid
- echo 3 >/usr/local/zookeeper-cluster/zookeeper-3/data/myid
- vi /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
- vi /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg
- vi /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg
-
- # 集群服务器 IP 列表如下
- server.1=192.168.149.135:2881:3881
- server.2=192.168.149.135:2882:3882
- server.3=192.168.149.135:2883:3883
- # 说明:server.服务器ID=服务器IP地址:服务器之间通信端口:服务器之间投票选举端口
4)启动集群
启动集群就是分别启动每个实例。
- # 按序启动
- /usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh start
- /usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh start
- /usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh start
启动后我们查询一下每个实例的运行状态:
- # Mode 为 follower 表示是跟随者(从)
- /usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh status
- # Mode 为 leader 表示是领导者(主),因为超过半数
- /usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status
- # 跟随者(从)
- /usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh status
4. 集群异常
Dubbo 是阿里巴巴公司开源的一个高性能、轻量级的 Java RPC 框架。
致力于提供高性能和透明化的 RPC 远程服务调用方案,以及 SOA 服务治理方案。
Dubbo 架构:
节点角色说明:
项目地址:JavaDemo: 小示例 - Gitee.com
案例说明:Dubbo 作为一个 RPC 框架,其最核心的功能就是要实现跨网络的远程调用。本示例就是要创建两个应用,一个作为服务的提供方,一个作为服务的消费方。通过 Dubbo 来实现服务消费方远程调用服务提供方的方法。
1. 公共接口模块
- public interface UserService {
-
- public String sayHello();
- }
2. 服务提供者模块
- package com.service.impl;
-
- import com.service.UserService;
- import org.apache.dubbo.config.annotation.Service;
-
- // Spring的@Service:将该类的对象创建出来,放到Spring的IOC容器中(bean定义)
- @Service // dubbo的@Service:将该类提供的方法(服务)对外发布。将访问的ip、端口、路径等信息注册到注册中心中
- public class UserServiceImpl implements UserService {
-
- public String sayHello() {
- return "welcome to dubbo ~";
- }
- }
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
- xmlns:context="http://www.springframework.org/schema/context"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
- http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd
- http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd">
-
- <!-- 扫描spring相关注解 -->
- <!-- <context:component-scan base-package="com.service" />-->
-
- <!-- dubbo的配置 -->
- <!-- 1.配置项目的名称(唯一) -->
- <dubbo:application name="dubbo_service"/>
- <!-- 2.配置注册中心的地址 -->
- <dubbo:registry address="zookeeper://192.168.3.244:2181"/>
- <!-- 3.扫描dubbo相关注解 -->
- <dubbo:annotation package="com.service.impl" />
- </beans>
3. 服务消费者模块
- package com.controller;
-
- import com.service.UserService;
- import org.apache.dubbo.config.annotation.Reference;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- @RestController
- @RequestMapping("/user")
- public class UserController {
-
- // @Autowired // spring的本地注入
- @Reference // dubbo的远程注入
- /*
- 1. 从注册中心(zookeeper)获取userService的访问url
- 2. 进行远程调用RPC
- 3. 将结果封装为一个代理对象。给变量赋值
- */
- private UserService userService;
-
- @RequestMapping("/sayHello")
- public String sayHello() {
- return userService.sayHello();
- }
-
- }
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
- xmlns:mvc="http://www.springframework.org/schema/mvc"
- xmlns:context="http://www.springframework.org/schema/context"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
- http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd
- http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd
- http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd">
-
- <mvc:annotation-driven/>
- <!-- 扫描spring相关注解 -->
- <context:component-scan base-package="com.controller"/>
-
- <!-- dubbo的配置 -->
- <!-- 1.配置项目的名称(唯一) -->
- <dubbo:application name="dubbo_service">
- <!-- qos是dubbo的监听服务,本例的service和controller部署在单机中,因此会出现端口(55555)冲突 -->
- <dubbo:parameter key="qos.port" value="33333"/>
- <!-- <dubbo:parameter key="dubbo.port" value="20881"/>-->
- </dubbo:application>
- <!-- 2.配置注册中心的地址 -->
- <dubbo:registry address="zookeeper://192.168.3.244:2181"/>
- <!-- 3.扫描dubbo相关注解 -->
- <dubbo:annotation package="com.controller" />
-
- </beans>
测试:
分别启动服务提供者和消费者的服务(Tomcat),访问消费者的地址 http://localhost:8000/user/sayHello ,查看浏览器输出结果:
1. Dubbo-admin 简介
2. Dubbo-admin 安装
1)环境准备
2)下载 Dubbo-Admin
3)解压并修改配置文件
- # 修改zookeeper的实际IP和端口
- # 注册中心
- admin.registry.address=zookeeper://192.168.149.135:2181
- # 配置中心
- admin.config-center=zookeeper://192.168.149.135:2181
- # 元数据中心
- admin.metadata-report.address=zookeeper://192.168.149.135:2181
4)打包项目
在 dubbo-admin-develop 目录执行打包命令:
- # 需等待约5分钟
- mvn clean package -Dmaven.test.skip=true
5)启动后端服务
- # 在 dubbo-Admin-develop\dubbo-admin-distribution\target 目录
- # 执行以下命令启动 dubbo-admin(dubbo-admin 后台由 SpringBoot 构建)
- java -jar .\dubbo-admin-0.1.jar
6)启动前端服务
- # 在 dubbo-admin-ui 目录下执行命令
- npm run dev
7)访问
1. 简单使用
搜索(服务生产者提供的)服务:
查看服务详情:
注意:
- <!-- 元数据配置 -->
- <dubbo:metadata-report address="zookeeper://192.168.149.135:2181" />
核心功能:
2. 序列化
3. 地址缓存
当注册中心挂了,服务是否可以正常访问?
4. 超时
- // 使用 timeout 属性配置超时时间,默认值1000,单位毫秒
- @Service(timeout=3000)
5. 重试
- // 通过retries属性来设置重试次数。默认为2次
- @Service(timeout=3000, retries=3) // 尝试连接4次(1+3)
6. 多版本
灰度发布:当出现新功能时,会让一部分用户先使用新功能,用户反馈没问题时,再将所有用户迁移到新功能。
Dubbo 中使用 version 属性来设置和调用同一个接口的不同版本。
- // 生产者配置
- @Service(version = "v1.0") // 版本一
- public class UserServiceImp11 implements UserService {...}
-
- @Service(version = "v2.0") // 版本二
- public class UserServiceImp12 implements UserService {...}
-
- // 消费者配置
- @Reference(version = "v2.0") // 远程注入版本二
- private UserService userService;
7. 负载均衡
Dubbo 的 4 种负载均衡策略:
- // 生产者 1 配置
- @Service(weight = 100)
- public class UserServiceImp1 implements UserService {...}
- // 生产者 2 配置
- @Service(weight = 200)
- public class UserServiceImp1 implements UserService {...}
-
- // 消费者配置
- // @Reference(loadbalance = "roundrobin")
- // @Reference(loadbalance = "leastactive")
- // @Reference(loadbalance = "consistenthash")
- @Reference(loadbalance = "random") // 按权重随机(默认)
- private UserService userService;
8. 集群容错
Dubbo 集群容错策略:
- // 消费者配置
- @Reference(cluster = "failover") // 远程注入
- private UserService userService;
9. 服务降级
服务降级:当服务器压力剧增的情况下,根据实际业务情况及流量,对一些不太重要的服务(如上图中的广告服务和日志服务)和页面有策略性地不处理或换种简单的方式处理,从而释放服务器资源以保证核心服务(如上图中的支付服务)正常运作或高效运作。
Dubbo 服务降级方式:
mock = force:return null
:表示消费方对该服务的方法调用都直接返回 null 值,不发起远程调用。用来屏蔽不重要服务在不可用时对调用方的影响。
mock = fail:return null
:表示消费方对该服务的方法调用在失败后,再返回 null 值且不抛异常。用来容忍不重要服务不稳定时对调用方的影响。
- // 消费者配置
- @Reference(mock="force :return null") // 不再调用userService的服务
- private UserService userService;
随着互联网行业的发展,对服务的要求也越来越高,服务架构也从单体架构逐渐演变为现在流行的微服务架构。
1. 单体架构
单体架构:将业务的所有功能集中在一个项目中开发,打成一个包部署。
优点:
缺点:
2. 分布式架构
分布式架构:根据业务功能对系统做拆分,每个业务功能模块作为独立项目开发,称为一个服务
优点:
缺点:
分布式架构虽然降低了服务耦合,但是服务拆分时也有很多问题需要思考:
因此,人们需要制定一套行之有效的标准来约束分布式架构。
3. 微服务架构
微服务的架构特征:
单一职责
:微服务拆分粒度更小,每一个服务都对应唯一的业务能力,做到单一职责。自治
:团队独立、技术独立、数据独立,独立部署和交付。面向服务
:服务提供统一标准的接口,与语言和技术无关。隔离性强
:服务调用做好隔离、容错、降级,避免出现级联问题。微服务的上述特性其实是在给分布式架构制定一个标准,进一步降低服务之间的耦合度,提供服务的独立性和灵活性,做到高内聚,低耦合。
服务拆分原则:
可以认为微服务是一种经过良好架构设计的分布式架构方案
。
但方案该怎么落地?选用什么样的技术栈?全球的互联网公司都在积极尝试自己的微服务落地方案,其中在 Java 领域最引人注目的就是 SpringCloud 提供的方案了。
Spring是JavaEE的一个轻量级开发框架,主营IoC和AOP,集成JDBC、ORM、MVC等功能便于开发。
Spring Boot是基于Spring,提供开箱即用的积木式组件,目的是提升开发效率。
Spring Cloud顾名思义是跟云相关的,云程序实际上就是指分布式应用程序,所以Spring Cloud就是为了让分布式应用程序编写更方便,更容易而提供的一组基础设施,它的核心是Spring框架,利用Spring Boot的自动配置,力图实现最简化的分布式应用程序开发。
Spring Cloud包含了一大堆技术组件,既有开源社区开发的组件,也有商业公司开发的组件,既有持续更新迭代的组件,也有即将退役不再维护的组件。
SpringCloud 常见组件:
注意,由于 SpringCloud 底层是依赖 SpringBoot 的,因此两者有如下的版本兼容关系:
微服务这种方案需要技术框架来落地,全球的互联网公司都在积极尝试自己的微服务落地技术。在国内最知名的就是 SpringCloud 和阿里巴巴的 Dubbo。
企业需求:
Eureka 是 Netflix 公司开源的一个服务注册与发现的组件。
• Eureka 和其他 Netflix 公司的服务组件(例如负载均衡、熔断器、网关等) 一起,被 SpringCloud 社区整合为 Spring-Cloud-Netflix 模块。
• Eureka 包含两个组件:Eureka Server(注册中心)和 Eureka Client(服务提供者、服务消费者)。
搭建步骤:
1. 父工程
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>org.example</groupId>
- <artifactId>eureka-parent</artifactId>
- <packaging>pom</packaging>
- <version>1.0-SNAPSHOT</version>
- <modules>
- <module>eureka-provider</module>
- <module>eureka-consumer</module>
- <module>eureka-server</module>
- </modules>
-
- <parent>
- <!-- spring boot 环境 -->
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.1.0.RELEASE</version>
- </parent>
-
- <dependencies>
- <!-- 简化POJO -->
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <optional>true</optional>
- </dependency>
- </dependencies>
-
- <!-- Spring Cloud 依赖管理 -->
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-dependencies</artifactId>
- <version>${spring-cloud.version}</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
- </dependencies>
- </dependencyManagement>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <java.version>1.8</java.version>
- <!--spring cloud 版本-->
- <spring-cloud.version>Greenwich.RELEASE</spring-cloud.version>
- </properties>
-
- </project>
2. Eureka Server
pom.xml:
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>eureka-parent</artifactId>
- <groupId>org.example</groupId>
- <version>1.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>eureka-server</artifactId>
-
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <!-- eureka-server -->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
- </dependency>
- </dependencies>
-
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- </properties>
-
- </project>
启动类:
- package com;
-
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
-
- @SpringBootApplication
- @EnableEurekaServer // 启用 EurekaServer(不声明也是默认启动)
- public class EurekaApp {
-
- public static void main(String[] args) {
- SpringApplication.run(EurekaApp.class, args);
- }
- }
application.yml:
- server:
- port: 8761
-
- # eureka 配置:共分为四部分配置
- # 1. dashboard: eureka 的 web 控制台配置
- # 2. server: eureka 的服务端配置
- # 3. client: eureka 的客户端配置
- # 4. instance: eureka 的实例配置
-
- eureka:
- instance:
- hostname: localhost # 主机名
- client:
- service-url:
- defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka # eureka 服务端地址,将来客户端使用该地址和 eureka 进行通信
- register-with-eureka: false # 是否将自己的路径注册到 eureka 上(eureka server 需要;eureka provider client 不需要)。默认:true
- fetch-registry: false # 是否需要从 eureka 中抓取路径(eureka server 不需要;eureka consumer client 需要)。默认:true
3. Eureka 控制台
运行启动类后,访问 localhost:8761 即可进入Eureka 控制台:
4. 服务提供者
pom.xml:
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>eureka-parent</artifactId>
- <groupId>org.example</groupId>
- <version>1.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>eureka-provider</artifactId>
-
- <dependencies>
- <!-- spring boot web -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <!-- eureka-client -->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
- </dependency>
- </dependencies>
-
- </project>
启动类:
- package com;
-
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
-
- /**
- * 启动类
- */
- @EnableEurekaClient // 该注解在SpringCloud新版本中可以省略
- @SpringBootApplication
- public class ProviderApp {
- public static void main(String[] args) {
- SpringApplication.run(ProviderApp.class, args);
- }
- }
application.yml:
- server:
- port: 8000
-
- eureka:
- instance:
- hostname: localhost # 主机名
- client:
- service-url:
- defaultZone: http://localhost:8761/eureka # eureka 服务端地址,将来客户端使用该地址和 eureka 进行通信
- spring:
- application:
- name: eureka-provider # 设置当前应用的名称。这会在 eureka 控制台中的 Application 显示,且需要使用该名称来获取路径
domain:
- package com.domain;
-
- import lombok.AllArgsConstructor;
- import lombok.Data;
- import lombok.NoArgsConstructor;
-
- @Data
- @AllArgsConstructor
- @NoArgsConstructor
- public class Goods {
-
- private int id;
- private String name;
- private double price;
- private int skuNum;
-
- @Override
- public String toString() {
- return "Goods{" +
- "id=" + id +
- ", name='" + name + '\'' +
- ", price=" + price +
- ", skuNum=" + skuNum +
- '}';
- }
- }
controller:
- package com.controller;
-
- import com.domain.Goods;
- import com.service.GoodsService;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.PathVariable;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- @RestController
- @RequestMapping("/goods")
- public class GoodsController {
-
- @Autowired
- private GoodsService goodsService;
-
- @GetMapping("findOne/{id}")
- public Goods findGoods(@PathVariable("id") int id){
- return goodsService.findOne(id);
- }
- }
service:
- package com.service;
-
- import com.dao.GoodsDao;
- import com.domain.Goods;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
-
- @Service
- public class GoodsService {
-
- @Autowired
- private GoodsDao goodsDao;
-
- public Goods findOne(int id){
- return goodsDao.findOne(id);
- }
- }
dao:
- package com.dao;
-
- import com.domain.Goods;
- import org.springframework.stereotype.Repository;
-
- @Repository
- public class GoodsDao {
-
- public static Goods findOne(int id){
- Goods phone = new Goods(1, "华为P10", 6999.00, 20);
- return phone;
- }
-
- }
5. 服务调用者
pom.xml:
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>eureka-parent</artifactId>
- <groupId>org.example</groupId>
- <version>1.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>eureka-consumer</artifactId>
-
- <dependencies>
- <!-- spring boot web -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <!-- eureka-client -->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
- </dependency>
- </dependencies>
-
- </project>
启动类:
- package com;
-
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
- import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
-
- @EnableDiscoveryClient // 激活 DiscoveryClient
- @EnableEurekaClient
- @SpringBootApplication
- public class ConsumerApp {
-
- public static void main(String[] args) {
- SpringApplication.run(ConsumerApp.class, args);
- }
- }
application.yml:
- server:
- port: 8001
-
- eureka:
- instance:
- hostname: localhost # 主机名
- client:
- service-url:
- defaultZone: http://localhost:8761/eureka # eureka服务端地址,将来客户端使用该地址和eureka进行通信
- spring:
- application:
- name: eureka-consumer # 设置当前应用的名称。这会在eureka中Application显示,且需要使用该名称来获取路径
RestTemplateConfig:
- package com.config;
-
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.web.client.RestTemplate;
-
- @Configuration
- public class RestTemplateConfig {
-
- @Bean
- public RestTemplate restTemplate() {
- return new RestTemplate();
- }
- }
domain:
- package com.domain;
-
- import lombok.AllArgsConstructor;
- import lombok.Data;
- import lombok.NoArgsConstructor;
-
- @Data
- @AllArgsConstructor
- @NoArgsConstructor
- public class Goods {
-
- private int id;
- private String name;
- private double price;
- private int skuNum;
-
- @Override
- public String toString() {
- return "Goods{" +
- "id=" + id +
- ", name='" + name + '\'' +
- ", price=" + price +
- ", skuNum=" + skuNum +
- '}';
- }
- }
controller:
- package com.controller;
-
- import com.domain.Goods;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.cloud.client.ServiceInstance;
- import org.springframework.cloud.client.discovery.DiscoveryClient;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.PathVariable;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
- import org.springframework.web.client.RestTemplate;
-
- import java.util.List;
-
- /**
- * 服务调用方
- */
- @RestController
- @RequestMapping("/order")
- public class OrderController {
-
- @Autowired
- private RestTemplate restTemplate;
-
- @Autowired
- private DiscoveryClient discoveryClient;
-
- @GetMapping("/goods/{id}")
- public Goods findOrderByGoodsId(@PathVariable("id") int id) {
- /*
- 动态从 Eureka Server 中获取 provider 的 ip 和端口
- 1. 注入 DiscoveryClient 对象.激活
- 2. 调用方法
- */
- // 演示 discoveryClient 使用
- List<ServiceInstance> instances = discoveryClient.getInstances("EUREKA-PROVIDER"); // 服务提供者所配置的实例名称(不区分大小写)
- // 判断集合是否有数据
- if(instances == null || instances.size() == 0){
- // 集合没有数据
- return null;
- }
- ServiceInstance instance = instances.get(0);
- String host = instance.getHost(); // 获取ip
- int port = instance.getPort(); // 获取端口
- /*
- 远程调用 Goods 服务中的 findOne 接口:使用 RestTemplate
- 1. 定义Bean restTemplate
- 2. 注入Bean
- 3. 调用方法
- */
- String url = String.format("http://%s:%d/goods/findOne/%d", host, port, id);
- Goods goods = restTemplate.getForObject(url, Goods.class);
- return goods;
- }
- }
运行效果:
访问 http://localhost:8001/order/goods/1 ,返回:{"id":1,"name":"华为P10","price":6999.0,"skuNum":20}
1. instance 相关属性
Eureka Instance 的配置信息全部保存在org.springframework.cloud.netflix.eureka.EurekaInstanceConfigBean 配置类里,实际上它是 com.netflix.appinfo.EurekaInstanceConfig 的实现类,替代了 netflix 的 com.netflix.appinfo.CloudInstanceConfig 的默认实现。
Eureka Instance 的配置信息全部以eureka.instance.xxx
的格式配置。
常用配置:
2. server 相关属性
Eureka Server 注册中心端的配置是对注册中心的特性配置。Eureka Server 的配置全部在 org.springframework.cloud.netflix.eureka.server.EurekaServerConfigBean 里,实际上它是 com.netflix.eureka.EurekaServerConfig 的实现类,替代了 netflix 的默认实现。
Eureka Server 的配置全部以eureka.server.xxx
的格式进行配置。
常用配置:
注意:上述配置一般在生产环境保持默认即可(生产环境配置原则是尽量减少环境变化),在开发或测试则可以为了方便而修改配置。
搭建示例:
1. Eureka Server 搭建
修改本地 host 文件:
1)eureka-server-1
application.yml:
- server:
- port: 8761
-
- eureka:
- instance:
- hostname: eureka-server1 # 主机名
- client:
- service-url:
- defaultZone: http://eureka-server2:8762/eureka
- register-with-eureka: true # 是否将自己的路径注册到 eureka 上
- fetch-registry: true # 是否需要从 eureka 中抓取路径
-
- spring:
- application:
- name: eureka-server-ha
2)eureka-server-2
application.yml:
- server:
- port: 8762
-
- eureka:
- instance:
- hostname: eureka-server2 # 主机名
- client:
- service-url:
- defaultZone: http://eureka-server1:8761/eureka
-
- register-with-eureka: true # 是否将自己的路径 注册到 eureka 上
- fetch-registry: true # 是否需要从 eureka 中抓取路径
- spring:
- application:
- name: eureka-server-ha
2. Eureka Client 配置
分别修改服务提供者和服务消费者配置文件中的注册服务地址:
- ...
- eureka:
- client:
- service-url:
- defaultZone: http://eureka-server1:8761/eureka,http://eureka-server2:8762/eureka # eureka 服务端地址
- ...
干掉其中一台 server,验证 client 仍能正常访问。
Ribbon 是 Netflix 提供的一个基于 Http 和 TCP 的客户端负载均衡工具,且已集成在 Eureka 依赖中。
实现原理:SpringCloud Ribbon 的底层采用了一个拦截器,拦截了 RestTemplate 发出的请求,对地址做了修改。
在服务调用者的 RestTemplate 配置类上添加注解:
- @Configuration
- public class RestTemplateConfig {
-
- @Bean
- @LoadBalanced // 开启客户端负载均衡(默认轮询策略)
- public RestTemplate restTemplate(){
- return new RestTemplate();
- }
- }
在调用时指定服务名:
- package com.controller;
-
- import com.domain.Goods;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.PathVariable;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
- import org.springframework.web.client.RestTemplate;
-
- /**
- * 服务调用方
- */
- @RestController
- @RequestMapping("/order")
- public class OrderController {
-
- @Autowired
- private RestTemplate restTemplate;
-
- @GetMapping("/goods/{id}")
- public Goods findOrderByGoodsId(@PathVariable("id") int id) {
-
- String url = String.format("http://eureka-provider/goods/findOne/%d", id);
- Goods goods = restTemplate.getForObject(url, Goods.class);
- return goods;
- }
- }
负载均衡策略:
使用负载均衡:
方式一:使用 bean 的方式。
- package com.config;
-
- import com.netflix.loadbalancer.IRule;
- import com.netflix.loadbalancer.RandomRule;
- import org.springframework.context.annotation.Bean;
-
- public class MyRule {
-
- @Bean
- public IRule rule() {
- return new RandomRule(); // 随机策略
- }
-
- }
- package com;
-
- import com.config.MyRule;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
- import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
- import org.springframework.cloud.netflix.ribbon.RibbonClient;
-
- @EnableDiscoveryClient // 激活DiscoveryClient
- @EnableEurekaClient
- @SpringBootApplication
- @RibbonClient(name="eureka-provider", configuration= MyRule.class) // 指定服务提供方并配置负载均衡策略
- public class ConsumerApp {
-
- public static void main(String[] args) {
- SpringApplication.run(ConsumerApp.class, args);
- }
- }
方式二:使用配置文件。
- server:
- port: 9000
-
- eureka:
- instance:
- hostname: localhost
- client:
- service-url:
- defaultZone: http://localhost:8761/eureka
-
- spring:
- application:
- name: eureka-consumer
-
- # 设置 Ribbon 的负载均衡策略:随机策略
- EUREKA-PROVIDER:
- ribbon:
- NFloadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule.RandomRule
Ribbon 默认是采用懒加载,即第一次访问时才会去创建 LoadBalanceClient,请求时间会很长。而饥饿加载则会在项目启动时创建,达到降低第一次访问的耗时。
可以通过下面配置开启饥饿加载:
- ribbon:
- eager-load:
- enabled: true
- clients: userservice
RestTemplate 方式调用存在的问题:
- String url = "http://userservice/user/" + order.getUserId();
- User user = restTemplate.getForObject(url, User.class);
解决方案:Feign
Feign 是一个声明式的 HTTP 客户端,它用了基于接口的注解方式,可以很方便地实现客户端配置,其作用就是帮助我们优雅的实现 HTTP 请求的发送,解决上面提到的问题。
Feign 底层依赖于 Ribbon 实现负载均衡和远程调用。
Feign 的使用步骤:
1. 消费端引入 Feign 依赖
- <!--feign-->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-openfeign</artifactId>
- </dependency>
2. 编写 Feign 调用接口
- package com.feign;
-
- import com.domain.Goods;
- import org.springframework.cloud.openfeign.FeignClient;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.PathVariable;
-
- /**
- *
- * feign声明式接口:用于发起远程调用
- *
- * 1. 定义接口
- * 2. 接口上添加注解 @FeignClient,并设置 value 属性为服务提供者的应用名称
- * 3. 编写调用接口,接口的声明规则和提供方接口保持一致(返回值和方法名可自定义)
- * 4. 注入该接口对象,调用接口方法完成远程调用(自动拼接value与接口URI)
- */
- @FeignClient(value="userservice")
- public interface UserFeignClient {
-
- @GetMapping("/user/{id}")
- public User findGoodsById(@PathVariable("id") int id);
-
- }
主要是基于 SpringMVC 的注解来声明远程调用的信息,比如:
3. Controller 服务调用
- package com.controller;
-
- import com.domain.Goods;
- import com.feign.GoodsFeignClient;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.PathVariable;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
- import org.springframework.web.client.RestTemplate;
-
- /**
- * 服务调用方
- */
- @RestController
- @RequestMapping("/order")
- public class OrderController {
-
- // @Autowired
- // private RestTemplate restTemplate;
-
- @Autowired
- private GoodsFeignClient goodsFeignClient; // IDEA 提示报错也无需理会
-
- @GetMapping("/goods/{id}")
- public Goods findOrderByGoodsId(@PathVariable("id") int id) {
-
- // String url = String.format("http://eureka-provider/goods/findOne/%d", id);
- // Goods goods = restTemplate.getForObject(url, Goods.class);
- Goods goods = goodsFeignClient.findGoodsById(id);
- return goods;
- }
- }
4. 启动类添加 Feign 注解
- package com;
-
- import com.config.MyRule;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
- import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
- import org.springframework.cloud.netflix.ribbon.RibbonClient;
- import org.springframework.cloud.openfeign.EnableFeignClients;
-
- @SpringBootApplication
- @EnableFeignClients // 开启 Feign 功能
- public class ConsumerApp {
-
- public static void main(String[] args) {
- SpringApplication.run(ConsumerApp.class, args);
- }
- }
Feign 可以运行自定义配置来覆盖默认配置,可以修改的配置如下:
一般我们需要配置的就是日志级别。
配置 Feign 日志有两种方式:
1)配置文件方式
- feign:
- client:
- config:
- default: # 这里用default,则表示全局配置
- loggerLevel: FULL # 日志级别
- feign:
- client:
- config:
- userservice: # 这里用服务名称,则是针对某个微服务的配置
- loggerLevel: FULL # 日志级别
2)java 代码方式:需要先声明一个 Bean
- public class FeignClientConfiguration {
- @Bean
- public Logger.Level feignLogLevel(){
- return Logger.Level.BASIC;
- }
- }
而后如果是全局配置,则把它放到 @EnableFeignClients 这个注解中:
@EnableFeignClients(defaultConfiguration=FeignClientConfiguration.class)
如果是局部配置,则把它放到 @FeignClient 这个注解中:
@FeignClient(value="userservice", configuration=FeignClientConfiguration.class)
Feign 底层的客户端实现:
因此优化 Feign 的性能主要包括:
连接池配置步骤如下:
- <!-- httpClient的依赖 -->
- <dependency>
- <groupId>io.github.openfeign</groupId>
- <artifactId>feign-httpclient</artifactId>
- </dependency>
- feign:
- client:
- config:
- default: # default:全局配置
- loggerLevel: BASIC # 日志级别:BASIC 就是基本的请求和响应信息
- httpclient:
- enabled: true # 开启 feign 对 HttpClient 的支持
- max-connections: 200 # 最大的连接数
- max-connections-per-route: 50 # 每个路径的最大连接数
方式一(继承):给消费者的 FeignClient 和提供者的 Controller 定义统一的父接口作为标准。
存在问题:
方式二(抽取):将 FeignClient 抽取为独立模块(项目),并且把接口有关的 POJO、默认的 Feign 配置都放到这个模块中,提供给所有消费者使用。
注意:当定义的 FeignClient 不在 SpringBootApplication 的扫描包范围时,这些 FeignClient 无法使用。有两种方式解决:
- // 方式一:指定 FeignClient 所在包
- @EnableFeignClients(basePackages="com.feign.clients")
-
- // 方式二:指定 FeignClient 字节码
- @EnableFeignClients(clients={UserClient.class})
Nacos 是阿里巴巴的产品,现在是 SpringCloud 中的一个组件,相比 Eureka 功能更加丰富,在国内受欢迎程度较高。
1. Windows 安装
1)下载解压包
在 Nacos 的 GitHub 页面,提供有下载链接,可以下载编译好的 Nacos 服务端或者源代码:
GitHub 的 Release 下载页:Releases · alibaba/nacos · GitHub
2)解压
3)端口配置
Nacos 的默认端口是 8848,如果你电脑上的其它进程占用了 8848 端口,请先尝试关闭该进程。
如果无法关闭占用 8848 端口的进程,也可以进入 nacos 的 conf 目录,修改配置文件(application.properties)中的端口:
4)启动
进入 bin 目录,以单机模式启动:startup.cmd -m standalone
5)访问 nacos 控制台
在浏览器访问 http://127.0.0.1:8848/nacos ,使用默认的账号和密码(都是 nacos)进行登录。
2. Linux 安装
Linux 或者 Mac 安装方式与 Windows 类似。
注意 Nacos 依赖于 JDK 运行,所以 Linux 上也需要先安装 JDK。
启动命令:sh startup.sh -m standalone
由于 Nacos 是 SpringCloudAlibaba 的组件,而 SpringCloudAlibaba 也遵循 SpringCloud 中定义的服务注册、服务发现规范。因此使用 Nacos 和使用 Eureka 对于微服务来说,并没有太大区别。主要差异在于依赖不同和服务地址不同。
Nacos 客户端实现(服务注册或发现)步骤:
工程依赖:
PS:注释掉 eureka 的依赖。
父工程依赖:
- <dependencyManagement>
- <dependencies>
- <!-- spring-cloud-alibaba 管理依赖 -->
- <dependency>
- <groupId>com.alibaba.cloud</groupId>
- <artifactId>spring-cloud-alibaba-dependencies</artifactId>
- <version>2.2.5.RELEASE</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
- </dependencies>
- </dependencyManagement>
- <!-- nacos 客户端依赖 -->
- <dependency>
- <groupId>com.alibaba.cloud</groupId>
- <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
- </dependency>
配置 nacos 注册中心地址:
- spring:
- cloud:
- nacos:
- server-addr: localhost:8848
运行:
Nacos服务分级存储模型:
1. 集群配置
服务跨集群调用问题:
服务集群属性配置:
- spring:
- cloud:
- nacos:
- server-addr: localhost:8848 # nacos 服务端地址
- discovery:
- cluster-name: HZ # 配置集群名称,也就是机房位置(例如:HZ,杭州)
2. 集群负载均衡策略
实现步骤:
修改 user-service(服务调用方)集群属性配置,达到以下的效果:
修改 order-service(服务提供方)中的 application.yml,设置集群为 HZ:
- spring:
- cloud:
- nacos:
- server-addr: localhost:8848 # nacos 服务端地址
- discovery:
- cluster-name: HZ # 配置集群名称,也就是机房位置(HZ,杭州)
在 order-service 中设置负载均衡的 IRule 为 NacosRule,这个规则优先会寻找与自己同集群的服务:
- userservice:
- ribbon:
- NFLoadBalancerRuleClassName: com.alibaba.cloud.nacos.ribbon.NacosRule # 负载均衡规则
运行效果:
3. 加权负载均衡
实际部署中会出现这样的场景:服务器设备性能有差异,部分实例所在机器性能较好,另一些较差,而我们希望性能好的机器承担更多的用户请求。
为此,Nacos 提供了权重配置来控制访问频率:权重越大则访问频率越高。
实现步骤:
在 Nacos 控制台可以设置实例的权重值,首先选中实例后面的编辑按钮:
Nacos 中服务存储和数据存储的最外层都是一个名为 namespace 的东西,用来做最外层隔离。
1. 创建 namespace:
在 Nacos 控制台可以创建 namespace,用来隔离不同环境:
2. 修改服务的 namespace
- spring:
- datasource:
- url: jdbc:mysql://localhost:3306/heima?useSSL=false
- username: root
- password: 123
- driver-class-name: com.mysql.jdbc.Driver
- cloud:
- nacos:
- server-addr: localhost:8848
- discovery:
- cluster-name: SH # 上海
- namespace: 492a7d5d-237b-46a1-a99a-fa8e98e4b0f9 # 命名空间,填ID
1. 临时实例和非临时实例
服务注册到 Nacos 时,可以选择注册为临时或非临时实例(通过下面的配置来设置):
- spring:
- cloud:
- nacos:
- discovery:
- ephemeral: false # 设置为非临时实例
临时实例宕机时,会从 nacos 的服务列表中剔除,而非临时实例则不会。
2. Nacos VS Eureka
Nacos 与 Eureka 的共同点:
Nacos 与 Eureka 的区别:
将配置交给 Nacos 管理的步骤:
具体操作:
1. 在 Nacos 中添加配置信息
2. 在弹出表单中填写配置信息
3. 配置获取的步骤如下
4. 引入 Nacos 的配置管理客户端依赖
- <!-- nacos配置管理依赖 -->
- <dependency>
- <groupId>com.alibaba.cloud</groupId>
- <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
- </dependency>
5. 在 resource 目录添加一个 bootstrap.yml 文件,这个文件是引导文件,优先级高于 application.yml
- spring:
- application:
- name: userservice # 服务名称
- profiles:
- active: dev # 开发环境,这里是dev
- cloud:
- nacos:
- server-addr: localhost:8848 # Nacos地址
- config:
- file-extension: yaml # 文件后缀名
6. 测试:将(Nacos 配置内容中的)pattern.dateformat 这个属性注入到 UserController 中
- @RestController
- @RequestMapping("/user")
- public class UserController {
-
- // 注入nacos中的配置属性
- @Value("${pattern.dateformat}")
- private String dateformat;
-
- // 编写controller,通过日期格式化器来格式化现在时间并返回
- @GetMapping("now")
- public String now(){
- return LocalDate.now().format(
- DateTimeFormatter.ofPattern(dateformat, Locale.CHINA)
- );
- }
- // ... 略
- }
Nacos 配置更改后,微服务可以实现热更新,两种方式如下:
注意事项:
方式一:在 @Value 注入的变量所在类上添加注解 @RefreshScope
方式二:使用 @ConfigurationProperties 注解
- @Component
- @Data
- @ConfigurationProperties(prefix="pattern")
- public class PatternProperties {
- private String dateformat;
- }
1. 多环境配置共享
微服务会从 nacos 读取的配置文件:
[服务名]-[spring.profile.active].yaml
:环境配置(例如 userservice-dev.yaml)。[服务名].yaml
:默认配置,多环境共享(例如 userservice.yaml)。[服务名].yaml
这个文件一定会加载,因此多环境共享配置可以写入这个文件。配置加载优先级:
2. 多服务配置共享
不同服务之间共享配置文件的两种方式:
方式一:通过 shared-configs 指定
- spring:
- application:
- name: userservice # 服务名称
- profiles:
- active: dev # 环境
- cloud:
- nacos:
- server-addr: localhost:8848 # Nacos 地址
- config:
- file-extension: yaml # 文件后缀名
- shared-configs: # 多微服务间共享的配置列表
- - dataId: common.yaml # 要共享的配置文件 id
方式二:通过 extension-configs 指定
- spring:
- application:
- name: userservice # 服务名称
- profiles:
- active: dev # 环境
- cloud:
- nacos:
- server-addr: localhost:8848 # Nacos 地址
- config:
- file-extension: yaml # 文件后缀名
- extends-configs: # 多微服务间共享的配置列表
- - dataId: extend.yaml # 要共享的配置文件 id
多种配置的优先级:
网关功能:
网关的技术实现:
在 SpringCloud 中网关的实现包括两种:
Zuul 是基于 Servlet 的实现,属于阻塞式编程;而 SpringCloudGateway 则是基于 Spring5 中提供的 WebFlux,属于响应式编程的实现,具备更好的性能。
网关搭建步骤:
路由配置包括:
创建新的模块(项目),引入 SpringCloudGateway 的依赖和 nacos 的服务发现依赖:
- <!-- 网关依赖 -->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-gateway</artifactId>
- </dependency>
- <!-- nacos服务发现依赖 -->
- <dependency>
- <groupId>com.alibaba.cloud</groupId>
- <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
- </dependency>
编写路由配置及 nacos 地址:
- server:
- port: 10010 # 网关端口
- spring:
- application:
- name: gateway # 服务名称
- cloud:
- nacos:
- server-addr: localhost:8848 # nacos地址
- gateway:
- routes: # 网关路由配置
- - id: user-service # 路由id,自定义,只要唯一即可
- # uri: http://127.0.0.1:8081 # 路由的目标地址。http 表示固定地址
- uri: lb://userservice # 路由的目标地址。"lb"表示负载均衡,后面跟服务名称
- predicates: # 路由断言,也就是判断请求是否符合路由规则的条件
- - Path=/user/** # 这个是按照路径匹配,只要以 /user/ 开头就符合要求
请求效果:
Route Predicate Factory(路由断言工厂),网关路由可以配置的内容包括:
Route Predicate Factory:
Path=/user/**
表示按照路径匹配(以 /user 开头则表示断言成功)Spring 提供了 11 种基本的 Predicate 工厂:
GatewayFilter 是网关中提供的一种过滤器,可以对进入网关的请求和微服务返回的响应做处理:
Spring 提供了 31 种不同的路由过滤器工厂(GatewayFilterFactory)。例如:
我们的目标是以Spring Cloud为基础,从零开始搭建一个7x24小时运行的证券交易所。
除了Spring Cloud外,通常项目还需要依赖数据库、消息系统、缓存等各种组件。我们选择组件的原则是通用性高,使用广泛,因此,数据库选择MySQL 8.x,消息系统选择Kafka 3.x,缓存系统选择Redis 6.x。
由于我们的项目是一个7x24小时运行的证券交易系统,因此,我们简单分析一下业务系统的特点:
为了简化设计,我们把项目需求限定如下:
项目名称暂定为Warp Exchange,采用GPL v3授权协议。项目最终完成后,效果如下:
对一个系统来说,建立一个简单可靠的模型,不但能大大简化系统的设计,而且能以较少的代码实现一个稳定运行的系统,最大限度地减少各种难以预测的错误。
我们来看证券交易系统的业务模型。
对于证券交易系统来说,其输入是所有交易员发送的买卖订单。系统接收到订单后,内部经过定序,再由撮合引擎进行买卖撮合,最后对成交的订单进行清算,买卖双方交换Base和Quote资产,即完成了交易。
在撮合成交的过程中,系统还需要根据成交价格、成交数量以及成交时间,对成交数据进行聚合,以便交易员能直观地以K线图的方式看到历史交易数据,因此,行情系统也是证券交易系统的一部分。此外,推送系统负责将行情、订单成交等事件推送给客户端。
最后,证券交易系统还需要给交易员提供一个操作界面,通常是Web或手机App。UI系统将在内部调用API,因此,API才是整个系统下单和撤单的唯一入口。
整个系统从逻辑上可以划分为如下模块:
以上各模块关系如下:
其中,交易引擎作为最核心的模块,我们需要仔细考虑如何设计一个简单可靠,且模块化程度较高的子系统。对证券交易系统来说,交易引擎内部可划分为:
交易引擎是一个以事件驱动为核心的系统,它的输入是定序后的一个个事件,输出则是撮合结果、市场行情等数据。交易引擎内部各模块关系如下:
经过这样的模块化设计,一个证券交易系统就具备了基本的雏型。
1. 项目骨架搭建
对于Warp Exchange项目,我们以Maven为构建工具,把每个模块作为一个Maven的项目管理,并抽取出公共逻辑放入common模块,结构如下:
为了简化版本和依赖管理,我们用parent模块管理最基础的pom.xml,其他模块直接从parent继承,能大大简化各自的pom.xml。parent模块pom.xml内容如下:
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
- http://maven.apache.org/xsd/maven-4.0.0.xsd"
- >
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.itranswarp.exchange</groupId>
- <artifactId>parent</artifactId>
- <version>1.0</version>
- <packaging>pom</packaging>
-
- <!-- 继承自SpringBoot Starter Parent -->
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <!-- SpringBoot版本 -->
- <version>2.7.1</version>
- </parent>
-
- <properties>
- <!-- 项目版本 -->
- <project.version>1.0</project.version>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-
- <!-- Java编译和运行版本 -->
- <maven.compiler.source>17</maven.compiler.source>
- <maven.compiler.target>17</maven.compiler.target>
- <java.version>17</java.version>
-
- <!-- 定义第三方组件的版本 -->
- <pebble.version>3.1.5</pebble.version>
- <springcloud.version>2021.0.3</springcloud.version>
- <springdoc.version>1.6.9</springdoc.version>
- <vertx.version>4.3.1</vertx.version>
- </properties>
-
- <!-- 引入SpringCloud依赖 -->
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-dependencies</artifactId>
- <version>${springcloud.version}</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
- </dependencies>
- </dependencyManagement>
-
- <!-- 共享的依赖管理 -->
- <dependencies>
- <!-- 依赖JUnit5 -->
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-api</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-params</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-engine</artifactId>
- <scope>test</scope>
- </dependency>
- <!-- 依赖SpringTest -->
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-test</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <pluginManagement>
- <plugins>
- <!-- 引入创建可执行Jar的插件 -->
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </pluginManagement>
- </build>
- </project>
上述pom.xml中,除了写死的Spring Boot版本、Java运行版本、项目版本外,其他引入的版本均以<xxx.version>1.23</xxx.version>的形式定义,以便后续可以用${xxx.version}引用版本号,避免了同一个组件出现多个写死的版本定义。
对其他业务模块,引入parent的pom.xml可大大简化配置。以ui模块为例,其pom.xml如下:
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
- http://maven.apache.org/xsd/maven-4.0.0.xsd"
- >
- <modelVersion>4.0.0</modelVersion>
-
- <!-- 指定Parent -->
- <parent>
- <groupId>com.itranswarp.exchange</groupId>
- <artifactId>parent</artifactId>
- <version>1.0</version>
- <!-- Parent POM的相对路径 -->
- <relativePath>../parent/pom.xml</relativePath>
- </parent>
-
- <!-- 当前模块名称 -->
- <artifactId>ui</artifactId>
-
- <dependencies>
- <!-- 依赖SpringCloud Config客户端 -->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-config</artifactId>
- </dependency>
-
- <!-- 依赖SpringBoot Actuator -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-actuator</artifactId>
- </dependency>
-
- <!-- 依赖Common模块 -->
- <dependency>
- <groupId>com.itranswarp.exchange</groupId>
- <artifactId>common</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <!-- 依赖第三方模块 -->
- <dependency>
- <groupId>io.pebbletemplates</groupId>
- <artifactId>pebble-spring-boot-starter</artifactId>
- <version>${pebble.version}</version>
- </dependency>
- </dependencies>
-
- <build>
- <!-- 指定输出文件名 -->
- <finalName>${project.artifactId}</finalName>
- <!-- 创建SpringBoot可执行jar -->
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
- </project>
因为我们在parent的pom.xml中引入了Spring Cloud的依赖管理,因此,无需指定相关组件的版本。只有我们自己编写的组件和未在Spring Boot和Spring Cloud中引入的组件,才需要指定版本。
最后,我们还需要一个build模块,把所有模块放到一起编译。建立build文件夹并创建pom.xml如下:
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
- http://maven.apache.org/maven-v4_0_0.xsd"
- >
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.itranswarp.exchange</groupId>
- <artifactId>build</artifactId>
- <version>1.0</version>
- <packaging>pom</packaging>
- <name>Warp Exchange</name>
-
- <!-- 按相对路径列出所有模块 -->
- <modules>
- <module>../common</module>
- <module>../config</module>
- <module>../parent</module>
- <module>../push</module>
- <module>../quotation</module>
- <module>../trading-api</module>
- <module>../trading-engine</module>
- <module>../trading-sequencer</module>
- <module>../ui</module>
- </modules>
- </project>
我们还需要创建目录config-repo来存储Spring Cloud Config服务器端的配置文件。
最后,将所有模块导入IDE,可正常开发、编译、运行。如果要在命令行模式下运行,进入build文件夹使用Maven编译即可:
warpexchange $ cd build && mvn clean package
2. 本地开发环境
在本地开发时,我们需要经常调试代码。除了安装JDK,选择一个IDE外,我们还需要在本地运行MySQL、Redis、Kafka,以及Kafka依赖的ZooKeeper服务。
考虑到手动安装各个服务在不同操作系统下的差异,以及初始化数据非常麻烦,我们使用Docker Desktop来运行这些基础服务,需要在build目录下编写一个docker-compose.yml文件定义我们要运行的所有服务:
- version: "3"
- services:
- zookeeper:
- image: bitnami/zookeeper:3.5
- container_name: zookeeper
- ports:
- - "2181:2181"
- environment:
- - ALLOW_ANONYMOUS_LOGIN=yes
- volumes:
- - "./docker/zookeeper-data:/bitnami"
-
- kafka:
- image: bitnami/kafka:3.0
- container_name: kafka
- ports:
- - "9092:9092"
- depends_on:
- - zookeeper
- environment:
- - KAFKA_BROKER_ID=1
- - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- - ALLOW_PLAINTEXT_LISTENER=yes
- volumes:
- - "./docker/kafka-data:/bitnami"
-
- redis:
- image: redis:6.2
- container_name: redis
- ports:
- - "6379:6379"
- volumes:
- - "./docker/redis-data:/data"
-
- mysql:
- image: mysql:8
- container_name: mysql
- ports:
- - "3306:3306"
- command: --default-authentication-plugin=mysql_native_password
- environment:
- - MYSQL_ROOT_PASSWORD=password
- volumes:
- - "./sql/schema.sql:/docker-entrypoint-initdb.d/1-schema.sql:ro"
- - "./docker/mysql-data:/var/lib/mysql"
在上述docker-compose.yml文件中,我们定义了MySQL、Redis、Kafka以及Kafka依赖的ZooKeeper服务,各服务均暴露标准端口,且MySQL的root口令设置为password,第一次启动MySQL时,使用sql/schema.sql文件初始化数据库表结构。所有数据盘均挂载到build目录下的docker目录。
在build目录下运行docker-compose up -d即可启动容器:
- build $ docker-compose up -d
- Creating network "build_default" with the default driver
- Creating zookeeper ... done
- Creating mysql ... done
- Creating redis ... done
- Creating kafka ... done
在Docker Desktop中可看到运行状态:
如果要删除开发环境的所有数据,首先停止运行Docker容器进程并删除,然后删除build目录下的docker目录,重新运行docker-compose即可。
3. Spring Cloud Config
Spring Cloud Config是Spring Cloud的一个子项目,它的主要目的是解决多个Spring Boot应用启动时,应该如何读取配置文件的问题。
对于单体应用,即一个独立的Spring Boot应用,我们会把配置写在application.yml文件中。如果配置需要针对多个环境,可以用---分隔并标注好环境:
- # application.yml
- # 通用配置:
- spring:
- datasource:
- url: jdbc:mysql://localhost/test
-
- ---
-
- # test profile:
- spring:
- config:
- activate:
- on-profile: test
- datasource:
- url: jdbc:mysql://172.16.0.100/test
这种配置方式针对单个Spring Boot应用是可行的,但是,针对分布式应用,有多个Spring Boot应用需要启动时,分散在各个应用中的配置既不便于管理,也不便于复用相同的配置。
Spring Cloud Config提供了一个通用的分布式应用的配置解决方案。它把配置分为两部分:
我们先来看看如何搭建一个Spring Cloud Config Server,即配置服务器。
首先,在config模块中引入spring-cloud-config-server依赖:
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-config-server</artifactId>
- </dependency>
然后,编写一个ConfigApplication入口,标注@EnableConfigServer:
- @EnableConfigServer
- @SpringBootApplication
- public class ConfigApplication {
- public static void main(String[] args) {
- SpringApplication.run(ConfigApplication.class, args);
- }
- }
最后,在application.yml中设置如何搜索配置。Spring Cloud Config支持多种配置方式,包括从本地文件、Git仓库、数据库等多个地方读取配置。这里我们选择以本地文件的方式读取配置文件,这也是最简单的一种配置方式:
- # 配置服务器的端口,通常设置为8888:
- server:
- port: 8888
-
- spring:
- application:
- name: config-server
- profiles:
- # 从文件读取配置时,Config Server激活的profile必须设定为native:
- active: native
- cloud:
- config:
- server:
- native:
- # 设置配置文件的搜索路径:
- search-locations: file:./config-repo, file:../config-repo, file:../../config-repo
在config-repo目录下,存放的就是一系列配置文件:
至此,配置服务器就完成了,直接运行ConfigApplication即可启动配置服务器。在开发过程中,保持配置服务器在后台运行即可。
接下来,对于每个负责业务的Spring Boot应用,我们需要从Spring Cloud Config Server读取配置。读取配置并不是说本地零配置,还是需要一点基础配置信息。以ui项目为例,编写application.yml如下:
- spring:
- application:
- # 设置app名称:
- name: ui
- config:
- # 导入Config Server地址:
- import: configserver:${CONFIG_SERVER:http://localhost:8888}
上述默认的Config Server配置为http://localhost:8888,也可以通过环境变量指定Config Server的地址。
下一步是在ui模块的pom.xml中添加依赖:
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-config</artifactId>
- </dependency>
接下来正常启动UIApplication,该应用就会自动从Config Server读取配置。由于我们指定了应用的名称是ui,且默认的profile是default,因此,Config Server将返回以下4个配置文件:
前面的配置文件优先级较高,后面的配置文件优先级较低。如果出现相同的配置项,则在优先级高的配置生效。
我们可以在浏览器访问http://localhost:8888/ui/default看到Config Server返回的配置,它是一个JSON文件:
- {
- "name": "ui",
- "profiles": [
- "default"
- ],
- "label": null,
- "version": null,
- "state": null,
- "propertySources": [
- {
- "name": "file:../config-repo/ui-default.yml",
- "source": {...}
- },
- {
- "name": "file:../config-repo/application-default.yml",
- "source": {...}
- },
- {
- "name": "file:../config-repo/ui.yml",
- "source": {...}
- },
- {
- "name": "file:../config-repo/application.yml",
- "source": {...}
- }
- ]
- }
如果我们启动UIApplication时传入SPRING_PROFILES_ACTIVE=test,将profile设置为test,则Config Server返回的文件如下:
可以通过http://localhost:8888/ui/test查看返回的配置。由于文件ui-test.yml不存在,因此,实际配置由3个文件合并而成。
我们可以很容易地看到,一个Spring Boot应用在启动时,首先要设置自己的name并导入Config Server的URL,再根据当前活动的profile,由Config Server返回多个配置文件:
其中,{name}-{xxx}.yml是针对某个应用+某个profile的特定配置,{name}.yml是针对某个应用+所有profile的配置,application-{profile}.yml是针对某个profile的全局配置,application.yml是所有应用的全局配置。搭配各种配置文件就可以灵活组合配置。一般来说,全局默认的配置放在application.yml中,例如数据库连接:
- spring:
- datasource:
- url: jdbc:mysql://localhost/test
这样保证了默认连接到本地数据库,在生产环境中会直接报错而不是连接到错误的数据库。
在生产环境,例如profile设置为prod,则可以将数据库连接写在application-prod.yml中,使得所有生产环境的应用读取到的数据库连接是一致的:
- spring:
- datasource:
- url: jdbc:mysql://172.16.0.100/prod_db
某个应用自己特定的配置则应当放到{name}.yml和{name}-{profile}.yml中。
在设置好各个配置文件后,应当通过浏览器检查Config Server返回的配置是否符合预期。
Spring Cloud Config还支持配置多个profile,以及从加密的配置源读取配置等。如果遇到更复杂的需求,可参考Spring Cloud Config的文档。
4. 环境变量
需要特别注意,在config-repo的配置文件里,使用的环境变量,不是Config Server的环境变量,而是具体某个Spring Boot应用的环境变量。
我们举个例子:假定ui.yml定义如下:
- server:
- port: ${APP_PORT:8000}
当UIApplication启动时,它获得的配置为server.port=${APP_PORT:8000}。Config Server不会替换任何环境变量,而是将它们原封不动地返回给UIApplication,由UIApplication根据自己的环境变量解析后获得最终配置。如果我们启动UIApplication时传入环境变量:
$ java -DAPP_PORT=7000 -jar ui.jar
则UIApplication最终读取的配置server.port为7000。
可见,使用Spring Cloud Config时,读取配置文件步骤如下:
环境变量通常用于配置一些敏感信息,如数据库连接口令,它们不适合明文写在config-repo的配置文件里。
5. 常见错误
启动一个Spring Boot应用时,如果出现Unable to load config data错误:
- java.lang.IllegalStateException: Unable to load config data from 'configserver:http://localhost:8888'
- at org.springframework.boot.context.config.StandardConfigDataLocationResolver.getReferences
- at ...
需要检查是否在pom.xml中引入了spring-cloud-starter-config,因为没有引入该依赖时,应用无法解析本地配置的import: configserver:xxx。
如果在启动一个Spring Boot应用时,Config Server没有运行,通常错误信息是因为没有读取到配置导致无法创建某个Bean。
这里我们以Spring Boot为基础,并通过Maven的模块化配置搭建了项目的基本结构,依赖的基础组件通过Docker Desktop运行并初始化数据。对于多个服务组成的分布式应用来说,使用Spring Cloud Config可满足应用的配置需求。
1. 交易引擎
一个完整的交易引擎包括资产系统、订单系统、撮合引擎和清算系统。
资产系统不仅记录了每个用户的所有资产,而且还要根据业务随时冻结和解冻用户资产。例如,下买单时,根据买入价格和买入数量,计算需要冻结的USD,然后对用户的可用USD进行冻结。
订单系统跟踪所有用户的所有订单。
撮合引擎是交易引擎中最重要的一个组件,它根据价格优先、时间优先的原则,对买卖订单进行匹配,匹配成功则成交,匹配不成功则放入订单簿等待后续成交。
清算系统则是处理来自撮合引擎的撮合结果。
最后,把上述几个组件组合起来,我们就得到了一个完善的交易引擎。
我们观察交易引擎的输入,它是一系列确定的订单序列,而交易引擎的输出则是成交信息。与此同时,交易引擎本身是一个确定性的状态机,它的内部状态包括订单集、资产表和订单簿。每当一个新的订单请求被输入后,状态机即更新状态,然后输出成交信息。
注意到交易引擎在任何一个时刻的状态都是确定的,在一个确定的状态下,继续给定一个确定的订单请求,下一个状态也是确定的,即:
交易引擎当前状态是Sn,则下一个输入On+1会将其状态更新为Sn+1。
因此,对于一组给定的输入订单集合[O1, O2, O3, ...],交易引擎每次内部状态的更新和输出都是完全确定的,与时间无关。
我们换句话说,就是给定一组订单输入的集合,让一个具有初始状态的交易引擎去执行,获得的结果集为[R1, R2, R3, ...],把同样的一组订单输入集合让另一个具有初始状态的交易引擎去执行,获得的结果集完全相同。
因此,要实现交易引擎的集群,可以同时运行多个交易引擎的实例,然后对每个实例输入相同的订单请求序列,就会得到完全相同的一组输出:
可见,交易引擎是一个事件驱动的状态机。
实现交易引擎有多种方式,例如,把资产、订单等放入数据库,基于数据库事务来保证交易完整性,这种方式的缺点就是速度非常慢,TPS很低。
也可以把全部组件放在内存中,这样能轻松实现一个高性能的交易引擎,但内存的易失性会导致宕机重启后丢失交易信息,因此,基于内存的交易引擎必须要解决数据的持久化问题。
在Warp Exchange项目中,我们将实现一个完全基于内存的交易引擎。
2. 资产系统
在交易系统中,用户资产是指用户以各种方式将USD、BTC充入交易所后的余额。本节我们来实现一个用户资产系统。
用户在买入BTC时,需要花费USD,而卖出BTC后,获得USD。当用户下单买入时,系统会先冻结对应的USD金额;当用户下单卖出时,系统会先冻结对应的BTC。之所以需要有冻结这一操作,是因为判断能否下单成功,是根据用户的可用资产判断。每下一个新的订单,就会有一部分可用资产被冻结,因此,用户资产本质上是一个由用户ID和资产ID标识的二维表:
用户ID | 资产ID | 可用 | 冻结 |
---|---|---|---|
101 | USD | 8900.3 | 1200 |
101 | BTC | 500 | 0 |
102 | USD | 12800 | 0 |
103 | BTC | 0 | 50 |
上述二维表有一个缺陷,就是对账很困难,因为缺少了一个关键的负债账户。对任何一个资产管理系统来说,要时刻保证整个系统的资产负债表为零。
对交易所来说,用户拥有的USD和BTC就是交易所的系统负债,只需引入一个负债账户,记录所有用户权益,就可以保证整个系统的资产负债表为零。假设负债账户以ID为1的系统用户表示,则用户资产表如下:
用户ID | 资产ID | 可用 | 冻结 |
---|---|---|---|
1 | USD | -23400.3 | 0 |
1 | BTC | -550 | 0 |
101 | USD | 8900.3 | 1200 |
101 | BTC | 500 | 0 |
102 | USD | 12800 | 0 |
103 | BTC | 0 | 50 |
引入了负债账户后,我们就可以定义资产的数据结构了。
在数据库中,上述表结构就是资产表的结构,将用户ID和资产ID标记为联合主键即可。
但是在内存中,我们怎么定义资产结构呢?
可以使用一个两层的ConcurrentMap定义如下:
- // 用户ID -> (资产ID -> Asset)
- ConcurrentMap<Long, ConcurrentMap<AssetEnum, Asset>> userAssets = new ConcurrentHashMap<>();
第一层Map的Key是用户ID,第二层Map的Key是资产ID,这样就可以用Asset结构表示资产:
- public class Asset {
- // 可用余额:
- BigDecimal available;
- // 冻结余额:
- BigDecimal frozen;
-
- public Assets() {
- this(BigDecimal.ZERO, BigDecimal.ZERO);
- }
-
- public Assets(BigDecimal available, BigDecimal frozen) {
- this.available = available;
- this.frozen = frozen;
- }
- }
下一步,我们在AssetService上定义对用户资产的操作。实际上,所有资产操作只有一种操作,即转账。转账类型可用Transfer定义为枚举类:
- public enum Transfer {
- // 可用转可用:
- AVAILABLE_TO_AVAILABLE,
- // 可用转冻结:
- AVAILABLE_TO_FROZEN,
- // 冻结转可用:
- FROZEN_TO_AVAILABLE;
- }
转账操作只需要一个tryTransfer()方法,实现如下:
- public boolean tryTransfer(Transfer type, Long fromUser, Long toUser, AssetEnum assetId, BigDecimal amount, boolean checkBalance) {
- // 转账金额不能为负:
- if (amount.signum() < 0) {
- throw new IllegalArgumentException("Negative amount");
- }
- // 获取源用户资产:
- Asset fromAsset = getAsset(fromUser, assetId);
- if (fromAsset == null) {
- // 资产不存在时初始化用户资产:
- fromAsset = initAssets(fromUser, assetId);
- }
- // 获取目标用户资产:
- Asset toAsset = getAsset(toUser, assetId);
- if (toAsset == null) {
- // 资产不存在时初始化用户资产:
- toAsset = initAssets(toUser, assetId);
- }
- return switch (type) {
- case AVAILABLE_TO_AVAILABLE -> {
- // 需要检查余额且余额不足:
- if (checkBalance && fromAsset.available.compareTo(amount) < 0) {
- // 转账失败:
- yield false;
- }
- // 源用户的可用资产减少:
- fromAsset.available = fromAsset.available.subtract(amount);
- // 目标用户的可用资产增加:
- toAsset.available = toAsset.available.add(amount);
- // 返回成功:
- yield true;
- }
- // 从可用转至冻结:
- case AVAILABLE_TO_FROZEN -> {
- if (checkBalance && fromAsset.available.compareTo(amount) < 0) {
- yield false;
- }
- fromAsset.available = fromAsset.available.subtract(amount);
- toAsset.frozen = toAsset.frozen.add(amount);
- yield true;
- }
- // 从冻结转至可用:
- case FROZEN_TO_AVAILABLE -> {
- if (checkBalance && fromAsset.frozen.compareTo(amount) < 0) {
- yield false;
- }
- fromAsset.frozen = fromAsset.frozen.subtract(amount);
- toAsset.available = toAsset.available.add(amount);
- yield true;
- }
- default -> {
- throw new IllegalArgumentException("invalid type: " + type);
- }
- };
- }
除了用户存入资产时,需要调用tryTransfer()并且不检查余额,因为此操作是从系统负债账户向用户转账,其他常规转账操作均需要检查余额:
- public void transfer(Transfer type, Long fromUser, Long toUser, AssetEnum assetId, BigDecimal amount) {
- if (!tryTransfer(type, fromUser, toUser, assetId, amount, true)) {
- throw new RuntimeException("Transfer failed");
- }
- }
冻结操作可在tryTransfer()基础上封装一个方法:
- public boolean tryFreeze(Long userId, AssetEnum assetId, BigDecimal amount) {
- return tryTransfer(Transfer.AVAILABLE_TO_FROZEN, userId, userId, assetId, amount, true);
- }
解冻操作实际上也是在tryTransfer()基础上封装:
- public void unfreeze(Long userId, AssetEnum assetId, BigDecimal amount) {
- if (!tryTransfer(Transfer.FROZEN_TO_AVAILABLE, userId, userId, assetId, amount, true)) {
- throw new RuntimeException("Unfreeze failed");
- }
- }
可以编写一个AssetServiceTest,测试各种转账操作:
- public class AssetServiceTest {
- @Test
- void tryTransfer() {
- // TODO...
- }
- }
并验证在任意操作后,所有用户资产的各余额总和为0。
最后有些疑问:
为什么不使用数据库?
因为我们要实现的交易引擎是100%全内存交易引擎,因此所有用户资产均存放在内存中,无需访问数据库。
为什么要使用ConcurrentMap?
使用ConcurrentMap并不是为了让多线程并发写入,因为AssetService中并没有任何同步锁。对AssetService进行写操作必须是单线程,不支持多线程调用tryTransfer()。
但是读取Asset支持多线程并发读取,这也是使用ConcurrentMap的原因。如果改成HashMap,根据不同JDK版本的实现不同,多线程读取HashMap可能造成死循环(注意这不是HashMap的bug),必须引入同步机制。
如何扩展以支持更多的资产类型?
我们在AssetEnum中以枚举方式定义了USD和BTC两种资产,如果要扩展到更多资产类型,可以以整型ID作为资产ID,同时需要管理一个资产ID到资产名称的映射,这样可以在业务需要的时候更改资产名称。
这里实现了一个基于内存的高性能的用户资产系统,其核心只有一个tryTransfer()转账方法,业务逻辑非常简单。
3. 订单系统
订单系统的目的是为了管理所有的活动订单,并给每个新订单一个递增的序列号。由于在创建订单时需要冻结用户资产,因此,我们定义的OrderService会引用AssetService:
- public class OrderService {
- // 引用AssetService:
- final AssetService assetService;
-
- public OrderService(@Autowired AssetService assetService) {
- this.assetService = assetService;
- }
- }
一个订单由订单ID唯一标识,此外,订单包含以下重要字段:
一个订单被成功创建后,它后续由撮合引擎处理时,只有unfilledQuantity和status会发生变化,其他属性均为只读,不会改变。
当订单状态变为完全成交、部分取消、完全取消时,订单就已经处理完成。处理完成的订单从订单系统中删除,并写入数据库永久变为历史订单。用户查询活动订单时,需要读取订单系统,用户查询历史订单时,只需从数据库查询,就与订单系统无关了。
我们定义OrderEntity如下:
- public class OrderEntity {
- // 订单ID / 定序ID / 用户ID:
- public Long id;
- public long sequenceId;
- public Long userId;
-
- // 价格 / 方向 / 状态:
- public BigDecimal price;
- public Direction direction;
- public OrderStatus status;
-
- // 订单数量 / 未成交数量:
- public BigDecimal quantity;
- public BigDecimal unfilledQuantity;
-
- // 创建和更新时间:
- public long createdAt;
- public long updatedAt;
- }
处于简化设计的缘故,该对象既作为订单系统的订单对象,也作为数据库映射实体。
根据业务需要,订单系统需要支持:
因此,OrderService需要用两个Map存储活动订单:
- public class OrderService {
- // 跟踪所有活动订单: Order ID => OrderEntity
- final ConcurrentMap<Long, OrderEntity> activeOrders = new ConcurrentHashMap<>();
-
- // 跟踪用户活动订单: User ID => Map(Order ID => OrderEntity)
- final ConcurrentMap<Long, ConcurrentMap<Long, OrderEntity>> userOrders = new ConcurrentHashMap<>();
添加一个新的Order时,需要同时更新activeOrders和userOrders。同理,删除一个Order时,需要同时从activeOrders和userOrders中删除。
我们先编写创建订单的方法:
- /**
- * 创建订单,失败返回null:
- */
- public OrderEntity createOrder(long sequenceId, long ts, Long orderId, Long userId, Direction direction, BigDecimal price, BigDecimal quantity) {
- switch (direction) {
- case BUY -> {
- // 买入,需冻结USD:
- if (!assetService.tryFreeze(userId, AssetEnum.USD, price.multiply(quantity))) {
- return null;
- }
- }
- case SELL -> {
- // 卖出,需冻结BTC:
- if (!assetService.tryFreeze(userId, AssetEnum.BTC, quantity)) {
- return null;
- }
- }
- default -> throw new IllegalArgumentException("Invalid direction.");
- }
- // 实例化Order:
- OrderEntity order = new OrderEntity();
- order.id = orderId;
- order.sequenceId = sequenceId;
- order.userId = userId;
- order.direction = direction;
- order.price = price;
- order.quantity = quantity;
- order.unfilledQuantity = quantity;
- order.createdAt = order.updatedAt = ts;
- // 添加到ActiveOrders:
- this.activeOrders.put(order.id, order);
- // 添加到UserOrders:
- ConcurrentMap<Long, OrderEntity> uOrders = this.userOrders.get(userId);
- if (uOrders == null) {
- uOrders = new ConcurrentHashMap<>();
- this.userOrders.put(userId, uOrders);
- }
- uOrders.put(order.id, order);
- return order;
- }
后续在清算过程中,如果发现一个Order已经完成或取消后,需要调用删除方法将活动订单从OrderService中删除:
- public void removeOrder(Long orderId) {
- // 从ActiveOrders中删除:
- OrderEntity removed = this.activeOrders.remove(orderId);
- if (removed == null) {
- throw new IllegalArgumentException("Order not found by orderId in active orders: " + orderId);
- }
- // 从UserOrders中删除:
- ConcurrentMap<Long, OrderEntity> uOrders = userOrders.get(removed.userId);
- if (uOrders == null) {
- throw new IllegalArgumentException("User orders not found by userId: " + removed.userId);
- }
- if (uOrders.remove(orderId) == null) {
- throw new IllegalArgumentException("Order not found by orderId in user orders: " + orderId);
- }
- }
删除订单时,必须从activeOrders和userOrders中全部成功删除,否则会造成OrderService内部状态混乱。
最后,根据业务需求,我们加上根据订单ID查询、根据用户ID查询的方法:
- // 根据订单ID查询Order,不存在返回null:
- public OrderEntity getOrder(Long orderId) {
- return this.activeOrders.get(orderId);
- }
- // 根据用户ID查询用户所有活动Order,不存在返回null:
- public ConcurrentMap<Long, OrderEntity> getUserOrders(Long userId) {
- return this.userOrders.get(userId);
- }
整个订单子系统的实现就是这么简单。
Order的id和sequenceId为何不合并使用一个ID?
订单ID是Order.id,是用户看到的订单标识,而Order.sequenceId是系统内部给订单的定序序列号,用于后续撮合时进入订单簿的排序,两者功能不同。
可以使用一个简单的算法来根据Sequence ID计算Order ID:
OrderID = SequenceID * 10000 + today("YYmm")
因为SequenceID是全局唯一的,我们给SequenceID添加创建日期的"YYmm"部分,可轻松实现按月分库保存和查询。
4. 撮合引擎
现撮合引擎的关键在于将业务模型转换为高效的数据结构。只要保证核心数据结构的简单和高效,撮合引擎的业务逻辑编写是非常容易的。
在证券交易系统中,撮合引擎是实现买卖盘成交的关键组件。我们先分析撮合引擎的工作原理,然后设计并实现一个最简化的撮合引擎。
在证券市场中,撮合交易是一种微观价格发现模型,它允许买卖双方各自提交买卖订单并报价,按价格优先,时间优先的顺序,凡买单价格大于等于卖单价格时,双方即达成价格协商并成交。在A股身经百战的老股民对此规则应该非常熟悉,这里不再详述。
我们将讨论如何从技术上来实现它。对于撮合引擎来说,它必须维护两个买卖盘列表,一个买盘,一个卖盘,买盘按价格从高到低排序,确保报价最高的订单排在最前面;卖盘则相反,按照价格从低到高排序,确保报价最低的卖单排在最前面。
下图是一个实际的买卖盘:
对于买盘来说,上图的订单排序为2086.50,2086.09,2086.06,20860,2085.97,……
对于卖盘来说,上图的订单排序为2086.55,2086.75,2086.77,2086.90,2086.99,……
不可能出现买1价格大于等于卖1价格的情况,因为这意味着应该成交的买卖订单没有成交却在订单簿上等待成交。
对于多个价格相同的订单,例如2086.55,很可能张三卖出1,李四卖出3,累计数量是4。当一个新的买单价格≥2086.55时,到底优先和张三的卖单成交还是优先和李四的卖单成交呢?这要看张三和李四的订单时间谁更靠前。
我们在订单上虽然保存了创建时间,但排序时,是根据定序ID即sequenceId来排序,以确保全局唯一。时间本身实际上是订单的一个普通属性,仅展示给用户,不参与业务排序。
下一步是实现订单簿OrderBook的表示。一个直观的想法是使用List<Order>,并对订单进行排序。但是,在证券交易中,使用List会导致两个致命问题:
更好的方法是使用红黑树,它是一种自平衡的二叉排序树,插入和删除的效率都是O(logN),对应的Java类是TreeMap。
所以我们定义OrderBook的结构就是一个TreeMap<OrderKey, OrderEntity>,它的排序根据OrderKey决定。由业务规则可知,负责排序的OrderKey只需要sequenceId和price即可:
- // 以record实现的OrderKey:
- public record OrderKey(long sequenceId, BigDecimal price) {
- }
因此,OrderBook的核心数据结构就可以表示如下:
- public class OrderBook {
- public final Direction direction; // 方向
- public final TreeMap<OrderKey, Order> book; // 排序树
-
- public OrderBook(Direction direction) {
- this.direction = direction;
- this.book = new TreeMap<>(???);
- }
- }
有的童鞋注意到TreeMap的排序要求实现Comparable接口或者提供一个Comparator。我们之所以没有在OrderKey上实现Comparable接口是因为买卖盘排序的价格规则不同,因此,编写两个Comparator分别用于排序买盘和卖盘:
- private static final Comparator<OrderKey> SORT_SELL = new Comparator<>() {
- public int compare(OrderKey o1, OrderKey o2) {
- // 价格低在前:
- int cmp = o1.price().compareTo(o2.price());
- // 时间早在前:
- return cmp == 0 ? Long.compare(o1.sequenceId(), o2.sequenceId()) : cmp;
- }
- };
-
- private static final Comparator<OrderKey> SORT_BUY = new Comparator<>() {
- public int compare(OrderKey o1, OrderKey o2) {
- // 价格高在前:
- int cmp = o2.price().compareTo(o1.price());
- // 时间早在前:
- return cmp == 0 ? Long.compare(o1.sequenceId(), o2.sequenceId()) : cmp;
- }
- };
这样,OrderBook的TreeMap排序就由Direction指定:
- public OrderBook(Direction direction) {
- this.direction = direction;
- this.book = new TreeMap<>(direction == Direction.BUY ? SORT_BUY : SORT_SELL);
- }
这里友情提示Java的BigDecimal比较大小的大坑:比较两个BigDecimal是否值相等,一定要用compareTo(),不要用equals(),因为1.2和1.20因为scale不同导致equals()返回false。
在Java中比较两个BigDecimal的值只能使用compareTo(),不能使用equals()!
再给OrderBook添加插入、删除和查找首元素方法:
- public OrderEntity getFirst() {
- return this.book.isEmpty() ? null : this.book.firstEntry().getValue();
- }
-
- public boolean remove(OrderEntity order) {
- return this.book.remove(new OrderKey(order.sequenceId, order.price)) != null;
- }
-
- public boolean add(OrderEntity order) {
- return this.book.put(new OrderKey(order.sequenceId, order.price), order) == null;
- }
现在,有了买卖盘,我们就可以编写撮合引擎了。
定义MatchEngine核心数据结构如下:
- public class MatchEngine {
- public final OrderBook buyBook = new OrderBook(Direction.BUY);
- public final OrderBook sellBook = new OrderBook(Direction.SELL);
- public BigDecimal marketPrice = BigDecimal.ZERO; // 最新市场价
- private long sequenceId; // 上次处理的Sequence ID
- }
一个完整的撮合引擎包含一个买盘、一个卖盘和一个最新成交价(初始值为0)。撮合引擎的输入是一个OrderEntity实例,每处理一个订单,就输出撮合结果MatchResult,核心处理方法定义如下:
- public MatchResult processOrder(long sequenceId, OrderEntity order) {
- ...
- }
下面我们讨论如何处理一个具体的订单。对于撮合交易来说,如果新订单是一个买单,则首先尝试在卖盘中匹配价格合适的卖单,如果匹配成功则成交。一个大的买单可能会匹配多个较小的卖单。当买单被完全匹配后,说明此买单已完全成交,处理结束,否则,如果存在未成交的买单,则将其放入买盘。处理卖单的逻辑是类似的。
我们把已经挂在买卖盘的订单称为挂单(Maker),当前正在处理的订单称为吃单(Taker),一个Taker订单如果未完全成交则转为Maker挂在买卖盘,因此,处理当前Taker订单的逻辑如下:
- public MatchResult processOrder(long sequenceId, OrderEntity order) {
- switch (order.direction) {
- case BUY:
- // 买单与sellBook匹配,最后放入buyBook:
- return processOrder(order, this.sellBook, this.buyBook);
- case SELL:
- // 卖单与buyBook匹配,最后放入sellBook:
- return processOrder(order, this.buyBook, this.sellBook);
- default:
- throw new IllegalArgumentException("Invalid direction.");
- }
- }
-
- MatchResult processOrder(long sequenceId, OrderEntity takerOrder, OrderBook makerBook, OrderBook anotherBook) {
- ...
- }
根据价格匹配,直到成交双方有一方完全成交或成交条件不满足时结束处理,我们直接给出processOrder()的业务逻辑代码:
- MatchResult processOrder(long sequenceId, OrderEntity takerOrder, OrderBook makerBook, OrderBook anotherBook) {
- this.sequenceId = sequenceId;
- long ts = takerOrder.createdAt;
- MatchResult matchResult = new MatchResult(takerOrder);
- BigDecimal takerUnfilledQuantity = takerOrder.quantity;
- for (;;) {
- OrderEntity makerOrder = makerBook.getFirst();
- if (makerOrder == null) {
- // 对手盘不存在:
- break;
- }
- if (takerOrder.direction == Direction.BUY && takerOrder.price.compareTo(makerOrder.price) < 0) {
- // 买入订单价格比卖盘第一档价格低:
- break;
- } else if (takerOrder.direction == Direction.SELL && takerOrder.price.compareTo(makerOrder.price) > 0) {
- // 卖出订单价格比买盘第一档价格高:
- break;
- }
- // 以Maker价格成交:
- this.marketPrice = makerOrder.price;
- // 待成交数量为两者较小值:
- BigDecimal matchedQuantity = takerUnfilledQuantity.min(makerOrder.unfilledQuantity);
- // 成交记录:
- matchResult.add(makerOrder.price, matchedQuantity, makerOrder);
- // 更新成交后的订单数量:
- takerUnfilledQuantity = takerUnfilledQuantity.subtract(matchedQuantity);
- BigDecimal makerUnfilledQuantity = makerOrder.unfilledQuantity.subtract(matchedQuantity);
- // 对手盘完全成交后,从订单簿中删除:
- if (makerUnfilledQuantity.signum() == 0) {
- makerOrder.updateOrder(makerUnfilledQuantity, OrderStatus.FULLY_FILLED, ts);
- makerBook.remove(makerOrder);
- } else {
- // 对手盘部分成交:
- makerOrder.updateOrder(makerUnfilledQuantity, OrderStatus.PARTIAL_FILLED, ts);
- }
- // Taker订单完全成交后,退出循环:
- if (takerUnfilledQuantity.signum() == 0) {
- takerOrder.updateOrder(takerUnfilledQuantity, OrderStatus.FULLY_FILLED, ts);
- break;
- }
- }
- // Taker订单未完全成交时,放入订单簿:
- if (takerUnfilledQuantity.signum() > 0) {
- takerOrder.updateOrder(takerUnfilledQuantity,
- takerUnfilledQuantity.compareTo(takerOrder.quantity) == 0 ? OrderStatus.PENDING
- : OrderStatus.PARTIAL_FILLED,
- ts);
- anotherBook.add(takerOrder);
- }
- return matchResult;
- }
可见,撮合匹配的业务逻辑是相对简单的。撮合结果记录在MatchResult中,它可以用一个Taker订单和一系列撮合匹配记录表示:
- public class MatchResult {
- public final Order takerOrder;
- public final List<MatchDetailRecord> MatchDetails = new ArrayList<>();
-
- // 构造方法略
- }
每一笔撮合记录则由成交双方、成交价格与数量表示:
- public record MatchDetailRecord(
- BigDecimal price,
- BigDecimal quantity,
- OrderEntity takerOrder,
- OrderEntity makerOrder) {
- }
撮合引擎返回的MatchResult包含了本次处理的完整结果,下一步需要把MatchResult发送给清算系统,对交易双方进行清算即完成了整个交易的处理。
我们可以编写一个简单的测试来验证撮合引擎工作是否正常。假设如下的订单依次输入到撮合引擎:
- // 方向 价格 数量
- buy 2082.34 1
- sell 2087.6 2
- buy 2087.8 1
- buy 2085.01 5
- sell 2088.02 3
- sell 2087.60 6
- buy 2081.11 7
- buy 2086.0 3
- buy 2088.33 1
- sell 2086.54 2
- sell 2086.55 5
- buy 2086.55 3
经过撮合后最终买卖盘及市场价如下:
- 2088.02 3
- 2087.60 6
- 2086.55 4
- ---------
- 2086.55
- ---------
- 2086.00 3
- 2085.01 5
- 2082.34 1
- 2081.11 7
如果我们仔细观察整个系统的输入和输出,输入实际上是一系列按时间排序后的订单(实际排序按sequenceId),输出是一系列MatchResult,内部状态的变化就是买卖盘以及市场价的变化。如果两个初始状态相同的MatchEngine,输入的订单序列是完全相同的,则我们得到的MatchResult输出序列以及最终的内部状态也是完全相同的。
下面是问题解答。
如何实现多个交易对?
一个撮合引擎只能处理一个交易对,如果要实现多个交易对,则需要构造一个“多撮合实例”的引擎:
- class MatchEngineGroup {
- Map<Long, MatchEngine> engines = new HashMap<>();
- public MatchResult processOrder(long sequenceId, OrderEntity order) {
- // 获得订单的交易对ID:
- Long symbolId = order.symbolId;
- // 查找交易对所对应的引擎实例:
- MatchEngine engine = engines.get(symbolId);
- if (engine == null) {
- // 该交易对的第一个订单:
- engine = new MatchEngine();
- engines.put(symbolId, engine);
- }
- // 由该实例处理订单:
- return engine.processOrder(sequenceId, order);
- }
- }
需要给订单增加symbolId属性以标识该订单是哪个交易对。
5. 清算系统
清算系统只负责根据撮合引擎输出的结果进行清算,清算的本质就是根据成交价格和数量对买卖双方的对应资产互相划转。清算系统本身没有状态。
在证券交易系统中,一个订单成功创建后,经过撮合引擎,就可以输出撮合结果。但此时买卖双方的资产还没有变化,要把撮合结果最终实现为买卖双方的资产交换,就需要清算。
清算系统就是处理撮合结果,将买卖双方冻结的USD和BTC分别交换到对方的可用余额,就使得买卖双方真正完成了资产交换。
因此,我们设计清算系统ClearingService,需要引用AssetService和OrderService:
- public class ClearingService {
- final AssetService assetService;
- final OrderService orderService;
-
- public ClearingService(@Autowired AssetService assetService, @Autowired OrderService orderService) {
- this.assetService = assetService;
- this.orderService = orderService;
- }
- }
当撮合引擎输出MatchResult后,ClearingService需要处理该结果,该清算方法代码框架如下:
- public void clearMatchResult(MatchResult result) {
- OrderEntity taker = result.takerOrder;
- switch (taker.direction) {
- case BUY -> {
- // TODO
- }
- case SELL -> {
- // TODO
- }
- default -> throw new IllegalArgumentException("Invalid direction.");
- }
- }
对Taker买入成交的订单,处理时需要注意,成交价格是按照Maker的报价成交的,而Taker冻结的金额是按照Taker订单的报价冻结的,因此,解冻后,部分差额要退回至Taker可用余额:
- case BUY -> {
- // 买入时,按Maker的价格成交:
- for (MatchDetailRecord detail : result.matchDetails) {
- OrderEntity maker = detail.makerOrder();
- BigDecimal matched = detail.quantity();
- if (taker.price.compareTo(maker.price) > 0) {
- // 实际买入价比报价低,部分USD退回账户:
- BigDecimal unfreezeQuote = taker.price.subtract(maker.price).multiply(matched);
- assetService.unfreeze(taker.userId, AssetEnum.USD, unfreezeQuote);
- }
- // 买方USD转入卖方账户:
- assetService.transfer(Transfer.FROZEN_TO_AVAILABLE, taker.userId, maker.userId, AssetEnum.USD, maker.price.multiply(matched));
- // 卖方BTC转入买方账户:
- assetService.transfer(Transfer.FROZEN_TO_AVAILABLE, maker.userId, taker.userId, AssetEnum.BTC, matched);
- // 删除完全成交的Maker:
- if (maker.unfilledQuantity.signum() == 0) {
- orderService.removeOrder(maker.id);
- }
- }
- // 删除完全成交的Taker:
- if (taker.unfilledQuantity.signum() == 0) {
- orderService.removeOrder(taker.id);
- }
- }
对Taker卖出成交的订单,只需将冻结的BTC转入Maker,将Maker冻结的USD转入Taker即可:
- case SELL -> {
- for (MatchDetailRecord detail : result.matchDetails) {
- OrderEntity maker = detail.makerOrder();
- BigDecimal matched = detail.quantity();
- // 卖方BTC转入买方账户:
- assetService.transfer(Transfer.FROZEN_TO_AVAILABLE, taker.userId, maker.userId, AssetEnum.BTC, matched);
- // 买方USD转入卖方账户:
- assetService.transfer(Transfer.FROZEN_TO_AVAILABLE, maker.userId, taker.userId, AssetEnum.USD, maker.price.multiply(matched));
- // 删除完全成交的Maker:
- if (maker.unfilledQuantity.signum() == 0) {
- orderService.removeOrder(maker.id);
- }
- }
- // 删除完全成交的Taker:
- if (taker.unfilledQuantity.signum() == 0) {
- orderService.removeOrder(taker.id);
- }
- }
当用户取消订单时,ClearingService需要取消订单冻结的USD或BTC,然后将订单从OrderService中删除:
- public void clearCancelOrder(OrderEntity order) {
- switch (order.direction) {
- case BUY -> {
- // 解冻USD = 价格 x 未成交数量
- assetService.unfreeze(order.userId, AssetEnum.USD, order.price.multiply(order.unfilledQuantity));
- }
- case SELL -> {
- // 解冻BTC = 未成交数量
- assetService.unfreeze(order.userId, AssetEnum.BTC, order.unfilledQuantity);
- }
- default -> throw new IllegalArgumentException("Invalid direction.");
- }
- // 从OrderService中删除订单:
- orderService.removeOrder(order.id);
- }
这样,我们就完成了清算系统的实现。
如果有手续费,如何清算?
如果有交易手续费,则首先需要思考:手续费应该定义在哪?
如果我们把手续费定义为一个配置,注入到ClearingService:
- public class ClearingService {
- @Value("${exchange.fee-rate:0.0005}")
- BigDecimal feeRate;
- }
那么问题来了:对于同一个订单输入序列,设定手续费为万分之五,和设定手续费为万分之二,执行后交易引擎的状态和输出结果是不同的!这就使得交易引擎不再是一个确定性状态机,无法重复执行交易序列。
此外,不同用户通常可以有不同的交易费率,例如机构的费率比个人低,做市商的费率可以为0。
要支持不同用户不同的费率,以及保证交易引擎是一个确定性状态机,手续费必须作为订单的一个不变属性,从外部输入,这样交易引擎不再关心如何读取费率。
带手续费的订单在创建时,针对买单,冻结金额不再是价格x数量,而是:
freeze = order.price * order.quantity * (1 + order.feeRate)
首先,需要修改OrderService创建订单时的冻结逻辑。其次,在清算时,除了买卖双方交换资产,还需要设定一个系统用户,专门接收手续费,将买方手续费从冻结的金额转入系统手续费用户,而卖方获得转入的金额会扣除手续费。
可以为挂单和吃单设置不同的手续费率吗?
可以,需要给订单添加两个费率属性:takerFeeRate和makerFeeRate,买方下单冻结时,额外冻结的金额按takerFeeRate冻结。
清算逻辑会复杂一些,要针对Taker和Maker分别计算不同的费率。
可以设置负费率吗?
可以,通常可以给makerFeeRate设置负费率,以鼓励做市。清算逻辑会更复杂一些,因为针对负费率的Maker,需要从系统手续费用户转账给Maker。
6. 交易引擎
交易引擎是以事件驱动的状态机模型,同样的输入将得到同样的输出。为提高交易系统的健壮性,可以自动检测重复消息和消息丢失并自动恢复。
我们现在实现了资产模块、订单模块、撮合引擎和清算模块,现在,就可以把它们组合起来,实现一个完整的交易引擎:
- public class TradingEngineService {
- @Autowired
- AssetService assetService;
-
- @Autowired
- OrderService orderService;
-
- @Autowired
- MatchEngine matchEngine;
-
- @Autowired
- ClearingService clearingService;
- }
交易引擎由事件驱动,因此,通过订阅Kafka的Topic实现批量读消息,然后依次处理每个事件:
- void processMessages(List<AbstractEvent> messages) {
- for (AbstractEvent message : messages) {
- processEvent(message);
- }
- }
-
- void processEvent(AbstractEvent event) {
- if (event instanceof OrderRequestEvent) {
- createOrder((OrderRequestEvent) event);
- } else if (event instanceof OrderCancelEvent) {
- cancelOrder((OrderCancelEvent) event);
- } else if (event instanceof TransferEvent) {
- transfer((TransferEvent) event);
- }
- }
我们目前一共有3种类型的事件,处理都非常简单。以createOrder()为例,核心代码其实就几行:
- void createOrder(OrderRequestEvent event) {
- // 生成Order ID:
- long orderId = event.sequenceId * 10000 + (year * 100 + month);
- // 创建Order:
- OrderEntity order = orderService.createOrder(event.sequenceId, event.createdAt, orderId, event.userId, event.direction, event.price, event.quantity);
- if (order == null) {
- logger.warn("create order failed.");
- return;
- }
- // 撮合:
- MatchResult result = matchEngine.processOrder(event.sequenceId, order);
- // 清算:
- clearingService.clearMatchResult(result);
- }
核心的业务逻辑并不复杂,只是交易引擎在处理完订单后,仅仅改变自身状态是不够的,它还得向外输出具体的成交信息、订单状态等。
因此,需要根据业务需求,在清算后继续收集撮合结果、已完成订单、准备发送的通知等,通过消息系统或Redis向外输出交易信息。如果把这些功能放到同一个线程内同步完成是非常耗时的,更好的方法是把它们先存储起来,再异步处理。例如,对于已完成的订单,可以异步落库:
- Queue<List<OrderEntity>> orderQueue = new ConcurrentLinkedQueue<>();
- void createOrder(OrderRequestEvent event) {
- ...
- // 清算完成后,收集已完成Order:
- if (!result.matchDetails.isEmpty()) {
- List<OrderEntity> closedOrders = new ArrayList<>();
- if (result.takerOrder.status.isFinalStatus) {
- closedOrders.add(result.takerOrder);
- }
- for (MatchDetailRecord detail : result.matchDetails) {
- OrderEntity maker = detail.makerOrder();
- if (maker.status.isFinalStatus) {
- closedOrders.add(maker);
- }
- }
- this.orderQueue.add(closedOrders);
- }
- }
-
- // 启动一个线程将orderQueue的Order异步写入数据库:
- void saveOrders() {
- // TODO:
- }
类似的,输出OrderBook、通知用户成交等信息都是异步处理。
接下来,我们再继续完善processEvent(),处理单个事件时,在处理具体的业务逻辑之前,我们首先根据sequenceId判断是否是重复消息,是重复消息就丢弃:
- void processEvent(AbstractEvent event) {
- if (event.sequenceId <= this.lastSequenceId) {
- logger.warn("skip duplicate event: {}", event);
- return;
- }
- // TODO:
- }
紧接着,我们判断是否丢失了消息,如果丢失了消息,就根据上次处理的消息的sequenceId,从数据库里捞出后续消息,直到赶上当前消息的sequenceId为止:
- // 判断是否丢失了消息:
- if (event.previousId > this.lastSequenceId) {
- // 从数据库读取丢失的消息:
- List<AbstractEvent> events = storeService.loadEventsFromDb(this.lastSequenceId);
- if (events.isEmpty()) {
- // 读取失败:
- System.exit(1);
- return;
- }
- // 处理丢失的消息:
- for (AbstractEvent e : events) {
- this.processEvent(e);
- }
- return;
- }
- // 判断当前消息是否指向上一条消息:
- if (event.previousId != lastSequenceId) {
- System.exit(1);
- return;
- }
- // 正常处理:
- ...
- // 更新lastSequenceId:
- this.lastSequenceId = event.sequenceId;
这样一来,我们对消息系统的依赖就不是要求它100%可靠,遇到重复消息、丢失消息,交易引擎都可以从这些错误中自己恢复。
由于资产、订单、撮合、清算都在内存中完成,如何保证交易引擎每处理一个事件,它的内部状态都是正确的呢?我们可以为交易引擎增加一个自验证功能,在debug模式下,每处理一个事件,就自动验证内部状态的完整性,包括:
- void processEvent(AbstractEvent event) {
- ...
- if (debugMode) {
- this.validate();
- }
- }
这样我们就能快速在开发阶段尽可能早地发现问题。
交易引擎的测试也相对比较简单。对于同一组输入,每次运行都会得到相同的结果,所以我们可以构造几组确定的输入来验证交易引擎:
- class TradingEngineServiceTest {
- @Test
- public void testTradingEngine() {
- // TODO:
- }
- }
交易引擎崩溃后如何恢复?
交易引擎如果运行时崩溃,可以重启,重启后先把现有的所有交易事件重头开始执行一遍,即可得到最新的状态。
注意到重头开始执行交易事件,会导致重复发出市场成交、用户订单通知等事件,因此,可根据时间做判断,不再重复发通知。下游系统在处理通知事件时,也要根据通知携带的sequenceId做去重判断。
有的童鞋会问,如果现有的交易事件已经有几千万甚至几十亿,从头开始执行如果需要花费几个小时甚至几天,怎么办?
可以定期把交易引擎的状态序列化至文件系统,例如,每10分钟一次。当交易引擎崩溃时,读取最新的状态文件,即可恢复至约10分钟前的状态,后续追赶只需要执行很少的事件消息。
如何序列化交易引擎的状态?
交易引擎的状态包括:
序列化时,分别针对每个子系统进行序列化。对资产系统来说,每个用户的资产可序列化为用户ID: [USD可用, USD冻结, BTC可用, BTC冻结]的JSON格式,整个资产系统序列化后结构如下:
- {
- "1": [-123000, 0, -12.3, 0],
- "100": [60000, 20000, 9, 0],
- "200": [43000, 0, 3, 0.3]
- }
订单系统可序列化为一系列活动订单列表:
- [
- { "id": 10012207, "sequenceId": 1001, "price": 20901, ...},
- { "id": 10022207, "sequenceId": 1002, "price": 20902, ...},
- ]
撮合引擎可序列化为买卖盘列表(仅包含订单ID):
- {
- "BUY": [10012207, 10022207, ...],
- "SELL": [...],
- "marketPrice": 20901
- }
最后合并为一个交易引擎的状态文件:
- {
- "sequenceId": 189000,
- "assets": { ... },
- "orders": [ ... ],
- "match": { ... }
- }
交易引擎启动时,读取状态文件,然后依次恢复资产系统、订单系统和撮合引擎的状态,就得到了指定sequenceId的状态。
写入状态时,如果是异步写入,需要先复制状态、再写入,防止多线程读同一实例导致状态不一致。读写JSON时,要使用JSON库的流式API(例如Jackson的Streaming API),以免内存溢出。对BigDecimal进行序列化时,要注意不要误读为double类型以免丢失精度。
7. 定序系统
定序系统负责给每个事件一个唯一递增序列号。通过引用前一个事件的序列号,可以构造一个能自动检测连续性的事件流。
当系统通过API接收到所有交易员发送的订单请求后,就需要按接收顺序对订单请求进行定序。
定序的目的是在系统内部完成订单请求排序,排序的同时给每个订单请求一个全局唯一递增的序列号,然后将排序后的订单请求发送至交易引擎。
因此,定序系统的输入是上游发送的事件消息,输出是定序后的带Sequence ID的事件,这样,下游的交易引擎就可以由确定性的事件进行驱动。
除了对订单请求进行定序,定序系统还需要对撤消订单、转账请求进行定序,因此,输入的事件消息包括:
对于某些类型的事件,例如转账请求,它必须被处理一次且仅处理一次。而消息系统本质上也是一个分布式网络应用程序,它的内部也有缓存、重试等机制。一般来说,消息系统可以实现的消息传输模式有:
实际上,第3种理想情况基本不存在,没有任何基于网络的消息系统能实现这种模式,所以,大部分消息系统都是按照第1种方式来设计,也就是基于确认+重试的机制保证消息可靠到达。
而定序系统要处理的事件消息,例如转账请求,如果消息重复了多次,就会造成重复转账,所以,我们还需要对某些事件消息作特殊处理,让发送消息的客户端给这个事件消息添加一个全局唯一ID,定序系统根据全局唯一ID去重,而不是依赖消息中间件的能力。
此外,为了让下游系统,也就是交易引擎能一个不漏地按顺序接收定序后的事件消息,我们也不能相信消息中间件总是在理想状态下工作。
除了给每个事件消息设置一个唯一递增ID外,定序系统还同时给每个事件消息附带前一事件的ID,这样就形成了一个微型“区块链”:
由于下游接收方可以根据Sequence ID去重,因此,重复发送的消息会被忽略:
如果出现消息丢失:
由于存在Previous ID,下游接收方可以检测到丢失,于是,接收方可以根据上次收到的ID去数据库查询,直到读取到最新的Sequence ID为止。只要定序系统先将定序后的事件消息落库,再发送给下游,就可以保证无论是消息重复还是丢失,接收方都可以正确处理:
整个过程中,丢失极少量消息不会对系统的可用性造成影响,这样就极大地减少了系统的运维成本和线上排错成本。
最后,无论是接收方还是发送方,为了提高消息收发的效率,应该总是使用批处理方式。定序系统采用批量读+批量batch写入数据库+批量发送消息的模式,可以显著提高TPS。
下面我们一步一步地实现定序系统。
首先定义要接收的事件消息,它包含一个Sequence ID、上一个Sequence ID以及一个可选的用于去重的全局唯一ID:
- public class AbstractEvent extends AbstractMessage {
- // 定序后的Sequence ID:
- public long sequenceId;
-
- // 定序后的Previous Sequence ID:
- public long previousId;
-
- // 可选的全局唯一标识:
- @Nullable
- public String uniqueId;
- }
定序系统接收的事件仅包含可选的uniqueId,忽略sequenceId和previousId。定序完成后,把sequenceId和previousId设置好,再发送给下游。
SequenceService用于接收上游消息、定序、发送消息给下游:
- @Component
- public class SequenceService {
- @Autowired
- SequenceHandler sequenceHandler;
-
- // 全局唯一递增ID:
- private AtomicLong sequence;
-
- // 接收消息并定序再发送:
- synchronized void processMessages(List<AbstractEvent> messages) {
- // 定序后的事件消息:
- List<AbstractEvent> sequenced = null;
- try {
- // 定序:
- sequenced = this.sequenceHandler.sequenceMessages(this.messageTypes, this.sequence, messages);
- } catch (Throwable e) {
- // 定序出错时进程退出:
- logger.error("exception when do sequence", e);
- System.exit(1);
- throw new Error(e);
- }
- // 发送定序后的消息:
- sendMessages(sequenced);
- }
- }
SequenceHandler是真正写入Sequence ID并落库的:
- @Component
- @Transactional(rollbackFor = Throwable.class)
- public class SequenceHandler {
- public List<AbstractEvent> sequenceMessages(MessageTypes messageTypes, AtomicLong sequence, List<AbstractEvent> messages) throws Exception {
- // 利用UniqueEventEntity去重:
- List<UniqueEventEntity> uniques = null;
- Set<String> uniqueKeys = null;
- List<AbstractEvent> sequencedMessages = new ArrayList<>(messages.size());
- List<EventEntity> events = new ArrayList<>(messages.size());
- for (AbstractEvent message : messages) {
- UniqueEventEntity unique = null;
- final String uniqueId = message.uniqueId;
- // 在数据库中查找uniqueId检查是否已存在:
- if (uniqueId != null) {
- if ((uniqueKeys != null && uniqueKeys.contains(uniqueId))
- || db.fetch(UniqueEventEntity.class, uniqueId) != null) {
- // 忽略已处理的重复消息:
- logger.warn("ignore processed unique message: {}", message);
- continue;
- }
- unique = new UniqueEventEntity();
- unique.uniqueId = uniqueId;
- if (uniques == null) {
- uniques = new ArrayList<>();
- }
- uniques.add(unique);
- if (uniqueKeys == null) {
- uniqueKeys = new HashSet<>();
- }
- uniqueKeys.add(uniqueId);
- }
- // 上次定序ID:
- long previousId = sequence.get();
- // 本次定序ID:
- long currentId = sequence.incrementAndGet();
- // 先设置message的sequenceId / previouseId,再序列化并落库:
- message.sequenceId = currentId;
- message.previousId = previousId;
- // 如果此消息关联了UniqueEvent,给UniqueEvent加上相同的sequenceId:
- if (unique != null) {
- unique.sequenceId = message.sequenceId;
- }
- // 准备写入数据库的Event:
- EventEntity event = new EventEntity();
- event.previousId = previousId;
- event.sequenceId = currentId;
- event.data = messageTypes.serialize(message);
- events.add(event);
- // 添加到结果集:
- sequencedMessages.add(message);
- }
- // 落库:
- if (uniques != null) {
- db.insert(uniques);
- }
- db.insert(events);
- // 返回定序后的消息:
- return sequencedMessages;
- }
- }
在SequenceService中调用SequenceHandler是因为我们写入数据库时需要利用Spring提供的声明式数据库事务,而消息的接收和发送并不需要被包含在数据库事务中。
如何在定序器重启后正确初始化下一个序列号?
正确初始化下一个序列号实际上就是要把一个正确的初始值给AtomicLong sequence字段。可以读取数据库获得当前最大的Sequence ID,这个Sequence ID就是上次最后一次定序的ID。
如何在定序器崩溃后自动恢复?
由于任何一个时候都只能有一个定序器工作,这样才能保证Sequence ID的正确性,因此,无法让两个定序器同时工作。
虽然无法让两个定序器同时工作,但可以让两个定序器以主备模式同时运行,仅主定序器工作。当主定序器崩溃后,备用定序器自动切换为主定序器接管后续工作即可。
为了实现主备模式,可以启动两个定序器,然后抢锁的形式确定主备。抢到锁的定序器开始工作,并定期刷新锁,未抢到锁的定序器定期检查锁。可以用数据库锁实现主备模式。
如何解决定序的性能瓶颈?
通常来说,消息系统的吞吐量远超数据库。定序的性能取决于批量写入数据库的能力。首先要提高数据库的性能,其次考虑按Sequence ID进行分库,但分库会提高定序的复杂度,也会使下游从数据库读取消息时复杂度增加。
最后,可以考虑使用专门针对时序优化的数据库,但这样就不如MySQL这种数据库通用、易用。
8. API系统
API系统负责认证用户身份,并提供一个唯一的交易入口。
有了交易引擎和定序系统,我们还需要一个API系统,用于接收所有交易员的订单请求。
相比事件驱动的交易引擎,API系统就比较简单,因为它就是一个标准的Web应用。
在编写API之前,我们需要对请求进行认证,即识别出是哪个用户发出的请求。用户认证放在Filter中是最合适的。认证方式可以是简单粗暴的用户名+口令,也可以是Token,也可以是API Key+API Secret等模式。
我们先实现一个最简单的用户名+口令的认证方式。需要注意的是,API和Web页面不同,Web页面可以给用户一个登录页,登录成功后设置Session或Cookie,后续请求检查的是Session或Cookie。API不能使用Session,因为Session很难做无状态集群,API也不建议使用Cookie,因为API域名很可能与Web UI的域名不一致,拿不到Cookie。要在API中使用用户名+口令的认证方式,可以用标准的HTTP头Authorization的Basic模式:
Authorization: Basic 用户名:口令
因此,我们可以尝试从Authorization中获取用户名和口令来认证:
- Long parseUserFromAuthorization(String auth) {
- if (auth.startsWith("Basic ")) {
- // 用Base64解码:
- String eap = new String(Base64.getDecoder().decode(auth.substring(6)));
- // 分离email:password
- int pos = eap.indexOf(':');
- String email = eap.substring(0, pos);
- String passwd = eap.substring(pos + 1);
- // 验证:
- UserProfileEntity p = userService.signin(email, passwd);
- return p.userId;
- }
- throw new ApiException(ApiError.AUTH_SIGNIN_FAILED, "Invalid Authorization header.");
- }
在ApiFilter中完成认证后,使用UserContext传递用户ID:
- public class ApiFilter {
- @Override
- public void doFilter(ServletRequest req, ServletResponse resp, FilterChain chain)
- throws IOException, ServletException {
- // 尝试认证用户:
- String authHeader = req.getHeader("Authorization");
- Long userId = authHeader == null ? null : parseUserFromAuthorization(authHeader);
- if (userId == null) {
- // 匿名身份:
- chain.doFilter(req, resp);
- } else {
- // 用户身份:
- try (UserContext ctx = new UserContext(userId)) {
- chain.doFilter(req, resp);
- }
- }
- }
- }
Basic模式很简单,需要注意的是用户名:口令使用:分隔,然后整个串用Base64编码,因此,读取的时候需要先用Base64解码。
虽然Basic模式并不安全,但是有了一种基本的认证模式,我们就可以把API-定序-交易串起来了。后续我们再继续添加其他认证模式。
编写API Controller:
对于认证用户的操作,例如,查询资产余额,可通过UserContext获取当前用户,然后通过交易引擎查询并返回用户资产余额:
- @ResponseBody
- @GetMapping(value = "/assets", produces = "application/json")
- public String getAssets() throws IOException {
- Long userId = UserContext.getRequiredUserId();
- return tradingEngineApiProxyService.get("/internal/" + userId + "/assets");
- }
因为交易引擎返回的结果就是JSON字符串,没必要先反序列化再序列化,可以以String的方式直接返回给客户端,需要标注@ResponseBody表示不要对String再进行序列化处理。
对于无需认证的操作,例如,查询公开市场的订单簿,可以直接返回Redis缓存结果:
- @ResponseBody
- @GetMapping(value = "/orderBook", produces = "application/json")
- public String getOrderBook() {
- String data = redisService.get(RedisCache.Key.ORDER_BOOK);
- return data == null ? OrderBookBean.EMPTY : data;
- }
但是对于创建订单的请求,处理就麻烦一些,因为API收到请求后,仅仅通过消息系统给定序系统发了一条消息。消息系统本身并不是类似HTTP的请求-响应模式,我们拿不到消息处理的结果。
这里先借助Spring的异步响应模型DeferredResult,再借助Redis的pub/sub模型,当API发送消息时,使用全局唯一refId跟踪消息,当交易引擎处理完订单请求后,向Redis发送pub事件,API收到Redis推送的事件后,根据refId找到DeferredResult,设置结果后由Spring异步返回给客户端:
代码实现如下:
- public class TradingApiController {
- // 消息refId -> DeferredResult:
- Map<String, DeferredResult<ResponseEntity<String>>> deferredResultMap = new ConcurrentHashMap<>();
- @Autowired
- RedisService redisService;
-
- @PostConstruct
- public void init() {
- // 订阅Redis:
- this.redisService.subscribe(RedisCache.Topic.TRADING_API_RESULT, this::onApiResultMessage);
- }
-
- @PostMapping(value = "/orders", produces = "application/json")
- @ResponseBody
- public DeferredResult<ResponseEntity<String>> createOrder(@RequestBody OrderRequestBean orderRequest) {
- final Long userId = UserContext.getRequiredUserId();
- // 消息的Reference ID:
- final String refId = IdUtil.generateUniqueId();
- var event = new OrderRequestEvent();
- event.refId = refId;
- event.userId = userId;
- event.direction = orderRequest.direction;
- event.price = orderRequest.price;
- event.quantity = orderRequest.quantity;
- event.createdAt = System.currentTimeMillis();
- // 如果超时则返回:
- ResponseEntity<String> timeout = new ResponseEntity<>(getTimeoutJson(), HttpStatus.BAD_REQUEST);
- // 正常异步返回:
- DeferredResult<ResponseEntity<String>> deferred = new DeferredResult<>(500, timeout); // 0.5秒超时
- deferred.onTimeout(() -> {
- this.deferredResultMap.remove(event.refId);
- });
- // 根据refId跟踪消息处理结果:
- this.deferredResultMap.put(event.refId, deferred);
- // 发送消息:
- sendMessage(event);
- return deferred;
- }
-
- // 收到Redis的消息结果推送:
- public void onApiResultMessage(String msg) {
- ApiResultMessage message = objectMapper.readValue(msg, ApiResultMessage.class);
- if (message.refId != null) {
- // 根据消息refId查找DeferredResult:
- DeferredResult<ResponseEntity<String>> deferred = this.deferredResultMap.remove(message.refId);
- if (deferred != null) {
- // 找到DeferredResult后设置响应结果:
- ResponseEntity<String> resp = new ResponseEntity<>(JsonUtil.writeJson(message.result), HttpStatus.OK);
- deferred.setResult(resp);
- }
- }
- }
- }
如何实现API Key认证?
身份认证的本质是确认用户身份。用户身份其实并不包含密码,而是用户ID、email、名字等信息,可以看作数据库中的user_profiles表。
userId | name | |
---|---|---|
100 | bob@example.com | Bob |
101 | alice@example.com | alice |
102 | cook@example.com | Cook |
使用口令认证时,通过添加一个password_auths表,存储哈希后的口令,并关联至某个用户ID,即可完成口令认证:
userId | random | passwd |
---|---|---|
100 | c47snXI | 7b6da12c... |
101 | djEqC2I | f7b68248... |
并不是每个用户都必须有口令,没有口令的用户仅仅表示该用户不能通过口令来认证身份,但完全可以通过其他方式认证。
使用API Key认证同理,通过添加一个api_auths表,存储API Key、API Secret并关联至某个用户ID:
userId | apiKey | apiSecret |
---|---|---|
101 | 5b503947f4f5d34a | e57c677d4ab4c5a4 |
102 | 13a867e8da13c7f6 | 92e41573e833ae13 |
102 | 341a8e60baf5b824 | 302c9e195826267f |
用户使用API Key认证时,提供API Key,以及用API Secret计算的Hmac哈希,服务器验证Hmac哈希后,就可以确认用户身份,因为其他人不知道该用户的API Secret,无法计算出正确的Hmac。
发送API Key认证时,可以定义如下的HTTP头:
- API-Key: 5b503947f4f5d34a
- API-Timestamp: 20220726T092137Z <- 防止重放攻击的时间戳
- API-Signature: d7a567b6cab85bcd
计算签名的原始输入可以包括HTTP Method、Path、Timestamp、Body等关键信息,具体格式可参考AWS API签名方式。
一个用户可以关联多个API Key认证,还可以给每个API Key附加特定权限,例如只读权限,这样用API Key认证就更加安全。
内部系统调用API如何实现用户认证?
很多时候,内部系统也需要调用API,并且需要以特定用户的身份调用API。让内部系统去读用户的口令或者API Key都是不合理的,更好的方式是使用一次性Token,还是利用Authorization头的Bearer模式:
Authorization: Bearer 5NPtI6LW...
构造一次性Token可以用userId:expires:hmac,内部系统和API共享同一个Hmac Key,就可以正确计算并验证签名。外部用户因为无法获得Hmac Key而无法伪造Token。
如何跟踪API性能?
可以使用Spring提供的HandlerInterceptor和DeferredResultProcessingInterceptor跟踪API性能,它们分别用于拦截同步API和异步API。
9. 行情系统
行情系统用来生成公开市场的历史数据,主要是K线图。
行情系统是典型的少量写、大量读的模式,非常适合缓存。通过编写Lua脚本可使得更新Redis更加简单。
K线图的数据来源是交易引擎成交产生的一个个Tick。一个K线包括OHLC这4个价格数据。在一个时间段内,第一个Tick的价格是Open,最后一个Tick的价格是Close,最高的价格是High,最低的价格是Low:
给定一组Tick集合,就可以汇总成一个K线,对应一个Bar结构:
- public class AbstractBarEntity {
- public long startTime; // 开始时间
- public BigDecimal openPrice; // 开始价格
- public BigDecimal highPrice; // 最高价格
- public BigDecimal lowPrice; // 最低价格
- public BigDecimal closePrice; // 结束价格
- public BigDecimal quantity; // 成交数量
- }
通常我们需要按1秒、1分钟、1小时和1天来生成不同类型的K线,因此,行情系统的功能就是不断从消息系统中读取Tick,合并,然后输出不同类型的K线。
此外,API系统还需要提供查询公开市场信息的功能。对于最近的成交信息和K线图,可以缓存在Redis中,对于较早时期的K线图,可以通过数据库查询。因此,行情系统需要将生成的K线保存到数据库中,同时负责不断更新Redis的缓存。
对于最新成交信息,我们在Redis中用一个List表示,它的每一个元素是一个序列号后的JSON:
["{...}", "{...}", "{...}"...]
如果有新的Tick产生,就需要把它们追加到列表尾部,同时将最早的Tick删除,以便维护一个最近成交的列表。
直接读取Redis列表,操作后再写回Redis是可以的,但比较麻烦。这里我们直接用Lua脚本更新最新Tick列表。Redis支持将一个Lua脚本加载后,直接在Redis内部执行脚本:
- local KEY_LAST_SEQ = '_TickSeq_' -- 上次更新的SequenceID
- local LIST_RECENT_TICKS = KEYS[1] -- 最新Ticks的Key
-
- local seqId = ARGV[1] -- 输入的SequenceID
- local jsonData = ARGV[2] -- 输入的JSON字符串表示的tick数组:"["{...}","{...}",...]"
- local strData = ARGV[3] -- 输入的JSON字符串表示的tick数组:"[{...},{...},...]"
-
- -- 获取上次更新的sequenceId:
- local lastSeqId = redis.call('GET', KEY_LAST_SEQ)
- local ticks, len;
-
- if not lastSeqId or tonumber(seqId) > tonumber(lastSeqId) then
- -- 广播:
- redis.call('PUBLISH', 'notification', '{"type":"tick","sequenceId":' .. seqId .. ',"data":' .. jsonData .. '}')
- -- 保存当前sequence id:
- redis.call('SET', KEY_LAST_SEQ, seqId)
- -- 更新最新tick列表:
- ticks = cjson.decode(strData)
- len = redis.call('RPUSH', LIST_RECENT_TICKS, unpack(ticks))
- if len > 100 then
- -- 裁剪LIST以保存最新的100个Tick:
- redis.call('LTRIM', LIST_RECENT_TICKS, len-100, len-1)
- end
- return true
- end
- -- 无更新返回false
- return false
在API中,要获取最新成交信息,我们直接从Redis缓存取出列表,然后拼接成一个JSON字符串:
- @ResponseBody
- @GetMapping(value = "/ticks", produces = "application/json")
- public String getRecentTicks() {
- List<String> data = redisService.lrange(RedisCache.Key.RECENT_TICKS, 0, -1);
- if (data == null || data.isEmpty()) {
- return "[]";
- }
- StringJoiner sj = new StringJoiner(",", "[", "]");
- for (String t : data) {
- sj.add(t);
- }
- return sj.toString();
- }
用Lua脚本更新Redis缓存还有一个好处,就是Lua脚本执行的时候,不但可以更新List,还可以通过Publish命令广播事件,后续我们编写基于WebSocket的推送服务器时,直接监听Redis广播,就可以主动向浏览器推送Tick更新的事件。
类似的,针对每一种K线,我们都在Redis中用ZScoredSet存储,用K线的开始时间戳作为Score。更新K线时,从每种ZScoredSet中找出Score最大的Bar结构,就是最后一个Bar,然后尝试更新。如果可以持久化这个Bar就返回,如果可以合并这个Bar就刷新ZScoreSet,用Lua脚本实现如下:
- local function merge(existBar, newBar)
- existBar[3] = math.max(existBar[3], newBar[3]) -- 更新High Price
- existBar[4] = math.min(existBar[4], newBar[4]) -- 更新Low Price
- existBar[5] = newBar[5] -- close
- existBar[6] = existBar[6] + newBar[6] -- 更新quantity
- end
-
- local function tryMergeLast(barType, seqId, zsetBars, timestamp, newBar)
- local topic = 'notification'
- local popedScore, popedBar
- -- 查找最后一个Bar:
- local poped = redis.call('ZPOPMAX', zsetBars)
- if #poped == 0 then
- -- ZScoredSet无任何bar, 直接添加:
- redis.call('ZADD', zsetBars, timestamp, cjson.encode(newBar))
- redis.call('PUBLISH', topic, '{"type":"bar","resolution":"' .. barType .. '","sequenceId":' .. seqId .. ',"data":' .. cjson.encode(newBar) .. '}')
- else
- popedBar = cjson.decode(poped[1])
- popedScore = tonumber(poped[2])
- if popedScore == timestamp then
- -- 合并Bar并发送通知:
- merge(popedBar, newBar)
- redis.call('ZADD', zsetBars, popedScore, cjson.encode(popedBar))
- redis.call('PUBLISH', topic, '{"type":"bar","resolution":"' .. barType .. '","sequenceId":' .. seqId .. ',"data":' .. cjson.encode(popedBar) .. '}')
- else
- -- 可持久化最后一个Bar,生成新的Bar:
- if popedScore < timestamp then
- redis.call('ZADD', zsetBars, popedScore, cjson.encode(popedBar), timestamp, cjson.encode(newBar))
- redis.call('PUBLISH', topic, '{"type":"bar","resolution":"' .. barType .. '","sequenceId":' .. seqId .. ',"data":' .. cjson.encode(newBar) .. '}')
- return popedBar
- end
- end
- end
- return nil
- end
-
- local seqId = ARGV[1]
- local KEY_BAR_SEQ = '_BarSeq_'
-
- local zsetBars, topics, barTypeStartTimes
- local openPrice, highPrice, lowPrice, closePrice, quantity
- local persistBars = {}
-
- -- 检查sequence:
- local seq = redis.call('GET', KEY_BAR_SEQ)
- if not seq or tonumber(seqId) > tonumber(seq) then
- zsetBars = { KEYS[1], KEYS[2], KEYS[3], KEYS[4] }
- barTypeStartTimes = { tonumber(ARGV[2]), tonumber(ARGV[3]), tonumber(ARGV[4]), tonumber(ARGV[5]) }
- openPrice = tonumber(ARGV[6])
- highPrice = tonumber(ARGV[7])
- lowPrice = tonumber(ARGV[8])
- closePrice = tonumber(ARGV[9])
- quantity = tonumber(ARGV[10])
-
- local i, bar
- local names = { 'SEC', 'MIN', 'HOUR', 'DAY' }
- -- 检查是否可以merge:
- for i = 1, 4 do
- bar = tryMergeLast(names[i], seqId, zsetBars[i], barTypeStartTimes[i], { barTypeStartTimes[i], openPrice, highPrice, lowPrice, closePrice, quantity })
- if bar then
- persistBars[names[i]] = bar
- end
- end
- redis.call('SET', KEY_BAR_SEQ, seqId)
- return cjson.encode(persistBars)
- end
-
- redis.log(redis.LOG_WARNING, 'sequence ignored: exist seq => ' .. seq .. ' >= ' .. seqId .. ' <= new seq')
-
- return '{}'
接下来我们编写QuotationService,初始化的时候加载Redis脚本,接收到Tick消息时调用脚本更新Tick和Bar,然后持久化Tick和Bar,代码如下:
- @Component
- public class QuotationService {
-
- @Autowired
- RedisService redisService;
-
- @Autowired
- MessagingFactory messagingFactory;
-
- MessageConsumer tickConsumer;
-
- private String shaUpdateRecentTicksLua = null;
- private String shaUpdateBarLua = null;
-
- @PostConstruct
- public void init() throws Exception {
- // 加载Redis脚本:
- this.shaUpdateRecentTicksLua = this.redisService.loadScriptFromClassPath("/redis/update-recent-ticks.lua");
- this.shaUpdateBarLua = this.redisService.loadScriptFromClassPath("/redis/update-bar.lua");
- // 接收Tick消息:
- String groupId = Messaging.Topic.TICK.name() + "_" + IpUtil.getHostId();
- this.tickConsumer = messagingFactory.createBatchMessageListener(Messaging.Topic.TICK, groupId,
- this::processMessages);
- }
-
- // 处理接收的消息:
- public void processMessages(List<AbstractMessage> messages) {
- for (AbstractMessage message : messages) {
- processMessage((TickMessage) message);
- }
- }
-
- // 处理一个Tick消息:
- void processMessage(TickMessage message) {
- // 对一个Tick消息中的多个Tick先进行合并:
- final long createdAt = message.createdAt;
- StringJoiner ticksStrJoiner = new StringJoiner(",", "[", "]");
- StringJoiner ticksJoiner = new StringJoiner(",", "[", "]");
- BigDecimal openPrice = BigDecimal.ZERO;
- BigDecimal closePrice = BigDecimal.ZERO;
- BigDecimal highPrice = BigDecimal.ZERO;
- BigDecimal lowPrice = BigDecimal.ZERO;
- BigDecimal quantity = BigDecimal.ZERO;
- for (TickEntity tick : message.ticks) {
- String json = tick.toJson();
- ticksStrJoiner.add("\"" + json + "\"");
- ticksJoiner.add(json);
- if (openPrice.signum() == 0) {
- openPrice = tick.price;
- closePrice = tick.price;
- highPrice = tick.price;
- lowPrice = tick.price;
- } else {
- // open price is set:
- closePrice = tick.price;
- highPrice = highPrice.max(tick.price);
- lowPrice = lowPrice.min(tick.price);
- }
- quantity = quantity.add(tick.quantity);
- }
- // 计算应该合并的每种类型的Bar的开始时间:
- long sec = createdAt / 1000;
- long min = sec / 60;
- long hour = min / 60;
- long secStartTime = sec * 1000;
- long minStartTime = min * 60 * 1000;
- long hourStartTime = hour * 3600 * 1000;
- long dayStartTime = Instant.ofEpochMilli(hourStartTime).atZone(zoneId).withHour(0).toEpochSecond() * 1000;
-
- // 更新Tick缓存:
- String ticksData = ticksJoiner.toString();
- Boolean tickOk = redisService.executeScriptReturnBoolean(this.shaUpdateRecentTicksLua,
- new String[] { RedisCache.Key.RECENT_TICKS },
- new String[] { String.valueOf(this.sequenceId), ticksData, ticksStrJoiner.toString() });
- if (!tickOk.booleanValue()) {
- logger.warn("ticks are ignored by Redis.");
- return;
- }
- // 保存Tick至数据库:
- saveTicks(message.ticks);
-
- // 更新Redis缓存的各种类型的Bar:
- String strCreatedBars = redisService.executeScriptReturnString(this.shaUpdateBarLua,
- new String[] { RedisCache.Key.SEC_BARS, RedisCache.Key.MIN_BARS, RedisCache.Key.HOUR_BARS,
- RedisCache.Key.DAY_BARS },
- new String[] { // ARGV
- String.valueOf(this.sequenceId), // sequence id
- String.valueOf(secStartTime), // sec-start-time
- String.valueOf(minStartTime), // min-start-time
- String.valueOf(hourStartTime), // hour-start-time
- String.valueOf(dayStartTime), // day-start-time
- String.valueOf(openPrice), // open
- String.valueOf(highPrice), // high
- String.valueOf(lowPrice), // low
- String.valueOf(closePrice), // close
- String.valueOf(quantity) // quantity
- });
- Map<BarType, BigDecimal[]> barMap = JsonUtil.readJson(strCreatedBars, TYPE_BARS);
- if (!barMap.isEmpty()) {
- // 保存Bar:
- SecBarEntity secBar = createBar(SecBarEntity::new, barMap.get(BarType.SEC));
- MinBarEntity minBar = createBar(MinBarEntity::new, barMap.get(BarType.MIN));
- HourBarEntity hourBar = createBar(HourBarEntity::new, barMap.get(BarType.HOUR));
- DayBarEntity dayBar = createBar(DayBarEntity::new, barMap.get(BarType.DAY));
- saveBars(secBar, minBar, hourBar, dayBar);
- }
- }
- }
K线是一组Bar按ZSet缓存在Redis中,Score就是Bar的开始时间。更新Bar时,同时广播通知,以便后续推送。要查询某种K线图,在API中,需要传入开始和结束的时间戳,通过ZRANGE命令返回排序后的List:
- String getBars(String key, long start, long end) {
- List<String> data = redisService.zrangebyscore(key, start, end);
- if (data == null || data.isEmpty()) {
- return "[]";
- }
- StringJoiner sj = new StringJoiner(",", "[", "]");
- for (String t : data) {
- sj.add(t);
- }
- return sj.toString();
- }
10. 推送系统
要高效处理大量WebSocket连接,我们选择基于Netty的Vert.x框架,可以通过少量代码配合Redis实现推送。
推送系统负责将公开市场的实时信息,包括订单簿、最新成交、最新K线等推送给客户端,对于用户的订单,还需要将成交信息推送给指定用户。FIX(Financial Information eXchange)协议是金融交易的一种实时化通讯协议,但是它非常复杂,而且不同版本的规范也不同。对于Warp Exchange来说,我们先实现一版简单的基于WebSocket推送JSON格式的通知。
和普通Web应用不同的是,基于Servlet的线程池模型不能高效地支持成百上千的WebSocket长连接。Java提供了NIO能充分利用Linux系统的epoll机制高效支持大量的长连接,但是直接使用NIO的接口非常繁琐,通常我们会选择基于NIO的Netty服务器。直接使用Netty其实仍然比较繁琐,基于Netty开发我们可以选择:
这里我们选择Vert.x,因为它的API更简单。
Vert.x本身包含若干模块,根据需要,我们引入3个组件:
- <dependency>
- <groupId>io.vertx</groupId>
- <artifactId>vertx-core</artifactId>
- <version>${vertx.version}</version>
- </dependency>
-
- <dependency>
- <groupId>io.vertx</groupId>
- <artifactId>vertx-web</artifactId>
- <version>${vertx.version}</version>
- </dependency>
-
- <dependency>
- <groupId>io.vertx</groupId>
- <artifactId>vertx-redis-client</artifactId>
- <version>${vertx.version}</version>
- </dependency>
我们先编写推送服务的入口:
- package com.itranswarp.exchange.push;
-
- @SpringBootApplication
- // 禁用数据库自动配置 (无DataSource, JdbcTemplate...)
- @EnableAutoConfiguration(exclude = DataSourceAutoConfiguration.class)
- public class PushApplication {
- public static void main(String[] args) {
- System.setProperty("vertx.disableFileCPResolving", "true");
- System.setProperty("vertx.logger-delegate-factory-class-name", "io.vertx.core.logging.SLF4JLogDelegateFactory");
- SpringApplication app = new SpringApplication(PushApplication.class);
- // 禁用Spring的Web:
- app.setWebApplicationType(WebApplicationType.NONE);
- app.run(args);
- }
- }
上述代码仍然是一个标准的Spring Boot应用,因为我们希望利用Spring Cloud Config读取配置。由于我们不使用Spring自身的Web功能,因此需要禁用Spring的Web功能。推送服务本身并不需要访问数据库,因此禁用数据库自动配置。最后,我们把PushApplication放在com.itranswarp.exchange.push包下面,以避免自动扫描到com.itranswarp.exchange包下的组件(如RedisService)。
下一步是编写PushService,注意它是一个Spring组件,由Spring初始化:
- @Component
- public class PushService extends LoggerSupport {
- @Value("${server.port}")
- private int serverPort;
-
- @Value("${exchange.config.hmac-key}")
- String hmacKey;
-
- @Value("${spring.redis.standalone.host:localhost}")
- private String redisHost;
-
- @Value("${spring.redis.standalone.port:6379}")
- private int redisPort;
-
- @Value("${spring.redis.standalone.password:}")
- private String redisPassword;
-
- @Value("${spring.redis.standalone.database:0}")
- private int redisDatabase = 0;
-
- private Vertx vertx;
-
- @PostConstruct
- public void startVertx() {
- // TODO: init Vert.x
- }
- }
由Spring初始化该组件的目的是注入各种配置。在初始化方法中,我们就可以启动Vert.x:
- @PostConstruct
- public void startVertx() {
- // 启动Vert.x:
- this.vertx = Vertx.vertx();
-
- // 创建一个Vert.x Verticle组件:
- var push = new PushVerticle(this.hmacKey, this.serverPort);
- vertx.deployVerticle(push);
-
- // 连接到Redis:
- String url = "redis://" + (this.redisPassword.isEmpty() ? "" : ":" + this.redisPassword + "@") + this.redisHost
- + ":" + this.redisPort + "/" + this.redisDatabase;
- Redis redis = Redis.createClient(vertx, url);
-
- redis.connect().onSuccess(conn -> {
- // 事件处理:
- conn.handler(response -> {
- // 收到Redis的PUSH:
- if (response.type() == ResponseType.PUSH) {
- int size = response.size();
- if (size == 3) {
- Response type = response.get(2);
- if (type instanceof BulkType) {
- // 收到PUBLISH通知:
- String msg = type.toString();
- // 由push verticle组件处理该通知:
- push.broadcast(msg);
- }
- }
- }
- });
- // 订阅Redis的Topic:
- conn.send(Request.cmd(Command.SUBSCRIBE).arg(RedisCache.Topic.NOTIFICATION)).onSuccess(resp -> {
- logger.info("subscribe ok.");
- }).onFailure(err -> {
- logger.error("subscribe failed.", err);
- System.exit(1);
- });
- }).onFailure(err -> {
- logger.error("connect to redis failed.", err);
- System.exit(1);
- });
- }
Vert.x用Verticle表示一个组件,我们编写PushVerticle来处理WebSocket连接:
- public class PushVerticle extends AbstractVerticle {
- @Override
- public void start() {
- // 创建VertX HttpServer:
- HttpServer server = vertx.createHttpServer();
-
- // 创建路由:
- Router router = Router.router(vertx);
-
- // 处理请求 GET /notification:
- router.get("/notification").handler(requestHandler -> {
- HttpServerRequest request = requestHandler.request();
- // 从token参数解析userId:
- Supplier<Long> supplier = () -> {
- String tokenStr = request.getParam("token");
- if (tokenStr != null && !tokenStr.isEmpty()) {
- AuthToken token = AuthToken.fromSecureString(tokenStr, this.hmacKey);
- if (!token.isExpired()) {
- return token.userId();
- }
- }
- return null;
- };
- final Long userId = supplier.get();
- logger.info("parse user id from token: {}", userId);
- // 将连接升级到WebSocket:
- request.toWebSocket(ar -> {
- if (ar.succeeded()) {
- initWebSocket(ar.result(), userId);
- }
- });
- });
-
- // 处理请求 GET /actuator/health:
- router.get("/actuator/health").respond(
- ctx -> ctx.response().putHeader("Content-Type", "application/json").end("{\"status\":\"UP\"}"));
-
- // 其他请求返回404错误:
- router.get().respond(ctx -> ctx.response().setStatusCode(404).setStatusMessage("No Route Found").end());
-
- // 绑定路由并监听端口:
- server.requestHandler(router).listen(this.serverPort, result -> {
- if (result.succeeded()) {
- logger.info("Vertx started on port(s): {} (http) with context path ''", this.serverPort);
- } else {
- logger.error("Start http server failed on port " + this.serverPort, result.cause());
- vertx.close();
- System.exit(1);
- }
- });
- }
- }
在PushVerticle中,start()方法由Vert.x回调。我们在start()方法中主要干这么几件事:
/notification
的GET请求,将其升级为WebSocket连接;在处理/notification时,我们尝试从URL的token参数解析出用户ID,这样我们就无需访问数据库而获得了当前连接的用户。升级到WebSocket连接后,再调用initWebSocket()继续处理WebSocket连接:
- public class PushVerticle extends AbstractVerticle {
- // 所有Handler:
- Map<String, Boolean> handlersSet = new ConcurrentHashMap<>(1000);
-
- // 用户ID -> Handlers
- Map<Long, Set<String>> userToHandlersMap = new ConcurrentHashMap<>(1000);
- // Handler -> 用户ID
- Map<String, Long> handlerToUserMap = new ConcurrentHashMap<>(1000);
-
- void initWebSocket(ServerWebSocket websocket, Long userId) {
- // 获取一个WebSocket关联的Handler ID:
- String handlerId = websocket.textHandlerID();
- // 处理输入消息:
- websocket.textMessageHandler(str -> {
- logger.info("text message: " + str);
- });
- websocket.exceptionHandler(t -> {
- logger.error("websocket error: " + t.getMessage(), t);
- });
- // 关闭连接时:
- websocket.closeHandler(e -> {
- unsubscribeClient(handlerId);
- unsubscribeUser(handlerId, userId);
- });
- subscribeClient(handlerId);
- subscribeUser(handlerId, userId);
- }
-
- void subscribeClient(String handlerId) {
- this.handlersSet.put(handlerId, Boolean.TRUE);
- }
-
- void unsubscribeClient(String handlerId) {
- this.handlersSet.remove(handlerId);
- }
-
- void subscribeUser(String handlerId, Long userId) {
- if (userId == null) {
- return;
- }
- handlerToUserMap.put(handlerId, userId);
- Set<String> set = userToHandlersMap.get(userId);
- if (set == null) {
- set = new HashSet<>();
- userToHandlersMap.put(userId, set);
- }
- set.add(handlerId);
- }
-
- void unsubscribeUser(String handlerId, Long userId) {
- if (userId == null) {
- return;
- }
- handlerToUserMap.remove(handlerId);
- Set<String> set = userToHandlersMap.get(userId);
- if (set != null) {
- set.remove(handlerId);
- }
- }
- }
在Vert.x中,每个WebSocket连接都有一个唯一的Handler标识,以String表示。我们用几个Map保存Handler和用户ID的映射关系,当关闭连接时,将对应的映射关系删除。
最后一个关键方法broadcast()由PushService中订阅的Redis推送时触发,该方法用于向用户主动推送通知:
- public void broadcast(String text) {
- NotificationMessage message = JsonUtil.readJson(text, NotificationMessage.class);
- if (message.userId == null) {
- // 没有用户ID时,推送给所有连接:
- EventBus eb = vertx.eventBus();
- for (String handler : this.handlersSet.keySet()) {
- eb.send(handler, text);
- }
- } else {
- // 推送给指定用户:
- Set<String> handlers = this.userToHandlersMap.get(message.userId);
- if (handlers != null) {
- EventBus eb = vertx.eventBus();
- for (String handler : handlers) {
- eb.send(handler, text);
- }
- }
- }
- }
当Redis收到PUBLISH调用后,它自动将String表示的JSON数据推送给所有订阅端。我们在PushService中订阅了notification这个Topic,然后通过broadcast()推送给WebSocket客户端。对于一个NotificationMessage,如果设置了userId,则推送给指定用户,适用于订单成交等针对用户ID的通知;如果没有设置userId,则推送给所有用户,适用于公开市场信息的推送。
整个推送服务仅包括3个Java文件,我们就实现了基于Redis和WebSocket的高性能推送。
我们已经实现了API系统、交易系统、定序系统、行情系统和推送系统,最后就差一个UI系统,让用户可以登录并通过浏览器下订单。
UI系统是标准的Web系统,除了注册、登录外,主要交易功能均由页面JavaScript实现。UI系统本身不是交易入口,它通过转发JavaScript请求至真正的API入口。
UI系统本质上是一个MVC模型的Web系统,我们先引入一个视图的第三方依赖:
- <dependency>
- <groupId>io.pebbletemplates</groupId>
- <artifactId>pebble-spring-boot-starter</artifactId>
- <version>${pebble.version}</version>
- </dependency>
在ui.yml加入最基本的配置:
- pebble:
- prefix: /templates/
- suffix: .html
注意到视图页面都放在src/main/resources/templates/目录下。编写MvcController,实现登录功能:
- @Controller
- public class MvcController extends LoggerSupport {
- // 显示登录页
- @GetMapping("/signin")
- public ModelAndView signin(HttpServletRequest request) {
- if (UserContext.getUserId() != null) {
- return redirect("/");
- }
- return prepareModelAndView("signin");
- }
-
- // 登录
- @PostMapping("/signin")
- public ModelAndView signIn(@RequestParam("email") String email, @RequestParam("password") String password, HttpServletRequest request, HttpServletResponse response) {
- try {
- UserProfileEntity userProfile = userService.signin(email, password);
- // 登录成功后设置Cookie:
- AuthToken token = new AuthToken(userProfile.userId, System.currentTimeMillis() + 1000 * cookieService.getExpiresInSeconds());
- cookieService.setSessionCookie(request, response, token);
- } catch (ApiException e) {
- // 登录失败:
- return prepareModelAndView("signin", Map.of("email", email, "error", "Invalid email or password."));
- } catch (Exception e) {
- // 登录失败:
- return prepareModelAndView("signin", Map.of("email", email, "error", "Internal server error."));
- }
- // 登录成功跳转:
- return redirect("/");
- }
- }
登录成功后,设置一个Cookie代表用户身份,以userId:expiresAt:hash表示。由于计算哈希引入了HmacKey,因此,客户端无法伪造Cookie。
继续编写UIFilter,用于验证Cookie并把特定用户的身份绑定到UserContext中:
- public class UIFilter {
- @Override
- public void doFilter(ServletRequest req, ServletResponse resp, FilterChain chain)
- throws IOException, ServletException {
- // 查找Cookie:
- AuthToken auth = cookieService.findSessionCookie(req);
- Long userId = auth == null ? null : auth.userId();
- try (UserContext ctx = new UserContext(userId)) {
- chain.doFilter(request, response);
- }
- }
- }
我们再编写一个ProxyFilter,它的目的是将页面JavaScript对API的调用转发给API系统:
- public class ProxyFilter {
- @Override
- public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
- throws IOException, ServletException {
- Long userId = UserContext.getUserId();
- // 构造一次性Token:
- String authToken = null;
- if (userId != null) {
- AuthToken token = new AuthToken(userId, System.currentTimeMillis() + 60_000);
- authToken = "Bearer " + token.toSecureString(hmacKey);
- }
- // 转发到API并读取响应:
- String responseJson = null;
- try {
- if ("GET".equals(request.getMethod())) {
- Map<String, String[]> params = request.getParameterMap();
- Map<String, String> query = params.isEmpty() ? null : convertParams(params);
- responseJson = tradingApiClient.get(String.class, request.getRequestURI(), authToken, query);
- } else if ("POST".equals(request.getMethod())) {
- responseJson = tradingApiClient.post(String.class, request.getRequestURI(), authToken,
- readBody(request));
- }
- // 写入响应:
- response.setContentType("application/json;charset=utf-8");
- PrintWriter pw = response.getWriter();
- pw.write(responseJson);
- pw.flush();
- } catch (ApiException e) {
- // 写入错误响应:
- writeApiException(request, response, e);
- } catch (Exception e) {
- // 写入错误响应:
- writeApiException(request, response,
- new ApiException(ApiError.INTERNAL_SERVER_ERROR, null, e.getMessage()));
- }
- }
- }
把ProxyFilter挂载到/api/*,通过UI转发请求的目的是简化页面JavaScript调用API,一是不再需要跨域,二是UI已经经过了登录认证,转发过程中自动生成一次性Token来调用API,这样JavaScript不再关心如何生成Authorization头。
下面我们就可以开始编写页面了:
页面功能主要由JavaScript实现,我们选择Vue前端框架,最终实现效果如下:
项目总结:
我们已经成功地完成了一个7x24运行的证券交易系统。虽然实现了基本功能,但仍有很多可改进的地方。
1)网关
直接给用户暴露API和UI是不合适的,通常我们会选择一个反向代理充当网关。可以使用Spring Cloud Gateway来实现网关。Spring Cloud Gateway是基于Netty的异步服务器,允许我们编写一系列过滤器来实现黑名单、权限检查、限流等功能。
也可以选择更通用的Nginx作为网关,相应的功能则需要由Lua脚本实现,具体可参考OpenResty。
2)远程调用
在系统内部,我们直接通过HTTP请求实现了远程调用,因为暴露的接口较少。如果接口比较多,可以考虑使用RPC调用,例如Spring Cloud OpenFeign。Spring Cloud OpenFeign把REST请求封装为Java接口方法,实现了一种声明式的RPC调用。也可以考虑更加通用的gRPC。
3)系统监控
要监控系统状态、性能等实时信息,我们需要构造一个监控系统。从零开始是不现实的,选择一个通用的标准协议比使用JMX要更简单。StatsD就是目前最流行的监控方案,它的基本原理是:
应用程序本身负责收集监控数据,然后以UDP协议发给StatsD守护进程,StatsD进程通常和应用程序运行在同一台机器上,它非常轻量级,并且StatsD是否运行都不影响应用程序的正常运行(因为UDP协议只管发不管能不能收到)。如果StatsD进程在运行中,它就把监控数据实时发送给聚合服务器如Graphite,再以可视化的形式展示出来。
StatsD是一个解决方案,既可以自己用开源组件搭建,又可以选择第三方商业服务商,例如DataDog。应用程序自身的数据采集则需要根据使用的服务商确定。如果使用DataDog,它会提供一个dd-java-agent.jar
,在启动应用程序时,以agent的方式注入到JVM中:
$ java -javaagent:dd-java-agent.jar -jar app.jar
再通过引入DataDog提供的API:
- <dependency>
- <groupId>com.datadoghq</groupId>
- <artifactId>dd-trace-api</artifactId>
- <version>{version}</version>
- </dependency>
就可以实现数据采集。DataDog提供的agent除了能采集应用程序的数据,还可以直接监控JVM、Linux系统,能大大简化监控配置。
对于分布式调用,例如UI调用API,API调用Engine,还可以集成Spring Cloud Sleuth来监控链路。它通过在入口调用每次生成一个唯一ID来跟踪链路,采集数据可直接与StatsD集成。
4)密钥管理
对于很多涉及密钥的配置来说,如数据库密码,系统AES密码,管理员口令等,直接存放在配置文件或数据库中都是不安全的。使用专业的密钥管理软件如Vault可以更安全地管理密钥。Spring Cloud Vault就是用于从Vault读取密钥,适合对安全性要求特别高的项目。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。