Go 程序如何實現優雅退出?來看看 K8s 是怎麼做的——下篇

K8s 的優雅退出

現在,我們已經掌握了 Go 中 HTTP Server 程序如何實現優雅退出,是時候看一看 K8s 中提供的一種更爲優雅的優雅退出退出方案了😄。

這要從 K8s API Server 啓動入口說起:

https://github.com/kubernetes/kubernetes/blob/release-1.31/cmd/kube-apiserver/apiserver.go

func main() {
 command := app.NewAPIServerCommand()
 code := cli.Run(command)
 os.Exit(code)
}

K8s API Server 啓動入口代碼非常簡單,我們可以進入 app.NewAPIServerCommand() 查看更多細節:

https://github.com/kubernetes/kubernetes/blob/release-1.31/cmd/kube-apiserver/app/server.go#L122

// NewAPIServerCommand creates a *cobra.Command object with default parameters
func NewAPIServerCommand() *cobra.Command {
 ...
 cmd := &cobra.Command{
  ...
  RunE: func(cmd *cobra.Command, args []string) error {
   ...
   return Run(cmd.Context(), completedOptions)
  },
  ...
 }
 cmd.SetContext(genericapiserver.SetupSignalContext())

 ...
 return cmd
}

NewAPIServerCommand 函數中,我們要關注的核心代碼只有兩行:

一行是 cmd.SetContext(genericapiserver.SetupSignalContext()),這是在爲 cmd 對象設置 ctx 屬性。

另一行是 RunE 屬性中最後一行代碼 Run(cmd.Context(), completedOptions),這裏是啓動程序,並使用了 cmd 對象的 ctx 屬性。

很明顯,K8s 使用了 Go 語言中流行的 Cobra 命令行框架作爲程序的啓動框架,Cobra 提供瞭如下兩個方法可以設置和獲取 Context

func (c *Command) Context() context.Context {
 return c.ctx
}

func (c *Command) SetContext(ctx context.Context) {
 c.ctx = ctx
}

NOTE: 如果你對 Cobra 不太熟悉,可以參考我的另一篇文章《萬字長文:Go 語言現代命令行框架 Cobra 詳解》

這裏的 ctx 就是串聯起 K8s 實現優雅退出的核心對象。

首先通過 genericapiserver.SetupSignalContext() 獲取到一個 context.Context 對象,根據函數名稱可以猜測到它可能跟信號有關。

對於 Run(cmd.Context(), completedOptions) 方法的調用,由於嵌套層級比較深,邏輯比較複雜,我就不把整個代碼調用鏈都貼出來講了。總之,這個啓動過程最終可以定位到 preparedGenericAPIServer.RunWithContext 這個方法的執行。在 RunWithContext 方法內部的第一行代碼 stopCh := ctx.Done() 是重點,它拿到了一個控制程序退出時機的 channel(這跟我們前文講解的優雅退出示例中 quit := make(chan os.Signal, 1) 變量作用相同),而這個 ctx 實際上就是 genericapiserver.SetupSignalContext() 的返回值,如果你感興趣可以詳細研究下這個 stopCh 的使用過程。

我們直接去分析 genericapiserver.SetupSignalContext() 的實現:

https://github.com/kubernetes/apiserver/blob/release-1.31/pkg/server/signal.go

package server

import (
 "context"
 "os"
 "os/signal"
)

var onlyOneSignalHandler = make(chan struct{})
var shutdownHandler chan os.Signal

// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
// which is closed on one of these signals. If a second signal is caught, the program
// is terminated with exit code 1.
// Only one of SetupSignalContext and SetupSignalHandler should be called, and only can
// be called once.
func SetupSignalHandler() <-chan struct{} {
 return SetupSignalContext().Done()
}

// SetupSignalContext is same as SetupSignalHandler, but a context.Context is returned.
// Only one of SetupSignalContext and SetupSignalHandler should be called, and only can
// be called once.
func SetupSignalContext() context.Context {
 close(onlyOneSignalHandler) // panics when called twice

 shutdownHandler = make(chan os.Signal, 2)

 ctx, cancel := context.WithCancel(context.Background())
 signal.Notify(shutdownHandler, shutdownSignals...)
 go func() {
  <-shutdownHandler
  cancel()
  <-shutdownHandler
  os.Exit(1) // second signal. Exit directly.
 }()

 return ctx
}

