Golang实现平滑重启(优雅重启)

栏目: Go · 发布时间: 5年前

内容简介:最近在看traefik的源代码, 看到其中有一个功能是平滑重启, 不过他是通过一个叫做上面说的graceful这个库,如它在Github的简介所说: "Graceful is a Go package enabling graceful shutdown of an http.Handler server." 只提供了优雅关闭, 不提供优雅重启. 那么什么叫做优雅关闭呢? 意思就是服务器要关闭了, 会拒绝新的连接,但是老的连接不会被强制关闭,而是 会等待一定时间, 等待客户端主动关闭, 除非客户端一直没有

最近在看traefik的源代码, 看到其中有一个功能是平滑重启, 不过他是通过一个叫做 graceful 的库来做到的, 是在HTTP Server的层级. 于是我探索了一下方案, 在TCP层级做了一个demo出来.

先看traefik的实现方案

上面说的graceful这个库,如它在Github的简介所说: "Graceful is a Go package enabling graceful shutdown of an http.Handler server." 只提供了优雅关闭, 不提供优雅重启. 那么什么叫做优雅关闭呢? 意思就是服务器要关闭了, 会拒绝新的连接,但是老的连接不会被强制关闭,而是 会等待一定时间, 等待客户端主动关闭, 除非客户端一直没有关闭, 到了预设的超时时间才进行服务器端关闭.

我们来看看traefik是怎么做的:

func main() { // goroutine 0
    goAway := false
    go func() {  // goroutine 1
        sig := <-sigs
        fmt.Println("I have to go...", sig)
        goAway = true
        srv.Stop(10 * time.Second)
    }()

    for{
        if (goAway){
            break
        }
        fmt.Println("Started")
        srv = &graceful.Server{
            Timeout: 10 * time.Second,
            NoSignalHandling: true,

            ConnState: func(conn net.Conn, state http.ConnState) {
                fmt.Println( "Connection ", state)
            },

            Server: &http.Server{
                Addr: ":8001",
                Handler: userRouter,
            },
        }

        go srv.ListenAndServe()  // goroutine 2
        <- srv.StopChan() // goroutine 0
        fmt.Println("Stopped")
    }
}

可以看到, 我们用 goroutine + 数字 来表示所注释的代码会在哪个goroutine里执行, main函数我们假设是在goroutine 0里执行.

sigs

而追踪进去就会发现, srv.StopChan() 是一个确保 graceful Server正确初始化 srv.stopChan 的函数. 搜索stopChan, 我们可以看到

func (srv *Server) shutdown(shutdown chan chan struct{}, kill chan struct{}) {
    // Request done notification
    done := make(chan struct{})
    shutdown <- done

    srv.stopLock.Lock()
    defer srv.stopLock.Unlock()
    if srv.Timeout > 0 {
        select {
        case <-done:
        case <-time.After(srv.Timeout):
            close(kill)
        }
    } else {
        <-done
    }
    // Close the stopChan to wake up any blocked goroutines.
    srv.chanLock.Lock()
    if srv.stopChan != nil {
        close(srv.stopChan)
    }
    srv.chanLock.Unlock()
}

在调用shutdown之后, 就会关闭这个channel, 然后上面所说的for循环就会重新初始化. 于是似乎就实现了一次 "平滑重启". 为什么打 引号呢? 因为在关闭服务器端的监听和下一次for循环重新执行到 srv.ListenAndServe() 之间的这一段时间间隙, 很有可能会有新的 连接到来却因为服务器端没有监听而连接失败. 所以这个实现和我们直接执行 sudo systemctl restart nginx 是类似的.

更详细的traefik源码分析我会另外再写一篇博客来分析, 这里就此打住. 接下来来看一下简单的在TCP层面实现平滑重启的服务器.

TCP平滑重启

首先我们来看看怎么起一个TCP服务器:

package main

import (
    "fmt"
    "net"
)

func handleConnection(conn net.Conn) {
    conn.Write([]byte("hello"))
    conn.Close()
}

