寫給 go 開發者的 gRPC 教程 - metadata
導語
和在普通
HTTP
請求中一樣,gRPC 提供了在每一次 RPC 中攜帶上下文的結構:metadata
。在 Go 語言中,它與context.Context
緊密結合,幫助我們實現服務端與客戶端之間互相傳遞信息
什麼是 metadata
?
gRPC 的 metadata
簡單理解,就是 HTTP Header
中的 key-value 對
-
metadata
是以key-value
的形式存儲數據的,其中 key 是 string 類型,而 value 是 []string,即一個字符串數組類型 -
metadata
使得 client 和 server 能夠爲對方提供關於本次調用的一些信息,就像一次 HTTP 請求的Request Header
和Response Header
一樣 -
HTTP Header
的生命週期是一次 HTTP 請求,那麼metadata
的生命週期就是一次 RPC 調用
metadata 創建
🌲 使用 New():
md := metadata.New(map[string]string{"key1":"value1","key2":"value2"})
🌲 使用 Pairs():
要注意如果有相同的 key 會自動合併
md := metadata.Pairs(
"key1", "value1",
"key1", "value1.2", // "key1" will have map value []string{"value1", "value1.2"}
"key2", "value2",
)
🌲 合併多個 metadata
md1 := metadata.Pairs("k1", "v1", "k2", "v2")
md2 := metadata.New(map[string]string{"key1":"value1","key2":"value2"})
md := metadata.Join(md1, md2)
🌲 存儲二進制數據
在 metadata 中,key 永遠是 string 類型,但是 value 可以是 string 也可以是二進制數據。爲了在 metadata 中存儲二進制數據,我們僅僅需要在 key 的後面加上一個 - bin 後綴。具有 - bin 後綴的 key 所對應的 value 在創建 metadata 時會被編碼(base64),收到的時候會被解碼:
md := metadata.Pairs(
"key", "string value",
"key-bin", string([]byte{96, 102}),
)
metadata 結構本身也有一些操作方法,參考文檔非常容易理解。這裏不再贅述:https://pkg.go.dev/google.golang.org/grpc@v1.44.0/metadata
metadata 發送與接收
讓我們再次回顧下 pb 文件和生成出來的 client 與 server 端的接口
service OrderManagement {
rpc getOrder(google.protobuf.StringValue) returns (Order);
}
type OrderManagementClient interface {
GetOrder(ctx context.Context,
in *wrapperspb.StringValue, opts ...grpc.CallOption) (*Order, error)
}
type OrderManagementServer interface {
GetOrder(context.Context, *wrapperspb.StringValue) (*Order, error)
mustEmbedUnimplementedOrderManagementServer()
}
可以看到相比 pb 中的接口定義,生成出來的 Go 代碼除了增加了error
返回值,還多了context.Context
和錯誤處理類似,gRPC 中的context.Context
也符合 Go 語言的使用習慣:通常情況下我們在函數首個參數放置context.Context
用來傳遞一次 RPC 中有關的上下文,藉助context.WithValue()
或ctx.Value()
往context
添加變量或讀取變量
metadata
就是 gRPC 中可以傳遞的上下文信息之一,所以metadata
的使用方式就是:metadata
記錄到context
,從context
讀取metadata
Clinet 發送 Server 接收
client
發送metadata
,那就是把metadata
存儲到contex.Context
server
接收metadata
,就是從contex.Context
中讀取Metadata
Clinet 發送 Metadata
把Metadata
放到contex.Context
,有幾種方式
🌲 使用NewOutgoingContext
將新創建的metadata
添加到context
中,這樣會 覆蓋 掉原來已有的metadata
// 將metadata添加到context中,獲取新的context
md := metadata.Pairs("k1", "v1", "k1", "v2", "k2", "v3")
ctx := metadata.NewOutgoingContext(context.Background(), md)
// unary RPC
response, err := client.SomeRPC(ctx, someRequest)
// streaming RPC
stream, err := client.SomeStreamingRPC(ctx)
🌲 使用AppendToOutgoingContext
可以直接將 key-value 對添加到已有的context
中
-
如果
context
中沒有metadata
,那麼就會 創建 一個 -
如果已有
metadata
,那麼就將數據 添加 到原來的metadata
// 如果對應的 context 沒有 metadata,那麼就會創建一個
ctx := metadata.AppendToOutgoingContext(ctx, "k1", "v1", "k1", "v2", "k2", "v3")
// 如果已有 metadata 了,那麼就將數據添加到原來的 metadata (例如在攔截器中)
ctx := metadata.AppendToOutgoingContext(ctx, "k3", "v4")
// 普通RPC(unary RPC)
response, err := client.SomeRPC(ctx, someRequest)
// 流式RPC(streaming RPC)
stream, err := client.SomeStreamingRPC(ctx)
Server 接收 Metedata
普通 RPC 與流式 RPC 的區別不大,都是從contex.Context
中讀取metadata
🌲 使用FromIncomingContext
普通 RPC(unary RPC)
//Unary Call
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
md, ok := metadata.FromIncomingContext(ctx)
// do something with metadata
}
流式 RPC(streaming RPC)
//Streaming Call
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
md, ok := metadata.FromIncomingContext(stream.Context()) // get context from stream
// do something with metadata
}
Server 發送 Clinet 接收
服務端發送的metadata
被分成了header
和 trailer
兩者,因而客戶端也可以讀取兩者
Server 發送 Metadata
對於**普通 RPC(unary RPC)**server 可以使用 grpc 包中提供的函數向 client 發送 header
和trailer
-
grpc.SendHeader()
-
grpc.SetHeader()
-
grpc.SetTrailer()
對於 ** 流式 RPC(streaming RPC)server 可以使用 ServerStream[1] 接口中定義的函數向 client 發送header
和 trailer
-
ServerStream.SendHeader()
-
ServerStream.SetHeader()
-
ServerStream.SetTrailer()
🌲 普通 RPC(unary RPC)
使用 grpc.SendHeader()
和 grpc.SetTrailer()
方法 ,這兩個函數將context.Context
作爲第一個參數
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
// 創建併發送header
header := metadata.Pairs("header-key", "val")
grpc.SendHeader(ctx, header)
// 創建併發送trailer
trailer := metadata.Pairs("trailer-key", "val")
grpc.SetTrailer(ctx, trailer)
}
如果不想立即發送header
,也可以使用grpc.SetHeader()
。grpc.SetHeader()
可以被多次調用,在如下時機會把多個metadata
合併發送出去
-
調用
grpc.SendHeader()
-
第一個響應被髮送時
-
RPC 結束時(包含成功或失敗)
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
// 創建header,在適當時機會被髮送
header := metadata.Pairs("header-key1", "val1")
grpc.SetHeader(ctx, header)
// 創建header,在適當時機會被髮送
header := metadata.Pairs("header-key2", "val2")
grpc.SetHeader(ctx, header)
// 創建併發送trailer
trailer := metadata.Pairs("trailer-key", "val")
grpc.SetTrailer(ctx, trailer)
}
🌲 流式 RPC(streaming RPC)
使用 ServerStream.SendHeader()
和 ServerStream.SetTrailer()
方法
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
// create and send header
header := metadata.Pairs("header-key", "val")
stream.SendHeader(header)
// create and set trailer
trailer := metadata.Pairs("trailer-key", "val")
stream.SetTrailer(trailer)
}
如果不想立即發送header
,也可以使用ServerStream.SetHeader()
。ServerStream.SetHeader()
可以被多次調用,在如下時機會把多個metadata
合併發送出去
-
調用
ServerStream.SendHeader()
-
第一個響應被髮送時
-
RPC 結束時(包含成功或失敗)
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
// create and send header
header := metadata.Pairs("header-key", "val")
stream.SetHeader(header)
// create and set trailer
trailer := metadata.Pairs("trailer-key", "val")
stream.SetTrailer(trailer)
}
Client 接收 Metadata
🌲 普通 RPC(unary RPC)
**普通 RPC(unary RPC)**使用grpc.Header()
和grpc.Trailer()
方法來接收 Metadata
// RPC using the context with new metadata.
var header, trailer metadata.MD
// Add Order
order := pb.Order{Id: "101", Items: []string{"iPhone XS", "Mac Book Pro"}, Destination: "San Jose, CA", Price: 2300.00}
res, err := client.AddOrder(ctx, &order, grpc.Header(&header), grpc.Trailer(&trailer))
if err != nil {
panic(err)
}
🌲 流式 RPC(streaming RPC)
**流式 RPC(streaming RPC)**通過調用返回的 ClientStream
接口的Header()
和 Trailer()
方法接收 metadata
stream, err := client.SomeStreamingRPC(ctx)
// retrieve header
header, err := stream.Header()
stream.CloseAndRecv()
// retrieve trailer
trailer := stream.Trailer()
Header
和Trailer
區別
根本區別:發送的時機不同!
✨ headers
會在下面三種場景下被髮送
-
SendHeader()
被調用時(包含grpc.SendHeader
和stream.SendHeader
) -
第一個響應被髮送時
-
RPC 結束時(包含成功或失敗)
✨ trailer
會在 rpc 返回的時候,即這個請求結束的時候被髮送
差異在流式 RPC(streaming RPC)中比較明顯:
因爲trailer
是在服務端發送完請求之後才發送的,所以 client 獲取trailer
的時候需要在stream.CloseAndRecv
或者stream.Recv
返回非 nil 錯誤 (包含 io.EOF) 之後
如果stream.CloseAndRecv
之前調用stream.Trailer()
獲取的是空
stream, err := client.SomeStreamingRPC(ctx)
// retrieve header
header, err := stream.Header()
// retrieve trailer
// `trailer`會在rpc返回的時候,即這個請求結束的時候被髮送
// 因此此時調用`stream.Trailer()`獲取的是空
trailer := stream.Trailer()
stream.CloseAndRecv()
// retrieve trailer
// `trailer`會在rpc返回的時候,即這個請求結束的時候被髮送
// 因此此時調用`stream.Trailer()`纔可以獲取到值
trailer := stream.Trailer()
使用場景
既然我們把metadata
類比成HTTP Header
,那麼metadata
的使用場景也可以借鑑HTTP
的Header
。如傳遞用戶token
進行用戶認證,傳遞trace
進行鏈路追蹤等
攔截器中的 metadata
在攔截器中,我們不但可以獲取或修改接收到的metadata
,甚至還可以截取並修改要發送出去的metadata
還記得攔截器如何實現麼?如果已經忘了快快回顧一下吧:
🌰 舉個例子:
我們在客戶端攔截器中從要發送給服務端的metadata
中讀取一個時間戳字段,如果沒有則補充這個時間戳字段
注意這裏用到了一個上文沒有提到的FromOutgoingContext(ctx)
函數
func orderUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
var s string
// 獲取要發送給服務端的`metadata`
md, ok := metadata.FromOutgoingContext(ctx)
if ok && len(md.Get("time")) > 0 {
s = md.Get("time")[0]
} else {
// 如果沒有則補充這個時間戳字段
s = "inter" + strconv.FormatInt(time.Now().UnixNano(), 10)
ctx = metadata.AppendToOutgoingContext(ctx, "time", s)
}
log.Printf("call timestamp: %s", s)
// Invoking the remote method
err := invoker(ctx, method, req, reply, cc, opts...)
return err
}
func main() {
conn, err := grpc.Dial("127.0.0.1:8009",
grpc.WithInsecure(),
grpc.WithChainUnaryInterceptor(
orderUnaryClientInterceptor,
),
)
if err != nil {
panic(err)
}
c := pb.NewOrderManagementClient(conn)
ctx = metadata.AppendToOutgoingContext(context.Background(), "time",
"raw"+strconv.FormatInt(time.Now().UnixNano(), 10))
// RPC using the context with new metadata.
var header, trailer metadata.MD
// Add Order
order := pb.Order{
Id: "101",
Items: []string{"iPhone XS", "Mac Book Pro"},
Destination: "San Jose, CA",
Price: 2300.00,
}
res, err := c.AddOrder(ctx, &order)
if err != nil {
panic(err)
}
}
以上的思路在 server 同樣適用。基於以上原理我們可以實現鏈路追蹤、用戶認證等功能
錯誤信息
還記得錯誤處理一文中留下的問題麼:gRPC 中如何傳遞錯誤消息Status
的呢?沒錯!也是使用的metadata
或者說http2.0
的header
。Status
的三種信息分別使用了三個header
頭
-
Grpc-Status
: 傳遞Status
的code
-
Grpc-Message
: 傳遞Status
的message
-
Grpc-Status-Details-Bin
: 傳遞Status
的details
func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error {
// ...
h := ht.rw.Header()
h.Set("Grpc-Status", fmt.Sprintf("%d", st.Code()))
if m := st.Message(); m != "" {
h.Set("Grpc-Message", encodeGrpcMessage(m))
}
if p := st.Proto(); p != nil && len(p.Details) > 0 {
stBytes, err := proto.Marshal(p)
if err != nil {
// TODO: return error instead, when callers are able to handle it.
panic(err)
}
h.Set("Grpc-Status-Details-Bin", encodeBinHeader(stBytes))
}
// ...
}
總結
一張圖總結下整個metadata
的使用方法(公衆號發送:metadata 總結獲取高清原文件)
參考資料
[1]
ServerStream: https://godoc.org/google.golang.org/grpc#ServerStream
[2]
gRPC 中的 metadata: https://pandaychen.github.io/2020/02/22/GRPC-METADATA-INTRO/
[3]
pkg.go.dev/grpc@v1.44.0/metadata: https://pkg.go.dev/google.golang.org/grpc@v1.44.0/metadata
[4]
concept of metadata: https://grpc.io/docs/guides/concepts.html#metadata
[5]
Documentation/grpc-metadata: https://github.com/grpc/grpc-go/blob/master/Documentation/grpc-metadata.md
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/GRmEjnd9D16b_5mPG038-A