当前位置:   article > 正文

Go优雅重启Web server示例-讲解版

func waitforsigterm()

本文参考 GRACEFULLY RESTARTING A GOLANG WEB SERVER 进行归纳和说明。 你也可以从这里拿到添加备注的代码版本。 我做了下分割,方便你能看懂。

问题

因为 golang 是编译型的,所以当我们修改一个用 go 写的服务的配置后,需要重启该服务,有的甚至还需要重新编译,再发布。如果在重启的过程中有大量的请求涌入,能做的无非是分流,或者堵塞请求。不论哪一种,都不优雅~,所以slax0r以及他的团队,就试图探寻一种更加平滑的,便捷的重启方式。

原文章中除了排版比较帅外,文字内容和说明还是比较少的,所以我希望自己补充一些说明。

原理

上述问题的根源在于,我们无法同时让两个服务,监听同一个端口。 解决方案就是复制当前的 listen 文件,然后在新老进程之间通过 socket 直接传输参数和环境变量。 新的开启,老的关掉,就这么简单。

防看不懂须知

Unix domain socket

一切皆文件

先玩一下

运行程序,过程中打开一个新的 console,输入 kill -1 [进程号],你就能看到优雅重启的进程了。

代码思路

  1. func main() {
  2. 主函数,初始化配置
  3. 调用serve()
  4. }
  5. func serve() {
  6. 核心运行函数
  7. getListener() // 1. 获取监听 listener
  8. start() // 2. 用获取到的 listener 开启 server 服务
  9. waitForSignal() // 3. 监听外部信号,用来控制程序 fork 还是 shutdown
  10. }
  11. func getListener() {
  12. 获取正在监听的端口对象
  13. (第一次运行新建)
  14. }
  15. func start() {
  16. 运行 http server
  17. }
  18. func waitForSignal() {
  19. for {
  20. 等待外部信号
  21. 1. fork子进程
  22. 2. 关闭进程
  23. }
  24. }
  25. 复制代码

上面是代码思路的说明,基本上我们就围绕这个大纲填充完善代码。

定义结构体

我们抽象出两个结构体,描述程序中公用的数据结构

  1. var cfg *srvCfg
  2. type listener struct {
  3. // Listener address
  4. Addr string `json:"addr"`
  5. // Listener file descriptor
  6. FD int `json:"fd"`
  7. // Listener file name
  8. Filename string `json:"filename"`
  9. }
  10. type srvCfg struct {
  11. sockFile string
  12. addr string
  13. ln net.Listener
  14. shutDownTimeout time.Duration
  15. childTimeout time.Duration
  16. }
  17. 复制代码

listener 是我们的监听者,他包含了监听地址,文件描述符,文件名。 文件描述符其实就是进程所需要打开的文件的一个索引,非负整数。 我们平时创建一个进程时候,linux都会默认打开三个文件,标准输入stdin,标准输出stdout,标准错误stderr, 这三个文件各自占用了 0,1,2 三个文件描述符。所以之后你进程还要打开文件的话,就得从 3 开始了。 这个listener,就是我们进程之间所要传输的数据了。

srvCfg 是我们的全局环境配置,包含 socket file 路径,服务监听地址,监听者对象,父进程超时时间,子进程超时时间。 因为是全局用的配置数据,我们先 var 一下。

入口

看看我们的 main 长什么样子

  1. func main() {
  2. serve(srvCfg{
  3. sockFile: "/tmp/api.sock",
  4. addr: ":8000",
  5. shutDownTimeout: 5*time.Second,
  6. childTimeout: 5*time.Second,
  7. }, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  8. w.Write([]byte(`Hello, world!`))
  9. }))
  10. }
  11. func serve(config srvCfg, handler http.Handler) {
  12. cfg = &config
  13. var err error
  14. // get tcp listener
  15. cfg.ln, err = getListener()
  16. if err != nil {
  17. panic(err)
  18. }
  19. // return an http Server
  20. srv := start(handler)
  21. // create a wait routine
  22. err = waitForSignals(srv)
  23. if err != nil {
  24. panic(err)
  25. }
  26. }
  27. 复制代码

很简单,我们把配置都准备好了,然后还注册了一个 handler--输出 Hello, world!

serve 函数的内容就和我们之前的思路一样,只不过多了些错误判断。

接下去,我们一个一个看里面的函数...

