基於 etcd 實現 grpc 服務註冊與發現

0 前言

幾周前和大家一起走讀了 grpc-go 客戶端的源碼鏈路,本篇則是想着重探討一下其中涉及到的 “服務發現” 以及 “負載均衡” 的相關內容. 本文會貼近於生產環境,使用到分佈式存儲組件 etcd 作爲 grpc 服務的註冊與發現模塊,並引用 roundRobin 輪詢算法作爲負載均衡策略.

1 背景

1.1 grpc 源碼

本系列探討的主題是由 google 研發的開源 rpc 框架 grpc-go.

對應的開源地址爲:https://github.com/grpc/grpc-go/ . 走讀的源碼版本爲 Release 1.54.0.

1.2 grpc 負載均衡

C-S 架構中負載均衡策略可以分爲兩大類——基於服務端實現負載均衡的模式以及基於客戶端實現負載均衡的模式.

grpc-go 中採用的是基於服務端實現負載均衡的模式. 在這種模式下,客戶端會首先取得服務端的節點(endpoint)列表,然後基於一定的負載均衡策略選擇到特定的 endpoint,然後直連發起請求.

1.3 etcd 服務註冊與發現

etcd 是一個分佈式 KV 存儲組件,協議層通過 raft 算法保證了服務的強一致性和高可用性,同時,etcd 還提供了針對於存儲數據的 watch 監聽回調功能,基於這一特性,etcd 很適合用於作爲配置管理中心或者服務註冊 / 發現模塊.

etcd 的開源地址爲 https://github.com/etcd-io/etcd

本文走讀的 etcd 源碼版本爲 v3.5.8.

在使用 etcd 作爲服務註冊 / 發現模塊時,同一個服務組在 etcd 中會以相同的服務名作爲共同的標識鍵前綴,與各服務節點的信息建立好映射關係,以實現所謂的 “服務註冊” 功能.

在客戶端使用 “服務發現” 功能時,則會在 etcd 中通過服務名取得對應的服務節點列表緩存在本地,然後在客戶端本地基於負載均衡策略選擇 endpoint 進行連接請求. 在這個過程中,客戶端還會利用到 etcd 的 watch 功能,在服務端節點發生變化時,及時感知到變更事件,然後對本地緩存的服務端節點列表進行更新.

1.4 etcd-grpc

etcd 是用 go 語言編寫的,和 grpc-go 具有很好的兼容性. 在 etcd 官方文檔中就給出了在 grpc-go 服務中使用 etcd 作爲服務註冊 / 發現模塊的示例,參考文檔見:https://etcd.io/docs/v3.5/dev-guide/grpc_naming/ .

官方文檔的使用示例在作爲本文源碼走讀的方法入口,下面開始.

2 服務端

2.1 啓動入口

首先給出,grpc-go 服務端啓動並通過 etcd 實現服務註冊的代碼示例.

package main


import (
    // 標準庫
    "context"
    "flag"
    "fmt"
    "net"
    "time"


    // grpc 樁代碼
    "github.com/grpc_demo/proto"


    // etcd
    eclient "go.etcd.io/etcd/client/v3"
    "go.etcd.io/etcd/client/v3/naming/endpoints"


    // grpc
    "google.golang.org/grpc"
)


const (
    // grpc 服務名
    MyService = "xiaoxu/demo"
    // etcd 端口
    MyEtcdURL = "http://localhost:2379"
)


type Server struct {
    proto.UnimplementedHelloServiceServer
}


func main() {
    // 接收命令行指定的 grpc 服務端口
    var port int
    flag.IntVar(&port, "port", 8080, "port")
    flag.Parse()
    addr := fmt.Sprintf("http://localhost:%d", port)


    // 創建 tcp 端口監聽器
    listener, _ := net.Listen("tcp", addr)
    
    // 創建 grpc server
    server := grpc.NewServer()
    proto.RegisterHelloServiceServer(server, &Server{})


    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    // 註冊 grpc 服務節點到 etcd 中
    go registerEndPointToEtcd(ctx, addr)
   
    // 啓動 grpc 服務
    if err := server.Serve(listener); err != nil {
        fmt.Println(err)
    }
}

2.2 服務註冊

registerEndPointToEtcd 方法給出了將 grpc 服務端節點註冊到 etcd 模塊的示例:

