赞
踩
代码地址:https://gitee.com/lymgoforIT/golang-trick/tree/master/29-limiter-breaker
限流与熔断、降级在微服务中是非常常见的概念,但是尽管我们耳熟能详了,却未必实际去了解过底层原理及其实现。
之前已经有博客介绍并实现过限流与降低了,博客地址如下:
限流:13. Go中常见限流算法示例代码
降级:17. 灰度开关、降级开关、灰度放量
主要知识点
Gin
Gin
作用:拒绝上游服务
发起的超过服务器承载能力的流量。是服务保护自身、质疑上游的一种体现,避免上游打挂自己。
作用:防止在下游服务
不可用的情况下造成雪崩效应。是服务保护自身、怀疑下游的一种体现,避免自身被下游拖垮,导致雪崩。
算法简介
令牌桶算法(Token Bucket)
是网络流量整形(Traffic Shaping)
和速率限制(Rate Limiting)
中最常使用的一种算法。典型情况下,令牌桶算法用来控制发送到网络上的数据的数目,并允许突发数据的发送
。想象有一个木桶,以固定的速度往木桶里加入令牌,木桶满了则不再加入令牌。服务收到请求时尝试从木桶中取出一个令牌,如果能够得到令牌则继续执行后续的业务逻辑;如果没有得到令牌,直接返回访问频率超限的错误码或页面等,不继续执行后续的业务逻辑
由于木桶内只要有令牌,请求就可以被处理,所以令牌桶算法可以支持突发流量。同时由于往木桶添加令牌的速度是固定的,且木桶的容量有上限,所以单位时间内处理的请求数目也能够得到控制,起到限流的目的。假设加入令牌的速度为 1token/10ms
(则1s
内最多放置100
个令牌,因此QPS
期望是100
左右),另一方面,桶的容量为500
,在请求比较的少的时候(小于每10
毫秒1
个请求)时,木桶可以先"攒"一些令牌(最多500
个)。当有突发流量时,一下把木桶内的令牌取空,也就是有500
个在并发执行的业务逻辑,之后要等每10ms
补充一个新的令牌才能接收一个新的请求。
木桶的容量设置:需要考虑业务逻辑的资源消耗和机器能承载并发处理多少业务逻辑。
生成令牌的速度设置 :太慢的话起不到“攒”令牌应对突发流量的效果,可根据预估或压测的QPS
进行设置。
B
个令牌,当桶满时,新添加的令牌被丢弃或拒绝1
个,则不会删除令牌,且请求将被限流(丢弃或阻塞等待)令牌桶限制的是平均流入速率(允许突发请求,只要有令牌就可以处理,支持一次拿3个令牌,4个令牌…),并允许一定程度突发流量。
适用场景
适合电商抢购或者微博出现热点事件这种场景,因为在限流的同时可以应对一定的突发流量。如果采用均匀速度处理请求的算法,在发生热点时间的时候,会造成大量的用户无法访问,对用户体验的损害比较大。
实现思路:
capacity
)rate
用于记录填充速率,lastTime
用于记录上次补充令牌的时间),令牌最多补充到桶的大小tokens
记录桶中当前的令牌数量lock
字段type TokenBucketLimiter struct {
lock sync.Mutex
rate time.Duration // 多长时间放入一个令牌,即放入令牌的速率
capacity int64 // 令牌桶的容量,控制最多放入多少令牌,也即突发最大并发量
tokens int64 // 当前桶中已有的令牌数量
lastTime time.Time // 上次放入令牌的时间,避免开启协程定时去放入令牌,而是请求到来时懒加载的方式(now - lastTime) / rate放入令牌
}
Go 实现
假设设置每100ms
生产一个令牌,记录最近一次访问的时间戳 lastTime
和令牌数
,每次请求时如果 now - lastTime > 100ms
, 增加 (now - lastTime) / 100ms
个令牌。然后,如果令牌数 > 0
,令牌数 -1
继续执行后续的业务逻辑,否则返回请求频率超限的错误码或页面。
上面的算法是对整体的请求进行的限流,如果是要对用户或IP
进行限流,则可以使用map[string]Limiter
控制,key
为userId
或IP
,value
为对应的限流器。
package limiter import ( "sync" "time" ) type TokenBucketLimiter struct { lock sync.Mutex rate time.Duration // 多长时间放入一个令牌,即放入令牌的速率 capacity int64 // 令牌桶的容量,控制最多放入多少令牌,也即突发最大并发量 tokens int64 // 当前桶中已有的令牌数量 lastTime time.Time // 上次放入令牌的时间,避免开启协程定时去放入令牌,而是请求到来时懒加载的方式(now - lastTime) / rate放入令牌 } func NewTokenBucketLimiter(rate time.Duration, capacity int64) *TokenBucketLimiter { if capacity < 1 { panic(any("token bucket capacity must be large 1")) } return &TokenBucketLimiter{ lock: sync.Mutex{}, rate: rate, capacity: capacity, tokens: 0, lastTime: time.Time{}, } } func (tbl *TokenBucketLimiter) Allow() bool { tbl.lock.Lock() // 加锁避免并发错误 defer tbl.lock.Unlock() // 如果 now 与上次请求的间隔超过了 token rate // 则增加令牌,更新lastTime now := time.Now() if now.Sub(tbl.lastTime) > tbl.rate { tbl.tokens += int64((now.Sub(tbl.lastTime)) / tbl.rate) // 放入令牌 if tbl.tokens > tbl.capacity { tbl.tokens = tbl.capacity // 总令牌数不能大于桶的容量 } tbl.lastTime = now // 更新上次往桶中放入令牌的时间 } if tbl.tokens > 0 { // 令牌数是否充足 tbl.tokens -= 1 return true } return false // 令牌不足,拒绝请求 }
一个组件要集成Gin
,一般情况下都是通过中间件实现的,所以我们首先建立一个middleware
包,写一个限流中间件
package middleware import ( "github.com/gin-gonic/gin" "golang-trick/29-limiter-breaker/limiter" "net/http" ) func Limiter(l *limiter.TokenBucketLimiter) gin.HandlerFunc { return func(context *gin.Context) { if !l.Allow(){ context.JSON(http.StatusForbidden,gin.H{ "error":"当前可用令牌数为0,请稍后再试!", }) context.Abort() } context.Next() } }
我们不想全局使用该中间件,而是写到了指定的接口上,即对指定的接口才限流,如下
package main import ( "github.com/gin-gonic/gin" "golang-trick/29-limiter-breaker/limiter" "golang-trick/29-limiter-breaker/middleware" "net/http" "time" ) func main(){ r := gin.Default() // 每秒放入一个令牌,最多应对4个突发流量 limitMiddleware := middleware.Limiter(limiter.NewTokenBucketLimiter(time.Second,4)) // 工作中并不是所有的接口都有限流诉求的,所以我们将限流中间件用在指定的接口上 r.GET("/ping", limitMiddleware,func(context *gin.Context) { context.JSON(http.StatusOK,gin.H{ "message":"pong", }) }) r.Run() }
测试:
实现思路
0
,不可能同时不为0
。状态机如下:
代码实现
由于熔断器有明显的三个状态,以及会有状态之间的转换,所以我们可以将其定为常量
const (
STATE_CLOSE = iota
STATE_OPEN
STATE_HALF_OPEN
)
时间周期可以定义两个,一个是正常情况下的时间周期,一个是打开态经历多久后可以进入半开半闭状态的时间周期,为了简便,我们将这两个字段合为了一个,即认为这两个时间周期设置的值(时长)是一样的
结构体字段以及构造方法的思路看如下代码注释:
type Breaker struct { mu sync.Mutex state int // 当前状态 failureThreshold int // 连续失败的阈值,用于控制由关闭->打开态 failureCount int // 已经连续失败的次数,用于计数以及和连续失败的阈值做比较,进行状态是否需要转换的判断 successThreshold int // 连续成功的阈值,用于控制由半开半闭状态->关闭 successCount int // 已经连续成功的次数,用于计数以及和连续成功的阈值做比较,进行状态是否需要转换的判断 halfMaxRequest int // 半开半闭状态下最大可放行请求数 halfCycleReqCount int // 半开半闭状态下已经请求了多少次 timeout time.Duration // 时间周期 cycleStartTime time.Time // 当前周期的开始时间 } // 通过观察Breaker结构体不难看出,很多字段都是用于计数的,在代码运行时变化,不需要用户设置 // 需要用户设置的值我们才放到构造方法中 func NewBreaker(failureThreshold, successThreshold, halfMaxRequest int, timeout time.Duration) *Breaker { return &Breaker{ state: STATE_CLOSE, //初始为关闭状态 failureThreshold: failureThreshold, successThreshold: successThreshold, halfMaxRequest: halfMaxRequest, timeout: timeout, } }
具体实现代码如下,主要看代码注释哦,应该还是比较清晰的,主要是在每次请求前后的代码
package breaker import ( "errors" "sync" "time" ) const ( STATE_CLOSE = iota STATE_OPEN STATE_HALF_OPEN ) type Breaker struct { mu sync.Mutex state int // 当前状态 failureThreshold int // 连续失败的阈值,用于控制由关闭->打开态 failureCount int // 已经连续失败的次数,用于计数以及和连续失败的阈值做比较,进行状态是否需要转换的判断 successThreshold int // 连续成功的阈值,用于控制由半开半闭状态->关闭 successCount int // 已经连续成功的次数,用于计数以及和连续成功的阈值做比较,进行状态是否需要转换的判断 halfMaxRequest int // 半开半闭状态下最大可放行请求数 halfCycleReqCount int // 半开半闭状态下已经请求了多少次 timeout time.Duration // 时间周期 cycleStartTime time.Time // 当前周期的开始时间 } // 通过观察Breaker结构体不难看出,很多字段都是用于计数的,在代码运行时变化,不需要用户设置 // 需要用户设置的值我们才放到构造方法中 func NewBreaker(failureThreshold, successThreshold, halfMaxRequest int, timeout time.Duration) *Breaker { return &Breaker{ state: STATE_CLOSE, //初始为关闭状态 failureThreshold: failureThreshold, successThreshold: successThreshold, halfMaxRequest: halfMaxRequest, timeout: timeout, } } // 熔断器是具体针对某个方法而言的,所以执行的时候需要传入一个方法 func (b *Breaker) Exec(f func() error) error { // 请求到来时根据时间是否超出当前周倩判断是否需要状态变更 b.before() // 前置状态判断与变更结束后,还是打开状态,那么可以直接拒绝请求了 if b.state == STATE_OPEN { return errors.New("熔断器处于打开状态,无法访问服务!") } // 关闭状态,可以直接放行 if b.state == STATE_CLOSE { // 实际的业务逻辑 err := f() // 请求结束后,判断是否需要状态变更 b.after(err) return err } if b.state == STATE_HALF_OPEN { // 半开状态下,判断当前周期内是否达到半开允许请求的最大次数 if b.halfCycleReqCount < b.halfMaxRequest { err := f() b.after(err) return err } else { return errors.New("熔断器处于半开状态,且当前周期内请求次数超出半开状态下所允许的最大值,请稍后重试!") } } return nil } // 我们不需要用专门的协程去变更状态,那样比较麻烦且耗费资源 // 请求到来时,我们再判断是否需要变更状态就行了 func (b *Breaker) before() { b.mu.Lock() defer b.mu.Unlock() // 由于总共就三个状态,所以不必要使用状态模式或者状态机FSM,直接用switch case就行了 switch b.state { case STATE_OPEN: // 如果之前处于了打开状态,那么本次请求到来时,如果时间已经过去一个周期了,那么应该进入半开半闭状态了 if b.cycleStartTime.Add(b.timeout).Before(time.Now()) { b.state = STATE_HALF_OPEN // 状态变更时,各种计数以及周期的开始时间都应该被重置了 b.reset() return } case STATE_HALF_OPEN: // 如果时间过去一个周期了,半开下的计数和周期开始时间需要重置,但是连续成功的次数不需要重置哦 // 比如我们设置了连续成功四次才改为关闭状态,但半开状态一个周期内最大允许请求数才设置两个 // 那么就应该是可以统计多个周期内的连续成功次数累计的,否则永远达不到一个周期内连续成功大于四次了 if b.cycleStartTime.Add(b.timeout).Before(time.Now()) { b.halfCycleReqCount = 0 b.cycleStartTime = time.Now() } case STATE_CLOSE: // 关闭状态下不需要比较什么阈值之类的,只要周期过了就重置计数和周期开始时间即可 if b.cycleStartTime.Add(b.timeout).Before(time.Now()) { b.reset() return } } } // 根据请求下游成功还是失败来变更熔断器的状态以及相应的计数 func (b *Breaker) after(err error) { b.mu.Lock() defer b.mu.Unlock() if err == nil { b.onSuccess() } else { b.onFailure() } } func (b *Breaker) reset() { b.failureCount = 0 b.successCount = 0 b.halfCycleReqCount = 0 b.cycleStartTime = time.Now() } func (b *Breaker) onSuccess() { b.failureCount = 0 // 请求只要成功一次,连续请求失败次数就归零 // 该onSuccess方法只有在关闭和半开状态才可能进入这里,而关闭状态下请求成功了,不需要判断是否需要变更状态 // 所以只需要判断是否半开状态即可 if b.state == STATE_HALF_OPEN { b.successCount++ // 需要累计,用于判断是否可以进入关闭状态 b.halfCycleReqCount++ // 需要累计,用于判断半开状态下当前周期已经达到半开的最大请求限制 if b.successCount >= b.successThreshold { // 连续成功次数大于等于设置的阈值了,可以进入关闭状态了 b.state = STATE_CLOSE b.reset() // 状态变更时,一定记住要重置计数和当前周期开始时间 } } } func (b *Breaker) onFailure() { b.successCount = 0 // 请求只要失败一次,连续请求成功次数就归零 b.failureCount++ // 该onFailure方法也只有在关闭和半开状态才可能进入这里 if b.state == STATE_CLOSE { if b.failureCount >= b.failureThreshold { // 连续失败次数达到连续失败阈值了,应该打开熔断器 b.state = STATE_OPEN b.reset() return } } if b.state == STATE_HALF_OPEN { b.state = STATE_OPEN // 半开状态下,只要失败一次,就重新进入打开状态 b.reset() return } }
package main import ( "errors" "golang-trick/29-limiter-breaker/breaker" "golang-trick/29-limiter-breaker/limiter" "golang-trick/29-limiter-breaker/middleware" "net/http" "time" "github.com/gin-gonic/gin" ) func main() { r := gin.Default() // 注意熔断器无法使用中间件,因为中间件是没有返回值的,而熔断器需要判断请求下游后的结果是成功还是失败 b := breaker.NewBreaker(4, 4, 2, time.Second*15) r.GET("/ping1", func(context *gin.Context) { err := b.Exec(func() error { value, _ := context.GetQuery("value") // 模拟,当请求参数为a时,我们认为请求下游失败 if value == "a" { return errors.New("value为a认为请求下游失败") } return nil // 不为a认为请求下游成功了,所以返回的错误为nil }) if err != nil { context.JSON(http.StatusInternalServerError, gin.H{ "error": err.Error(), }) return } context.JSON(http.StatusOK, gin.H{ "message": "pong1", }) }) r.Run() }
测试:
1、启动服务后正常访问
2、输入参数a
则认为请求失败
3、15秒内连续失败次数超出四次,熔断器打开
4、等待15
秒后,熔断器进入半开状态,可以正常放行少于半开状态下最大请求次数2
次的请求
5、半开状态15
秒内,成功次数超过两次后,拦截后续请求(不管是会成功还是会失败的请求,都会被拦截)
6、半开状态下当请求次数还没有超出半开请求最大次数限制时,有一次失败请求,熔断器就立即再次进入关闭状态
7、两个半开周期内,各成功两次,连续成功次数达到连续成功阈值4
次后,熔断器进入关闭状态
在Go
中,我们可以使用golang.org/x/time/rate
这个包实现令牌桶限流策略。其中,rate.Limiter
类型提供了每秒产生固定令牌数的功能,这意味着,我们可以定义每秒允许执行的令牌数量,从而实现限流。
以下是一个令牌桶限流的简单示例:
package main import ( "context" "fmt" "golang.org/x/time/rate" "time" ) func main() { // 创建一个限流器,r为每秒生成令牌的数量,b为最多存储的令牌数量。 r := rate.Limit(1) // 生成令牌的速率 b := int(5) // 令牌桶大小 limiter := rate.NewLimiter(r, b) ctx := context.Background() // 模拟20个请求 for i := 1; i <= 20; i++ { err := limiter.Wait(ctx) // 阻塞等待直到有令牌可取 if err != nil { fmt.Println(i, "limiter.Wait()失败:", err) continue } fmt.Println(i, "请求通过", time.Now().Format("2006-01-02 15:04:05")) } }
这段代码运行后,你会看到一开始有5
个请求瞬间通过,这是因为一开始令牌桶是满的,然后开始限制,每秒只能通过一次请求,因为我们设置的rate.Limit(1)
,即每秒生成一个令牌。
需要强调的是,rate.Limiter
两个方法:
limiter.Allow()
,非阻塞,如果取不到令牌直接返回limiter.Wait(ctx)
,阻塞等待直到取到令牌上述代码使用的是后者,所以如果取不到令牌就会阻塞等待。如果你想要非阻塞地获取令牌,就需要使用Allow()
方法。
在Go
中,熔断器是一种能够防止系统过载并减少失败风险的机制。它是通过控制服务调用、设置超时、限制请求次数等手段来实现的。一种称为 hystrix-go
的库是对 Netflix
的熔断器模式的一个实现。以下是如何在Go
语言中使用 hystrix-go
。
首先,你需要安装 hystrix-go
:
go get github.com/Netflix/hystrix-go/hystrix
然后,你可以在你的代码中使用它:
package main import ( "github.com/Netflix/hystrix-go/hystrix" "log" "time" ) func main() { hystrix.ConfigureCommand("my_command", hystrix.CommandConfig{ Timeout: 1000, // 超时时间设置: MaxConcurrentRequests: 100, // 最大并发数设置 ErrorPercentThreshold: 50, // 错误百分比线设置,超过该百分比就启动熔断 }) for i := 0; i < 10000; i++ { // 使用熔断器执行命令 err := hystrix.Do("my_command", func() error { // 实际的业务逻辑, // 如果调用失败或者超过了超时时间,就会开始计算错误的比例。 // 比如这里我们模拟一个每1毫秒执行1次的任务 time.Sleep(1 * time.Millisecond) return nil }, nil) if err != nil { log.Printf("错误: %s", err.Error()) } } }
以上代码中,创建了一个名为 my_command
的熔断器,设置了超时时间、最大并发数和错误百分比阈值。然后不断地执行一个任务,模拟业务逻辑。如果任务出现错误或者超时,hystrix-go
就会开始计算错误的比例,一旦错误比例超过了我们设置的阈值,就会启动熔断,后续的任务调用将自动被拒绝,直到一段时间后(默认是5
秒)再尝试放行部分流量,测试系统的状态。
在 Go
中实现服务的降级,我们可以根据情况采用诸如限流、熔断等方案。下面以使用 Go
开源工具库 hystrix-go
来实现熔断降级为例。
首先,需要安装 hystrix-go 库:
go get github.com/afex/hystrix-go/hystrix
然后,可以按照以下步骤进行编码:
package main import ( "github.com/afex/hystrix-go/hystrix" "fmt" ) func main() { // 配置熔断器 hystrix.ConfigureCommand("my_command", hystrix.CommandConfig{ Timeout: 1000, // 执行command的超时时间 MaxConcurrentRequests: 100, // command的最大并发量 SleepWindow: 5000, // 降级后尝试恢复正常的间隔,单位毫秒 ErrorPercentThreshold: 1, // 触发熔断错误比率,超过这个错误率,断路器将会从关闭打开 }) // 使用熔断器 output := make(chan bool, 1) errors := hystrix.Go("my_command", func() error { // 这是你要执行的命令 output <- call() return nil }, func(err error) error { // 这里是你的降级逻辑 output <- false return nil }) // 在你的业务逻辑中处理结果 select { case out := <-output: // success fmt.Println("success:", out) case err := <-errors: // failure fmt.Println("error:", err) } } func call() bool { // 这里模拟你的业务逻辑 return true }
在这个例子中,我们使用 hystrix
对一段需要降级处理的代码进行了包装。当这段代码运行时如果发生错误,会触发我们设置的降级逻辑。这样,即使在面临大量错误的情况下,我们的系统也能够保持稳定运行。
需要注意的是,降级处理的方法需要依据业务具体情况和需要来设计。以上例子为最基础的模板,真实的使用环境中需要根据业务需求进行更复杂的设计。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。