微服務 · 深入理解 gRPC - Part1

本文爲系列篇微服務的關於 深入 gRPC 的文章。本篇將會從 gRPC 的基本概念、gRPC 的使用、gRPC 的編程模型、gRPC 的編程模型的實現、gRPC 的編程模型的實現的細節等多個角度來了解。

本篇爲 深入瞭解gRPC 的下篇,篇幅原因,將這篇文章拆分成上下篇,下篇繼續更新中。

  1. 前言

gRPC 作爲一個 Google 開源的 RPC 框架,由於其優異的性能和支持多種流行語言的特點,被衆多的開發者所熟悉。我接觸 gRPC 也有至少五年的時間,但是由於種種原因,在很長時間內對 gRPC 的瞭解處於一個入門或者只是知道個大概的水平。直到大概 2~3 年前在上家公司機緣巧合的緣故,需要對部門內做一次關於 gRPC 的知識分享,而那次我花了 2 周多的時間去了解去背後的原理、實現、數據流向。那時候我記得是白班分享沒有寫 PPT,所以那時候對這些知識點有了比較深刻的理解。

然而,我上家我所在部門的業務幾乎沒有涉及到 gRPC 的開發,因此這些理解只是變成一個知道的概念,並沒有在實際開發工作中提到實際的應用。但是從那次分享後,我對 gRPC 有了一些迷戀現象,想做一些實際的 gRPC 相關項目,從實際項目中提煉自己的知識面。

到現在,我回過頭來看,已經參與了幾個基於 gRPC 通信的項目以及基於 gRPC 的微服務框架,最近也在寫一個比較完整的微服務項目,也是基於 gRPC 通信。的確從實踐中提煉到了一定的知識,自己對整體的理解也有了一定的提升。

今天想寫這篇文章的原因有兩個,其一是我前前後後對 gRPC 有了很多的交集並且也在上家極力推薦使用(但是能力不夠,沒能推廣起來),我對這塊有了一些自己的看法和觀點,但是一直沒有一個比較完整的記錄。其二是之前與大學同學做一次線上分享的時候,有人提問關於 gRPC 的性能問題(由於其基於 HTTP/2, 所以對其性能持懷疑態度),我覺得這個問題確實也是需要一個深究的問題,所以這篇文章也會提到相關內容。

因此,這篇文件將會從 gRPC 的基本概念、gRPC 的使用、gRPC 的編程模型、gRPC 的編程模型的實現、gRPC 的編程模型的實現的細節等多個角度來一一進行講解,給自己一個總結,給對這方面有疑問的同學一定的幫助。

  1. 本篇所有的示例代碼均用 Go

  2. 本篇完全以個人的理解和官方文檔爲準,若有錯誤不準之處,請幫忙支持評論一下,謝謝!

  1. gRPC 的基本概念

gRPC is a modern open source high performance Remote Procedure Call (RPC) framework that can run in any environment. It can efficiently connect services in and across data centers with pluggable support for load balancing, tracing, health checking and authentication. It is also applicable in last mile of distributed computing to connect devices, mobile applications and browsers to backend services.

簡單來說,gRPC 是一個高性能的遠程過程調用框架,可以在任何環境中運行,可以在數據中心之間高效地連接服務,並且支持負載均衡、跟蹤、健康檢查和身份驗證。它還適用於分佈式計算,將設備、移動應用和瀏覽器連接到後端服務。gRPC 是由 CNCF 孵化的項目, 目前在 GitHub 上有 43.8k 的 star 和 9.2k 的 fork。gRPC 有以下幾個核心特點:

  1. 簡單的服務定義。通過 Protocol Buffer 去定義數據結構和服務的接口 (關於 pb 更詳細的介紹請查這篇:[系列] 微服務 · 如何通過 protobuf 定義數據和服務)。

  2. 快速使用。僅通過一行代碼就進行服務註冊和遠程調用。

  3. 跨語言和平臺。gRPC 支持衆多主流語言,可以在不同語言之間無縫遠程調用且均可通過 pb 生成對應語言的相關代碼。

  4. 支持雙向流。gRPC 支持基於 HTTP/2 的雙向流,即客戶端和服務端均可以向對方讀寫流數據。

  5. 插件化。內置可插拔的負載均衡、跟蹤、健康檢查和身份驗證插件。

  6. 微服務。gRPC 非常適合微服務框架,且有衆多微服務框架均支持 gRPC。

  7. 高性能。得益於 HTTP/2 的鏈路複用能力,gRPC 可以在同一個連接上同時處理多個請求,同時得益於 pb 爲編碼出包更快更小的二進制數據包,從而提高了性能。

