gRPC 客戶端長連接機制實現及 keepalive 分析
0x00 前言
HTTP2
是一個全雙工的流式協議, 服務端也可以主動 ping
客戶端, 且服務端還會有一些檢測連接可用性和控制客戶端 ping
包頻率的配置。gRPC 就是採用 HTTP2
來作爲其基礎通信模式的,所以默認的 gRPC 客戶端都是長連接。
有這麼一種場景,需要客戶端和服務端保持持久的長連接,即無論服務端、客戶端異常斷開或重啓,長連接都要具備重試保活(當然前提是兩方重啓都成功)的需求。在 gRPC 中,對於已經建立的長連接,服務端異常重啓之後,客戶端一般會收到如下錯誤:
rpc error: code = Unavailable desc = transport is closing
大部分的 gRPC 客戶端封裝都沒有很好的處理這類 case,參見 Warden 關於 Server 端服務重啓後 Client 連接斷開之後的重試問題 [1],對於這種錯誤,推薦有兩種處理方法:
-
重試:在客戶端調用失敗時,選擇以指數退避(Exponential Backoff )來優雅進行重試
-
增加 keepalive 的保活策略
-
增加重連(auto reconnect)策略
這篇文章就來分析下如何實現這樣的客戶端保活(keepalive)邏輯。提到保活機制,我們先看下 gRPC 的 keepalive 機制 [2]。
0x01 HTTP2 的 GOAWAY 幀
HTTP2
使用 GOAWAY 幀信號來控制連接關閉,GOAWAY 用於啓動連接關閉或發出嚴重錯誤狀態信號。GOAWAY 語義爲允許端點正常停止接受新的流,同時仍然完成對先前建立的流的處理,當 client 收到這個包之後就會主動關閉連接。下次需要發送數據時,就會重新建立連接。GOAWAY 是實現 grpc.gracefulStop
機制的重要保證。
0x02 gRPC 客戶端 keepalive
gRPC 客戶端提供 keepalive 配置如下:
var kacp = keepalive.ClientParameters{
Time: 10 * time.Second, // send pings every 10 seconds if there is no activity
Timeout: time.Second, // wait 1 second for ping ack before considering the connection dead
PermitWithoutStream: true, // send pings even without active streams
}
//Dial 中傳入 keepalive 配置
conn, err := grpc.Dial(*addr, grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp))
keepalive.ClientParameters
參數的含義如下:
-
Time
:如果沒有 activity, 則每隔10s
發送一個 ping 包 -
Timeout
:如果 ping ack 1s 之內未返回則認爲連接已斷開 -
PermitWithoutStream
:如果沒有 active 的 stream, 是否允許發送 ping
聯想到,在項目中 ssh
客戶端 [3] 和 mysql
客戶端中都有着類似的實現,即單獨開啓協程來實現 keepalive:如下面的代碼(以 ssh
爲例):
go func() {
t := time.NewTicker(2 * time.Second)
defer t.Stop()
for range t.C {
_, _, err := client.Conn.SendRequest("keepalive@golang.org", true, nil)
if err != nil {
return
}
}
}()
gPRC 的實現
在 grpc-go 的 newHTTP2Client[4] 方法中,有下面的邏輯:即在新建一個 HTTP2Client
的時候會啓動一個 goroutine
來處理 keepalive
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
...
if t.keepaliveEnabled {
t.kpDormancyCond = sync.NewCond(&t.mu)
go t.keepalive()
}
...
}
接下來,看下 keepalive
方法 [5] 的實現:
func (t *http2Client) keepalive() {
p := &ping{data: [8]byte{}} //ping 的內容
timer := time.NewTimer(t.kp.Time) // 啓動一個定時器, 觸發時間爲配置的 Time 值
//for loop
for {
select {
// 定時器觸發
case <-timer.C:
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
timer.Reset(t.kp.Time)
continue
}
// Check if keepalive should go dormant.
t.mu.Lock()
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
// Make awakenKeepalive writable.
<-t.awakenKeepalive
t.mu.Unlock()
select {
case <-t.awakenKeepalive:
// If the control gets here a ping has been sent
// need to reset the timer with keepalive.Timeout.
case <-t.ctx.Done():
return
}
} else {
t.mu.Unlock()
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
}
// Send ping.
t.controlBuf.put(p)
}
// By the time control gets here a ping has been sent one way or the other.
timer.Reset(t.kp.Timeout)
select {
case <-timer.C:
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
timer.Reset(t.kp.Time)
continue
}
t.Close()
return
case <-t.ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
}
// 上層通知 context 結束
case <-t.ctx.Done():
if !timer.Stop() {
// 返回 false,表示 timer 未被銷燬
<-timer.C
}
return
}
}
}
從客戶端的 keepalive
實現中梳理下執行邏輯:
-
填充
ping
包內容, 爲[8]byte{}
,創建定時器, 觸發時間爲用戶配置中的Time
-
循環處理,select 的兩大分支,一爲定時器觸發後執行的邏輯,另一分支爲
t.ctx.Done()
,即keepalive
的上層應用調用了cancel
結束 context 子樹 -
核心邏輯在定時器觸發的過程中
0x03 gRPC 服務端的 keepalive
gRPC 的服務端主要有兩塊邏輯:
-
接收並相應客戶端的 ping 包
-
單獨啓動 goroutine 探測客戶端是否存活
gRPC 服務端提供 keepalive 配置,分爲兩部分 keepalive.EnforcementPolicy
和 keepalive.ServerParameters
,如下:
var kaep = keepalive.EnforcementPolicy{
MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
PermitWithoutStream: true, // Allow pings even when there are no active streams
}
var kasp = keepalive.ServerParameters{
MaxConnectionIdle: 15 * time.Second, // If a client is idle for 15 seconds, send a GOAWAY
MaxConnectionAge: 30 * time.Second, // If any connection is alive for more than 30 seconds, send a GOAWAY
MaxConnectionAgeGrace: 5 * time.Second, // Allow 5 seconds for pending RPCs to complete before forcibly closing connections
Time: 5 * time.Second, // Ping the client if it is idle for 5 seconds to ensure the connection is still active
Timeout: 1 * time.Second, // Wait 1 second for the ping ack before assuming the connection is dead
}
func main(){
...
s := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp))
...
}
keepalive.EnforcementPolicy
:
-
MinTime
:如果客戶端兩次 ping 的間隔小於5s
,則關閉連接 -
PermitWithoutStream
:即使沒有 active stream, 也允許 ping
keepalive.ServerParameters
:
-
MaxConnectionIdle
:如果一個 client 空閒超過15s
, 發送一個 GOAWAY, 爲了防止同一時間發送大量 GOAWAY, 會在15s
時間間隔上下浮動15*10%
, 即15+1.5
或者15-1.5
-
MaxConnectionAge
:如果任意連接存活時間超過30s
, 發送一個 GOAWAY -
MaxConnectionAgeGrace
:在強制關閉連接之間, 允許有5s
的時間完成 pending 的 rpc 請求 -
Time
:如果一個 client 空閒超過5s
, 則發送一個 ping 請求 -
Timeout
:如果 ping 請求1s
內未收到回覆, 則認爲該連接已斷開
gRPC 的實現
服務端處理客戶端的 ping
包的 response 的邏輯在 handlePing
方法 [6] 中。handlePing
方法會判斷是否違反兩條 policy, 如果違反則將 pingStrikes++
, 當違反次數大於 maxPingStrikes(2)
時, 打印一條錯誤日誌並且發送一個 goAway 包,斷開這個連接,具體實現如下:
func (t *http2Server) handlePing(f *http2.PingFrame) {
if f.IsAck() {
if f.Data == goAwayPing.data && t.drainChan != nil {
close(t.drainChan)
return
}
// Maybe it's a BDP ping.
if t.bdpEst != nil {
t.bdpEst.calculate(f.Data)
}
return
}
pingAck := &ping{ack: true}
copy(pingAck.data[:], f.Data[:])
t.controlBuf.put(pingAck)
now := time.Now()
defer func() {
t.lastPingAt = now
}()
// A reset ping strikes means that we don't need to check for policy
// violation for this ping and the pingStrikes counter should be set
// to 0.
if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
t.pingStrikes = 0
return
}
t.mu.Lock()
ns := len(t.activeStreams)
t.mu.Unlock()
if ns < 1 && !t.kep.PermitWithoutStream {
// Keepalive shouldn't be active thus, this new ping should
// have come after at least defaultPingTimeout.
if t.lastPingAt.Add(defaultPingTimeout).After(now) {
t.pingStrikes++
}
} else {
// Check if keepalive policy is respected.
if t.lastPingAt.Add(t.kep.MinTime).After(now) {
t.pingStrikes++
}
}
if t.pingStrikes > maxPingStrikes {
// Send goaway and close the connection.
if logger.V(logLevel) {
logger.Errorf("transport: Got too many pings from the client, closing the connection.")
}
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
}
}
注意,對 pingStrikes
累加的邏輯:
-
t.lastPingAt.Add(defaultPingTimeout).After(now)
: -
t.lastPingAt.Add(t.kep.MinTime).After(now)
:
func (t *http2Server) handlePing(f *http2.PingFrame) {
...
if ns < 1 && !t.kep.PermitWithoutStream {
// Keepalive shouldn't be active thus, this new ping should
// have come after at least defaultPingTimeout.
if t.lastPingAt.Add(defaultPingTimeout).After(now) {
t.pingStrikes++
}
} else {
// Check if keepalive policy is respected.
if t.lastPingAt.Add(t.kep.MinTime).After(now) {
t.pingStrikes++
}
}
if t.pingStrikes > maxPingStrikes {
// Send goaway and close the connection.
errorf("transport: Got too many pings from the client, closing the connection.")
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
}
}
keepalive 相關代碼
gRPC 服務端新建一個 HTTP2 server 的時候會啓動一個單獨的 goroutine 處理 keepalive 邏輯,newHTTP2Server
方法 [7]:
func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
...
go t.keepalive()
...
}
簡單分析下 keepalive
的實現,核心邏輯是啓動 3
個定時器,分別爲 maxIdle
、maxAge
和 keepAlive
,然後在 for select
中處理相關定時器觸發事件:
-
maxIdle
邏輯:判斷 client 空閒時間是否超出配置的時間, 如果超時, 則調用t.drain
, 該方法會發送一個 GOAWAY 包 -
maxAge
邏輯:觸發之後首先調用t.drain
發送 GOAWAY 包, 接着重置定時器, 時間設置爲MaxConnectionAgeGrace
, 再次觸發後調用t.Close()
直接關閉(有些 graceful 的意味) -
keepalive
邏輯:首先判斷 activity 是否爲1
, 如果不是則置pingSent
爲true
, 並且發送 ping 包, 接着重置定時器時間爲Timeout
, 再次觸發後如果 activity 不爲 1(即未收到 ping 的回覆) 並且pingSent
爲true
, 則調用t.Close()
關閉連接
func (t *http2Server) keepalive() {
p := &ping{}
var pingSent bool
maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
maxAge := time.NewTimer(t.kp.MaxConnectionAge)
keepalive := time.NewTimer(t.kp.Time)
// NOTE: All exit paths of this function should reset their
// respective timers. A failure to do so will cause the
// following clean-up to deadlock and eventually leak.
defer func() {
// 退出前,完成定時器的回收工作
if !maxIdle.Stop() {
<-maxIdle.C
}
if !maxAge.Stop() {
<-maxAge.C
}
if !keepalive.Stop() {
<-keepalive.C
}
}()
for {
select {
case <-maxIdle.C:
t.mu.Lock()
idle := t.idle
if idle.IsZero() { // The connection is non-idle.
t.mu.Unlock()
maxIdle.Reset(t.kp.MaxConnectionIdle)
continue
}
val := t.kp.MaxConnectionIdle - time.Since(idle)
t.mu.Unlock()
if val <= 0 {
// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
// Gracefully close the connection.
t.drain(http2.ErrCodeNo, []byte{})
// Resetting the timer so that the clean-up doesn't deadlock.
maxIdle.Reset(infinity)
return
}
maxIdle.Reset(val)
case <-maxAge.C:
t.drain(http2.ErrCodeNo, []byte{})
maxAge.Reset(t.kp.MaxConnectionAgeGrace)
select {
case <-maxAge.C:
// Close the connection after grace period.
t.Close()
// Resetting the timer so that the clean-up doesn't deadlock.
maxAge.Reset(infinity)
case <-t.ctx.Done():
}
return
case <-keepalive.C:
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
pingSent = false
keepalive.Reset(t.kp.Time)
continue
}
if pingSent {
t.Close()
// Resetting the timer so that the clean-up doesn't deadlock.
keepalive.Reset(infinity)
return
}
pingSent = true
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
}
t.controlBuf.put(p)
keepalive.Reset(t.kp.Timeout)
case <-t.ctx.Done():
return
}
}
}
0x04 實現健壯的長連接客戶端
官方提供了 keepalive 的實例:
-
服務端 [8]
-
客戶端 [9]
0x05 參考
- GRPC 開箱手冊 [10]
參考資料
[1] Warden 關於 Server 端服務重啓後 Client 連接斷開之後的重試問題: https://github.com/go-kratos/kratos/issues/177
[2] keepalive 機制: https://github.com/grpc/grpc/blob/master/doc/keepalive.md
[3] ssh
客戶端: https://pandaychen.github.io/2019/10/20/HOW-TO-BUILD-A-SSHD-WITH-GOLANG/# 客戶端 - keepalive - 機制
[4] newHTTP2Client: https://github.com/grpc/grpc-go/blob/master/internal/transport/http2_client.go#L166
[5] keepalive
方法: https://github.com/grpc/grpc-go/blob/master/internal/transport/http2_client.go#L1350
[6] handlePing
方法: https://github.com/grpc/grpc-go/blob/master/internal/transport/http2_server.go#L693
[7] newHTTP2Server
方法: https://github.com/grpc/grpc-go/blob/master/internal/transport/http2_server.go#L129
[8] 服務端: https://github.com/grpc/grpc-go/blob/master/examples/features/keepalive/server/main.go
[9] 客戶端: https://github.com/grpc/grpc-go/blob/master/examples/features/keepalive/client/main.go
[10] GRPC 開箱手冊: https://juejin.im/post/6844904096474857485
歡迎關注 Go 生態。生態君會不定期分享 Go 語言生態相關內容。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/U6WMwrowutsvfvrYLz0wOQ