func registerEndPointToEtcd(ctx context.Context, addr string) {
    // 創建 etcd 客戶端
    etcdClient, _ := eclient.NewFromURL(MyEtcdURL)
    etcdManager, _ := endpoints.NewManager(etcdClient, MyService)


    // 創建一個租約,每隔 10s 需要向 etcd 彙報一次心跳,證明當前節點仍然存活
    var ttl int64 = 10
    lease, _ := etcdClient.Grant(ctx, ttl)
    
    // 添加註冊節點到 etcd 中,並且攜帶上租約 id
    _ = etcdManager.AddEndpoint(ctx, fmt.Sprintf("%s/%s", MyService, addr), endpoints.Endpoint{Addr: addr}, eclient.WithLease(lease.ID))


    // 每隔 5 s進行一次延續租約的動作
    for {
        select {
        case <-time.After(5 * time.Second):
            // 續約操作
            resp, _ := etcdClient.KeepAliveOnce(ctx, lease.ID)
            fmt.Printf("keep alive resp: %+v", resp)
        case <-ctx.Done():
            return
        }
    }
}

2.4 註冊節點

在 grpc 服務端註冊 endpoint 時,調用了方法鏈 endpointManager.AddEndpoint -> endpointManager.Update,將服務節點 endpoint 以共同的服務名作爲標識鍵 key 的前綴,添加到 kv 存儲介質當中.

由於 endpoint 的註冊信息關聯到了租約,因此倘若租約過期,endpoint 的註冊信息也隨之失效. 所以 endpoint 在運行過程中,需要持續向 etcd 發送心跳以進行租約的續期,背後的作用正是通過這種續約機制向 etcd 服務註冊模塊證明 endpoint 自身的仍然處於存活狀態.

func (m *endpointManager) AddEndpoint(ctx context.Context, key string, endpoint Endpoint, opts ...clientv3.OpOption) error {
    return m.Update(ctx, []*UpdateWithOpts{NewAddUpdateOpts(key, endpoint, opts...)})
}
func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts) (err error) {
    ops := make([]clientv3.Op, 0, len(updates))
    for _, update := range updates {
        if !strings.HasPrefix(update.Key, m.target+"/") {
            return status.Errorf(codes.InvalidArgument, "endpoints: endpoint key should be prefixed with '%s/' got: '%s'", m.target, update.Key)
        }


        switch update.Op {
        case Add:
            internalUpdate := &internal.Update{
                Op:       internal.Add,
                Addr:     update.Endpoint.Addr,
                Metadata: update.Endpoint.Metadata,
            }


            var v []byte
            if v, err = json.Marshal(internalUpdate); err != nil {
                return status.Error(codes.InvalidArgument, err.Error())
            }
            ops = append(ops, clientv3.OpPut(update.Key, string(v), update.Opts...))
        // ...
        }
    }
    _, err = m.client.KV.Txn(ctx).Then(ops...).Commit()
    return err
}

3 客戶端

首先曬一下前文——grpc-go 客戶端源碼走讀 展示過的客戶端架構圖,這些內容會爲本文的展開打下鋪墊.

3.1 啓動入口

下面給出 grpc-go 客戶端啓動的代碼示例,核心點的註釋已經給出.

package main

import (
    // 標準庫
    "context"
    "fmt"
    "time"

    // grpc 樁文件
    "github.com/grpc_demo/proto"

    // etcd
    eclient "go.etcd.io/etcd/client/v3"
    eresolver "go.etcd.io/etcd/client/v3/naming/resolver"


    // grpc
    "google.golang.org/grpc"
    "google.golang.org/grpc/balancer/roundrobin"
    "google.golang.org/grpc/credentials/insecure"
)

const MyService = "xiaoxu/demo"

func main() {
    // 創建 etcd 客戶端
    etcdClient, _ := eclient.NewFromURL("my_etcd_url")
    
    // 創建 etcd 實現的 grpc 服務註冊發現模塊 resolver
    etcdResolverBuilder, _ := eresolver.NewBuilder(etcdClient)
    
    // 拼接服務名稱,需要固定義 etcd:/// 作爲前綴
    etcdTarget := fmt.Sprintf("etcd:///%s", MyService)
    
    // 創建 grpc 連接代理
    conn, _ := grpc.Dial(
        // 服務名稱
        etcdTarget,
        // 注入 etcd resolver
        grpc.WithResolvers(etcdResolverBuilder),
        // 聲明使用的負載均衡策略爲 roundrobin     grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy""%s"}`, roundrobin.Name)),
        grpc.WithTransportCredentials(insecure.NewCredentials()),
    )
    defer conn.Close()

    // 創建 grpc 客戶端
    client := proto.NewHelloServiceClient(conn)
    for {
        // 發起 grpc 請求
        resp, _ := client.SayHello(context.Background()&proto.HelloReq{
            Name: "xiaoxuxiansheng",
        })
        fmt.Printf("resp: %+v", resp)
        // 每隔 1s 發起一輪請求
        <-time.After(time.Second)
    }
}

