gRPC 客戶端長連接機制實現及 keepalive 分析

0x00 前言

HTTP2 是一個全雙工的流式協議, 服務端也可以主動 ping 客戶端, 且服務端還會有一些檢測連接可用性和控制客戶端 ping 包頻率的配置。gRPC 就是採用 HTTP2 來作爲其基礎通信模式的,所以默認的 gRPC 客戶端都是長連接。

有這麼一種場景,需要客戶端和服務端保持持久的長連接,即無論服務端、客戶端異常斷開或重啓,長連接都要具備重試保活(當然前提是兩方重啓都成功)的需求。在 gRPC 中,對於已經建立的長連接,服務端異常重啓之後,客戶端一般會收到如下錯誤:

rpc error: code = Unavailable desc = transport is closing

大部分的 gRPC 客戶端封裝都沒有很好的處理這類 case,參見 Warden 關於 Server 端服務重啓後 Client 連接斷開之後的重試問題 [1],對於這種錯誤,推薦有兩種處理方法:

  1. 重試:在客戶端調用失敗時,選擇以指數退避(Exponential Backoff )來優雅進行重試

  2. 增加 keepalive 的保活策略

  3. 增加重連(auto reconnect)策略

這篇文章就來分析下如何實現這樣的客戶端保活(keepalive)邏輯。提到保活機制,我們先看下 gRPC 的 keepalive 機制 [2]。

0x01 HTTP2 的 GOAWAY 幀

HTTP2 使用 GOAWAY 幀信號來控制連接關閉,GOAWAY 用於啓動連接關閉或發出嚴重錯誤狀態信號。GOAWAY 語義爲允許端點正常停止接受新的流,同時仍然完成對先前建立的流的處理,當 client 收到這個包之後就會主動關閉連接。下次需要發送數據時,就會重新建立連接。GOAWAY 是實現 grpc.gracefulStop 機制的重要保證。

0x02 gRPC 客戶端 keepalive

gRPC 客戶端提供 keepalive 配置如下:

var kacp = keepalive.ClientParameters{
 Time:                10 * time.Second, // send pings every 10 seconds if there is no activity
 Timeout:             time.Second,      // wait 1 second for ping ack before considering the connection dead
 PermitWithoutStream: true,             // send pings even without active streams
}
//Dial 中傳入 keepalive 配置
conn, err := grpc.Dial(*addr, grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp))

keepalive.ClientParameters 參數的含義如下:

聯想到,在項目中 ssh 客戶端 [3] 和 mysql 客戶端中都有着類似的實現,即單獨開啓協程來實現 keepalive:如下面的代碼(以 ssh 爲例):

go func() {
    t := time.NewTicker(2 * time.Second)
    defer t.Stop()
    for range t.C {
        _, _, err := client.Conn.SendRequest("keepalive@golang.org", true, nil)
        if err != nil {
            return
        }
    }
}()

gPRC 的實現

在 grpc-go 的 newHTTP2Client[4] 方法中,有下面的邏輯:即在新建一個 HTTP2Client 的時候會啓動一個 goroutine 來處理 keepalive

// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
    ...
 if t.keepaliveEnabled {
  t.kpDormancyCond = sync.NewCond(&t.mu)
  go t.keepalive()
    }
    ...
}

接下來,看下 keepalive 方法 [5] 的實現:

func (t *http2Client) keepalive() {
 p := &ping{data: [8]byte{}} //ping 的內容
 timer := time.NewTimer(t.kp.Time) // 啓動一個定時器, 觸發時間爲配置的 Time 值
 //for loop
 for {
  select {
  // 定時器觸發
  case <-timer.C:
   if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
    timer.Reset(t.kp.Time)
    continue
   }
   // Check if keepalive should go dormant.
   t.mu.Lock()
   if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
    // Make awakenKeepalive writable.
    <-t.awakenKeepalive
    t.mu.Unlock()
    select {
    case <-t.awakenKeepalive:
     // If the control gets here a ping has been sent
     // need to reset the timer with keepalive.Timeout.
    case <-t.ctx.Done():
     return
    }
   } else {
    t.mu.Unlock()
    if channelz.IsOn() {
     atomic.AddInt64(&t.czData.kpCount, 1)
    }
    // Send ping.
    t.controlBuf.put(p)
   }

   // By the time control gets here a ping has been sent one way or the other.
   timer.Reset(t.kp.Timeout)
   select {
   case <-timer.C:
    if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
     timer.Reset(t.kp.Time)
     continue
    }
    t.Close()
    return
   case <-t.ctx.Done():
    if !timer.Stop() {
     <-timer.C
    }
    return
   }
  // 上層通知 context 結束
  case <-t.ctx.Done():
   if !timer.Stop() {
    // 返回 false,表示 timer 未被銷燬
    <-timer.C
   }
   return
  }
 }
}