// RequestShutdown emulates a received event that is considered as shutdown signal (SIGTERM/SIGINT)
// This returns whether a handler was notified
func RequestShutdown() bool {
 if shutdownHandler != nil {
  select {
  case shutdownHandler <- shutdownSignals[0]:
   return true
  default:
  }
 }

 return false
}

這裏代碼不多,但卻相當精妙,可以一窺 K8s 設計之優雅。

我們從 SetupSignalContext 函數開始分析。

SetupSignalContext 函數第一行代碼,通過調用 close(onlyOneSignalHandler) 來確保在整個程序中只調用一次 SetupSignalContext 函數,調用多次則直接 panic。這能強制調用方寫出正確的代碼,避免出現意料之外的情況。

shutdownHandler 是一個包含了兩個緩衝區的 channel,而不像我們定義的 quit := make(chan os.Signal, 1) 那樣只有一個緩衝區大小。

我們前文講過,通過 signal.Notify(c chan<- os.Signal, sig ...os.Signal) 函數註冊所關注的信號後,signal 包在給 c 發送信號時不會阻塞。因爲我們要接收兩次退出信號,所以 shutdownHandler 緩衝區大小爲 2

這也是 SetupSignalContext 函數的精髓所在,它實現了收到一次 SIGINT/SIGTERM 信號,程序優雅退出,收到兩次 SIGINT/SIGTERM 信號,程序強制退出的功能。

代碼片段如下:

ctx, cancel := context.WithCancel(context.Background())
signal.Notify(shutdownHandler, shutdownSignals...)
go func() {
 <-shutdownHandler
 cancel()
 <-shutdownHandler
 os.Exit(1) // second signal. Exit directly.
}()

這裏使用一個帶有取消功能的 Context,當第一次收到信號時,就調用 cancel() 取消這個 ctx。而這個 ctx 會作爲函數返回值返給調用方,調用方拿到它,就可以在需要的地方調用 <-ctx.Done() 來等待退出信號了。這就是 preparedGenericAPIServer.RunWithContext 方法中調用 stopCh := ctx.Done() 拿到 channel,然後等待 <-stopCh 退出信號的邏輯了。

這裏用到的 shutdownSignals 變量,定義在 signal_posix.go 文件中:

https://github.com/kubernetes/apiserver/blob/release-1.31/pkg/server/signal_posix.go

package server

import (
 "os"
 "syscall"
)

var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}

shutdownSignals 是一個保存了兩個信號的切片對象。

os.Interrupt 實際上是一個變量,它的值等於 syscall.SIGINT

// The only signal values guaranteed to be present in the os package on all
// systems are os.Interrupt (send the process an interrupt) and os.Kill (force
// the process to exit). On Windows, sending os.Interrupt to a process with
// os.Process.Signal is not implemented; it will return an error instead of
// sending a signal.
var (
 Interrupt Signal = syscall.SIGINT
 Kill      Signal = syscall.SIGKILL
)

這裏爲實現優雅退出,監控了兩個信號 SIGINTSIGTERM,並沒有監控 SIGQUIT 信號。不過這已經足夠用了,根據我的經驗,絕大多數情況下我們都會使用 Ctrl + C 終止程序,而非使用 Ctrl + \

SetupSignalHandler 函數內部調用了 SetupSignalContext 函數,它唯一的作用就是直接返回給調用方 ctx.Done() 所返回的 channel,以此來方便調用方。

RequestShutdown 函數可以主動觸發退出事件信號(SIGTERM/SIGINT),返回值表示是否觸發成功。

現在將 K8s 優雅退出方案集成進我們的 net/http 優雅退出示例程序中:

package main

import (
 "context"
 "errors"
 "log"
 "net/http"
 "time"

 genericapiserver "k8s.io/apiserver/pkg/server"
)

func main() {
 srv := &http.Server{
  Addr: ":8000",
 }

 http.HandleFunc("/sleep", func(w http.ResponseWriter, r *http.Request) {
  duration, err := time.ParseDuration(r.FormValue("duration"))
  if err != nil {
   http.Error(w, err.Error(), 400)
   return
  }

  time.Sleep(duration)
  _, _ = w.Write([]byte("Welcome HTTP Server"))
 })

 go func() {
  if err := srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
   log.Fatalf("HTTP server error: %v", err)
  }
  log.Println("Stopped serving new connections")
 }()

 // NOTE: 只需要替換這 3 行代碼,Gin 版本同理
 // quit := make(chan os.Signal, 1)
 // signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
 // <-quit

 // 可以直接丟棄,context.Context.Done() 返回的就是普通空結構體
 <-genericapiserver.SetupSignalHandler()

 log.Println("Shutdown Server...")

 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
 defer cancel()

 // We received an SIGINT/SIGTERM signal, shut down.
 if err := srv.Shutdown(ctx); err != nil {
  // Error from closing listeners, or context timeout:
  log.Printf("HTTP server Shutdown: %v", err)
 }
 log.Println("HTTP server graceful shutdown completed")
}

