当前位置:   article > 正文

Go并发调用的超时处理

case <-h.ctx.done()

之前有聊过 golang 的协程,我发觉似乎还很理论,特别是在并发安全上,所以特结合网上的一些例子,来试验下go routine中 的 channel, select, context 的妙用。

场景-微服务调用

我们用 gin(一个web框架) 作为处理请求的工具,需求是这样的:
一个请求 X 会去并行调用 A, B, C 三个方法,并把三个方法返回的结果加起来作为 X 请求的 Response。
但是我们这个 Response 是有时间要求的(不能超过3秒的响应时间),

可能 A, B, C 中任意一个或两个,处理逻辑十分复杂,或者数据量超大,导致处理时间超出预期,
那么我们就马上切断,并返回已经拿到的任意个返回结果之和。

我们先来定义主函数:

  1. func main() {
  2. r := gin.New()
  3. r.GET("/calculate", calHandler)
  4. http.ListenAndServe(":8008", r)
  5. }

非常简单,普通的请求接受和 handler 定义。其中 calHandler 是我们用来处理请求的函数。

分别定义三个假的微服务,其中第三个将会是我们超时的哪位~

  1. func microService1() int {
  2. time.Sleep(1*time.Second)
  3. return 1
  4. }
  5. func microService2() int {
  6. time.Sleep(2*time.Second)
  7. return 2
  8. }
  9. func microService3() int {
  10. time.Sleep(10*time.Second)
  11. return 3
  12. }

接下来,我们看看 calHandler 里到底是什么

  1. func calHandler(c *gin.Context) {
  2. ...
  3. c.JSON(http.StatusOK, gin.H{"code":200, "result": sum})
  4. return
  5. }

一个典型的 gin Response,我们先不用在意 sum 是什么。

要点1--并发调用

直接用 go 就好了嘛~
所以一开始我们可能就这么写:

  1. go microService1()
  2. go microService2()
  3. go microService3()

很简单有没有,但是等等,说好的返回值我怎么接呢?
为了能够并行地接受处理结果,我们很容易想到用 channel 去接。
所以我们把调用服务改成这样:

  1. var resChan = make(chan int, 3) // 因为有3个结果,所以我们创建一个可以容纳3个值的 int channel。
  2. go func() {
  3. resChan <- microService1()
  4. }()
  5. go func() {
  6. resChan <- microService2()
  7. }()
  8. go func() {
  9. resChan <- microService3()
  10. }()

有东西接,那也要有方法去算,所以我们加一个一直循环拿 resChan 中结果并计算的方法:

  1. var resContainer, sum int
  2. for {
  3. resContainer = <-resChan
  4. sum += resContainer
  5. }

这样一来我们就有一个 sum 来计算每次从 resChan 中拿出的结果了。

要点2--超时信号

还没结束,说好的超时处理呢?
为了实现超时处理,我们需要引入一个东西,就是 context,什么是 context ?
我们这里只使用 context 的一个特性,超时通知(其实这个特性完全可以用 channel 来替代)。

可以看在定义 calHandler 的时候我们已经将 c *gin.Context 作为参数传了进来,那我们就不用自己在声明了。
gin.Context 简单理解为贯穿整个 gin 声明周期的上下文容器,有点像是分身,亦或是量子纠缠的感觉。

有了这个 gin.Context, 我们就能在一个地方对 context 做出操作,而其他正在使用 context 的函数或方法,也会感受到 context 做出的变化。

ctx, _ := context.WithTimeout(c, 3*time.Second) //定义一个超时的 context