這些特性使得 gRPC 在微服務架構中的應用非常廣泛。以 Go 語言爲例,主流的微服務框架 go-micro, go-zero, go-kit, kratos 等都是默認支持 gRPC 的。

  1. gRPC 的使用

3.1 生成 gRPC 代碼

proto 文件定義服務後,我們通過 protoc 工具生成 gRPC 的代碼。此時需要在生成命令中添加 --go-grpc_out 參數來指定生成代碼的路徑和其他參數。以下面的簡單 proto 文件爲例:

// 爲了演示,這裏返回值定義爲空的結構
message Empty {}

// 定義服務和其方法
// 爲確保生成的代碼儘量簡單,我們只定義了兩個方法
service OrderService {
  rpc GetOrder(Empty) returns (Empty) {}
  rpc CreateOrder(Empty) returns (Empty) {}
}

我們執行 protoc --go_out=paths=source_relative:. --go-grpc_out=paths=source_relative:. proto_file 命令,生成代碼後,我們可以看到在當前目錄下會生成兩個文件,分別是 order_service.pb.goorder_service_grpc.pb.go。第一個文件包含定義的 enum, message 以及 pb 文件的信息所對應的 Go 代碼,第二個文件包含定義的 service 所對應的 Go 代碼。本篇不討論第一個文件內容。我們現在來看一下 order_service_grpc.pb.go 文件和核心內容(篇幅原因會忽略一些非必要代碼的展示)。

3.1.1 客戶端相關代碼

客戶端代碼相對來說比較簡單好理解,定了 OrderServiceClient 之後實現這個接口,而顯示方式就是通過 gRPC 連接去調用服務端的 OrderService 服務的對應的方法。我們看的類似這種 /api.user.session.v1.OrderService/GetOrder 字符串可以理解爲路由地址,server 端代碼生成時會將同樣的字符串與其對應的方法共同註冊上去,從而確定唯一的方法。

type OrderServiceClient interface {
    GetOrder(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error)
    CreateOrder(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error)
}

type orderServiceClient struct {
    cc grpc.ClientConnInterface
}

func NewOrderServiceClient(cc grpc.ClientConnInterface) OrderServiceClient {
    return &orderServiceClient{cc}
}