我們只需要將如下 3 行代碼:

quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit

替換成 K8s 提供的 SetupSignalHandler 函數調用即可:

<-genericapiserver.SetupSignalHandler()

其他代碼都不用修改。

執行示例程序,按一次 Ctrl + C 測試優雅退出:

$ go build -o main main.go && ./main
^C2024/08/22 09:24:46 Shutdown Server...
2024/08/22 09:24:46 Stopped serving new connections
2024/08/22 09:24:49 HTTP server graceful shutdown completed
$ echo $?
0
$ curl "http://localhost:8000/sleep?duration=5s"
Welcome HTTP Server

執行示例程序,按兩次 Ctrl + C 測試強制退出:

$ go build -o main main.go && ./main
^C2024/08/22 09:25:28 Shutdown Server...
2024/08/22 09:25:28 Stopped serving new connections
^C
$ echo $?                                       
1
$ curl "http://localhost:8000/sleep?duration=5s"
curl: (52) Empty reply from server

完美,K8s 爲我們提供了優雅退出的新思路。這樣在開發環境,爲了方便調試,我們可以無需等待優雅退出,只要連續發送兩次 SIGTERM/SIGINT 即可強制退出程序。在生產環境發送一次 SIGTERM/SIGINT 信號等待優雅退出。

使用 Gin 框架開發的 Web 程序也可以這樣修改,你可以自行嘗試。

gRPC 的優雅退出

gRPC Server 優雅退出

接下來我們一起看下 gRPC Server 程序如何實現優雅退出。

示例程序目錄結構如下:

$ tree grpc
grpc
├── Makefile
├── client
│   └── main.go
├── pb
│   ├── helloworld.pb.go
│   ├── helloworld.proto
│   └── helloworld_grpc.pb.go
└── server
    └── main.go

helloworld.proto 中定義了 gRPC Server 支持的服務接口:

syntax = "proto3";

option go_package = ".;pb";

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
  string duration = 2;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

server/main.go 中 Server 端代碼如下:

// Package main implements a server for Greeter service.
package main

import (
 "context"
 "flag"
 "fmt"
 "log"
 "net"
 "time"

 "google.golang.org/grpc"
 genericapiserver "k8s.io/apiserver/pkg/server"

 "github.com/jianghushinian/blog-go-example/gracefulstop/grpc/pb"
)

var (
 port = flag.Int("port", 50051, "The server port")
)

// server is used to implement helloworld.GreeterServer.
type server struct {
 pb.UnimplementedGreeterServer
}

// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
 log.Printf("Received: %v", in.GetName())

 duration, _ := time.ParseDuration(in.GetDuration())
 time.Sleep(duration)

 return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}

func main() {
 flag.Parse()
 lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
 if err != nil {
  log.Fatalf("failed to listen: %v", err)
 }

 s := grpc.NewServer()
 pb.RegisterGreeterServer(s, &server{})
 log.Printf("server listening at %v", lis.Addr())

 go func() {
  if err := s.Serve(lis); err != nil {
   log.Fatalf("failed to serve: %v", err)
  }
 }()

 <-genericapiserver.SetupSignalHandler()
 log.Printf("Shutdown Server...")
 s.GracefulStop()
 log.Println("gRPC server graceful shutdown completed")
}

這與 HTTP Server 的優雅退出邏輯基本相同,同樣由 grpc 包提供了優雅退出方法 GracefulStop

在接收到退出信號以後,調用 s.GracefulStop() 方法即可實現優雅退出。可以發現,這其實是一個優雅退出的套路。

client/main.go 中 Client 端代碼如下:

// Package main implements a client for Greeter service.
package main

import (
 "context"
 "flag"
 "log"
 "time"

 "google.golang.org/grpc"
 "google.golang.org/grpc/credentials/insecure"

 "github.com/jianghushinian/blog-go-example/gracefulstop/grpc/pb"
)

