寫給 go 開發者的 gRPC 教程 - metadata


導語

和在普通HTTP請求中一樣,gRPC 提供了在每一次 RPC 中攜帶上下文的結構:metadata。在 Go 語言中,它與context.Context緊密結合,幫助我們實現服務端與客戶端之間互相傳遞信息

什麼是 metadata

gRPC 的 metadata 簡單理解,就是 HTTP Header  中的 key-value 對

metadata 創建

🌲 使用 New():

md := metadata.New(map[string]string{"key1":"value1","key2":"value2"})

🌲 使用 Pairs():

要注意如果有相同的 key 會自動合併

md := metadata.Pairs(
    "key1""value1",
    "key1""value1.2", // "key1" will have map value []string{"value1""value1.2"}
    "key2""value2",
)

🌲 合併多個 metadata

md1 :=  metadata.Pairs("k1""v1""k2""v2")
md2 := metadata.New(map[string]string{"key1":"value1","key2":"value2"})

md := metadata.Join(md1, md2)

🌲 存儲二進制數據

在 metadata 中,key 永遠是 string 類型,但是 value 可以是 string 也可以是二進制數據。爲了在 metadata 中存儲二進制數據,我們僅僅需要在 key 的後面加上一個 - bin 後綴。具有 - bin 後綴的 key 所對應的 value 在創建 metadata 時會被編碼(base64),收到的時候會被解碼:

md := metadata.Pairs(
    "key""string value",
    "key-bin", string([]byte{96, 102}),
)

metadata 結構本身也有一些操作方法,參考文檔非常容易理解。這裏不再贅述:https://pkg.go.dev/google.golang.org/grpc@v1.44.0/metadata

metadata 發送與接收

讓我們再次回顧下 pb 文件和生成出來的 client 與 server 端的接口

service OrderManagement {
    rpc getOrder(google.protobuf.StringValue) returns (Order);
}
type OrderManagementClient interface {
 GetOrder(ctx context.Context, 
           in *wrapperspb.StringValue, opts ...grpc.CallOption) (*Order, error)
}
type OrderManagementServer interface {
 GetOrder(context.Context, *wrapperspb.StringValue) (*Order, error)
 mustEmbedUnimplementedOrderManagementServer()
}

可以看到相比 pb 中的接口定義,生成出來的 Go 代碼除了增加了error返回值,還多了context.Context

和錯誤處理類似,gRPC 中的context.Context 也符合 Go 語言的使用習慣:通常情況下我們在函數首個參數放置context.Context用來傳遞一次 RPC 中有關的上下文,藉助context.WithValue()ctx.Value()context添加變量或讀取變量

metadata就是 gRPC 中可以傳遞的上下文信息之一,所以metadata的使用方式就是:metadata記錄到context,從context讀取metadata

Clinet 發送 Server 接收

client發送metadata,那就是把metadata存儲到contex.Context

server接收metadata,就是從contex.Context中讀取Metadata

Clinet 發送 Metadata

Metadata放到contex.Context,有幾種方式

🌲 使用NewOutgoingContext

將新創建的metadata添加到context中,這樣會 覆蓋 掉原來已有的metadata

// 將metadata添加到context中,獲取新的context
md := metadata.Pairs("k1""v1""k1""v2""k2""v3")
ctx := metadata.NewOutgoingContext(context.Background(), md)

// unary RPC
response, err := client.SomeRPC(ctx, someRequest)

// streaming RPC
stream, err := client.SomeStreamingRPC(ctx)

🌲 使用AppendToOutgoingContext

可以直接將 key-value 對添加到已有的context

// 如果對應的 context 沒有 metadata,那麼就會創建一個
ctx := metadata.AppendToOutgoingContext(ctx, "k1""v1""k1""v2""k2""v3")

// 如果已有 metadata 了,那麼就將數據添加到原來的 metadata  (例如在攔截器中)
ctx := metadata.AppendToOutgoingContext(ctx, "k3""v4")

// 普通RPC(unary RPC)
response, err := client.SomeRPC(ctx, someRequest)

// 流式RPC(streaming RPC)
stream, err := client.SomeStreamingRPC(ctx)

Server 接收 Metedata

普通 RPC 與流式 RPC 的區別不大,都是從contex.Context中讀取metadata