func main() {
    ln, err := net.Listen("tcp", ":8080")
    if err != nil {
        fmt.Println(err)
    } else {
        fmt.Println(ln.Addr())
    }
    for {
        if conn, err := ln.Accept(); err == nil {
            fmt.Println("new conn...")
            go handleConnection(conn)
        }
    }
}

为了测试,我们写一个 Python 脚本(Python果然还是更加简洁):

import socket


def foo():
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect(("127.0.0.1", 8080))
    s.close()


if __name__ == "__main__":
    while True:
        foo()

执行之后就可以看到输出.

我们知道多个进程是不可以监听在同一个(IP地址,端口号)对上的, 即,不能对同一对(IP地址,端口号) 执行多次listen函数,我们可以做个实验,把ListenAndServe抽出来,起另外一个goroutine去执行,为了方便 区分,我们加入一个参数,就是goroutine的名字:

package main

import (
    "fmt"
    "net"
)

func handleConnection(conn net.Conn) {
    conn.Write([]byte("hello"))
    conn.Close()
}

func ListenAndServe(name string) {
    ln, err := net.Listen("tcp", ":8080")
    if err != nil {
        fmt.Println(err)
    } else {
        fmt.Println(ln.Addr())
    }

    fmt.Println(name)
    for {
        if conn, err := ln.Accept(); err == nil {
            go handleConnection(conn)
        }
    }
}

func main() {
    go ListenAndServe("server1")
    ListenAndServe("server2")
}

执行一下:

$ go run t.go
[::]:8080
server2
listen tcp :8080: bind: address already in use
server1
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x20 pc=0x4dc9a4]

但是我们可以在同一个socket对上, 共享同一个监听套接字地址, 然后在多个goroutine中执行accept函数:

package main

import (
    "fmt"
    "net"
)

func handleConnection(conn net.Conn) {
    conn.Write([]byte("hello"))
    conn.Close()
}

func ListenAndServe(ln net.Listener, name string) {
    for {
        if conn, err := ln.Accept(); err == nil {
            fmt.Println(name)
            go handleConnection(conn)
        }
    }
}

func main() {
    ln, err := net.Listen("tcp", ":8080")
    if err != nil {
        fmt.Println(err)
    } else {
        fmt.Println(ln.Addr())
    }

    go ListenAndServe(ln, "server1")
    ListenAndServe(ln, "server2")
}

但是这还远远不是平滑重启, 只是证明了平滑重启是可行的, 毕竟平滑重启的前提就是在父子进程中能够 共享同一个套接字, 而且在不同的地方可以进行 accept 操作. 接下来我们来看一下怎么fork, 然后带上 socket套接字的文件描述符, 然后再在子进程中重新把套接字描述符还原成 tcp.Listener :

  • 先来看怎么把套接字转换成文件描述符, 传递给另外一个goroutine, 然后这个goroutine还原成listener:
package main

import (
    "fmt"
    "net"
)

func handleConnection(conn net.Conn) {
    conn.Write([]byte("hello"))
    conn.Close()
}

func listenAndServe(ln net.Listener, name string) {
    for {
        if conn, err := ln.Accept(); err == nil {
            fmt.Println(name)
            go handleConnection(conn)
        }
    }
}

func main() {
    ln, err := net.Listen("tcp", ":8080")
    if err != nil {
        fmt.Println(err)
    } else {
        fmt.Println(ln.Addr())
    }
    l := ln.(*net.TCPListener)
    newFile, _ := l.File()
    fmt.Println(newFile.Fd())

    anotherListener, _ := net.FileListener(newFile)

    go listenAndServe(anotherListener, "listener 1")
    listenAndServe(ln, "listener 2")
}

接下来我们要在 go 中进行fork并且传递文件描述符, 查看了文档, 可以通过 exec.Cmd 里的 ExtraFiles []*os.File 来传递:

package main

import (
    "flag"
    "fmt"
    "net"
    "os"
    "os/exec"
)