const (
 defaultName = "world"
)

var (
 addr = flag.String("addr""localhost:50051""the address to connect to")
 name = flag.String("name", defaultName, "Name to greet")
)

func main() {
 flag.Parse()
 // Set up a connection to the server.
 conn, err := grpc.NewClient(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
 if err != nil {
  log.Fatalf("did not connect: %v", err)
 }
 defer conn.Close()
 c := pb.NewGreeterClient(conn)

 // Contact the server and print out its response.
 ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
 defer cancel()
 r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name, Duration: "10s"})
 if err != nil {
  log.Fatalf("could not greet: %v", err)
 }
 log.Printf("Greeting: %s", r.GetMessage())
}

執行示例程序,測試優雅退出邏輯:

# 執行服務端代碼
$ go build -o main main.go && ./main
2024/08/22 09:26:17 server listening at [::]:50051
2024/08/22 09:26:24 Received: world
^C2024/08/22 09:26:26 Shutdown Server...
2024/08/22 09:26:34 gRPC server graceful shutdown completed
$ echo $?
0
# 執行客戶端代碼
$ go build -o main main.go && ./main
2024/08/22 09:26:34 Greeting: Hello world

優雅退出生效。

既然 gRPC Server 中的優雅退出方案已經介紹完了,同講解 HTTP Server 優雅退出一樣,接下來我再帶你一起深入瞭解一下 GracefulStop 的源碼是如何實現的。

GracefulStop 源碼

GracefulStop 方法源碼如下:

https://github.com/grpc/grpc-go/blob/v1.65.0/server.go#L1882

// GracefulStop stops the gRPC server gracefully. It stops the server from
// accepting new connections and RPCs and blocks until all the pending RPCs are
// finished.
func (s *Server) GracefulStop() {
 s.stop(true)
}

func (s *Server) stop(graceful bool) {
 s.quit.Fire()
 defer s.done.Fire()

 s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelz.ID) })
 s.mu.Lock()
 s.closeListenersLocked()
 // Wait for serving threads to be ready to exit.  Only then can we be sure no
 // new conns will be created.
 s.mu.Unlock()
 s.serveWG.Wait()

 s.mu.Lock()
 defer s.mu.Unlock()

 if graceful {
  s.drainAllServerTransportsLocked()
 } else {
  s.closeServerTransportsLocked()
 }

 for len(s.conns) != 0 {
  s.cv.Wait()
 }
 s.conns = nil

 if s.opts.numServerWorkers > 0 {
  // Closing the channel (only once, via grpcsync.OnceFunc) after all the
  // connections have been closed above ensures that there are no
  // goroutines executing the callback passed to st.HandleStreams (where
  // the channel is written to).
  s.serverWorkerChannelClose()
 }

 if graceful || s.opts.waitForHandlers {
  s.handlersWG.Wait()
 }

 if s.events != nil {
  s.events.Finish()
  s.events = nil
 }
}

GracefulStop 方法直接調用了 s.stop(true) 方法。

stop 方法的 graceful 參數用來決定是否啓用優雅退出,傳遞 true 表示優雅退出,傳遞 false 表示強制退出。

stop 方法第一段代碼邏輯如下:

s.quit.Fire()
defer s.done.Fire()

Server 對象的 quitdone 屬性類型都爲 *grpcsync.Event,前者用來標記 gRPC Server 正在執行退出流程,後者標記退出完成。

Event 定義如下:

https://github.com/grpc/grpc-go/blob/v1.65.0/internal/grpcsync/event.go

// Event represents a one-time event that may occur in the future.
type Event struct {
 fired int32
 c     chan struct{}
 o     sync.Once
}

// Fire causes e to complete.  It is safe to call multiple times, and
// concurrently.  It returns true iff this call to Fire caused the signaling
// channel returned by Done to close.
func (e *Event) Fire() bool {
 ret := false
 e.o.Do(func() {
  atomic.StoreInt32(&e.fired, 1)
  close(e.c)
  ret = true
 })
 return ret
}

// Done returns a channel that will be closed when Fire is called.
func (e *Event) Done() <-chan struct{} {
 return e.c
}

// HasFired returns true if Fire has been called.
func (e *Event) HasFired() bool {
 return atomic.LoadInt32(&e.fired) == 1
}