從客戶端的 keepalive 實現中梳理下執行邏輯:

  1. 填充 ping 包內容, 爲 [8]byte{},創建定時器, 觸發時間爲用戶配置中的 Time

  2. 循環處理,select 的兩大分支,一爲定時器觸發後執行的邏輯,另一分支爲 t.ctx.Done(),即 keepalive 的上層應用調用了 cancel 結束 context 子樹

  3. 核心邏輯在定時器觸發的過程中

0x03 gRPC 服務端的 keepalive

gRPC 的服務端主要有兩塊邏輯:

  1. 接收並相應客戶端的 ping 包

  2. 單獨啓動 goroutine 探測客戶端是否存活

gRPC 服務端提供 keepalive 配置,分爲兩部分 keepalive.EnforcementPolicykeepalive.ServerParameters,如下:

var kaep = keepalive.EnforcementPolicy{
 MinTime:             5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
 PermitWithoutStream: true,            // Allow pings even when there are no active streams
}

var kasp = keepalive.ServerParameters{
 MaxConnectionIdle:     15 * time.Second, // If a client is idle for 15 seconds, send a GOAWAY
 MaxConnectionAge:      30 * time.Second, // If any connection is alive for more than 30 seconds, send a GOAWAY
 MaxConnectionAgeGrace: 5 * time.Second,  // Allow 5 seconds for pending RPCs to complete before forcibly closing connections
 Time:                  5 * time.Second,  // Ping the client if it is idle for 5 seconds to ensure the connection is still active
 Timeout:               1 * time.Second,  // Wait 1 second for the ping ack before assuming the connection is dead
}

func main(){
 ...
 s := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp))
 ...
}

keepalive.EnforcementPolicy

keepalive.ServerParameters

gRPC 的實現

服務端處理客戶端的 ping 包的 response 的邏輯在 handlePing 方法 [6] 中。handlePing 方法會判斷是否違反兩條 policy, 如果違反則將 pingStrikes++, 當違反次數大於 maxPingStrikes(2) 時, 打印一條錯誤日誌並且發送一個 goAway 包,斷開這個連接,具體實現如下:

func (t *http2Server) handlePing(f *http2.PingFrame) {
 if f.IsAck() {
  if f.Data == goAwayPing.data && t.drainChan != nil {
   close(t.drainChan)
   return
  }
  // Maybe it's a BDP ping.
  if t.bdpEst != nil {
   t.bdpEst.calculate(f.Data)
  }
  return
 }
 pingAck := &ping{ack: true}
 copy(pingAck.data[:], f.Data[:])
 t.controlBuf.put(pingAck)

 now := time.Now()
 defer func() {
  t.lastPingAt = now
 }()
 // A reset ping strikes means that we don't need to check for policy
 // violation for this ping and the pingStrikes counter should be set
 // to 0.
 if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
  t.pingStrikes = 0
  return
 }
 t.mu.Lock()
 ns := len(t.activeStreams)
 t.mu.Unlock()
 if ns < 1 && !t.kep.PermitWithoutStream {
  // Keepalive shouldn't be active thus, this new ping should
  // have come after at least defaultPingTimeout.
  if t.lastPingAt.Add(defaultPingTimeout).After(now) {
   t.pingStrikes++
  }
 } else {
  // Check if keepalive policy is respected.
  if t.lastPingAt.Add(t.kep.MinTime).After(now) {
   t.pingStrikes++
  }
 }

 if t.pingStrikes > maxPingStrikes {
  // Send goaway and close the connection.
  if logger.V(logLevel) {
   logger.Errorf("transport: Got too many pings from the client, closing the connection.")
  }
  t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
 }
}

注意,對 pingStrikes 累加的邏輯:

func (t *http2Server) handlePing(f *http2.PingFrame) {
 ...
 if ns < 1 && !t.kep.PermitWithoutStream {
  // Keepalive shouldn't be active thus, this new ping should
  // have come after at least defaultPingTimeout.
  if t.lastPingAt.Add(defaultPingTimeout).After(now) {
   t.pingStrikes++
  }
 } else {
  // Check if keepalive policy is respected.
  if t.lastPingAt.Add(t.kep.MinTime).After(now) {
   t.pingStrikes++
  }
 }
 if t.pingStrikes > maxPingStrikes {
  // Send goaway and close the connection.
  errorf("transport: Got too many pings from the client, closing the connection.")
  t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
 }
}

