赞
踩
随着业务的发展壮大,对后端服务的压力也会越来越大,为了打造高效稳定的系统, 产生了分布式,微服务等等系统设计,因为这个原因,设计复杂度也随之增加,基于此 诞生了高并发系统三大利器限流,缓存,降级/熔断
。
限流: 从系统的流量入口考虑,从进入的流量上进行限制,达到保护系统的作用;
缓存: 将数据库中的数据缓存起来,提升系统访问速度和并发度,保护数据库资源。
降级和熔断比较类似,都是属于过载保护机制,但是实现上有着如下区别
降级: 从系统内部的平级服务或者业务的维度考虑,流量大了,可以暂停或延迟一些非重要服务,如日志收集等等,保护其他正常使用;
熔断: 当某一服务出现了过载现象,为防止整个系统故障,直接关闭该服务或者保证部分请求成功,另一部分返回失败。比如10s内连续请求失败次数达到20次, 触发熔断机制,过滤60%请求。
本文主要介绍第一利器限流,后续会有详细的文章介绍缓存和降级。
应对秒杀,大促等高性能压力的场景时,为了保证系统的平稳运行,必须针对超过预期的流量,通过预先设定的限流规则选择性的对某些请求进行限流。
在分布式系统中,其接口的调用来自多个系统,有的调用方可能会请求数量突增,过去争夺服务器资源, 而来自其他调用方的接口请求因此来不及响应而排队等待,微服务整体的请求响应时间变长甚至超时。 所以为了防止接口被过度调用,需要对每个调用方进行细粒度的访问限流。
上图是一个请求流量漏斗模型,执行过程依次是 1. 用户请求经过网关将请求下发到服务层 2. 服务层针对每条请求获取缓存数据 3. 缓存没有命中,直接请求数据
网关作为服务的入口,承接了系统整个系统的所有流量,是整个访问链路的源头,具有"一夫当关,万夫莫开"的角色,所以是最适合做限流的的途径。
另外,引入网关层还可以解决分布式系统中的如下问题: 客户端会多次请求不同的微服务,增加了客户端的复杂性 存在跨域请求,在一定场景下处理相对复杂 认证复杂,每个服务都需要独立认证 难以重构,随着项目的迭代,可能需要重新划分微服务。例如,可能将多个服务合并成一个或者将一个服务拆分成多个。如果客户端直接与微服务通信, 那么重构将会很难实施 某些微服务可能使用了防火墙 / 浏览器不友好的协议,直接访问会有一定的困难 恶意请求,存在安全问题
目前主流的网关层有以软件为代表的Nginx,还有Spring Cloud中的Gateway和Zuul这类网关层组件,也有以硬件+软件为代表的F5(F5价钱贵到你怀疑人生)。 除此之外,还有许多大厂开发的组件,比如腾讯里约。
提到中间件首先想到的就是消息队列(Message queue),简称MQ, 顾名思义,消息队列就是一个 存放“消息”的“队列”,有着 FIFO 的特性。这里的“消息”实际就是“数据”的意思,因此消息队列本身就是一个简单的数据结构——队列。
在设计模式上,消息队列是“生产者-消费者”模式的一个经典实现。 一般而言,用户就是请求的“生产者”,而后台服务就是请求的“消费者”,kafka、RabbitMQ,RocketMQ等等都是目前系统中常见的MQ, 在系统中可以达到异步解耦,削峰填谷的作用。
限流算法常见有三种,分别是计数器、漏桶、令牌桶。
计数器算法是限流算法里最简单也是最容易实现的一种算法。比如对于A接口来说,我们1分钟的访问次数不能超过100个。 我们可以设置一个计数器counter,每当一个请求过来的时候,counter就加1,如果counter的值大于100并且该请求与第一个请求的间隔时间还在1分钟之内, 那么说明请求数过多;如果该请求与第一个请求的间隔时间大于1分钟,且counter的值还在限流范围内,那么就重置 counter。
在对计数器进行技术的问题,要注意原子性,防止并发问题,导致计数不准。 代码设计如下:
- // CounterLimit 计数器
- type CounterLimit struct {
- counter int64 //计数器
- limit int64 //指定时间窗口内允许的最大请求数
- intervalNano int64 //指定的时间窗口
- unixNano int64 //unix时间戳,单位为纳秒
- }
-
- // NewCounterLimit 初始化
- func NewCounterLimit(interval time.Duration, limit int64) *CounterLimit {
-
- return &CounterLimit{
- counter: 0,
- limit: limit,
- intervalNano: int64(interval),
- unixNano: time.Now().UnixNano(),
- }
- }
-
- // Allow 判断当前时间窗口是否允许请求
- func (c *CounterLimit) Allow() bool {
-
- now := time.Now().UnixNano()
- if now-c.unixNano > c.intervalNano { //如果当前过了当前的时间窗口,则重新进行计数
- atomic.StoreInt64(&c.counter, 0)
- atomic.StoreInt64(&c.unixNano, now)
- return true
- }
-
- atomic.AddInt64(&c.counter, 1)
- return c.counter < c.limit //判断是否要进行限流
- }
滑动窗口固定窗口的优化版本,主要解决临界问题。
如上图,假设0:59时,瞬间收到100个请求,并且1:00时候又瞬间收到了100个请求,那么其实这个服务在 1秒里面,收到了200个请求。 我们刚才规定的是1分钟最多100个请求,也就是每秒钟最多1.7个请求,用户通过在时间窗口的重置节点处突发请求,可以瞬间超过我们的速率限制,压垮我们的应用。
滑动窗口协议(Sliding Window Protocol),属于TCP协议的一种应用,用于网络数据传输时的流量控制,以避免拥塞的发生。 该协议允许发送方在停止并等待确认前发送多个数据分组。由于发送方不必每发一个分组就停下来等待确认。因此该协议可以加速数据的传输,提高网络吞吐量。
滑动窗口计数器是通过将窗口再细分,并且按照时间"滑动",这种算法避免了固定窗口计数器带来的双倍突发请求,但时间区间的精度越高,算法所需的空间容量就越大。
在上图中,整个红色的矩形框表示一个时间窗口,也就是一分钟。然后我们将时间窗口进行划分,每格代表的是10秒钟,总共6格。每过10秒钟,我们的时间窗口就会 往右滑动一格。每一个格子都有自己独立的计数器counter,假设一个请求 在0:25秒的时候到达,那么0:20~0:29对应的counter就会加1。
滑动窗口怎么解决刚才的临界问题的呢?
0:59到达的100个请求会落在灰色的格子中,而1:00到达的请求会落在橘黄色的格子中。当时间到达1:00时,我们的窗口会往右移动一格,那么此时时间窗口内的 总请求数量一共是200个,超过了限定的100个,所以此时能够检测出来触发了限流。
当滑动窗口的格子划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确。
- var (
- limitCount int = 10 // 6s限频
- limitBucket int = 6 // 滑动窗口个数
- curCount int32 = 0 // 记录限频数量
- head *ring.Ring // 环形队列(链表)
- )
-
- func main() {
- tcpAddr, err := net.ResolveTCPAddr("tcp4", "0.0.0.0:9090") //获取一个tcpAddr
- checkError(err)
- listener, err := net.ListenTCP("tcp", tcpAddr) //监听一个端口
- checkError(err)
- defer listener.Close()
- // 初始化滑动窗口
- head = ring.New(limitBucket)
- for i := 0; i < limitBucket; i++ {
- head.Value = 0
- head = head.Next()
- }
- // 启动执行器
- go func() {
- timer := time.NewTicker(time.Second * 1)
- for range timer.C { // 定时每隔1秒刷新一次滑动窗口数据
- subCount := int32(0 - head.Value.(int))
- newCount := atomic.AddInt32(&curCount, subCount)
-
- arr := [6]int{}
- for i := 0; i < limitBucket; i++ { // 这里是为了方便打印
- arr[i] = head.Value.(int)
- head = head.Next()
- }
- fmt.Println("move subCount,newCount,arr", subCount, newCount, arr)
- head.Value = 0
- head = head.Next()
- }
- }()
-
- for {
- conn, err := listener.Accept() // 在此处阻塞,每次来一个请求才往下运行handle函数
- if err != nil {
- fmt.Println(err)
- continue
- }
- go handle(&conn) // 起一个单独的协程处理,有多少个请求,就起多少个协程,协程之间共享同一个全局变量limiting,对其进行原子操作。
- }
- }
-
- func handle(conn *net.Conn) {
- defer (*conn).Close()
- n := atomic.AddInt32(&curCount, 1)
- //fmt.Println("handler n:", n)
- if n > int32(limitCount) { // 超出限频
- atomic.AddInt32(&curCount, -1) // add 1 by atomic,业务处理完毕,放回令牌
- (*conn).Write([]byte("HTTP/1.1 404 NOT FOUNDrnrnError, too many request, please try again."))
- } else {
- mu := sync.Mutex{}
- mu.Lock()
- pos := head.Prev()
- val := pos.Value.(int)
- val++
- pos.Value = val
- mu.Unlock()
- time.Sleep(1 * time.Second) // 假设我们的应用处理业务用了1s的时间
- (*conn).Write([]byte("HTTP/1.1 200 OKrnrnI can change the world!")) // 业务处理结束后,回复200成功。
- }
- }
-
- // 异常报错的处理
- func checkError(err error) {
- if err != nil {
- fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
- os.Exit(1)
- }
- }
由于滑动窗口比较难于理解,打印了如下压测日志,压测工具地址 https://github.com/rakyll/hey
- $ ./hey -c 6 -n 300 -q 6 -t 80 http://localhost:9090
-
- move subCount,newCount,arr 0 0 [0 0 0 0 0 0]
- move subCount,newCount,arr 0 0 [0 0 0 0 0 0]
- move subCount,newCount,arr 0 6 [0 0 0 0 0 6]
- move subCount,newCount,arr 0 10 [0 0 0 0 6 4]
- move subCount,newCount,arr 0 10 [0 0 0 6 4 0]
- move subCount,newCount,arr 0 10 [0 0 6 4 0 0]
- move subCount,newCount,arr 0 10 [0 6 4 0 0 0]
- move subCount,newCount,arr -6 4 [6 4 0 0 0 0]
- move subCount,newCount,arr -4 6 [4 0 0 0 0 6]
- move subCount,newCount,arr 0 10 [0 0 0 0 6 4]
- move subCount,newCount,arr 0 10 [0 0 0 6 4 0]
- move subCount,newCount,arr 0 10 [0 0 6 4 0 0]
- move subCount,newCount,arr 0 10 [0 6 4 0 0 0]
- move subCount,newCount,arr -6 4 [6 4 0 0 0 0]
- move subCount,newCount,arr -4 3 [4 0 0 0 0 3]
- move subCount,newCount,arr 0 3 [0 0 0 0 3 0]
- move subCount,newCount,arr 0 3 [0 0 0 3 0 0]
- move subCount,newCount,arr 0 3 [0 0 3 0 0 0]
- move subCount,newCount,arr 0 3 [0 3 0 0 0 0]
- move subCount,newCount,arr -3 0 [3 0 0 0 0 0]
- move subCount,newCount,arr 0 0 [0 0 0 0 0 0]
- move subCount,newCount,arr 0 0 [0 0 0 0 0 0]
漏桶算法概念如下:
- // BucketLimit 漏桶结构
- type BucketLimit struct {
- rate float64 //漏桶中水的漏出速率
- bucketSize float64 //漏桶最多能装的水大小
- unixNano int64 //unix时间戳
- curWater float64 //当前桶里面的水
- }
-
- // NewBucketLimit 初始化
- func NewBucketLimit(rate float64, bucketSize int64) *BucketLimit {
- return &BucketLimit{
- bucketSize: float64(bucketSize),
- rate: rate,
- unixNano: time.Now().UnixNano(),
- curWater: 0,
- }
- }
-
- // refresh 更新当前桶的容量
- func (b *BucketLimit) refresh() {
- now := time.Now().UnixNano()
- //时间差, 把纳秒换成秒
- diffSec := float64(now-b.unixNano) / 1000 / 1000 / 1000
- b.curWater = math.Max(0, b.curWater-diffSec*b.rate)
- b.unixNano = now
- return
- }
-
- // Allow 允许请求,是否超过桶的容量
- func (b *BucketLimit) Allow() bool {
- b.refresh()
- if b.curWater < b.bucketSize {
- b.curWater = b.curWater + 1
- return true
- }
-
- return false
- }
漏桶算法也存在着明显缺陷,当短时间内有大量的突发请求时,即便此时服务器没有任何负载,每个请求也都得在队列中等待一段时间才能被响应。
对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。 令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。
- // TokenBucket 令牌桶
- type TokenBucket struct {
- rate int // 令牌放入速度
- tokenSize int // 令牌桶的容量
- curNum int // 当前桶中token
- }
-
- // NewTokenBucket 初始化
- func NewTokenBucket(rate, tokenSize int) *TokenBucket {
- return &TokenBucket{
- rate: rate,
- tokenSize: tokenSize,
- }
- }
-
- // PushToken 在桶中存放token
- func (t *TokenBucket) PushToken() chan struct{} {
- tb := make(chan struct{}, t.tokenSize)
- ticker := time.NewTicker(time.Duration(1000) * time.Microsecond)
- //初始化token
- for i := 0; i < t.tokenSize; i++ {
- tb <- struct{}{}
- }
- t.curNum = t.tokenSize
-
- // 指定速率放入token
- go func() {
- for {
- for i := 0; i < t.rate; i++ {
- tb <- struct{}{}
- }
- t.curNum += t.rate
- if t.curNum > t.tokenSize {
- t.curNum = t.tokenSize
- }
- <-ticker.C
- }
- }()
- return tb
- }
-
- // popToken 取出token
- func (t *TokenBucket) PopToken(bucket chan struct{}, n int) {
- for i := 0; i < n; i++ {
- _, ok := <-bucket
- if ok {
- t.curNum -= 1
- fmt.Println("get token success")
- } else {
- fmt.Println("get token fail")
- }
- }
- }
文中涉及代码已上传至代码库,参见 https://github.com/MerlinFeng/codenote/tree/main/rate_limit
在实际的系统设计中,令牌桶和漏桶算法的应用是比较广泛的,分别有着不同的使用场景。
令牌桶可以用来保护自己,主要用来对上游调用者频率进行限流,为的是让自己不被打垮。 所以如果自己本身有处理能力的时候,如果流量突发(实际消费能力强于配置的流量限制),那么实际处理速率可以超过配置的限制。
漏桶算法,用来保护他人,也就是保护他所调用的下游系统。主要场景是,当调用的第三方系统本身没有保护机制,或者有流量限制的时候, 我们的调用速度不能超过他的限制,由于我们不能更改第三方系统,所以只有在主调方控制。这个时候,即使流量突发,也必须舍弃。 因为消费能力是第三方决定的。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。