golang 源碼分析:grpc context
gRPC 是基於 HTTP/2 協議的。進程間傳輸定義了一個 metadata 對象,該對象放在 Request-Headers 內,所以通過 metadata 我們可以將上一個進程中的全局對象透傳到下一個被調用的進程。
type MD map[string][]string
進程內部我們通過 context 來傳輸上下文數據,進程間傳遞 MD 的時候,我們也可以從 ctx,取出來,進行傳遞
//set 數據到 metadata
md := metadata.Pairs("key", "val")
// 新建一個有 metadata 的 context
ctx := metadata.NewOutgoingContext(context.Background(), md)
爲什麼不直接把 context 裏面的數據全取出來,傳遞給下游呢?這是出於可維護性和安全性兩方面的考慮,如果將 ctx 所有信息都傳遞下去,很有可能將一些內部信息泄漏,另一方面,下游在取 ctx 的時候,不知道到底傳了哪些數據。所以 grpc 定義了兩個 context:
OutgoingContext
IncomingContext
OutgoingContext 用於發送請求一方,包裝下游依賴的數據,傳遞出去。IncomingContext 用於服務端接受,客戶端傳遞來的 context 信息。context 中間通過序列化成 http2 header 的方式進行傳輸。metadata/metadata.go,我們可以看到這兩個 context 雖然也是通過 context.WithValue 設置數據,通過 context.Value 來讀取數據。
type mdIncomingKey struct{}
type mdOutgoingKey struct{}
// NewIncomingContext creates a new context with incoming md attached.
func NewIncomingContext(ctx context.Context, md MD) context.Context {
return context.WithValue(ctx, mdIncomingKey{}, md)
}
// NewOutgoingContext creates a new context with outgoing md attached. If used
// in conjunction with AppendToOutgoingContext, NewOutgoingContext will
// overwrite any previously-appended metadata.
func NewOutgoingContext(ctx context.Context, md MD) context.Context {
return context.WithValue(ctx, mdOutgoingKey{}, rawMD{md: md})
}
func FromIncomingContext(ctx context.Context) (MD, bool) {
md, ok := ctx.Value(mdIncomingKey{}).(MD)
if !ok {
return nil, false
}
out := MD{}
for k, v := range md {
// We need to manually convert all keys to lower case, because MD is a
// map, and there's no guarantee that the MD attached to the context is
// created using our helper functions.
key := strings.ToLower(k)
out[key] = v
}
return out, true
}
func FromOutgoingContext(ctx context.Context) (MD, bool) {
raw, ok := ctx.Value(mdOutgoingKey{}).(rawMD)
if !ok {
return nil, false
}
out := MD{}
for k, v := range raw.md {
// We need to manually convert all keys to lower case, because MD is a
// map, and there's no guarantee that the MD attached to the context is
// created using our helper functions.
key := strings.ToLower(k)
out[key] = v
}
for _, added := range raw.added {
if len(added)%2 == 1 {
panic(fmt.Sprintf("metadata: FromOutgoingContext got an odd number of input pairs for metadata: %d", len(added)))
}
for i := 0; i < len(added); i += 2 {
key := strings.ToLower(added[i])
out[key] = append(out[key], added[i+1])
}
}
return out, ok
}
但是,和普通 context 也是有差別的,MD 的存儲的時候,key 是 string,value 是 []string,context 爲了儘可能地防止覆蓋,key 、value 都是 interface 類型的,並且通過 lint 等方式,儘可能做到不讓修改,也就是說用戶自己存入的數據的 key 儘量要是新定義的類型,類型別名也不可以。
直觀理解,客戶端在發送請求的時候,會初始化一個 OutgoingContext,服務端在取的時候,用的是 IncomingContext,中間必然存在一個從 OutgoingContext 取數據,方讓 http2 header,從 http2 header 取數據存入 IncomingContext 的過程。我們通過源碼來分析下:
1,server 端構造 IncomingContext 的過程:
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)
s.serveStreams(st)
我們從 server.go 文件 ServeHTTP 函數開始:
func (s *Server) serveStreams(st transport.ServerTransport)
st.HandleStreams(func(stream *transport.Stream)
它調用了 internal/transport/http2_server.go 裏面的函數
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context)
case *http2.MetaHeadersFrame:
if t.operateHeaders(frame, handle, traceCtx) {
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool)
ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
可以看到,通過 http2 的 header 構造了我們的 IncomingContext
2,client 從 OutgoingContext 取數據的過程
客戶端的請求調用是從 call.go 的 Invoke 函數開始的
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
err := cs.SendMsg(req); err != nil
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption)
return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
op := func(a *csAttempt) error { return a.newStream() }
s, err := a.t.NewStream(cs.ctx, cs.callHdr)
最終調用啦 a.t.NewStream
實現在 internal/transport/http2_client.go
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error)
headerFields, err := t.createHeaderFields(ctx, callHdr)
func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error)
md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok
至此,完成了,數據的轉換。那麼問題來了,對於一個處於中游的 grpc 服務,每個請求,我都去先獲取 IncomingContext 然後設置 OutgoingContext 是不是很麻煩我們有沒有相關的簡單方案呢?答案是 middleware
3,客戶端 middleware
在客戶端發起的請求連接的時候,我們可以在 options 裏面添加攔截器 unaryClientInterceptors
conn, err := grpc.Dial(target, dialOptions...)
dialOptions := append([]grpc.DialOption{
grpc.WithUnaryInterceptor(grpcMiddleware.ChainUnaryClient(unaryClientInterceptors...)),
客戶端的攔截器有很多,比如:
clientinterceptors.UnaryTracingInterceptor,
clientinterceptors.DurationInterceptor,
clientinterceptors.PrometheusInterceptor,
clientinterceptors.BreakerInterceptor,
clientinterceptors.TimeoutInterceptor(cliOpts.Timeout),
一個常見的客戶端攔截器可以這麼寫,攔截器的入參有我們需要的一切:
func DurationInterceptor(ctx context.Context, method string, req, reply interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
//do some thing
err := invoker(ctx, method, req, reply, cc, opts...)
4,服務端 middleware
server = grpc.NewServer(dialOptions...)
dialOptions := []grpc.ServerOption{
grpc_middleware.WithUnaryServerChain(unaryServerInterceptors...),
常見的服務端攔截器長這樣:
func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
o := evaluateOptions(opts)
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (_ interface{}, err error) {
//do some thing
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/O-6FdYn2l8hxOfGUUz9CHg