🌲 使用FromIncomingContext

普通 RPC(unary RPC)

//Unary Call
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
    md, ok := metadata.FromIncomingContext(ctx)
    // do something with metadata
}

流式 RPC(streaming RPC)

//Streaming Call
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
    md, ok := metadata.FromIncomingContext(stream.Context()) // get context from stream
    // do something with metadata
}

Server 發送 Clinet 接收

服務端發送的metadata被分成了headertrailer兩者,因而客戶端也可以讀取兩者

Server 發送 Metadata

對於**普通 RPC(unary RPC)**server 可以使用 grpc 包中提供的函數向 client 發送 headertrailer

對於 ** 流式 RPC(streaming RPC)server 可以使用 ServerStream[1] 接口中定義的函數向 client 發送headertrailer

🌲 普通 RPC(unary RPC)

使用 grpc.SendHeader()  和 grpc.SetTrailer() 方法 ,這兩個函數將context.Context作爲第一個參數

func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
  // 創建併發送header
  header := metadata.Pairs("header-key""val")
  grpc.SendHeader(ctx, header)
  
  // 創建併發送trailer
  trailer := metadata.Pairs("trailer-key""val")
  grpc.SetTrailer(ctx, trailer)
}

如果不想立即發送header,也可以使用grpc.SetHeader()grpc.SetHeader()可以被多次調用,在如下時機會把多個metadata合併發送出去

func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
  // 創建header,在適當時機會被髮送
  header := metadata.Pairs("header-key1""val1")
  grpc.SetHeader(ctx, header)
    
  // 創建header,在適當時機會被髮送
  header := metadata.Pairs("header-key2""val2")
  grpc.SetHeader(ctx, header)
  
  // 創建併發送trailer
  trailer := metadata.Pairs("trailer-key""val")
  grpc.SetTrailer(ctx, trailer)
}

🌲 流式 RPC(streaming RPC)

使用 ServerStream.SendHeader()ServerStream.SetTrailer() 方法

func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
  // create and send header
  header := metadata.Pairs("header-key""val")
  stream.SendHeader(header)
  
  // create and set trailer
  trailer := metadata.Pairs("trailer-key""val")
  stream.SetTrailer(trailer)
}

如果不想立即發送header,也可以使用ServerStream.SetHeader()ServerStream.SetHeader()可以被多次調用,在如下時機會把多個metadata合併發送出去

func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
  // create and send header
  header := metadata.Pairs("header-key""val")
  stream.SetHeader(header)
  
  // create and set trailer
  trailer := metadata.Pairs("trailer-key""val")
  stream.SetTrailer(trailer)
}

Client 接收 Metadata

🌲 普通 RPC(unary RPC)

**普通 RPC(unary RPC)**使用grpc.Header()grpc.Trailer()方法來接收 Metadata

// RPC using the context with new metadata.
var header, trailer metadata.MD

// Add Order
order := pb.Order{Id: "101", Items: []string{"iPhone XS""Mac Book Pro"}, Destination: "San Jose, CA", Price: 2300.00}
res, err := client.AddOrder(ctx, &order, grpc.Header(&header), grpc.Trailer(&trailer))
if err != nil {
  panic(err)
}

🌲 流式 RPC(streaming RPC)

**流式 RPC(streaming RPC)**通過調用返回的 ClientStream接口的Header()Trailer()方法接收 metadata

stream, err := client.SomeStreamingRPC(ctx)

// retrieve header
header, err := stream.Header()

stream.CloseAndRecv()

// retrieve trailer
trailer := stream.Trailer()

HeaderTrailer區別

根本區別:發送的時機不同!

headers會在下面三種場景下被髮送

trailer會在 rpc 返回的時候,即這個請求結束的時候被髮送

差異在流式 RPC(streaming RPC)中比較明顯:

因爲trailer是在服務端發送完請求之後才發送的,所以 client 獲取trailer的時候需要在stream.CloseAndRecv或者stream.Recv 返回非 nil 錯誤 (包含 io.EOF) 之後

如果stream.CloseAndRecv之前調用stream.Trailer()獲取的是空

stream, err := client.SomeStreamingRPC(ctx)

// retrieve header
header, err := stream.Header()