获取 listener

也就是我们的 getListener() 函数

  1. func getListener() (net.Listener, error) {
  2. // 第一次执行不会 importListener
  3. ln, err := importListener()
  4. if err == nil {
  5. fmt.Printf("imported listener file descriptor for addr: %s\n", cfg.addr)
  6. return ln, nil
  7. }
  8. // 第一次执行会 createListener
  9. ln, err = createListener()
  10. if err != nil {
  11. return nil, err
  12. }
  13. return ln, err
  14. }
  15. func importListener() (net.Listener, error) {
  16. ...
  17. }
  18. func createListener() (net.Listener, error) {
  19. fmt.Println("首次创建 listener", cfg.addr)
  20. ln, err := net.Listen("tcp", cfg.addr)
  21. if err != nil {
  22. return nil, err
  23. }
  24. return ln, err
  25. }
  26. 复制代码

因为第一次不会执行 importListener, 所以我们暂时不需要知道 importListener 里是怎么实现的。 只肖明白 createListener 返回了一个监听对象。

而后就是我们的 start 函数

  1. func start(handler http.Handler) *http.Server {
  2. srv := &http.Server{
  3. Addr: cfg.addr,
  4. Handler: handler,
  5. }
  6. // start to serve
  7. go srv.Serve(cfg.ln)
  8. fmt.Println("server 启动完成,配置信息为:",cfg.ln)
  9. return srv
  10. }
  11. 复制代码

很明显,start 传入一个 handler,然后协程运行一个 http server。

监听信号

监听信号应该是我们这篇里面重头戏的入口,我们首先来看下代码:

  1. func waitForSignals(srv *http.Server) error {
  2. sig := make(chan os.Signal, 1024)
  3. signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP)
  4. for {
  5. select {
  6. case s := <-sig:
  7. switch s {
  8. case syscall.SIGHUP:
  9. err := handleHangup() // 关闭
  10. if err == nil {
  11. // no error occured - child spawned and started
  12. return shutdown(srv)
  13. }
  14. case syscall.SIGTERM, syscall.SIGINT:
  15. return shutdown(srv)
  16. }
  17. }
  18. }
  19. }
  20. 复制代码

首先建立了一个通道,这个通道用来接收系统发送到程序的命令,比如kill -9 myprog, 这个 9 就是传到通道里的。我们用 Notify 来限制会产生响应的信号,这里有:

如果实在搞不清这三个信号的区别,只要明白我们通过区分信号,留给了进程自己判断处理的余地。

然后我们开启了一个循环监听,显而易见地,监听的就是系统信号。 当信号为 syscall.SIGHUP ,我们就要重启进程了。 而当信号为 syscall.SIGTERM, syscall.SIGINT 时,我们直接关闭进程。

于是乎,我们就要看看,handleHangup 里面到底做了什么。

父子间的对话

进程之间的优雅重启,我们可以看做是一次愉快的父子对话, 爸爸给儿子开通了一个热线,爸爸通过热线把现在正在监听的端口信息告诉儿子, 儿子在接受到必要的信息后,子承父业,开启新的空进程,告知爸爸,爸爸正式退休。

  1. func handleHangup() error {
  2. c := make(chan string)
  3. defer close(c)
  4. errChn := make(chan error)
  5. defer close(errChn)
  6. // 开启一个热线通道
  7. go socketListener(c, errChn)
  8. for {
  9. select {
  10. case cmd := <-c:
  11. switch cmd {
  12. case "socket_opened":
  13. p, err := fork()
  14. if err != nil {
  15. fmt.Printf("unable to fork: %v\n", err)
  16. continue
  17. }
  18. fmt.Printf("forked (PID: %d), waiting for spinup", p.Pid)
  19. case "listener_sent":
  20. fmt.Println("listener sent - shutting down")
  21. return nil
  22. }
  23. case err := <-errChn:
  24. return err
  25. }
  26. }
  27. return nil
  28. }
  29. 复制代码

socketListener 开启了一个新的 unix socket 通道,同时监听通道的情况,并做相应的处理。 处理的情况说白了就只有两种:

  1. 通道开了,说明我可以造儿子了(fork),儿子来接爸爸的信息
  2. 爸爸把监听对象文件都传给儿子了,爸爸完成使命