3.2 注入 etcd resolver

在 grpc 客戶端啓動時,首先會獲取到 etcd 中提供的 grpc 服務發現構造器 resolverBuilder,然後在調用 grpc.Dial 方法創建連接代理 ClientConn 時,將其注入其中.

func main() {
    // ...
    // 創建 etcd 實現的 grpc 服務註冊發現模塊 resolver
    etcdResolverBuilder, _ := eresolver.NewBuilder(etcdClient)
    
    // ...
    // 創建 grpc 連接代理
    conn, _ := grpc.Dial(
        // ...
        // 注入 etcd resolver
        grpc.WithResolvers(etcdResolverBuilder),
        // ...
    )
    // ...
}
func WithResolvers(rs ...resolver.Builder) DialOption {
    return newFuncDialOption(func(o *dialOptions) {
        o.resolvers = append(o.resolvers, rs...)
    })
}

etcd 實現的 resolverBuilder 源碼如下,其中內置了一個 etcd 客戶端用於獲取 endpoint 註冊信息. etcdResolverBuilder 的 schema 是 ”etcd“,因此後續在通過 etcd 作爲服務發現模塊時,使用的服務名標識鍵需要以 etcd 作爲前綴.

type builder struct {
    c *clientv3.Client
}


func (b builder) Scheme() string {
    return "etcd"
}


// NewBuilder creates a resolver builder.
func NewBuilder(client *clientv3.Client) (gresolver.Builder, error) {
    return builder{c: client}, nil
}

3.3 啓動 grpc balancerWrapper

在 grpc-go 客戶端啓動時,會調用方法鏈 DialContext -> newCCBalancerWrapper -> go ccBalancerWrapper.watcher,啓動一個 balancerWrapper 的守護協程,持續監聽 ClientConn 更新、balancer 更新等事件並進行處理.

func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
    // ...
    cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
        DialCreds:        credsClone,
        CredsBundle:      cc.dopts.copts.CredsBundle,
        Dialer:           cc.dopts.copts.Dialer,
        Authority:        cc.authority,
        CustomUserAgent:  cc.dopts.copts.UserAgent,
        ChannelzParentID: cc.channelzID,
        Target:           cc.parsedTarget,
    })


    // ...
    return cc, nil
}
func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper {
    ccb := &ccBalancerWrapper{
        cc:       cc,
        updateCh: buffer.NewUnbounded(),
        resultCh: buffer.NewUnbounded(),
        closed:   grpcsync.NewEvent(),
        done:     grpcsync.NewEvent(),
    }
    go ccb.watcher()
    ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts)
    return ccb
}

func (ccb *ccBalancerWrapper) watcher() {
    for {
        select {
        case u := <-ccb.updateCh.Get():
            // ...
            switch update := u.(type) {
            case *ccStateUpdate:
                ccb.handleClientConnStateChange(update.ccs)
            case *switchToUpdate:
                ccb.handleSwitchTo(update.name)                
            // ...
            }
        case <-ccb.closed.Done():
        }
        // ...
    }
}

3.4 獲取 etcd resolver builder

在 grpc-go 客戶端啓動時,還有一條方法鏈路是 DialContext -> ClientConn.parseTargetAndFindResolver -> ClientConn.getResolver,通過 target 的 schema(etcd),讀取此前通過 option 注入的 etcd resolverBuilder.

func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
    // ...
    // Determine the resolver to use.
    resolverBuilder, err := cc.parseTargetAndFindResolver()  
    // ...
    rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
    // ...
}
func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) {
    // ...
    var rb resolver.Builder
    // ...
    rb = cc.getResolver(parsedTarget.URL.Scheme)
    // ...
    return rb, nil
    // ...
}
func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
    for _, rb := range cc.dopts.resolvers {
        if scheme == rb.Scheme() {
            return rb
        }
    }
    return resolver.Get(scheme)
}

3.5 創建並啓動 etcd resolver