只要时间到了,我们就能用 ctx.Done() 获取到一个超时的 channel(通知),然后其他用到这个 ctx 的地方也会停掉,并释放 ctx。
一般来说,ctx.Done() 是结合 select 使用的。
所以我们又需要一个循环来监听 ctx.Done()

  1. for {
  2. select {
  3. case <- ctx.Done():
  4. // 返回结果
  5. }

现在我们有两个 for 了,是不是能够合并下?

  1. for {
  2. select {
  3. case resContainer = <-resChan:
  4. sum += resContainer
  5. fmt.Println("add", resContainer)
  6. case <- ctx.Done():
  7. fmt.Println("result:", sum)
  8. return
  9. }
  10. }

诶嘿,看上去不错。
不过我们怎么在正常完成微服务调用的时候输出结果呢?
看来我们还需要一个 flag

  1. var count int
  2. for {
  3. select {
  4. case resContainer = <-resChan:
  5. sum += resContainer
  6. count ++
  7. fmt.Println("add", resContainer)
  8. if count > 2 {
  9. fmt.Println("result:", sum)
  10. return
  11. }
  12. case <- ctx.Done():
  13. fmt.Println("timeout result:", sum)
  14. return
  15. }
  16. }

我们加入一个计数器,因为我们只是调用3次微服务,所以当 count 大于2的时候,我们就应该结束并输出结果了。

要点3--并发中的等待

上面的计时器是一种偷懒的方法,因为我们知道了调用微服务的次数,如果我们并不知道,或者之后还要添加呢?
手动每次改 count 的判断阈值会不会太不优雅了?这时候我们就可以加入 sync 包。
我们将会使用的 sync 的一个特性是 WaitGroup。它的作用是等待一组协程运行完毕后,执行接下去的步骤。

我们来改下之前微服务调用的代码块:

  1. var success = make(chan int, 1) // 成功的通道标识
  2. wg := sync.WaitGroup{} // 创建一个 waitGroup 组
  3. wg.Add(3) // 我们往组里加3个标识,因为我们要运行3个任务
  4. go func() {
  5. resChan <- microService1()
  6. wg.Done() // 完成一个,Done()一个
  7. }()
  8. go func() {
  9. resChan <- microService2()
  10. wg.Done()
  11. }()
  12. go func() {
  13. resChan <- microService3()
  14. wg.Done()
  15. }()
  16. wg.Wait() // 直到我们前面三个标识都被 Done 了,否则程序一直会阻塞在这里
  17. success <- 1 // 我们发送一个成功信号到通道中

注意:如果我们直接把上面的代码放到 calHandler 里,会出现一个问题,WaitGroup不论怎么样都会堵塞我们的正常情况输出(死活都要让你超时)。
所以,我们把上面这段和业务逻辑相关的代码单独抽离出来,并包装一下。

  1. // rc 是结果 channel, success 是成功与否的 flag channel
  2. func MyLogic(rc chan<- int, success chan<- int) {
  3. wg := sync.WaitGroup{} // 创建一个 waitGroup 组
  4. wg.Add(3) // 我们往组里加3个标识,因为我们要运行3个任务
  5. go func() {
  6. rc <- microService1()
  7. wg.Done() // 完成一个,Done()一个
  8. }()
  9. go func() {
  10. rc <- microService2()
  11. wg.Done()
  12. }()
  13. go func() {
  14. rc <- microService3()
  15. wg.Done()
  16. }()
  17. wg.Wait() // 直到我们前面三个标识都被 Done 了,否则程序一直会阻塞在这里
  18. success <- 1 // 我们发送一个成功信号到通道中
  19. }

最终,这个 MyLogic 还是要作为一个协程运行的。

既然我们有了 success 这个信号,那么再把它加入到监控 for 循环中,并做些修改,删除原来 count 判断的部分。

  1. for {
  2. select {
  3. case resContainer = <-resChan:
  4. sum += resContainer
  5. fmt.Println("add", resContainer)
  6. case <- success:
  7. fmt.Println("result:", sum)
  8. return
  9. case <- ctx.Done():
  10. fmt.Println("result:", sum)
  11. return
  12. }
  13. }

三个 case,分工明确,

case resContainer = <-resChan:用来拿逻辑的输出的结果并计算

case <- success:是理想情况下的正常输出

case <- ctx.Done():是超时情况下的输出

我们再润色一下,把后两个 case 的 fmt.Println("result:", sum)改为 gin 的标准 http Response

  1. c.JSON(http.StatusOK, gin.H{"code":200, "result": sum})
  2. return

至此,所有的主要代码都完成了。下面是完全版

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "sync"
  7. "time"
  8. "github.com/gin-gonic/gin"
  9. )
  10. // 一个请求会触发调用三个服务,每个服务输出一个 int,
  11. // 请求要求结果为三个服务输出 int 之和
  12. // 请求返回时间不超过3秒,大于3秒只输出已经获得的 int 之和
  13. func calHandler(c *gin.Context) {
  14. var resContainer, sum int
  15. var success, resChan = make(chan int), make(chan int, 3)
  16. ctx, cancel := context.WithTimeout(c, 5*time.Second)
  17. defer cancel()
  18. // 真正的业务逻辑
  19. go MyLogic(resChan, success)
  20. for {
  21. select {
  22. case resContainer = <-resChan:
  23. sum += resContainer
  24. fmt.Println("add", resContainer)
  25. case <- success:
  26. c.JSON(http.StatusOK, gin.H{"code":200, "result": sum})
  27. return
  28. case <- ctx.Done():
  29. c.JSON(http.StatusOK, gin.H{"code":200, "result": sum})
  30. return
  31. }
  32. }
  33. }
  34. func main() {
  35. r := gin.New()
  36. r.GET("/calculate", calHandler)
  37. http.ListenAndServe(":8008", r)
  38. }
  39. func MyLogic(rc chan<- int, success chan<- int) {
  40. wg := sync.WaitGroup{} // 创建一个 waitGroup 组
  41. wg.Add(3) // 我们往组里加3个标识,因为我们要运行3个任务
  42. go func() {
  43. rc <- microService1()
  44. wg.Done() // 完成一个,Done()一个
  45. }()
  46. go func() {
  47. rc <- microService2()
  48. wg.Done()
  49. }()
  50. go func() {
  51. rc <- microService3()
  52. wg.Done()
  53. }()
  54. wg.Wait() // 直到我们前面三个标识都被 Done 了,否则程序一直会阻塞在这里
  55. success <- 1 // 我们发送一个成功信号到通道中
  56. }
  57. func microService1() int {
  58. time.Sleep(1*time.Second)
  59. return 1
  60. }
  61. func microService2() int {
  62. time.Sleep(2*time.Second)
  63. return 2
  64. }
  65. func microService3() int {
  66. time.Sleep(6*time.Second)
  67. return 3
  68. }

上面的程序只是简单描述了一个调用其他微服务超时的处理场景。
实际过程中还需要加很多很多调料,才能保证接口的对外完整性。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/624897
推荐阅读
相关标签
  

闽ICP备14008679号