handleHangup 里面的东西有点多,不要慌,我们一个一个来看。 先来看 socketListener

  1. func socketListener(chn chan<- string, errChn chan<- error) {
  2. // 创建 socket 服务端
  3. fmt.Println("创建新的socket通道")
  4. ln, err := net.Listen("unix", cfg.sockFile)
  5. if err != nil {
  6. errChn <- err
  7. return
  8. }
  9. defer ln.Close()
  10. // signal that we created a socket
  11. fmt.Println("通道已经打开,可以 fork 了")
  12. chn <- "socket_opened"
  13. // accept
  14. // 阻塞等待子进程连接进来
  15. c, err := acceptConn(ln)
  16. if err != nil {
  17. errChn <- err
  18. return
  19. }
  20. // read from the socket
  21. buf := make([]byte, 512)
  22. nr, err := c.Read(buf)
  23. if err != nil {
  24. errChn <- err
  25. return
  26. }
  27. data := buf[0:nr]
  28. fmt.Println("获得消息子进程消息", string(data))
  29. switch string(data) {
  30. case "get_listener":
  31. fmt.Println("子进程请求 listener 信息,开始传送给他吧~")
  32. err := sendListener(c) // 发送文件描述到新的子进程,用来 import Listener
  33. if err != nil {
  34. errChn <- err
  35. return
  36. }
  37. // 传送完毕
  38. fmt.Println("listener 信息传送完毕")
  39. chn <- "listener_sent"
  40. }
  41. }
  42. 复制代码

sockectListener创建了一个 unix socket 通道,创建完毕后先发送了 socket_opened 这个信息。 这时候 handleHangup 里的 case "socket_opened" 就会有反应了。 同时,socketListener 一直在 accept 阻塞等待新程序的信号,从而发送原 listener 的文件信息。 直到发送完毕,才会再告知 handlerHangup listener_sent

下面是 acceptConn 的代码,并没有复杂的逻辑,就是等待子程序请求、处理超时和错误。

  1. func acceptConn(l net.Listener) (c net.Conn, err error) {
  2. chn := make(chan error)
  3. go func() {
  4. defer close(chn)
  5. fmt.Printf("accept 新连接%+v\n", l)
  6. c, err = l.Accept()
  7. if err != nil {
  8. chn <- err
  9. }
  10. }()
  11. select {
  12. case err = <-chn:
  13. if err != nil {
  14. fmt.Printf("error occurred when accepting socket connection: %v\n",
  15. err)
  16. }
  17. case <-time.After(cfg.childTimeout):
  18. fmt.Println("timeout occurred waiting for connection from child")
  19. }
  20. return
  21. }
  22. 复制代码

还记的我们之前定义的 listener 结构体吗?这时候就要派上用场了:

  1. func sendListener(c net.Conn) error {
  2. fmt.Printf("发送老的 listener 文件 %+v\n", cfg.ln)
  3. lnFile, err := getListenerFile(cfg.ln)
  4. if err != nil {
  5. return err
  6. }
  7. defer lnFile.Close()
  8. l := listener{
  9. Addr: cfg.addr,
  10. FD: 3, // 文件描述符,进程初始化描述符为0 stdin 1 stdout 2 stderr,所以我们从3开始
  11. Filename: lnFile.Name(),
  12. }
  13. lnEnv, err := json.Marshal(l)
  14. if err != nil {
  15. return err
  16. }
  17. fmt.Printf("将 %+v\n 写入连接\n", string(lnEnv))
  18. _, err = c.Write(lnEnv)
  19. if err != nil {
  20. return err
  21. }
  22. return nil
  23. }
  24. func getListenerFile(ln net.Listener) (*os.File, error) {
  25. switch t := ln.(type) {
  26. case *net.TCPListener:
  27. return t.File()
  28. case *net.UnixListener:
  29. return t.File()
  30. }
  31. return nil, fmt.Errorf("unsupported listener: %T", ln)
  32. }
  33. 复制代码

sendListener 先将我们正在使用的tcp监听文件(一切皆文件)做了一份拷贝,并把必要的信息塞进了 listener 结构体中,序列化后用 unix socket 传输给新的子进程。