// NewEvent returns a new, ready-to-use Event.
func NewEvent() *Event {
 return &Event{c: make(chan struct{})}
}

可以發現,*Event 對象的 Fire 方法就是將 fired 字段值置爲 1,並且關閉類型爲 channel 的字段 c,所以其實只要調用了 Fire,那麼調用 Done 方法將立即返回,調用 HasFired 方法就是在判斷 fired 字段值是否爲 1(即是否調用過 Fire 方法)。

s.quit.Fire() 代碼被調用以後,Serve 方法就能夠感知到當前服務正在退出,接下來就不會再接收新的請求進來了。

Serve 方法源碼如下:

// Serve accepts incoming connections on the listener lis, creating a new
// ServerTransport and service goroutine for each. The service goroutines
// read gRPC requests and then call the registered handlers to reply to them.
// Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
// this method returns.
// Serve will return a non-nil error unless Stop or GracefulStop is called.
//
// Note: All supported releases of Go (as of December 2023) override the OS
// defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive
// with OS defaults for keepalive time and interval, callers need to do the
// following two things:
//   - pass a net.Listener created by calling the Listen method on a
//     net.ListenConfig with the `KeepAlive` field set to a negative value. This
//     will result in the Go standard library not overriding OS defaults for TCP
//     keepalive interval and time. But this will also result in the Go standard
//     library not enabling TCP keepalives by default.
//   - override the Accept method on the passed in net.Listener and set the
//     SO_KEEPALIVE socket option to enable TCP keepalives, with OS defaults.
func (s *Server) Serve(lis net.Listener) error {
 s.mu.Lock()
 s.printf("serving")
 s.serve = true
 if s.lis == nil {
  // Serve called after Stop or GracefulStop.
  s.mu.Unlock()
  lis.Close()
  return ErrServerStopped
 }

 s.serveWG.Add(1)
 defer func() {
  s.serveWG.Done()
  // 判斷當前服務是否正在退出
  if s.quit.HasFired() {
   // Stop or GracefulStop called; block until done and return nil.
   <-s.done.Done()
  }
 }()

 ls := &listenSocket{
  Listener: lis,
  channelz: channelz.RegisterSocket(&channelz.Socket{
   SocketType:    channelz.SocketTypeListen,
   Parent:        s.channelz,
   RefName:       lis.Addr().String(),
   LocalAddr:     lis.Addr(),
   SocketOptions: channelz.GetSocketOption(lis)},
  ),
 }
 s.lis[ls] = true

 defer func() {
  s.mu.Lock()
  if s.lis != nil && s.lis[ls] {
   ls.Close()
   delete(s.lis, ls)
  }
  s.mu.Unlock()
 }()

 s.mu.Unlock()
 channelz.Info(logger, ls.channelz, "ListenSocket created")

 var tempDelay time.Duration // how long to sleep on accept failure
 for {
  rawConn, err := lis.Accept()
  if err != nil {
   if ne, ok := err.(interface {
    Temporary() bool
   }); ok && ne.Temporary() {
    if tempDelay == 0 {
     tempDelay = 5 * time.Millisecond
    } else {
     tempDelay *= 2
    }
    if max := 1 * time.Second; tempDelay > max {
     tempDelay = max
    }
    s.mu.Lock()
    s.printf("Accept error: %v; retrying in %v", err, tempDelay)
    s.mu.Unlock()
    timer := time.NewTimer(tempDelay)
    select {
    case <-timer.C:
    // 判斷當前服務是否正在退出
    case <-s.quit.Done():
     timer.Stop()
     return nil
    }
    continue
   }
   s.mu.Lock()
   s.printf("done serving; Accept = %v", err)
   s.mu.Unlock()

   // 判斷當前服務是否正在退出
   if s.quit.HasFired() {
    return nil
   }
   return err
  }
  tempDelay = 0
  // Start a new goroutine to deal with rawConn so we don't stall this Accept
  // loop goroutine.
  //
  // Make sure we account for the goroutine so GracefulStop doesn't nil out
  // s.conns before this conn can be added.
  s.serveWG.Add(1)
  go func() {
   s.handleRawConn(lis.Addr().String(), rawConn)
   s.serveWG.Done()
  }()
 }
}

我們可以直接跳到 for 循環部分的代碼段,每次通過 rawConn, err := lis.Accept() 接收到一個新的請求進來,都會使用 if s.quit.HasFired() 來判斷當前服務是否正在退出,如果返回結果爲 true,則 Serve 方法直接退出。