func (c *orderServiceClient) GetOrder(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error) {
    out := new(Empty)
    err := c.cc.Invoke(ctx, "/api.user.session.v1.OrderService/GetOrder", in, out, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

func (c *orderServiceClient) CreateOrder(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error) {
    out := new(Empty)
    err := c.cc.Invoke(ctx, "/api.user.session.v1.OrderService/CreateOrder", in, out, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

我們在自己程序內如果需要調用第三方服務的話,只需要通過 NewOrderServiceClient 函數生成 OrderServiceClient 實例,然後調用對應的方法即可。如:

// conn 爲 grpc connection,可以通過 grpc.Dial 來生成或大部分微服務框架都提供了連接方法
resp,err := NewOrderServiceClient(conn).GetOrder(context.Background()&Empty{})
if err != nil {
    fmt.Println(err)
}
// end of rpc call, do own biz

3.1.2 服務端相關代碼

服務端代碼相對客戶端代碼會多一些,生成代碼分爲兩部分,一部分是定義 interface 然後由一個默認實現類來實現,另一部分是提供註冊實現接口的方法。因爲我們需要自己去實現定義的服務邏輯,然後註冊上去,這樣才能讓客戶端調用。

第一部分代碼:

// OrderServiceServer is the server API for OrderService service.
// All implementations must embed UnimplementedOrderServiceServer
// for forward compatibility
// 這裏需要說明一下,爲了確保服務的穩定性,實現該接口的結構必須包含 UnimplementedOrderServiceServer,這樣即便我們只實現其中一部分的方法,也不會導致服務崩潰或不可用。
type OrderServiceServer interface {
    GetOrder(context.Context, *Empty) (*Empty, error)
    CreateOrder(context.Context, *Empty) (*Empty, error)
    mustEmbedUnimplementedOrderServiceServer()
}

// UnimplementedOrderServiceServer must be embedded to have forward compatible implementations.
type UnimplementedOrderServiceServer struct {
}

func (UnimplementedOrderServiceServer) GetOrder(context.Context, *Empty) (*Empty, error) {
    return nil, status.Errorf(codes.Unimplemented, "method GetOrder not implemented")
}
func (UnimplementedOrderServiceServer) CreateOrder(context.Context, *Empty) (*Empty, error) {
    return nil, status.Errorf(codes.Unimplemented, "method CreateOrder not implemented")
}
func (UnimplementedOrderServiceServer) mustEmbedUnimplementedOrderServiceServer() {}

// UnsafeOrderServiceServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to OrderServiceServer will
// result in compilation errors.
type UnsafeOrderServiceServer interface {
    mustEmbedUnimplementedOrderServiceServer()
}

第二部分代碼:

// 這裏是我們外部註冊入口
func RegisterOrderServiceServer(s grpc.ServiceRegistrar, srv OrderServiceServer) {
    s.RegisterService(&OrderService_ServiceDesc, srv)
}
// 每個接口的處理方法,內部調用的是這個方法
func _OrderService_GetOrder_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
    in := new(Empty)
    if err := dec(in); err != nil {
        return nil, err
    }
    if interceptor == nil {
        return srv.(OrderServiceServer).GetOrder(ctx, in)
    }
    info := &grpc.UnaryServerInfo{
        Server:     srv,
        FullMethod: "/api.user.session.v1.OrderService/GetOrder",
    }
    handler := func(ctx context.Context, req interface{}) (interface{}, error) {
        return srv.(OrderServiceServer).GetOrder(ctx, req.(*Empty))
    }
    return interceptor(ctx, in, info, handler)
}

func _OrderService_CreateOrder_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
    in := new(Empty)
    if err := dec(in); err != nil {
        return nil, err
    }
    if interceptor == nil {
        return srv.(OrderServiceServer).CreateOrder(ctx, in)
    }
    info := &grpc.UnaryServerInfo{
        Server:     srv,
        FullMethod: "/api.user.session.v1.OrderService/CreateOrder",
    }
    handler := func(ctx context.Context, req interface{}) (interface{}, error) {
        return srv.(OrderServiceServer).CreateOrder(ctx, req.(*Empty))
    }
    return interceptor(ctx, in, info, handler)
}

// OrderService_ServiceDesc is the grpc.ServiceDesc for OrderService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var OrderService_ServiceDesc = grpc.ServiceDesc{
    ServiceName: "api.user.session.v1.OrderService",
    HandlerType: (*OrderServiceServer)(nil),
    Methods: []grpc.MethodDesc{
        {
            // 內部實現時,先根據 serviceName 確定 service,再根據 methodName 確定 method,然後調用 Handler
            MethodName: "GetOrder", 
            Handler:    _OrderService_GetOrder_Handler,
        },
        {
            MethodName: "CreateOrder",
            Handler:    _OrderService_CreateOrder_Handler,
        },
    },
    Streams:  []grpc.StreamDesc{},
    Metadata: "user/session/v1/session.proto",
}

服務端作爲實現者,需要定義一個 struct 類型且包含 UnimplementedOrderServiceServer 的結構體,然後實現 OrderServiceServer 的方法,並在服務啓動時 註冊到 grpc.Server 中。如:

// --- service package
package service
// ...
type BizOrder struct {
    // orderpb 包包含我們之前生成的文件
    orderpb.UnimplementedOrderServiceServer
}

func (s *BizOrder) GetOrder(ctx context.Context, in *Empty) (*Empty, error) {
    // do something
    return &Empty{}, nil
}

func (s *BizOrder) CreateOrder(ctx context.Context, in *Empty) (*Empty, error) {
    // do something
    return &Empty{}, nil
}
// --- main package
package main

func main() {
    // ... init gprc server

    // register service
    orderpb.RegisterOrderServiceServer(grpcServer, &service.BizOrder{})
}
  1. gRPC 的編程模型

grpc 編程模型可以從大體上分爲兩種情況,分別是應答模式,數據流模式。應答模式是指客戶端發送一個請求,服務端返回一個響應(常見的 http request-response 模式),然後這次請求完成。而數據流模式是客戶端和服務端其中一方以流的形式持續讀 / 寫數據(也可能雙方都是持續讀寫,雙向流),另一方只需要一次請求或響應(如果是雙向流則均可以多次讀寫)。

4.1 應答模式

這個模式屬於是最常見大家最熟悉的一種模式,在我們定義服務的方法的時候也是基本用的是應答模式。我們上面提到的 GetOrder 方法,就是一個應答模式的例子。請求時構造輸入參數,然後等到響應返回,然後結束這次遠程調用,這就是應答模式。

4.1.1 使用

該方式的使用我們在上面其實已經演示過了,這裏不再贅述。

4.1.2 實現

一次客戶端遠程調用服務端方法的流程步驟大體如下:

  1. 客戶端調用對應的 Client 方法

  2. client 方法實現內調用 invoke 方法 並帶上對應的 method 和其他參數

  3. invoke 方法內總共分三步:

    write:

  4. 創建一個 ClientStream 對象,初始化請求需要的參數,確定請求 endpoint 地址,初始化 buffer size,獲取 http2 transport 對象等

  5. 調用 ClientStream.SendMsq 方法。首先初始化請求 header, payload 和 data, 然後調用 http2 client 的 Write 方法,該方法是異步處理請求的,會把 send request 寫入到一個單向鏈表內,然後由一個單獨的 goroutine 去消費這個鏈表上的數據,然後批量寫入到 socket 中。

// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
 // should proceed only if Write returns nil.
 func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
     if opts.Last {
         // If it's the last message, update stream state.
         if !s.compareAndSwapState(streamActive, streamWriteDone) {
             return errStreamDone
         }
     } else if s.getState() != streamActive {
         return errStreamDone
     }
     df := &dataFrame{
         streamID:  s.id,
         endStream: opts.Last,
         h:         hdr,
         d:         data,
     }
     if hdr != nil || data != nil { // If it's not an empty data frame, check quota.
         if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
             return err
         }
     }
     // controlBuf 底層爲一個緩衝區,用於存儲控制數據,比如 header 和 data。基於單向鏈表實現
     return t.controlBuf.put(df)
 }
 // writeLoop 內部調用 write 方法,循環發送數據