var (
    graceful = flag.Bool("graceful", false, "-graceful")
)

func handleConnection(conn net.Conn) {
    conn.Write([]byte("hello"))
    conn.Close()
}

func listenAndServe(ln net.Listener, name string) {
    for {
        if conn, err := ln.Accept(); err == nil {
            fmt.Println(name)
            go handleConnection(conn)
        }
    }
}

func gracefulRestart() {
    ln, err := net.FileListener(os.NewFile(3, "graceful server"))
    if err != nil {
        fmt.Println(err)
    } else {
        fmt.Println(ln)
    }

    listenAndServe(ln, "graceful server")
}

func main() {
    flag.Parse()
    fmt.Printf("given args: %t\n", *graceful)

    if *graceful {
        gracefulRestart()
    } else {
        ln, err := net.Listen("tcp", ":8080")
        if err != nil {
            fmt.Println(err)
        } else {
            fmt.Println(ln.Addr())
        }
        l := ln.(*net.TCPListener)
        newFD, _ := l.File()
        fmt.Println(newFD.Fd())

        cmd := exec.Command(os.Args[0], "-graceful")
        cmd.Stdin, cmd.Stdout, cmd.Stderr = os.Stdin, os.Stdout, os.Stderr
        cmd.ExtraFiles = []*os.File{newFD}
        cmd.Run()
    }
}

当然我们还可以做的更好, 例如让graceful server支持再次graceful restart, 于是代码变成了这样:

ckage main

import (
    "flag"
    "fmt"
    "net"
    "os"
    "os/exec"
    "os/signal"
    "syscall"
)

var (
    graceful = flag.Bool("graceful", false, "-graceful")
)

// Accepted accepted connection
type Accepted struct {
    conn net.Conn
    err  error
}

func handleConnection(conn net.Conn) {
    conn.Write([]byte("hello"))
    conn.Close()
}

func listenAndServe(ln net.Listener, sig chan os.Signal) {
    accepted := make(chan Accepted, 1)
    go func() {
        for {
            conn, err := ln.Accept()
            accepted <- Accepted{conn, err}
        }
    }()

    for {
        select {
        case a := <-accepted:
            if a.err == nil {
                fmt.Println("handle connection")
                go handleConnection(a.conn)
            }
        case _ = <-sig:
            fmt.Println("gonna fork and run")
            forkAndRun(ln)
            break
        }
    }
}

func gracefulListener() net.Listener {
    ln, err := net.FileListener(os.NewFile(3, "graceful server"))
    if err != nil {
        fmt.Println(err)
    }

    return ln
}

func firstBootListener() net.Listener {
    ln, err := net.Listen("tcp", ":8080")
    if err != nil {
        fmt.Println(err)
    }

    return ln
}

func forkAndRun(ln net.Listener) {
    l := ln.(*net.TCPListener)
    newFile, _ := l.File()
    fmt.Println(newFile.Fd())

    cmd := exec.Command(os.Args[0], "-graceful")
    cmd.Stdin, cmd.Stdout, cmd.Stderr = os.Stdin, os.Stdout, os.Stderr
    cmd.ExtraFiles = []*os.File{newFile}
    cmd.Run()
}

func main() {
    flag.Parse()
    fmt.Printf("given args: %t, pid: %d\n", *graceful, os.Getpid())
    c := make(chan os.Signal, 1)
    signal.Notify(c, syscall.SIGUSR1)

    var ln net.Listener
    if *graceful {
        ln = gracefulListener()
    } else {
        ln = firstBootListener()
    }

    listenAndServe(ln, c)
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

智能优化算法及其应用

智能优化算法及其应用

王凌 / 清华大学出版社 / 2001-10 / 22.00元

智能优化算法及其应用,ISBN:9787302044994,作者:王凌著一起来看看 《智能优化算法及其应用》 这本书的介绍吧!

MD5 加密
MD5 加密

MD5 加密工具

SHA 加密
SHA 加密

SHA 加密工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具