此時 Serve 方法的 defer 語句開始執行,這裏會再次使用 if s.quit.HasFired() 判斷當前服務是否正在退出(之所以判斷兩次,因爲 Serve 方法也可能由於其他原因導致退出,進入 defer 邏輯),如果是,則調用 <-s.done.Done() 阻塞在這裏,直到 GracefulStop 優雅退出邏輯執行完成。

此外,for 循環內部的 select 語句中,有一個 case 調用了 <-s.quit.Done(),也是在判斷當前服務是否正在退出,如果是,則調用 timer.Stop() 清理定時器後,Serve 方法直接退出。

我們接着往下看:

s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelz.ID) })
s.mu.Lock()
s.closeListenersLocked()
// Wait for serving threads to be ready to exit.  Only then can we be sure no
// new conns will be created.
s.mu.Unlock()
s.serveWG.Wait()

這裏的 channelz 是 gRPC 的一個監控工具,用於跟蹤 gRPC 的內部狀態,RemoveEntry 會移除該服務器的監控數據。

s.closeListenersLocked() 方法是不是很熟悉,這與 net/http 包中的 Shutdown 方法命名都一樣,作用也就不言而喻了。

定義如下:

// s.mu must be held by the caller.
func (s *Server) closeListenersLocked() {
 for lis := range s.lis {
  lis.Close()
 }
 s.lis = nil
}

對於 s.serveWG.Wait() 這行代碼,根據這個操作的屬性名和方法名可以猜到,serveWG 明顯是 sync.WaitGroup 類型。

既然有 Wait(),那就應該會有 Add(1) 操作。其正在前文中貼出 Serve 代碼中。

剛進入 Serve 方法時,就會調用 s.serveWG.Add(1)Serve 方法退出時執行 s.serveWG.Done()

s.serveWG.Add(1)
defer func() {
 s.serveWG.Done()
 if s.quit.HasFired() {
  // Stop or GracefulStop called; block until done and return nil.
  <-s.done.Done()
 }
}()

並且,在 Serve 方法的 for 循環邏輯中,每次有新的請求進來,s.serveWG.Add(1)s.serveWG.Done() 也會被調用一次:

s.serveWG.Add(1)
go func() {
 s.handleRawConn(lis.Addr().String(), rawConn)
 s.serveWG.Done()
}()

所以,這裏其實是在等待 Serve 方法執行完成並退出。

接下來的代碼段是根據是否要進行優雅退出,執行不同的邏輯:

s.mu.Lock()
defer s.mu.Unlock()

if graceful {
    s.drainAllServerTransportsLocked()
} else {
    s.closeServerTransportsLocked()
}

可以發現,接下來的全部操作都加鎖處理。

優雅退出走 s.drainAllServerTransportsLocked() 邏輯:

// s.mu must be held by the caller.
func (s *Server) drainAllServerTransportsLocked() {
 if !s.drain {
  for _, conns := range s.conns {
   for st := range conns {
    st.Drain("graceful_stop")
   }
  }
  s.drain = true
 }
}

它的主要作用是在服務器優雅停止的過程中,讓所有的服務器傳輸層(ServerTransports)停止接收新的請求,但繼續處理現有的請求,直到它們完成。

這裏有一行註釋:s.mu must be held by the caller

說明了在調用 drainAllServerTransportsLocked 方法之前,調用者必須已經持有 s.mu 鎖。這是爲了確保在執行方法體時,服務器的狀態不會被併發修改。

對於 if !s.drain 這個條件判斷,其用於確保 drainAllServerTransportsLocked 方法只會執行一次。

s.conns 屬性保存了所有連接,是 map[string]map[transport.ServerTransport]bool 類型。

通過嵌套的 for 循環遍歷每個連接中的 ServerTransport 實例。ServerTransport 是 gRPC 中的一個概念,它表示一個抽象的傳輸層實現,負責處理客戶端和服務器之間的實際數據傳輸。

調用 st.Drain("graceful_stop") 方法的作用,是告訴傳輸層不要再接收新的請求或連接了,但允許繼續處理現有的請求,直到它們完成。

這個方法會向客戶端發送信號,表明服務器正在進行優雅關閉。它會給所有的客戶端發送一個控制幀 GOAWAY(因爲 gRPC 是基於 HTTP/2 的,所以纔會這樣處理),告訴客戶端關閉 TCP 連接。