read from buf and write to socket:

 // 這段註釋其實寫的很詳細了,我們可以看到,這裏的 writeLoop 內部調用了 write 方法,然後再調用了一個單獨的 goroutine,這個 goroutine 就
 // 是一個單向鏈表的消費者,直到鏈表爲空,然後再一次性寫入到 socket 中。
 // run should be run in a separate goroutine.
 // It reads control frames from controlBuf and processes them by:
 // 1. Updating loopy's internal state, or/and
 // 2. Writing out HTTP2 frames on the wire.
 //
 // Loopy keeps all active streams with data to send in a linked-list.
 // All streams in the activeStreams linked-list must have both:
 // 1. Data to send, and
 // 2. Stream level flow control quota available.
 //
 // In each iteration of run loop, other than processing the incoming control
 // frame, loopy calls processData, which processes one node from the activeStreams linked-list.
 // This results in writing of HTTP2 frames into an underlying write buffer.
 // When there's no more control frames to read from controlBuf, loopy flushes the write buffer.
 // As an optimization, to increase the batch size for each flush, loopy yields the processor, once
 // if the batch size is too low to give stream goroutines a chance to fill it up.
 func (l *loopyWriter) run() (err error) {
     defer func() {
         if err == ErrConnClosing {
             // Don't log ErrConnClosing as error since it happens
             // 1. When the connection is closed by some other known issue.
             // 2. User closed the connection.
             // 3. A graceful close of connection.
             if logger.V(logLevel) {
                 logger.Infof("transport: loopyWriter.run returning. %v", err)
             }
             err = nil
         }
     }()
     for {
         it, err := l.cbuf.get(true)
         if err != nil {
             return err
         }
         if err = l.handle(it); err != nil {
             return err
         }
         if _, err = l.processData(); err != nil {
             return err
         }
         gosched := true
     hasdata:
         for {
             it, err := l.cbuf.get(false)
             if err != nil {
                 return err
             }
             if it != nil {
                 // 根據數據類型做不同的處理
                 // 如果是stream data,則會把數據寫入到 loopWriter 的 activeStreams 中, 也是個單向鏈表
                 if err = l.handle(it); err != nil {
                     return err
                 }
                 // 從 activeStreams 中讀取一個數據 然後把數據寫入到 loopWriter 的 frameBuf 中
                 // 該方法的第一參數爲 bool,當 activeStreams 爲空是返回true,否則返回false
                 if _, err = l.processData(); err != nil {
                     return err
                 }
                 // 讀完讀取下一個
                 continue hasdata
             }
             isEmpty, err := l.processData()
             if err != nil {
                 return err
             }
             // activeStreams 中依然有數據還沒 process
             if !isEmpty {
                 continue hasdata
             }
             if gosched {
                 gosched = false
                 // 如果當前處理的數據大小小於 minBatchSize(1000),則休眠一下,等待下一次的數據
                 if l.framer.writer.offset < minBatchSize {
                     runtime.Gosched()
                     continue hasdata
                 }
             }
             // 數據 flush 到 socket
             l.framer.writer.Flush()
             break hasdata

         }
     }
 }
  1. 調用 ClientStream.RecvMsg 方法。該方法會先響應的 header 消息,從 header 讀取數據 encoding,然後根據 encoding 讀取數據解壓數據,並把數據綁定到這次請求響應的 pb message 結構上。最後會調用 ClientStream.finish 方法,表示結束該請求。

