萬字長文 - 從實踐到原理,帶你參透 gRPC

大家好,我是煎魚。

gRPC 在 Go 語言中大放異彩,越來越多的小夥伴在使用,最近也在公司安利了一波,希望這一篇文章能帶你一覽 gRPC 的巧妙之處,本文篇幅比較長,請做好閱讀準備。

本文目錄如下:

簡述

gRPC 是一個高性能、開源和通用的 RPC 框架,面向移動和 HTTP/2 設計。目前提供 C、Java 和 Go 語言版本,分別是:grpc, grpc-java, grpc-go. 其中 C 版本支持 C, C++, Node.js, Python, Ruby, Objective-C, PHP 和 C# 支持。

gRPC 基於 HTTP/2 標準設計,帶來諸如雙向流、流控、頭部壓縮、單 TCP 連接上的多複用請求等特性。這些特性使得其在移動設備上表現更好,更省電和節省空間佔用。

調用模型

1、客戶端(gRPC Stub)調用 A 方法,發起 RPC 調用。

2、對請求信息使用 Protobuf 進行對象序列化壓縮(IDL)。

3、服務端(gRPC Server)接收到請求後,解碼請求體,進行業務邏輯處理並返回。

4、對響應結果使用 Protobuf 進行對象序列化壓縮(IDL)。

5、客戶端接受到服務端響應,解碼請求體。回調被調用的 A 方法,喚醒正在等待響應(阻塞)的客戶端調用並返回響應結果。

調用方式

一、Unary RPC:一元 RPC

Server

type SearchService struct{}

func (s *SearchService) Search(ctx context.Context, r *pb.SearchRequest) (*pb.SearchResponse, error) {
    return &pb.SearchResponse{Response: r.GetRequest() + " Server"}, nil
}

const PORT = "9001"

func main() {
    server := grpc.NewServer()
    pb.RegisterSearchServiceServer(server, &SearchService{})

    lis, err := net.Listen("tcp"":"+PORT)
    ...

    server.Serve(lis)
}

Client

func main() {
    conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure())
    ...
    defer conn.Close()

    client := pb.NewSearchServiceClient(conn)
    resp, err := client.Search(context.Background()&pb.SearchRequest{
        Request: "gRPC",
    })
    ...
}

二、Server-side streaming RPC:服務端流式 RPC

Server

func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error {
    for n := 0; n <= 6; n++ {
        stream.Send(&pb.StreamResponse{
            Pt: &pb.StreamPoint{
                ...
            },
        })
    }

    return nil
}

Client

func printLists(client pb.StreamServiceClient, r *pb.StreamRequest) error {
    stream, err := client.List(context.Background(), r)
    ...

    for {
        resp, err := stream.Recv()
        if err == io.EOF {
            break
        }
        ...
    }

    return nil
}

三、Client-side streaming RPC:客戶端流式 RPC

Server

func (s *StreamService) Record(stream pb.StreamService_RecordServer) error {
    for {
        r, err := stream.Recv()
        if err == io.EOF {
            return stream.SendAndClose(&pb.StreamResponse{Pt: &pb.StreamPoint{...}})
        }
        ...

    }

    return nil
}

Client

func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error {
    stream, err := client.Record(context.Background())
    ...

    for n := 0; n < 6; n++ {
        stream.Send(r)
    }

    resp, err := stream.CloseAndRecv()
    ...

    return nil
}

四、Bidirectional streaming RPC:雙向流式 RPC

Server

func (s *StreamService) Route(stream pb.StreamService_RouteServer) error {
    for {
        stream.Send(&pb.StreamResponse{...})
        r, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        ...
    }

    return nil
}

Client

func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error {
    stream, err := client.Route(context.Background())
    ...

    for n := 0; n <= 6; n++ {
        stream.Send(r)
        resp, err := stream.Recv()
        if err == io.EOF {
            break
        }
        ...
    }

    stream.CloseSend()

    return nil
}

客戶端與服務端是如何交互的

在開始分析之前,我們要先 gRPC 的調用有一個初始印象。那麼最簡單的就是對 Client 端調用 Server 端進行抓包去剖析,看看整個過程中它都做了些什麼事。如下圖:

我們略加整理發現共有十二個行爲,是比較重要的。在開始分析之前,建議你自己先想一下,它們的作用都是什麼?大膽猜測一下,帶着疑問去學習效果更佳。

行爲分析

Magic