// retrieve trailer 
// `trailer`會在rpc返回的時候,即這個請求結束的時候被髮送
// 因此此時調用`stream.Trailer()`獲取的是空
trailer := stream.Trailer()

stream.CloseAndRecv()

// retrieve trailer 
// `trailer`會在rpc返回的時候,即這個請求結束的時候被髮送
// 因此此時調用`stream.Trailer()`纔可以獲取到值
trailer := stream.Trailer()

使用場景

既然我們把metadata類比成HTTP Header,那麼metadata的使用場景也可以借鑑HTTPHeader。如傳遞用戶token進行用戶認證,傳遞trace進行鏈路追蹤等

攔截器中的 metadata

在攔截器中,我們不但可以獲取或修改接收到的metadata,甚至還可以截取並修改要發送出去的metadata

還記得攔截器如何實現麼?如果已經忘了快快回顧一下吧:

🌰 舉個例子:

我們在客戶端攔截器中從要發送給服務端的metadata中讀取一個時間戳字段,如果沒有則補充這個時間戳字段

注意這裏用到了一個上文沒有提到的FromOutgoingContext(ctx)函數

func orderUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{},
 cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {

 var s string

 // 獲取要發送給服務端的`metadata`
 md, ok := metadata.FromOutgoingContext(ctx)
 if ok && len(md.Get("time")) > 0 {
  s = md.Get("time")[0]
 } else {
        // 如果沒有則補充這個時間戳字段
  s = "inter" + strconv.FormatInt(time.Now().UnixNano(), 10)
  ctx = metadata.AppendToOutgoingContext(ctx, "time", s)
 }

 log.Printf("call timestamp: %s", s)

 // Invoking the remote method
 err := invoker(ctx, method, req, reply, cc, opts...)

 return err
}

func main() {
 conn, err := grpc.Dial("127.0.0.1:8009",
  grpc.WithInsecure(),
  grpc.WithChainUnaryInterceptor(
   orderUnaryClientInterceptor,
  ),
 )
 if err != nil {
  panic(err)
 }
    
    c := pb.NewOrderManagementClient(conn)

 ctx = metadata.AppendToOutgoingContext(context.Background()"time",
  "raw"+strconv.FormatInt(time.Now().UnixNano(), 10))

 // RPC using the context with new metadata.
 var header, trailer metadata.MD

 // Add Order
 order := pb.Order{
  Id:          "101",
  Items:       []string{"iPhone XS""Mac Book Pro"},
  Destination: "San Jose, CA",
  Price:       2300.00,
 }
 res, err := c.AddOrder(ctx, &order)
 if err != nil {
  panic(err)
 }
}

以上的思路在 server 同樣適用。基於以上原理我們可以實現鏈路追蹤、用戶認證等功能

錯誤信息

還記得錯誤處理一文中留下的問題麼:gRPC 中如何傳遞錯誤消息Status的呢?沒錯!也是使用的metadata或者說http2.0headerStatus的三種信息分別使用了三個header

func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error {
 // ...
  h := ht.rw.Header()
  h.Set("Grpc-Status", fmt.Sprintf("%d", st.Code()))
  if m := st.Message(); m != "" {
   h.Set("Grpc-Message", encodeGrpcMessage(m))
  }

  if p := st.Proto(); p != nil && len(p.Details) > 0 {
   stBytes, err := proto.Marshal(p)
   if err != nil {
    // TODO: return error instead, when callers are able to handle it.
    panic(err)
   }

   h.Set("Grpc-Status-Details-Bin", encodeBinHeader(stBytes))
  }
    // ...
}

總結

一張圖總結下整個metadata的使用方法(公衆號發送:metadata 總結獲取高清原文件)


參考資料

[1]

ServerStream: https://godoc.org/google.golang.org/grpc#ServerStream

[2]

gRPC 中的 metadata: https://pandaychen.github.io/2020/02/22/GRPC-METADATA-INTRO/

[3]

pkg.go.dev/grpc@v1.44.0/metadata: https://pkg.go.dev/google.golang.org/grpc@v1.44.0/metadata

[4]

concept of metadata: https://grpc.io/docs/guides/concepts.html#metadata

[5]

Documentation/grpc-metadata: https://github.com/grpc/grpc-go/blob/master/Documentation/grpc-metadata.md

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/GRmEjnd9D16b_5mPG038-A