Drain 方法實現如下:

func (t *http2Server) Drain(debugData string) {
 t.mu.Lock()
 defer t.mu.Unlock()
 if t.drainEvent != nil {
  return
 }
 t.drainEvent = grpcsync.NewEvent()
 t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte(debugData), headsUp: true})
}

在完成對所有連接的遍歷和 Drain 操作後,將 s.drain 設置爲 true,表示服務器已經進入了 "drain" 狀態,這樣後續不會再次執行相同的操作。

結合之前持有鎖的操作,這裏不會重複執行。

相對來說,沒有采用優雅退出的另一個方法 closeServerTransportsLocked 就要暴力一些:

// s.mu must be held by the caller.
func (s *Server) closeServerTransportsLocked() {
 for _, conns := range s.conns {
  for st := range conns {
   st.Close(errors.New("Server.Stop called"))
  }
 }
}

這裏直接調用 Close 方法關閉連接,省略了控制幀 GOAWAY 的發送。

stop 函數接下來會等待所有現有的連接被安全關閉:

for len(s.conns) != 0 {
    s.cv.Wait()
}
s.conns = nil

繼續往下執行:

if s.opts.numServerWorkers > 0 {
    // Closing the channel (only once, via grpcsync.OnceFunc) after all the
    // connections have been closed above ensures that there are no
    // goroutines executing the callback passed to st.HandleStreams (where
    // the channel is written to).
    s.serverWorkerChannelClose()
}

這段代碼用於關閉工作線程的 channel,確保所有處理程序都已經終止,不會再處理新的請求。

s.serverWorkerChannelClose 在初始化操作時被賦值:

// initServerWorkers creates worker goroutines and a channel to process incoming
// connections to reduce the time spent overall on runtime.morestack.
func (s *Server) initServerWorkers() {
 s.serverWorkerChannel = make(chan func())
 s.serverWorkerChannelClose = grpcsync.OnceFunc(func() {
  close(s.serverWorkerChannel)
 })
 for i := uint32(0); i < s.opts.numServerWorkers; i++ {
  go s.serverWorker()
 }
}

接下來,如果是優雅退出或者配置了 s.opts.waitForHandlers 選項,則代碼會調用 s.handlersWG.Wait(),等待所有的處理程序完成:

if graceful || s.opts.waitForHandlers {
 s.handlersWG.Wait()
}

不知你有沒有注意到,Serve 方法內部,調用了 s.handleRawConn(lis.Addr().String(), rawConn) 來處理每一個請求。

handleRawConn 方法內部會調用 s.serveStreams(context.Background(), st, rawConn) 方法。

serveStreams 方法定義如下:

func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport, rawConn net.Conn) {
 ctx = transport.SetConnection(ctx, rawConn)
 ctx = peer.NewContext(ctx, st.Peer())
 for _, sh := range s.opts.statsHandlers {
  ctx = sh.TagConn(ctx, &stats.ConnTagInfo{
   RemoteAddr: st.Peer().Addr,
   LocalAddr:  st.Peer().LocalAddr,
  })
  sh.HandleConn(ctx, &stats.ConnBegin{})
 }

 defer func() {
  st.Close(errors.New("finished serving streams for the server transport"))
  for _, sh := range s.opts.statsHandlers {
   sh.HandleConn(ctx, &stats.ConnEnd{})
  }
 }()

 streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams)
 st.HandleStreams(ctx, func(stream *transport.Stream) {
  s.handlersWG.Add(1)
  streamQuota.acquire()
  f := func() {
   defer streamQuota.release()
   defer s.handlersWG.Done()
   s.handleStream(st, stream)
  }

  if s.opts.numServerWorkers > 0 {
   select {
   case s.serverWorkerChannel <- f:
    return
   default:
    // If all stream workers are busy, fallback to the default code path.
   }
  }
  go f()
 })
}

serveStreams 方法內部會調用 s.handlersWG.Add(1)s.handlersWG.Done() 操作。

serveStreams 執行完成後,stop 方法就可以繼續執行了:

if s.events != nil {
 s.events.Finish()
 s.events = nil
}

stop 方法最後,對事件進行了清理,檢查 s.events 是否不爲空,如果不爲空,則調用 s.events.Finish(),完成事件的清理工作。

至此,GracefulStop 的源碼就分析完成了。

