golang 源碼分析:grpc context

        gRPC 是基於 HTTP/2 協議的。進程間傳輸定義了一個 metadata 對象,該對象放在 Request-Headers 內,所以通過 metadata 我們可以將上一個進程中的全局對象透傳到下一個被調用的進程。

type MD map[string][]string

進程內部我們通過 context 來傳輸上下文數據,進程間傳遞 MD 的時候,我們也可以從 ctx,取出來,進行傳遞

//set 數據到 metadata
md := metadata.Pairs("key", "val")
// 新建一個有 metadata 的 context
ctx := metadata.NewOutgoingContext(context.Background(), md)

        爲什麼不直接把 context 裏面的數據全取出來,傳遞給下游呢?這是出於可維護性和安全性兩方面的考慮,如果將 ctx 所有信息都傳遞下去,很有可能將一些內部信息泄漏,另一方面,下游在取 ctx 的時候,不知道到底傳了哪些數據。所以 grpc 定義了兩個 context:

OutgoingContext
IncomingContext

OutgoingContext 用於發送請求一方,包裝下游依賴的數據,傳遞出去。IncomingContext 用於服務端接受,客戶端傳遞來的 context 信息。context 中間通過序列化成 http2 header 的方式進行傳輸。metadata/metadata.go,我們可以看到這兩個 context 雖然也是通過 context.WithValue 設置數據,通過 context.Value 來讀取數據。

type mdIncomingKey struct{}
type mdOutgoingKey struct{}
// NewIncomingContext creates a new context with incoming md attached.
func NewIncomingContext(ctx context.Context, md MD) context.Context {
  return context.WithValue(ctx, mdIncomingKey{}, md)
}
// NewOutgoingContext creates a new context with outgoing md attached. If used
// in conjunction with AppendToOutgoingContext, NewOutgoingContext will
// overwrite any previously-appended metadata.
func NewOutgoingContext(ctx context.Context, md MD) context.Context {
  return context.WithValue(ctx, mdOutgoingKey{}, rawMD{md: md})
}
func FromIncomingContext(ctx context.Context) (MD, bool) {
  md, ok := ctx.Value(mdIncomingKey{}).(MD)
  if !ok {
    return nil, false
  }
  out := MD{}
  for k, v := range md {
    // We need to manually convert all keys to lower case, because MD is a
    // map, and there's no guarantee that the MD attached to the context is
    // created using our helper functions.
    key := strings.ToLower(k)
    out[key] = v
  }
  return out, true
}
func FromOutgoingContext(ctx context.Context) (MD, bool) {
  raw, ok := ctx.Value(mdOutgoingKey{}).(rawMD)
  if !ok {
    return nil, false
  }
  out := MD{}
  for k, v := range raw.md {
    // We need to manually convert all keys to lower case, because MD is a
    // map, and there's no guarantee that the MD attached to the context is
    // created using our helper functions.
    key := strings.ToLower(k)
    out[key] = v
  }
  for _, added := range raw.added {
    if len(added)%2 == 1 {
      panic(fmt.Sprintf("metadata: FromOutgoingContext got an odd number of input pairs for metadata: %d", len(added)))
    }
    for i := 0; i < len(added); i += 2 {
      key := strings.ToLower(added[i])
      out[key] = append(out[key], added[i+1])
    }
  }
  return out, ok
}

但是,和普通 context 也是有差別的,MD 的存儲的時候,key 是 string,value 是 []string,context 爲了儘可能地防止覆蓋,key 、value 都是 interface 類型的,並且通過 lint 等方式,儘可能做到不讓修改,也就是說用戶自己存入的數據的 key 儘量要是新定義的類型,類型別名也不可以。

            直觀理解,客戶端在發送請求的時候,會初始化一個 OutgoingContext,服務端在取的時候,用的是 IncomingContext,中間必然存在一個從 OutgoingContext 取數據,方讓 http2 header,從 http2 header 取數據存入 IncomingContext 的過程。我們通過源碼來分析下:

1,server 端構造 IncomingContext 的過程:

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) 
        s.serveStreams(st)

我們從 server.go 文件 ServeHTTP 函數開始:

func (s *Server) serveStreams(st transport.ServerTransport) 
        st.HandleStreams(func(stream *transport.Stream)

它調用了 internal/transport/http2_server.go 裏面的函數

func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) 
     case *http2.MetaHeadersFrame:
       if t.operateHeaders(frame, handle, traceCtx) {
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool)
   ctx = metadata.NewIncomingContext(ctx, ht.headerMD)

可以看到,通過 http2 的 header 構造了我們的 IncomingContext

2,client 從 OutgoingContext 取數據的過程

客戶端的請求調用是從 call.go 的 Invoke 函數開始的

func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error 
        cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
         err := cs.SendMsg(req); err != nil
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption)
        return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
          op := func(a *csAttempt) error { return a.newStream() }
            s, err := a.t.NewStream(cs.ctx, cs.callHdr)

最終調用啦 a.t.NewStream

實現在 internal/transport/http2_client.go

func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error)
     headerFields, err := t.createHeaderFields(ctx, callHdr)
func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error)
     md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok

至此,完成了,數據的轉換。那麼問題來了,對於一個處於中游的 grpc 服務,每個請求,我都去先獲取 IncomingContext 然後設置 OutgoingContext 是不是很麻煩我們有沒有相關的簡單方案呢?答案是 middleware

3,客戶端 middleware

    在客戶端發起的請求連接的時候,我們可以在 options 裏面添加攔截器 unaryClientInterceptors

    conn, err := grpc.Dial(target, dialOptions...)
    dialOptions := append([]grpc.DialOption{
      grpc.WithUnaryInterceptor(grpcMiddleware.ChainUnaryClient(unaryClientInterceptors...)),

客戶端的攔截器有很多,比如:

clientinterceptors.UnaryTracingInterceptor,
clientinterceptors.DurationInterceptor,
clientinterceptors.PrometheusInterceptor,
clientinterceptors.BreakerInterceptor,
clientinterceptors.TimeoutInterceptor(cliOpts.Timeout),

一個常見的客戶端攔截器可以這麼寫,攔截器的入參有我們需要的一切:

func DurationInterceptor(ctx context.Context, method string, req, reply interface{},
  cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
   //do some thing
  err := invoker(ctx, method, req, reply, cc, opts...)

4,服務端 middleware

server = grpc.NewServer(dialOptions...)
    dialOptions := []grpc.ServerOption{
      grpc_middleware.WithUnaryServerChain(unaryServerInterceptors...),

常見的服務端攔截器長這樣:

func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
  o := evaluateOptions(opts)
  return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (_ interface{}, err error) {
  //do some thing
  }
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/O-6FdYn2l8hxOfGUUz9CHg