在取得 etcd resolver builder 後,會在 newCCResolverWrapper 方法中,執行 builder.Build 方法進行 etcd resolver 的創建.

func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
    // ...
    // Build the resolver.
    rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
    // ...


    return cc, nil
}
func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
    // ...
    ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
    // ...
}

 被構建出來的 etcd resolver 定義如下:

type resolver struct {
    // etcd 客戶端
    c      *clientv3.Client
    target string
    // grpc 連接代理
    cc     gresolver.ClientConn
    // 持續監聽的 etcd chan,能夠獲取到服務端 endpoint 的變更事件
    wch    endpoints.WatchChannel
    ctx    context.Context
    cancel context.CancelFunc
    wg     sync.WaitGroup
}

在 etcd resolver builder 構建 resolver 的過程中,會獲取到一個來自 etcd 客戶端的 channel,用於持續監聽 endpoint 的變更事件,以維護更新客戶端緩存的 endpoint 列表.

構建出 resolver 後,會調用 go resolver.watch 方法開啓一個守護協程,持續監聽 channel.

func (b builder) Build(target gresolver.Target, cc gresolver.ClientConn, opts gresolver.BuildOptions) (gresolver.Resolver, error) {
    r := &resolver{
        c:      b.c,
        target: target.Endpoint,
        cc:     cc,
    }
    r.ctx, r.cancel = context.WithCancel(context.Background())


    // 創建 etcd endpoint 管理服務實例
    em, err := endpoints.NewManager(r.c, r.target)
    // 獲取 endpoint 變更事件的監聽 channel
    r.wch, err = em.NewWatchChannel(r.ctx)
    
    // ...
    r.wg.Add(1)
    // 開啓對 endpoint 變更事件的監聽
    go r.watch()
    return r, nil
}

在守護協程 watcher 中,每當感知到 endpoint 的變更,則會此時全量的 endpoints 作爲入參,通過調用 ccResolverWrapper.UpdateState 方法對 grpc 連接代理 ClientConn 進行更新,保證 grpc-go 客戶端維護到最新的 endpoint 實時數據.

func (r *resolver) watch() {
    defer r.wg.Done()


    allUps := make(map[string]*endpoints.Update)
    for {
        select {
        case <-r.ctx.Done():
            return
        // 監聽到 grpc 服務端 endpoint 變更事件
        case ups, ok := <-r.wch:
            // ...
            // 處理監聽事件
            for _, up := range ups {
                switch up.Op {
                case endpoints.Add:
                    allUps[up.Key] = up
                case endpoints.Delete:
                    delete(allUps, up.Key)
                }
            }
  
            addrs := convertToGRPCAddress(allUps)
            // 監聽到 endpoint 變更時,需要將其更新到 grpc 客戶端本地維護的 subConns 列表當中
            r.cc.UpdateState(gresolver.State{Addresses: addrs})
        }
    }
}

3.6 接收 endpoint 更新事件

在 etcd resolver 的守護協程接收到 endpoint 變更事件後,會經歷 ccResolverWrapper.UpdateState -> ClientConn.updateResolverState 方法鏈路的調用,其中會執行兩項任務:

func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
    // ...
    if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
    // ...
}
func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
    // ...
    if s.ServiceConfig == nil {
        cc.maybeApplyDefaultServiceConfig(s.Addresses)
    }
    // ...
    uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
    // ...
    return ret
}

3.7 啓用 roundrobin balancer

首先聊聊 grpc 客戶端啓用負載均衡器 round-robin 的鏈路.

經由 ClientConn.maybeApplyDefaultServiceConfig -> ClientConn.applyServiceConfigAndBalancer -> balancerWrapper.switchTo 的方法鏈路,會通過 grpc 客戶端啓動時注入的 defaultServiceConfig 中獲取本次使用的負載均衡策略名 "round_robin",接下來會調用 ccBalancerWrapper.switchTo 方法,將當前使用的負載均衡器切換成 round-robin 類型.

func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
    // ...
    if cc.dopts.defaultServiceConfig != nil {
        cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig}, addrs)
    } 
    // ...
}
func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {
    // ... 
    cc.sc = sc
    if configSelector != nil {
        cc.safeConfigSelector.UpdateConfigSelector(configSelector)
    }




    // ...
 
    // 讀取配置,設定 newBalancer 爲 defaultServiceConfig 中傳入的 roundrobin
    var newBalancerName string
    // ...
    if cc.sc != nil && cc.sc.LB != nil {
        newBalancerName = *cc.sc.LB
    } 
    cc.balancerWrapper.switchTo(newBalancerName)
}