可以發現 grpc 的優雅退出跟 net/http 的優雅退出還是有很多相似之處的,這也很好理解,gRPC 是基於 HTTP/2 的,所以底層也是 HTTP 協議。

普通 Go 程序的優雅退出

在日常開發中,除了 HTTP Server 或者 gRPC Server 程序,我們也可能會開發一些其他需要優雅退出的程序。

在文章的最後一個小節,我再來介紹一種常見的週期性執行任務的 Go 程序如何實現優雅退出。

示例代碼如下:

package main

import (
 "log"
 "time"

 genericapiserver "k8s.io/apiserver/pkg/server"
)

type Syncer struct {
 interval time.Duration
}

func (s *Syncer) Run(quit <-chan struct{}) error {
 ticker := time.NewTicker(s.interval)
 defer ticker.Stop()

 for {
  select {
  case <-ticker.C:
   // 業務邏輯
   log.Println("do something")
  case <-quit:
   log.Println("Stop loop")
   s.Stop()
   return nil
  }
 }
}

func (s *Syncer) Stop() {
 log.Println("Stop Syncer start")
 time.Sleep(time.Second * 5)
 log.Println("Stop Syncer done")
}

func main() {
 s := Syncer{interval: time.Second}
 quit := genericapiserver.SetupSignalHandler()

 if err := s.Run(quit); err != nil {
  log.Fatalf("Syncer run err: %s", err.Error())
 }
}

這裏定義了一個 Syncer 結構體,用來實現週期性執行一段代碼邏輯。

Run 方法中用到了 time.Ticker,在 for 循環中週期執行某些業務邏輯,比如定時同步數據狀態、生成數據報表等。

Run 方法中還用到了類似 http.Server.Shutdown 方法內部的 for + select 代碼結構,來實現接收退出信號 <-quit。當收到退出信號,就調用 s.Stop() 進行優雅退出邏輯。

執行示例代碼,中途按 Ctrl + C 進行優雅退出操作:

$ go build -o main main.go && ./main
2024/08/22 09:27:34 do something
2024/08/22 09:27:35 do something
2024/08/22 09:27:36 do something
^C2024/08/22 09:27:37 Stop loop
2024/08/22 09:27:37 Stop Syncer start
2024/08/22 09:27:42 Stop Syncer done

也可以嘗試強制退出:

$ go build -o main main.go && ./main
2024/08/22 09:28:05 do something
2024/08/22 09:28:06 do something
2024/08/22 09:28:07 do something
^C2024/08/22 09:28:07 Stop loop
2024/08/22 09:28:07 Stop Syncer start
^C

兩種操作都沒有問題。

可以將這個示例程序作爲優雅退出的代碼模板,集成進你的 Go 程序中。

至此,本文要講解的內容就全部都寫完了。

總結

所謂的優雅退出,其實就是在關閉進程的時候,不能 “暴力” 關閉,而是要等待進程中的邏輯(比如一次完整的 HTTP 請求)處理完成後,才關閉進程。

Go 爲我們提供的 os/singal 包進行信號處理。默認情況下接收到退出信號 Go 程序會立即退出,使用 signal.Notify 註冊關注的退出信號以後,我們可以實現自己的處理邏輯。

常見退出信號有 SIGINTSIGQUITSIGTERMSIGKILLSIGKILL 不能被 Go 程序捕獲。

我們分別爲 HTTP Server、gRPC Server 以及週期性執行任務的 Go 程序實現了優雅退出功能,並且對 net/http 包的 Shutdown 源碼以及 grpc 包的 GracefulStop 源碼都進行了分析和講解。

本文還重點講解了 K8s 爲我們提供了一種更加優雅的方式,來實現優雅退出功能。我們可以實現收到一次 SIGINT/SIGTERM 信號,程序優雅退出,收到兩次 SIGINT/SIGTERM 信號,程序強制退出。

切記,忽略優雅退出可能會導致數據的不一致問題,因此實現優雅退出功能是非常有必要的。

實現優雅退出最核心的兩點:

  1. 接收退出信號。

  2. 如何等待正在處理的任務完成,還要考慮超時機制。

這兩步做完,程序就可以退出了。

本文示例源碼我都放在了 GitHub 中,歡迎點擊查看。

希望此文能對你有所啓發。

延伸閱讀

聯繫我

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/oztrz9WfCDnyZC6s4b__LQ