Magic 幀的主要作用是建立 HTTP/2 請求的前言。在 HTTP/2 中,要求兩端都要發送一個連接前言,作爲對所使用協議的最終確認,並確定 HTTP/2 連接的初始設置,客戶端和服務端各自發送不同的連接前言。

而上圖中的 Magic 幀是客戶端的前言之一,內容爲 PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n,以確定啓用 HTTP/2 連接。

SETTINGS

SETTINGS 幀的主要作用是設置這一個連接的參數,作用域是整個連接而並非單一的流。

而上圖的 SETTINGS 幀都是空 SETTINGS 幀,圖一是客戶端連接的前言(Magic 和 SETTINGS 幀分別組成連接前言)。圖二是服務端的。另外我們從圖中可以看到多個 SETTINGS 幀,這是爲什麼呢?是因爲發送完連接前言後,客戶端和服務端還需要有一步互動確認的動作。對應的就是帶有 ACK 標識 SETTINGS 幀。

HEADERS

HEADERS 幀的主要作用是存儲和傳播 HTTP 的標頭信息。我們關注到 HEADERS 裏有一些眼熟的信息,分別如下:

你會發現這些東西非常眼熟,其實都是 gRPC 的基礎屬性,實際上遠遠不止這些,只是設置了多少展示多少。例如像平時常見的 grpc-timeoutgrpc-encoding 也是在這裏設置的。

DATA

DATA 幀的主要作用是裝填主體信息,是數據幀。而在上圖中,可以很明顯看到我們的請求參數 gRPC 存儲在裏面。只需要瞭解到這一點就可以了。

HEADERS, DATA, HEADERS

在上圖中 HEADERS 幀比較簡單,就是告訴我們 HTTP 響應狀態和響應的內容格式。

在上圖中 DATA 幀主要承載了響應結果的數據集,圖中的 gRPC Server 就是我們 RPC 方法的響應結果。

在上圖中 HEADERS 幀主要承載了 gRPC 狀態 和 gRPC 狀態消息,圖中的 grpc-statusgrpc-message 就是我們的 gRPC 調用狀態的結果。

其它步驟

WINDOW_UPDATE

主要作用是管理和流的窗口控制。通常情況下打開一個連接後,服務器和客戶端會立即交換 SETTINGS 幀來確定流控制窗口的大小。默認情況下,該大小設置爲約 65 KB,但可通過發出一個 WINDOW_UPDATE 幀爲流控制設置不同的大小。

PING/PONG

主要作用是判斷當前連接是否仍然可用,也常用於計算往返時間。其實也就是 PING/PONG,大家對此應該很熟。

小結

這塊 gRPC 的基礎使用,你可以看看我另外的 《gRPC 入門系列》,相信對你一定有幫助。

淺談理解

服務端

爲什麼四行代碼,就能夠起一個 gRPC Server,內部做了什麼邏輯。你有想過嗎?接下來我們一步步剖析,看看裏面到底是何方神聖。

一、初始化

// grpc.NewServer()
func NewServer(opt ...ServerOption) *Server {
 opts := defaultServerOptions
 for _, o := range opt {
  o(&opts)
 }
 s := &Server{
  lis:    make(map[net.Listener]bool),
  opts:   opts,
  conns:  make(map[io.Closer]bool),
  m:      make(map[string]*service),
  quit:   make(chan struct{}),
  done:   make(chan struct{}),
  czData: new(channelzData),
 }
 s.cv = sync.NewCond(&s.mu)
 ...

 return s
}

這塊比較簡單,主要是實例 grpc.Server 並進行初始化動作。涉及如下:

二、註冊

pb.RegisterSearchServiceServer(server, &SearchService{})

步驟一:Service API interface

// search.pb.go
type SearchServiceServer interface {
 Search(context.Context, *SearchRequest) (*SearchResponse, error)
}

func RegisterSearchServiceServer(s *grpc.Server, srv SearchServiceServer) {
 s.RegisterService(&_SearchService_serviceDesc, srv)
}

還記得我們平時編寫的 Protobuf 嗎?在生成出來的 .pb.go 文件中,會定義出 Service APIs interface 的具體實現約束。而我們在 gRPC Server 進行註冊時,會傳入應用 Service 的功能接口實現,此時生成的 RegisterServer 方法就會保證兩者之間的一致性。

步驟二:Service API IDL

你想亂傳糊弄一下?不可能的,請乖乖定義與 Protobuf 一致的接口方法。但是那個 &_SearchService_serviceDesc 又有什麼作用呢?代碼如下:

// search.pb.go
var _SearchService_serviceDesc = grpc.ServiceDesc{
 ServiceName: "proto.SearchService",
 HandlerType: (*SearchServiceServer)(nil),
 Methods: []grpc.MethodDesc{
  {
   MethodName: "Search",
   Handler:    _SearchService_Search_Handler,
  },
 },
 Streams:  []grpc.StreamDesc{},
 Metadata: "search.proto",
}

這看上去像服務的描述代碼,用來向內部表述 “我” 都有什麼。涉及如下:

步驟三:Register Service

func (s *Server) register(sd *ServiceDesc, ss interface{}) {
    ...
 srv := &service{
  server: ss,
  md:     make(map[string]*MethodDesc),
  sd:     make(map[string]*StreamDesc),
  mdata:  sd.Metadata,
 }
 for i := range sd.Methods {
  d := &sd.Methods[i]
  srv.md[d.MethodName] = d
 }
 for i := range sd.Streams {
  ...
 }
 s.m[sd.ServiceName] = srv
}

在最後一步中,我們會將先前的服務接口信息、服務描述信息給註冊到內部 service 去,以便於後續實際調用的使用。涉及如下:

小結

在這一章節中,主要介紹的是 gRPC Server 在啓動前的整理和註冊行爲,看上去很簡單,但其實一切都是爲了後續的實際運行的預先準備。因此我們整理一下思路,將其串聯起來看看,如下:

三、監聽

接下來到了整個流程中,最重要也是大家最關注的監聽 / 處理階段,核心代碼如下:

func (s *Server) Serve(lis net.Listener) error {
 ...
 var tempDelay time.Duration
 for {
  rawConn, err := lis.Accept()
  if err != nil {
   if ne, ok := err.(interface {
    Temporary() bool
   }); ok && ne.Temporary() {
    if tempDelay == 0 {
     tempDelay = 5 * time.Millisecond
    } else {
     tempDelay *= 2
    }
    if max := 1 * time.Second; tempDelay > max {
     tempDelay = max
    }
    ...
    timer := time.NewTimer(tempDelay)
    select {
    case <-timer.C:
    case <-s.quit:
     timer.Stop()
     return nil
    }
    continue
   }
   ...
   return err
  }
  tempDelay = 0

  s.serveWG.Add(1)
  go func() {
   s.handleRawConn(rawConn)
   s.serveWG.Done()
  }()
 }
}

Serve 會根據外部傳入的 Listener 不同而調用不同的監聽模式,這也是 net.Listener 的魅力,靈活性和擴展性會比較高。而在 gRPC Server 中最常用的就是 TCPConn,基於 TCP Listener 去做。接下來我們一起看看具體的處理邏輯,如下:

客戶端

一、創建撥號連接

// grpc.Dial(":"+PORT, grpc.WithInsecure())
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
 cc := &ClientConn{
  target:            target,
  csMgr:             &connectivityStateManager{},
  conns:             make(map[*addrConn]struct{}),
  dopts:             defaultDialOptions(),
  blockingpicker:    newPickerWrapper(),
  czData:            new(channelzData),
  firstResolveEvent: grpcsync.NewEvent(),
 }
 ...
 chainUnaryClientInterceptors(cc)
 chainStreamClientInterceptors(cc)

 ...
}

grpc.Dial 方法實際上是對於 grpc.DialContext 的封裝,區別在於 ctx 是直接傳入 context.Background。其主要功能是創建與給定目標的客戶端連接,其承擔了以下職責:

連沒連

之前聽到有的人說調用 grpc.Dial 後客戶端就已經與服務端建立起了連接,但這對不對呢?我們先鳥瞰全貌,看看正在跑的 goroutine。如下:

我們可以有幾個核心方法一直在等待 / 處理信號,通過分析底層源碼可得知。涉及如下:

func (ac *addrConn) connect()
func (ac *addrConn) resetTransport()
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time)
func (ac *addrConn) getReadyTransport()

在這裏主要分析 goroutine 提示的 resetTransport 方法,看看都做了啥。核心代碼如下:

func (ac *addrConn) resetTransport() {
 for i := 0; ; i++ {
  if ac.state == connectivity.Shutdown {
   return
  }
  ...
  connectDeadline := time.Now().Add(dialDuration)
  ac.updateConnectivityState(connectivity.Connecting)
  newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
  if err != nil {
   if ac.state == connectivity.Shutdown {
    return
   }
   ac.updateConnectivityState(connectivity.TransientFailure)
   timer := time.NewTimer(backoffFor)
   select {
   case <-timer.C:
    ...
   }
   continue
  }

  if ac.state == connectivity.Shutdown {
   newTr.Close()
   return
  }
  ...
  if !healthcheckManagingState {
   ac.updateConnectivityState(connectivity.Ready)
  }
  ...

  if ac.state == connectivity.Shutdown {
   return
  }
  ac.updateConnectivityState(connectivity.TransientFailure)
 }
}