客戶端請求流程

一次服務端收到一個請求,然後處理完響應回去的流程是這樣的:

  1. grpc 服務啓動,開始監聽端口

  2. net.Listener.Accept() 獲取到一個連接

  3. 啓動一個 goroutine, 調用 s.handleRawConn 方法去處理這個連接

  4. s.handleRawConn 方法先創建一個 http2Transport 實例,並把這個實例存到 server 的 conns 字段中

  5. s.handleRawConn 方法起一個 goroutine, 調用 s.serveStreams 方法去處理這個連接,這個方法結束後調用 s.removeConn 方法,從 server 的 conns 字段中刪除這個連接

// handleRawConn forks a goroutine to handle a just-accepted connection that
// has not had any I/O performed on it yet.
func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
    if s.quit.HasFired() {
        rawConn.Close()
        return
    }
    rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))

    // Finish handshaking (HTTP2)
    st := s.newHTTP2Transport(rawConn)
    rawConn.SetDeadline(time.Time{})
    if st == nil {
        return
    }

    if !s.addConn(lisAddr, st) {
        return
    }
    go func() {
        s.serveStreams(st)
        s.removeConn(lisAddr, st)
    }()
}
  1. s.serveStreams 方法是服務端處理連接的主要邏輯,它會調用 transport.HandleStreams,然後等待該方法結束

  2. HandleStreams 方法會處理這次請求的數據和 header,並構造一個 Stream 對象,然後調用 HandleStreams 傳參的 handler