在 ccBalancerWrapper 守護協程 watcher 接收到 switchToUpdate 類型的變更事件後,會順沿 ccBalancerWrapper.handleSwtichTo -> Balancer.SwitchTo -> baseBuilder.Build 的方法鏈路,最終真正構造出 round-robin 類型的負載均衡器,此時 baseBalancer 中內置的關鍵字段 pickerBuilder 爲 rrPickerBuilder(rr 爲 round-robin 的簡寫).

func (ccb *ccBalancerWrapper) handleSwitchTo(name string) {
    // ...
    // 從全局 balancer map 中獲取 roundrobin 對應的 balancerBuilder
    builder := balancer.Get(name)
    // ...


    if err := ccb.balancer.SwitchTo(builder); err != nil {
        // ...
        return
    }
    ccb.curBalancerName = builder.Name()
}
func (gsb *Balancer) SwitchTo(builder balancer.Builder) error {
    // ...
    bw := &balancerWrapper{
        gsb: gsb,
        lastState: balancer.State{
            ConnectivityState: connectivity.Connecting,
            Picker:            base.NewErrPicker(balancer.ErrNoSubConnAvailable),
        },
        subconns: make(map[balancer.SubConn]bool),
    }
    
    // ...
    // 創建 roundrobin balancer
    newBalancer := builder.Build(bw, gsb.bOpts)
    // ...
    bw.Balancer = newBalancer
    return nil
}
func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
    bal := &baseBalancer{
        cc:            cc,
        pickerBuilder: bb.pickerBuilder,


        subConns: resolver.NewAddressMap(),
        scStates: make(map[balancer.SubConn]connectivity.State),
        csEvltr:  &balancer.ConnectivityStateEvaluator{},
        config:   bb.config,
        state:    connectivity.Connecting,
    }
    // ...
    bal.picker = NewErrPicker(balancer.ErrNoSubConnAvailable)
    return bal
}

3.8 更新 endpoint

接下來需要對 grpc 客戶端負載均衡器 balancer 中的 endpoint 信息進行更新.

方法鏈路爲 ccBalancerWrapper -> handleClientConnStateChange -> Balancer.UpdateClientConnState -> baseBalancer.UpdateClientConnState

其中,會獲取到實時的全量 endpoints 數據,然後調用 baseBalancer.regeneratePicker 方法進行 rrPicker 的重鑄,並且將最新的數據注入其中.

func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
    ccb.updateCh.Put(&ccStateUpdate{ccs: ccs})
    // ...
}
func (ccb *ccBalancerWrapper) handleClientConnStateChange(ccs *balancer.ClientConnState) {
    if ccb.curBalancerName != grpclbName {
        // Filter any grpclb addresses since we don't have the grpclb balancer.
        var addrs []resolver.Address
        for _, addr := range ccs.ResolverState.Addresses {
            if addr.Type == resolver.GRPCLB {
                continue
            }
            addrs = append(addrs, addr)
        }
        ccs.ResolverState.Addresses = addrs
    }
    ccb.resultCh.Put(ccb.balancer.UpdateClientConnState(*ccs))
}
func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {
    // ...
    balToUpdate := gsb.latestBalancer()
    // ...
    return balToUpdate.UpdateClientConnState(state)
}
func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
    // ...
    addrsSet := resolver.NewAddressMap()
    // 更新服務對應的 endpoint 信息,存放到 baseBalancer.subConns 當中
    for _, a := range s.ResolverState.Addresses {
        addrsSet.Set(a, nil)
        if _, ok := b.subConns.Get(a); !ok {
            // a is a new address (not existing in b.subConns).
            sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
            // ...
            b.subConns.Set(a, sc)
            b.scStates[sc] = connectivity.Idle
            // ...
            sc.Connect()
        }
    }
    for _, a := range b.subConns.Keys() {
        sci, _ := b.subConns.Get(a)
        sc := sci.(balancer.SubConn)
        // a was removed by resolver.
        if _, ok := addrsSet.Get(a); !ok {
            b.cc.RemoveSubConn(sc)
            b.subConns.Delete(a)
        }
    }
    // ...
    // 將 baseBalancer.subConns 注入到 picker 當中
    b.regeneratePicker()
    b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
    return nil
}
func (b *baseBalancer) regeneratePicker() {
    // ...
    readySCs := make(map[balancer.SubConn]SubConnInfo)


    // Filter out all ready SCs from full subConn map.
    for _, addr := range b.subConns.Keys() {
        sci, _ := b.subConns.Get(addr)
        sc := sci.(balancer.SubConn)
        if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
            readySCs[sc] = SubConnInfo{Address: addr}
        }
    }
    
    // 基於當前最新的 endpoint 信息構建 picker
    b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})
}
func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
    // ...
    scs := make([]balancer.SubConn, 0, len(info.ReadySCs))
    for sc := range info.ReadySCs {
        scs = append(scs, sc)
    }
    return &rrPicker{
        subConns: scs,
        // ...
        next: uint32(grpcrand.Intn(len(scs))),
    }
}

