grpc Go Client 源碼分析

grpc client 代碼非常簡潔,分三步

1,獲取連接

2,初始化客戶端

3,發送請求

conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
defer conn.Close()
c := pb.NewGreeterClient(conn)
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})

首先看下發送請求

type GreeterClient interface {
  // Sends a greeting
  SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error)
}
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
  out := new(HelloReply)
  err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
  if err != nil {
    return nil, err
  }
  return out, nil
}

調用了 cc 的 Invoke 方法

type greeterClient struct {
  cc grpc.ClientConnInterface
}

ClientConnInterface 的定義如下

type ClientConnInterface interface {
  // Invoke performs a unary RPC and returns after the response is received
  // into reply.
  Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error
  // NewStream begins a streaming RPC.
  NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
}

接口包含了兩個方法 Invoke 和 NewStream 定義在 clientconn.go 中

接着看下 client 初始化的代碼

func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient {
  return &greeterClient{cc}
}

僅僅把 connet interface 傳給了 client

最後看下獲取連接的實現

func Dial(target string, opts ...DialOption) (*ClientConn, error) {
  return DialContext(context.Background(), target, opts...)
}

clientconn.go 的 Dial 方法返回了 ClientConn 指針

func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
  cc := &ClientConn{
    target:            target,
    csMgr:             &connectivityStateManager{},
    conns:             make(map[*addrConn]struct{}),
    dopts:             defaultDialOptions(),
    blockingpicker:    newPickerWrapper(),
    czData:            new(channelzData),
    firstResolveEvent: grpcsync.NewEvent(),
  }
  ....
cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)
resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
  ....
  cc.balancerBuildOpts = balancer.BuildOptions{
    DialCreds:        credsClone,
    CredsBundle:      cc.dopts.copts.CredsBundle,
    Dialer:           cc.dopts.copts.Dialer,
    CustomUserAgent:  cc.dopts.copts.UserAgent,
    ChannelzParentID: cc.channelzID,
    Target:           cc.parsedTarget,
  }
  .....
rWrapper, err := newCCResolverWrapper(cc, resolverBuilder) 
}

其中 ClientConn 的結構體定義如下

type ClientConn struct {
  ctx    context.Context
  cancel context.CancelFunc
  target       string
  parsedTarget resolver.Target
  authority    string
  dopts        dialOptions
  csMgr        *connectivityStateManager
  balancerBuildOpts balancer.BuildOptions
  blockingpicker    *pickerWrapper
  safeConfigSelector iresolver.SafeConfigSelector
  mu              sync.RWMutex
  resolverWrapper *ccResolverWrapper
  sc              *ServiceConfig
  conns           map[*addrConn]struct{}
  // Keepalive parameter can be updated if a GoAway is received.
  mkp             keepalive.ClientParameters
  curBalancerName string
  balancerWrapper *ccBalancerWrapper
  retryThrottler  atomic.Value
  firstResolveEvent *grpcsync.Event
  channelzID int64 // channelz unique identification number
  czData     *channelzData
  lceMu               sync.Mutex // protects lastConnectionError
  lastConnectionError error
}

可以看出 Dial 僅僅做了 connection 的初始化

call.go 裏定義了 Invoke 方法

func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
  // allow interceptor to see all applicable call options, which means those
  // configured as defaults from dial option as well as per-call options
  opts = combine(cc.dopts.callOptions, opts)
  if cc.dopts.unaryInt != nil {
    return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
  }
  return invoke(ctx, method, args, reply, cc, opts...)
}
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
  cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
  if err != nil {
    return err
  }
  if err := cs.SendMsg(req); err != nil {
    return err
  }
  return cs.RecvMsg(reply)
}

裏面分了三步,建立連接,發送請求,獲取結果

newClientStream 函數定義在 stream.go 文件裏

func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
  rpcConfig, err := cc.safeConfigSelector.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: method})
    callHdr := &transport.CallHdr{
    Host:           cc.authority,
    Method:         method,
    ContentSubtype: c.contentSubtype,
  }
  cs := &clientStream{
    callHdr:      callHdr,
    ctx:          ctx,
    methodConfig: &mc,
    opts:         opts,
    callInfo:     c,
    cc:           cc,
    desc:         desc,
    codec:        c.codec,
    cp:           cp,
    comp:         comp,
    cancel:       cancel,
    beginTime:    beginTime,
    firstAttempt: true,
    onCommit:     onCommit,
  }
  if err := cs.newAttemptLocked(sh, trInfo); err != nil {
    cs.finish(err)
    return nil, err
  }
}
func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (retErr error) {
  newAttempt := &csAttempt{
    cs:           cs,
    dc:           cs.cc.dopts.dc,
    statsHandler: sh,
    trInfo:       trInfo,
  }
  t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)
 }

getTransport 定義在 clientconn.go 中

func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
  t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
    Ctx:            ctx,
    FullMethodName: method,
  })
  if err != nil {
    return nil, nil, toRPCErr(err)
  }
  return t, done, nil
}

pick 函數定義在 picker_warper.go

func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {
for{
pickResult, err := p.Pick(info)
if t, ok := acw.getAddrConn().getReadyTransport(); ok {}
}
}

在 stream.go 文件裏定義了 ClientStream 的接口