说了这么多都是爸爸进程的代码,中间我们跳过了创建子进程, 那下面我们来看看 fork,也是一个重头戏:

  1. func fork() (*os.Process, error) {
  2. // 拿到原监听文件描述符并打包到元数据中
  3. lnFile, err := getListenerFile(cfg.ln)
  4. fmt.Printf("拿到监听文件 %+v\n,开始创建新进程\n", lnFile.Name())
  5. if err != nil {
  6. return nil, err
  7. }
  8. defer lnFile.Close()
  9. // 创建子进程时必须要塞的几个文件
  10. files := []*os.File{
  11. os.Stdin,
  12. os.Stdout,
  13. os.Stderr,
  14. lnFile,
  15. }
  16. // 拿到新进程的程序名,因为我们是重启,所以就是当前运行的程序名字
  17. execName, err := os.Executable()
  18. if err != nil {
  19. return nil, err
  20. }
  21. execDir := filepath.Dir(execName)
  22. // 生孩子了
  23. p, err := os.StartProcess(execName, []string{execName}, &os.ProcAttr{
  24. Dir: execDir,
  25. Files: files,
  26. Sys: &syscall.SysProcAttr{},
  27. })
  28. fmt.Println("创建子进程成功")
  29. if err != nil {
  30. return nil, err
  31. }
  32. // 这里返回 nil 后就会直接 shutdown 爸爸进程
  33. return p, nil
  34. }
  35. 复制代码

当执行 StartProcess 的那一刻,你会意识到,子进程的执行会回到最初的地方,也就是 main 开始。 这时候,我们 获取 listener中的 importListener 方法就会被激活:

  1. func importListener() (net.Listener, error) {
  2. // 向已经准备好的 unix socket 建立连接,这个是爸爸进程在之前就建立好的
  3. c, err := net.Dial("unix", cfg.sockFile)
  4. if err != nil {
  5. fmt.Println("no unix socket now")
  6. return nil, err
  7. }
  8. defer c.Close()
  9. fmt.Println("准备导入原 listener 文件...")
  10. var lnEnv string
  11. wg := sync.WaitGroup{}
  12. wg.Add(1)
  13. go func(r io.Reader) {
  14. defer wg.Done()
  15. // 读取 conn 中的内容
  16. buf := make([]byte, 1024)
  17. n, err := r.Read(buf[:])
  18. if err != nil {
  19. return
  20. }
  21. lnEnv = string(buf[0:n])
  22. }(c)
  23. // 写入 get_listener
  24. fmt.Println("告诉爸爸我要 'get-listener' 了")
  25. _, err = c.Write([]byte("get_listener"))
  26. if err != nil {
  27. return nil, err
  28. }
  29. wg.Wait() // 等待爸爸传给我们参数
  30. if lnEnv == "" {
  31. return nil, fmt.Errorf("Listener info not received from socket")
  32. }
  33. var l listener
  34. err = json.Unmarshal([]byte(lnEnv), &l)
  35. if err != nil {
  36. return nil, err
  37. }
  38. if l.Addr != cfg.addr {
  39. return nil, fmt.Errorf("unable to find listener for %v", cfg.addr)
  40. }
  41. // the file has already been passed to this process, extract the file
  42. // descriptor and name from the metadata to rebuild/find the *os.File for
  43. // the listener.
  44. // 我们已经拿到了监听文件的信息,我们准备自己创建一份新的文件并使用
  45. lnFile := os.NewFile(uintptr(l.FD), l.Filename)
  46. fmt.Println("新文件名:", l.Filename)
  47. if lnFile == nil {
  48. return nil, fmt.Errorf("unable to create listener file: %v", l.Filename)
  49. }
  50. defer lnFile.Close()
  51. // create a listerer with the *os.File
  52. ln, err := net.FileListener(lnFile)
  53. if err != nil {
  54. return nil, err
  55. }
  56. return ln, nil
  57. }
  58. 复制代码

这里的 importListener 执行时间,就是在父进程创建完新的 unix socket 通道后。

至此,子进程开始了新的一轮监听,服务...

结束

代码量虽然不大,但是传递了一个很好的优雅重启思路,有些地方还是要实践一下才能理解(对于我这种新手而言)。 其实网上还有很多其他优雅重启的方式,大家可以 Google 一下。 希望我上面简单的讲解能够帮到你,如果有错误的话请及时指出,我会更正的。

你也可以从这里拿到添加备注的代码版本。 我做了下分割,方便你能看懂。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/681884
推荐阅读
相关标签
  

闽ICP备14008679号