在該方法中會不斷地去嘗試創建連接,若成功則結束。否則不斷地根據 Backoff 算法的重試機制去嘗試創建連接,直到成功爲止。從結論上來講,單純調用 DialContext 是異步建立連接的,也就是並不是馬上生效,處於 Connecting 狀態,而正式下要到達 Ready 狀態纔可用。

真的連了嗎

在抓包工具上提示一個包都沒有,那麼這算真正連接了嗎?我認爲這是一個表述問題,我們應該儘可能的嚴謹。如果你真的想通過 DialContext 方法就打通與服務端的連接,則需要調用 WithBlock 方法,雖然會導致阻塞等待,但最終連接會到達 Ready 狀態(握手成功)。如下圖:

二、實例化 Service API

type SearchServiceClient interface {
 Search(ctx context.Context, in *SearchRequest, opts ...grpc.CallOption) (*SearchResponse, error)
}

type searchServiceClient struct {
 cc *grpc.ClientConn
}

func NewSearchServiceClient(cc *grpc.ClientConn) SearchServiceClient {
 return &searchServiceClient{cc}
}

這塊就是實例 Service API interface,比較簡單。

三、調用

// search.pb.go
func (c *searchServiceClient) Search(ctx context.Context, in *SearchRequest, opts ...grpc.CallOption) (*SearchResponse, error) {
 out := new(SearchResponse)
 err := c.cc.Invoke(ctx, "/proto.SearchService/Search", in, out, opts...)
 if err != nil {
  return nil, err
 }
 return out, nil
}

proto 生成的 RPC 方法更像是一個包裝盒,把需要的東西放進去,而實際上調用的還是 grpc.invoke 方法。如下:

func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
 cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
 if err != nil {
  return err
 }
 if err := cs.SendMsg(req); err != nil {
  return err
 }
 return cs.RecvMsg(reply)
}

通過概覽,可以關注到三塊調用。如下:

連接

// clientconn.go
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
  FullMethodName: method,
 })
 if err != nil {
  return nil, nil, toRPCErr(err)
 }
 return t, done, nil
}

newClientStream 方法中,我們通過 getTransport 方法獲取了 Transport 層中抽象出來的 ClientTransport 和 ServerTransport,實際上就是獲取一個連接給後續 RPC 調用傳輸使用。

四、關閉連接

// conn.Close()
func (cc *ClientConn) Close() error {
 defer cc.cancel()
    ...
 cc.csMgr.updateState(connectivity.Shutdown)
    ...
 cc.blockingpicker.close()
 if rWrapper != nil {
  rWrapper.close()
 }
 if bWrapper != nil {
  bWrapper.close()
 }

 for ac := range conns {
  ac.tearDown(ErrClientConnClosing)
 }
 if channelz.IsOn() {
  ...
  channelz.AddTraceEvent(cc.channelzID, ted)
  channelz.RemoveEntry(cc.channelzID)
 }
 return nil
}

該方法會取消 ClientConn 上下文,同時關閉所有底層傳輸。涉及如下:

Q&A

1. gRPC Metadata 是通過什麼傳輸?

2. 調用 grpc.Dial 會真正的去連接服務端嗎?

會,但是是異步連接的,連接狀態爲正在連接。但如果你設置了 grpc.WithBlock 選項,就會阻塞等待(等待握手成功)。另外你需要注意,當未設置 grpc.WithBlock 時,ctx 超時控制對其無任何效果。

3. 調用 ClientConn 不 Close 會導致泄露嗎?

會,除非你的客戶端不是常駐進程,那麼在應用結束時會被動地回收資源。但如果是常駐進程,你又真的忘記執行 Close 語句,會造成的泄露。如下圖:

3.1. 客戶端

3.2. 服務端

3.3. TCP

4. 不控制超時調用的話,會出現什麼問題?

短時間內不會出現問題,但是會不斷積蓄泄露,積蓄到最後當然就是服務無法提供響應了。如下圖:

5. 爲什麼默認的攔截器不可以傳多個?