type ClientStream interface {
// Header returns the header metadata received from the server if there
  // is any. It blocks if the metadata is not ready to read.
  Header() (metadata.MD, error)
  // Trailer returns the trailer metadata from the server, if there is any.
  // It must only be called after stream.CloseAndRecv has returned, or
  // stream.Recv has returned a non-nil error (including io.EOF).
  Trailer() metadata.MD
  // CloseSend closes the send direction of the stream. It closes the stream
  // when non-nil error is met. It is also not safe to call CloseSend
  // concurrently with SendMsg.
  CloseSend() error
  // Context returns the context for this stream.
  //
  // It should not be called until after Header or RecvMsg has returned. Once
  // called, subsequent client-side retries are disabled.
  Context() context.Context
  // SendMsg is generally called by generated code. On error, SendMsg aborts
  // the stream. If the error was generated by the client, the status is
  // returned directly; otherwise, io.EOF is returned and the status of
  // the stream may be discovered using RecvMsg.
  //
  // SendMsg blocks until:
  //   - There is sufficient flow control to schedule m with the transport, or
  //   - The stream is done, or
  //   - The stream breaks.
  //
  // SendMsg does not wait until the message is received by the server. An
  // untimely stream closure may result in lost messages. To ensure delivery,
  // users should ensure the RPC completed successfully using RecvMsg.
  //
  // It is safe to have a goroutine calling SendMsg and another goroutine
  // calling RecvMsg on the same stream at the same time, but it is not safe
  // to call SendMsg on the same stream in different goroutines. It is also
  // not safe to call CloseSend concurrently with SendMsg.
  SendMsg(m interface{}) error
  // RecvMsg blocks until it receives a message into m or the stream is
  // done. It returns io.EOF when the stream completes successfully. On
  // any other error, the stream is aborted and the error contains the RPC
  // status.
  //
  // It is safe to have a goroutine calling SendMsg and another goroutine
  // calling RecvMsg on the same stream at the same time, but it is not
  // safe to call RecvMsg on the same stream in different goroutines.
  RecvMsg(m interface{}) error
}

clientstream 實現了上述接口

type clientStream struct {
  callHdr  *transport.CallHdr
  opts     []CallOption
  callInfo *callInfo
  cc       *ClientConn
  desc     *StreamDesc
  codec baseCodec
  cp    Compressor
  comp  encoding.Compressor
  cancel context.CancelFunc // cancels all attempts
  sentLast  bool // sent an end stream
  beginTime time.Time
  methodConfig *MethodConfig
  ctx context.Context // the application's context, wrapped by stats/tracing
  retryThrottler *retryThrottler // The throttler active when the RPC began.
  binlog *binarylog.MethodLogger // Binary logger, can be nil.
  // serverHeaderBinlogged is a boolean for whether server header has been
  // logged. Server header will be logged when the first time one of those
  // happens: stream.Header(), stream.Recv().
  //
  // It's only read and used by Recv() and Header(), so it doesn't need to be
  // synchronized.
  serverHeaderBinlogged bool
  mu                      sync.Mutex
  firstAttempt            bool // if true, transparent retry is valid
  numRetries              int  // exclusive of transparent retry attempt(s)
  numRetriesSincePushback int  // retries since pushback; to reset backoff
  finished                bool // TODO: replace with atomic cmpxchg or sync.Once?
  // attempt is the active client stream attempt.
  // The only place where it is written is the newAttemptLocked method and this method never writes nil.
  // So, attempt can be nil only inside newClientStream function when clientStream is first created.
  // One of the first things done after clientStream's creation, is to call newAttemptLocked which either
  // assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked,
  // then newClientStream calls finish on the clientStream and returns. So, finish method is the only
  // place where we need to check if the attempt is nil.
  attempt *csAttempt
  // TODO(hedging): hedging will have multiple attempts simultaneously.
  committed  bool // active attempt committed for retry?
  onCommit   func()
  buffer     []func(a *csAttempt) error // operations to replay on retry
  bufferSize int                        // current size of buffer
}

實現了 SendMsg 和 RecvMsg 兩個方法

func (cs *clientStream) SendMsg(m interface{}) (err error) {
hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
  op := func(a *csAttempt) error {
    err := a.sendMsg(m, hdr, payload, data)
    // nil out the message and uncomp when replaying; they are only needed for
    // stats which is disabled for subsequent attempts.
    m, data = nil, nil
    return err
  }
}
func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) {
  data, err = encode(codec, m)
  compData, err := compress(data, cp, comp)
  hdr, payload = msgHeader(data, compData)
  }

實現了數據的編碼壓縮

緊接着發送數據

func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
  if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
  }
  if a.statsHandler != nil {
    a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))
  }  
}

最後是接受消息

func (cs *clientStream) RecvMsg(m interface{}) error {
  err := cs.withRetry(func(a *csAttempt) error {
    return a.recvMsg(m, recvInfo)
  }, cs.commitAttemptLocked)
}
func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{
      Client:   true,
      RecvTime: time.Now(),
      Payload:  m,
      // TODO truncate large payload.
      Data:       payInfo.uncompressedBytes,
      WireLength: payInfo.wireLength + headerLen,
      Length:     len(payInfo.uncompressedBytes),
    })
err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)    
}
func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {
  d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
  if err != nil {
    return err
  }
  if err := c.Unmarshal(d, m); err != nil {
    return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
  }
  if payInfo != nil {
    payInfo.uncompressedBytes = d
  }
  return nil
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/SQ22u7l17mWl2nQlSqvGXg