keepalive 相關代碼

gRPC 服務端新建一個 HTTP2 server 的時候會啓動一個單獨的 goroutine 處理 keepalive 邏輯,newHTTP2Server 方法 [7]:

func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
 ...
 go t.keepalive()
 ...
}

簡單分析下 keepalive 的實現,核心邏輯是啓動 3 個定時器,分別爲 maxIdlemaxAgekeepAlive,然後在 for select 中處理相關定時器觸發事件:

func (t *http2Server) keepalive() {
 p := &ping{}
 var pingSent bool
 maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
 maxAge := time.NewTimer(t.kp.MaxConnectionAge)
 keepalive := time.NewTimer(t.kp.Time)
 // NOTE: All exit paths of this function should reset their
 // respective timers. A failure to do so will cause the
 // following clean-up to deadlock and eventually leak.
 defer func() {
  // 退出前,完成定時器的回收工作
  if !maxIdle.Stop() {
   <-maxIdle.C
  }
  if !maxAge.Stop() {
   <-maxAge.C
  }
  if !keepalive.Stop() {
   <-keepalive.C
  }
 }()
 for {
  select {
  case <-maxIdle.C:
   t.mu.Lock()
   idle := t.idle
   if idle.IsZero() { // The connection is non-idle.
    t.mu.Unlock()
    maxIdle.Reset(t.kp.MaxConnectionIdle)
    continue
   }
   val := t.kp.MaxConnectionIdle - time.Since(idle)
   t.mu.Unlock()
   if val <= 0 {
    // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
    // Gracefully close the connection.
    t.drain(http2.ErrCodeNo, []byte{})
    // Resetting the timer so that the clean-up doesn't deadlock.
    maxIdle.Reset(infinity)
    return
   }
   maxIdle.Reset(val)
  case <-maxAge.C:
   t.drain(http2.ErrCodeNo, []byte{})
   maxAge.Reset(t.kp.MaxConnectionAgeGrace)
   select {
   case <-maxAge.C:
    // Close the connection after grace period.
    t.Close()
    // Resetting the timer so that the clean-up doesn't deadlock.
    maxAge.Reset(infinity)
   case <-t.ctx.Done():
   }
   return
  case <-keepalive.C:
   if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
    pingSent = false
    keepalive.Reset(t.kp.Time)
    continue
   }
   if pingSent {
    t.Close()
    // Resetting the timer so that the clean-up doesn't deadlock.
    keepalive.Reset(infinity)
    return
   }
   pingSent = true
   if channelz.IsOn() {
    atomic.AddInt64(&t.czData.kpCount, 1)
   }
   t.controlBuf.put(p)
   keepalive.Reset(t.kp.Timeout)
  case <-t.ctx.Done():
   return
  }
 }
}

0x04 實現健壯的長連接客戶端

官方提供了 keepalive 的實例:

0x05 參考

參考資料

[1] Warden 關於 Server 端服務重啓後 Client 連接斷開之後的重試問題: https://github.com/go-kratos/kratos/issues/177

[2] keepalive 機制: https://github.com/grpc/grpc/blob/master/doc/keepalive.md

[3] ssh 客戶端: https://pandaychen.github.io/2019/10/20/HOW-TO-BUILD-A-SSHD-WITH-GOLANG/# 客戶端 - keepalive - 機制

[4] newHTTP2Client: https://github.com/grpc/grpc-go/blob/master/internal/transport/http2_client.go#L166

[5] keepalive 方法: https://github.com/grpc/grpc-go/blob/master/internal/transport/http2_client.go#L1350

[6] handlePing 方法: https://github.com/grpc/grpc-go/blob/master/internal/transport/http2_server.go#L693

[7] newHTTP2Server 方法: https://github.com/grpc/grpc-go/blob/master/internal/transport/http2_server.go#L129

[8] 服務端: https://github.com/grpc/grpc-go/blob/master/examples/features/keepalive/server/main.go

[9] 客戶端: https://github.com/grpc/grpc-go/blob/master/examples/features/keepalive/client/main.go

[10] GRPC 開箱手冊: https://juejin.im/post/6844904096474857485

歡迎關注 Go 生態。生態君會不定期分享 Go 語言生態相關內容。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/U6WMwrowutsvfvrYLz0wOQ