3.9 grpc 客戶端請求

在 grpc 客戶端實際發起請求時,會順延 invoke -> newClientStream -> newClientStreamWithParams -> csAttemp.getTransport -> ClientConn.getTransport 的方法鏈路進行調用,接着調用 pickerWrapper.pick 方法,獲取到其中內置的 rrPicker(round-robin)連接選擇器,調用其 Pick 方法進行服務端節點的選擇.

func main() {
    // ...


    client := proto.NewHelloServiceClient(conn)
    for {
        resp, _ := client.SayHello(context.Background()&proto.HelloReq{
            Name: "xiaoxuxiansheng",
        })
        fmt.Printf("resp: %+v", resp)
        // 每隔 1s 發起一輪請求
        <-time.After(time.Second)
    }
}
func (c *helloServiceClient) SayHello(ctx context.Context, in *HelloReq, opts ...grpc.CallOption) (*HelloResp, error) {
    out := new(HelloResp)
    err := c.cc.Invoke(ctx, "/pb.HelloService/SayHello", in, out, opts...)
    // ...
    return out, nil
}
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
    // ...
    return invoke(ctx, method, args, reply, cc, opts...)
}
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
    cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
    // ...
    if err := cs.SendMsg(req); err != nil {
        return err
    }
    return cs.RecvMsg(reply)
}
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
    // ...
    var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
        return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
    }
    // ...
    return newStream(ctx, func() {})
}
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
    // ...
    op := func(a *csAttempt) error {
        if err := a.getTransport(); err != nil {
            return err
        }
        if err := a.newStream(); err != nil {
            return err
        }
        // ...
        cs.attempt = a
        return nil
    }
    if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
        return nil, err
    }
    // ...
    return cs, nil
}
func (a *csAttempt) getTransport() error {
    cs := a.cs

    var err error
    a.t, a.pickResult, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method)
    // ...
}
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, balancer.PickResult, error) {
    return cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
        Ctx:            ctx,
        FullMethodName: method,
    })
}
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, balancer.PickResult, error) {
    var ch chan struct{}


    var lastPickErr error
    for {
        // ...

        ch = pw.blockingCh
        p := pw.picker
        pw.mu.Unlock()

        pickResult, err := p.Pick(info)
        // ...

        acw, ok := pickResult.SubConn.(*acBalancerWrapper)
        // ...
        if t := acw.getAddrConn().getReadyTransport(); t != nil {
            // ...
            return t, pickResult, nil
        }
        // ...
 }

3.10 roundrobin 負載均衡

grpc-go 中,對 round-robin picker 的實現源碼如下,其中包含了兩個核心字段:

// ...
type rrPicker struct {
    // ...
    // subconn 列表
    subConns []balancer.SubConn
    // 最後一次獲取 subconn 時對應的 index
    next     uint32
}

每次調用 rrPicker.Pick 方法,會對 next 的數值進行加一,然後取 next 對 endpoints 連接列表 subConnss 取模,獲取到對應的一筆 subConn 進行返回,以達到負載均衡的效果.

func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
    subConnsLen := uint32(len(p.subConns))
    // 更新 next
    nextIndex := atomic.AddUint32(&p.next, 1)

    // 輪流依次取 subconn
    sc := p.subConns[nextIndex%subConnsLen]
    return balancer.PickResult{SubConn: sc}, nil
}

4 總結

本文以 etcd 作爲 grpc 的服務註冊 / 發現模塊,round-robin 作爲負載均衡策略,以 grpc 客戶端的運行鏈路爲主線,進行了原理分析和源碼走讀:

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/x-vC1gz7-x6ELjU-VYOTmA