func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) {
    // With this transport type there will be exactly 1 stream: this HTTP request.
    // ...ominous code here...
    s := &Stream{
        id:             0, // irrelevant
        requestRead:    func(int) {},
        cancel:         cancel,
        buf:            newRecvBuffer(),
        st:             ht,
        method:         req.URL.Path,
        recvCompress:   req.Header.Get("grpc-encoding"),
        contentSubtype: ht.contentSubtype,
    }
    pr := &peer.Peer{
        Addr: ht.RemoteAddr(),
    }
    if req.TLS != nil {
        pr.AuthInfo = credentials.TLSInfo{State: *req.TLS, CommonAuthInfo: credentials.CommonAuthInfo{SecurityLevel: credentials.PrivacyAndIntegrity}}
    }
    ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
    s.ctx = peer.NewContext(ctx, pr)
    if ht.stats != nil {
        s.ctx = ht.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
        inHeader := &stats.InHeader{
            FullMethod:  s.method,
            RemoteAddr:  ht.RemoteAddr(),
            Compression: s.recvCompress,
        }
        ht.stats.HandleRPC(s.ctx, inHeader)
    }
    // data reader
    s.trReader = &transportReader{
        reader:        &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},
        windowHandler: func(int) {},
    }

    // readerDone is closed when the Body.Read-ing goroutine exits.
    readerDone := make(chan struct{})
    go func() {
        defer close(readerDone)

        // TODO: minimize garbage, optimize recvBuffer code/ownership
        const readSize = 8196
        for buf := make([]byte, readSize); ; {
            n, err := req.Body.Read(buf)
            if n > 0 {
                s.buf.put(recvMsg{buffer: bytes.NewBuffer(buf[:n:n])})
                buf = buf[n:]
            }
            if err != nil {
                s.buf.put(recvMsg{err: mapRecvMsgError(err)})
                return
            }
            if len(buf) == 0 {
                buf = make([]byte, readSize)
            }
        }
    }()

    // startStream is provided by the *grpc.Server's serveStreams.
    // It starts a goroutine serving s and exits immediately.
    // The goroutine that is started is the one that then calls
    // into ht, calling WriteHeader, Write, WriteStatus, Close, etc.
    startStream(s)

    ht.runStream()
    close(requestOver)

    // Wait for reading goroutine to finish.
    req.Body.Close()
    <-readerDone
}
  1. HandleStreams 傳參的 handler 是主要處理 stream 並調用用戶實現的方法。
st.HandleStreams(func(stream *transport.Stream) {
        wg.Add(1)
        // 注意 numServerWorkers 默認是 0,所以不會啓動 goroutine
        if s.opts.numServerWorkers > 0 {
            data := &serverWorkerData{st: st, wg: &wg, stream: stream}
            select {
                // 如果配置多個 worker,則一個連接由多個 worker 處理,這些 worker 在初始化時 啓動 goroutine,
                // 並讀取各自 channel 的值,然後還是會調用 handleStream 方法
            case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
            default:
                // If all stream workers are busy, fallback to the default code path.
                go func() {
                    s.handleStream(st, stream, s.traceInfo(st, stream))
                    wg.Done()
                }()
            }
        } else {
            // 默認情況下走這個邏輯
            go func() {
                defer wg.Done()
                s.handleStream(st, stream, s.traceInfo(st, stream))
            }()
        }
    }, func(ctx context.Context, method string) context.Context {
        if !EnableTracing {
            return ctx
        }
        tr := trace.New("grpc.Recv."+methodFamily(method), method)
        return trace.NewContext(ctx, tr)
    })
  1. s.handleStream 方法是從 stream 讀取 serviceName 和 method,並查找對應的 handler,然後調用 s.processUnaryRPC 去處理之後的邏輯。如果沒有找到服務或方法,則調用 processStreamingRPC 並傳空的服務信息,由該方法去處理,這個方法在下面的單向流的實現中提到。

  2. s.processUnaryRPC 從請求 header 讀取壓縮算法解壓數據,讀取 encode 類型 unmarshal 數據,然後調用我們實現的方法。調用完成後,將 reply 用同樣的壓縮算法和 encode 類型進行編碼壓縮,然後寫入到 response 中。

服務的請求處理

  1. 總結

由於篇幅原因,本篇將在這裏結束,關於 grpc 的數據流變成模式和相關實現以及其他更多關於 grpc 的內容,請持續關注,我會在下一篇中進行詳細的介紹。

本篇主要講述了:

  1. grpc 的概念

  2. grpc 在 go 語言環境下的使用

  3. grpc 的常見編程模式之一的應答模式的使用和實現源碼解析

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/yGmu_K8nDWESl-Fb68eQyA