func chainUnaryClientInterceptors(cc *ClientConn) {
 interceptors := cc.dopts.chainUnaryInts
 if cc.dopts.unaryInt != nil {
  interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
 }
 var chainedInt UnaryClientInterceptor
 if len(interceptors) == 0 {
  chainedInt = nil
 } else if len(interceptors) == 1 {
  chainedInt = interceptors[0]
 } else {
  chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
   return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
  }
 }
 cc.dopts.unaryInt = chainedInt
}

當存在多個攔截器時,取的就是第一個攔截器。因此結論是允許傳多個,但並沒有用。

6. 真的需要用到多個攔截器的話,怎麼辦?

可以使用 go-grpc-middleware 提供的 grpc.UnaryInterceptorgrpc.StreamInterceptor 鏈式方法,方便快捷省心。

單單會用還不行,我們再深剖一下,看看它是怎麼實現的。核心代碼如下:

func ChainUnaryClient(interceptors ...grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {
 n := len(interceptors)
 if n > 1 {
  lastI := n - 1
  return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
   var (
    chainHandler grpc.UnaryInvoker
    curI         int
   )

   chainHandler = func(currentCtx context.Context, currentMethod string, currentReq, currentRepl interface{}, currentConn *grpc.ClientConn, currentOpts ...grpc.CallOption) error {
    if curI == lastI {
     return invoker(currentCtx, currentMethod, currentReq, currentRepl, currentConn, currentOpts...)
    }
    curI++
    err := interceptors[curI](currentCtx, currentMethod, currentReq, currentRepl, currentConn, chainHandler, currentOpts...)
    curI--
    return err
   }

   return interceptors[0](ctx, method, req, reply, cc, chainHandler, opts...)
  }
 }
    ...
}

當攔截器數量大於 1 時,從 interceptors[1] 開始遞歸,每一個遞歸的攔截器 interceptors[i] 會不斷地執行,最後才真正的去執行 handler 方法。同時也經常有人會問攔截器的執行順序是什麼,通過這段代碼你得出結論了嗎?

7. 頻繁創建 ClientConn 有什麼問題?

這個問題我們可以反向驗證一下,假設不公用 ClientConn 看看會怎麼樣?如下:

func BenchmarkSearch(b *testing.B) {
 for i := 0; i < b.N; i++ {
  conn, err := GetClientConn()
  if err != nil {
   b.Errorf("GetClientConn err: %v", err)
  }
  _, err = Search(context.Background(), conn)
  if err != nil {
   b.Errorf("Search err: %v", err)
  }
 }
}

輸出結果:

    ... connection error: desc = "transport: Error while dialing dial tcp :10001: socket: too many open files"
    ... connection error: desc = "transport: Error while dialing dial tcp :10001: socket: too many open files"
    ... connection error: desc = "transport: Error while dialing dial tcp :10001: socket: too many open files"
    ... connection error: desc = "transport: Error while dialing dial tcp :10001: socket: too many open files"
FAIL
exit status 1

當你的應用場景是存在高頻次同時生成 / 調用 ClientConn 時,可能會導致系統的文件句柄佔用過多。這種情況下你可以變更應用程序生成 / 調用 ClientConn 的模式,又或是池化它,這塊可以參考 grpc-go-pool 項目。

8. 客戶端請求失敗後會默認重試嗎?

會不斷地進行重試,直到上下文取消。而重試時間方面採用 backoff 算法作爲的重連機制,默認的最大重試時間間隔是 120s。

9. 爲什麼要用 HTTP/2 作爲傳輸協議?

許多客戶端要通過 HTTP 代理來訪問網絡,gRPC 全部用 HTTP/2 實現,等到代理開始支持 HTTP/2 就能透明轉發 gRPC 的數據。不光如此,負責負載均衡、訪問控制等等的反向代理都能無縫兼容 gRPC,比起自己設計 wire protocol 的 Thrift,這樣做科學不少。@ctiller @滕亦飛

10. 在 Kubernetes 中 gRPC 負載均衡有問題?

gRPC 的 RPC 協議是基於 HTTP/2 標準實現的,HTTP/2 的一大特性就是不需要像 HTTP/1.1 一樣,每次發出請求都要重新建立一個新連接,而是會複用原有的連接。

所以這將導致 kube-proxy 只有在連接建立時纔會做負載均衡,而在這之後的每一次 RPC 請求都會利用原本的連接,那麼實際上後續的每一次的 RPC 請求都跑到了同一個地方。

注:使用 k8s service 做負載均衡的情況下

總結

參考

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/o-K7G9ywCdmW7et6Q4WMeA