golang 源碼分析:nacos 服務發現

        https://github.com/alibaba/nacos 是阿里開源的服務發現和配置同步組件,上手非常容易,我們介紹下如何部署,然後看下 nacos 提供的 golang sdk:https://github.com/nacos-group/nacos-sdk-go 如何使用,分析下具體的源碼實現。

docker run --name nacos-quick -e MODE=standalone -p 8848:8848 -p 9848:9848 -d nacos/nacos-server:2.0.2
Unable to find image 'nacos/nacos-server:2.0.2' locally
2.0.2: Pulling from nacos/nacos-server
9a03b1668b6d: Pull complete 
Digest: sha256:ac66d2fbc1ba432beff88beb165e5012686863d72a5e0f25da06e23c5e7b329d
Status: Downloaded newer image for nacos/nacos-server:2.0.2
db9558d41223b12bd58f2c120ead7d506a50bd40327a3fc6518178b27e50dd99

在 nacos 1.X 的版本中使用 http 方式來做服務註冊和發現,配置主端口 (默認 8848);在 2.0 版本支持了 grpc 服務發現:9848 是客戶端 gRPC 請求服務端端口,用於客戶端向服務端發起連接和請求 9849 是服務端 gRPC 請求服務端端口,用於服務間同步等。

        我們實現一個服務註冊

curl -X POST 'http://127.0.0.1:8848/nacos/v1/ns/instance?serviceName=nacos.naming.serviceName&ip=20.18.7.10&port=8080'
ok

    拉取註冊結果

curl -X GET 'http://127.0.0.1:8848/nacos/v1/ns/instance/list?serviceName=nacos.naming.serviceName'
{"name":"DEFAULT_GROUP@@nacos.naming.serviceName","groupName":"DEFAULT_GROUP","clusters":"","cacheMillis":10000,"hosts":[],"lastRefTime":1667400684240,"checksum":"","allIPs":false,"reachProtectionThreshold":false,"valid":true}
{"name":"DEFAULT_GROUP@@nacos.naming.serviceName","groupName":"DEFAULT_GROUP","clusters":"","cacheMillis":10000,"hosts":[{"instanceId":"20.18.7.10#8080#DEFAULT#DEFAULT_GROUP@@nacos.naming.serviceName","ip":"20.18.7.10","port":8080,"weight":1.0,"healthy":true,"enabled":true,"ephemeral":true,"clusterName":"DEFAULT","serviceName":"DEFAULT_GROUP@@nacos.naming.serviceName","metadata":{},"instanceHeartBeatInterval":5000,"instanceIdGenerator":"simple","ipDeleteTimeout":30000,"instanceHeartBeatTimeOut":15000}],"lastRefTime":1667400719947,"checksum":"","allIPs":false,"reachProtectionThreshold":false,"valid":true}

同樣我們也可以使用 nacos 的配置中心功能,發佈配置

curl -X POST "http://127.0.0.1:8848/nacos/v1/cs/configs?dataId=nacos.cfg.dataId&group=test&content=helloWorld"
true

獲取配置

curl -X GET "http://127.0.0.1:8848/nacos/v1/cs/configs?dataId=nacos.cfg.dataId&group=test"
helloWorld

其實 golang 的 sdk 就是基於上述 api 做的封裝來實現服務註冊與發現的。

我們定義一個服務

syntax = "proto3";
import "google/protobuf/empty.proto";
package grpcnacos;
option go_package = ".;grpcnacos";
service Test{
  rpc Test(google.protobuf.Empty) returns( TestResponse) {};
}
message TestResponse{
  string msg = 1;
}

    生成對應的 golang 代碼

mkdir -p ../pkg/protocol/grpcnacos
protoc --go_out=../pkg/protocol/grpcnacos --go_opt=paths=source_relative     --go-grpc_out=../pkg/protocol/grpcnacos  --go-grpc_opt=paths=source_relative grpcnacos.proto

定義 grpc 服務的實現邏輯

package service
import (
  "context"
  "learn/learn/Nacos/pkg/protocol/grpcnacos"
  "log"
  emptypb "google.golang.org/protobuf/types/known/emptypb"
)
type Service struct {
  grpcnacos.UnimplementedTestServer
}
func (s Service) Test(ctx context.Context, empty *emptypb.Empty) (*grpcnacos.TestResponse, error) {
  log.Println("收到一個請求")
  return &grpcnacos.TestResponse{Msg: "test"}, nil
}

註冊我們的服務

package main
import (
  "fmt"
  "learn/learn/Nacos/exp1/service"
  "learn/learn/Nacos/pkg/protocol/grpcnacos"
  "log"
  "net"
  "github.com/nacos-group/nacos-sdk-go/clients"
  "github.com/nacos-group/nacos-sdk-go/common/constant"
  "github.com/nacos-group/nacos-sdk-go/vo"
  "google.golang.org/grpc"
)
func main() {
  server := grpc.NewServer()
  service := service.Service{}
  grpcnacos.RegisterTestServer(server, service)
  port := GenFreePort()
  listen, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
  if err != nil {
    log.Fatalf("監聽端口:%d失敗: %s", port, err.Error())
  }
  // 創建serverConfig
  // 支持多個;至少一個ServerConfig
  serverConfig := []constant.ServerConfig{
    {
      IpAddr: "127.0.0.1",
      Port:   8848,
    },
  }
  // 創建clientConfig
  clientConfig := constant.ClientConfig{
    NamespaceId:         "", // 如果需要支持多namespace,我們可以場景多個client,它們有不同的NamespaceId。當namespace是public時,此處填空字符串。
    TimeoutMs:           50000,
    NotLoadCacheAtStart: true,
    LogLevel:            "debug",
  }
  // 創建服務發現客戶端的另一種方式 (推薦)
  namingClient, err := clients.NewNamingClient(
    vo.NacosClientParam{
      ClientConfig:  &clientConfig,
      ServerConfigs: serverConfig,
    },
  )
  if err != nil {
    log.Fatalf("初始化nacos失敗: %s", err.Error())
  }
  success, err := namingClient.RegisterInstance(vo.RegisterInstanceParam{
    Ip:          "127.0.0.1",
    Port:        uint64(port),
    ServiceName: "test-server",
    Weight:      10,
    Enable:      true,
    Healthy:     true,
    Ephemeral:   true,
    Metadata:    map[string]string{"name": "test"},
    ClusterName: "DEFAULT",       // 默認值DEFAULT
    GroupName:   "DEFAULT_GROUP", // 默認值DEFAULT_GROUP
  })
  if err != nil {
    log.Fatalf("註冊服務失敗: %s", err.Error())
  }
  log.Println("success: ", success)
  log.Printf("服務啓動成功;PORT:%d\n", port)
  _ = server.Serve(listen)
}
// GenFreePort 獲取一個空閒的端口;端口避免寫死,因爲要啓動多個實例,測試負載均衡
func GenFreePort() int {
  addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
  if err != nil {
    panic(err)
  }
  listen, err := net.ListenTCP("tcp", addr)
  if err != nil {
    panic(err)
  }
  defer listen.Close()
  return listen.Addr().(*net.TCPAddr).Port
}

通過名字獲取服務的實力,請求獲取結果

package main
import (
  "context"
  "fmt"
  "learn/learn/Nacos/pkg/protocol/grpcnacos"
  "log"
  "github.com/nacos-group/nacos-sdk-go/clients"
  "github.com/nacos-group/nacos-sdk-go/common/constant"
  "github.com/nacos-group/nacos-sdk-go/vo"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/protobuf/types/known/emptypb"
)
func main() {
  // 創建serverConfig
  // 支持多個;至少一個ServerConfig
  serverConfig := []constant.ServerConfig{
    {
      IpAddr: "127.0.0.1",
      Port:   8848,
    },
  }
  // 創建clientConfig
  clientConfig := constant.ClientConfig{
    // 如果需要支持多namespace,我們可以場景多個client,它們有不同的NamespaceId。當namespace是public時,此處填空字符串。
    NamespaceId:         "",
    TimeoutMs:           5000,
    NotLoadCacheAtStart: true,
    LogLevel:            "debug",
  }
  // 創建服務發現客戶端的另一種方式 (推薦)
  namingClient, err := clients.NewNamingClient(
    vo.NacosClientParam{
      ClientConfig:  &clientConfig,
      ServerConfigs: serverConfig,
    },
  )
  if err != nil {
    log.Fatalf("初始化nacos失敗: %s", err.Error())
  }
  // SelectOneHealthyInstance將會按加權隨機輪詢的負載均衡策略返回一個健康的實例
  // 實例必須滿足的條件:health=true,enable=true and weight>0
  instance, err := namingClient.SelectOneHealthyInstance(vo.SelectOneHealthInstanceParam{
    ServiceName: "test-server",
    GroupName:   "DEFAULT_GROUP",     // 默認值DEFAULT_GROUP
    Clusters:    []string{"DEFAULT"}, // 默認值DEFAULT
  })
  log.Printf("獲取到的實例IP:%s;端口:%d", instance.Ip, instance.Port)
  conn, err := grpc.Dial(fmt.Sprintf("%s:%d", instance.Ip, instance.Port), grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    log.Fatalf("監聽%s:%d失敗:%s", instance.Ip, instance.Port, err.Error())
  }
  client := grpcnacos.NewTestClient(conn)
  res, err := client.Test(context.Background(), &emptypb.Empty{})
  if err != nil {
    log.Fatalf("調用TestClient失敗: %s", err.Error())
  }
  log.Println(res.Msg)
}

至此我們完成了簡單的服務註冊和服務發現功能。測試下

% go run learn/Nacos/exp1/server/main.go
2022/11/04 00:04:38 success:  true
2022/11/04 00:04:38 服務啓動成功;PORT:56358
2022/11/04 00:04:51 收到一個請求
 % go run learn/Nacos/exp1/client/main.go
2022/11/04 00:04:51 獲取到的實例IP:127.0.0.1;端口:56358
2022/11/04 00:04:51 test

我們可以在頁面上看下我們服務的註冊情況 http://127.0.0.1:8848/nacos/#/login 用戶名密碼都是 nacos

可以看到,不論是服務端註冊還是客戶端拉取,我們首先都需要初始化 namingService 的客戶端,它需要兩組參數

  namingClient, err := clients.NewNamingClient(
    vo.NacosClientParam{
      ClientConfig:  &clientConfig,
      ServerConfigs: serverConfig,
    },
  )

其中 clientConfig 配置了客戶端,也就是我們的應用允許的超時時間等配置,serverConfigs 是一組服務端的地址和端口後,也就是我們的 nacos 服務的地址,可以配置多個實例實現多活。

        對於 server 端來說是通過 RegisterInstance 來實現服務的註冊的

  success, err := namingClient.RegisterInstance(vo.RegisterInstanceParam{
    Ip:          "127.0.0.1",
    Port:        uint64(port),
    ServiceName: "test-server",
    Weight:      10,
    Enable:      true,
    Healthy:     true,
    Ephemeral:   true,
    Metadata:    map[string]string{"name": "test"},
    ClusterName: "DEFAULT",       // 默認值DEFAULT
    GroupName:   "DEFAULT_GROUP", // 默認值DEFAULT_GROUP
  })

     客戶端是通過 SelectOneHealthyInstance 將會按加權隨機輪詢的負載均衡策略返回一個健康的實例

  // 實例必須滿足的條件:health=true,enable=true and weight>0
  instance, err := namingClient.SelectOneHealthyInstance(vo.SelectOneHealthInstanceParam{
    ServiceName: "test-server",
    GroupName:   "DEFAULT_GROUP",     // 默認值DEFAULT_GROUP
    Clusters:    []string{"DEFAULT"}, // 默認值DEFAULT
  })
  log.Printf("獲取到的實例IP:%s;端口:%d", instance.Ip, instance.Port)

           使用起來很簡單方便又沒有。下面分析下源碼實現,註冊參數定義如下

type RegisterInstanceParam struct {
  Ip          string            `param:"ip"`          //required
  Port        uint64            `param:"port"`        //required
  Weight      float64           `param:"weight"`      //required,it must be lager than 0
  Enable      bool              `param:"enabled"`     //required,the instance can be access or not
  Healthy     bool              `param:"healthy"`     //required,the instance is health or not
  Metadata    map[string]string `param:"metadata"`    //optional
  ClusterName string            `param:"clusterName"` //optional,default:DEFAULT
  ServiceName string            `param:"serviceName"` //required
  GroupName   string            `param:"groupName"`   //optional,default:DEFAULT_GROUP
  Ephemeral   bool              `param:"ephemeral"`   //optional
}

註冊的時候先生成了服務的實例信息和心跳信息,然後請求 nacos 服務進行註冊

type Instance struct {
  Valid       bool              `json:"valid"`
  Marked      bool              `json:"marked"`
  InstanceId  string            `json:"instanceId"`
  Port        uint64            `json:"port"`
  Ip          string            `json:"ip"`
  Weight      float64           `json:"weight"`
  Metadata    map[string]string `json:"metadata"`
  ClusterName string            `json:"clusterName"`
  ServiceName string            `json:"serviceName"`
  Enable      bool              `json:"enabled"`
  Healthy     bool              `json:"healthy"`
  Ephemeral   bool              `json:"ephemeral"`
}
type BeatInfo struct {
  Ip          string            `json:"ip"`
  Port        uint64            `json:"port"`
  Weight      float64           `json:"weight"`
  ServiceName string            `json:"serviceName"`
  Cluster     string            `json:"cluster"`
  Metadata    map[string]string `json:"metadata"`
  Scheduled   bool              `json:"scheduled"`
  Period      time.Duration     `json:"-"`
  State       int32             `json:"-"`
}

具體動作的執行是通過我們初始化 naming 客戶端的時候指定的 proxy agent 執行的,默認的 agent 是一個 httpagent

func (sc *NamingClient) RegisterInstance(param vo.RegisterInstanceParam) (bool, error) 
    _, err := sc.serviceProxy.RegisterInstance(util.GetGroupName(param.ServiceName, param.GroupName), param.GroupName, instance)
    sc.beatReactor.AddBeatInfo(util.GetGroupName(param.ServiceName, param.GroupName), beatInfo)

其中註冊實例是直接調用的我們前面提到的服務註冊的 http 接口

return proxy.nacosServer.ReqApi(constant.SERVICE_PATH, params, http.MethodPost)
    SERVICE_BASE_PATH           = "/v1/ns"
    SERVICE_PATH                = SERVICE_BASE_PATH + "/instance"

發送心跳是單獨啓用了一個協程

  go br.sendInstanceBeat(k, beatInfo)

如果當前實例註銷,則進行停止心跳,否則進行心跳通信

beatInterval, err := br.serviceProxy.SendBeat(beatInfo)
api := constant.SERVICE_BASE_PATH + "/instance/beat"
result, err := proxy.nacosServer.ReqApi(api, params, http.MethodPut)

具體調用的是

SERVICE_BASE_PATH           = "/v1/ns"
result, err = server.callServer(api, params, method, getAddress(curServer), curServer.ContextPath)

最終調用的 agent 實現位於 github.com/nacos-group/nacos-sdk-go@v1.1.2/common/http_agent/http_agent.go

type HttpAgent struct {
}
func (agent *HttpAgent) Get
      get(path, header, timeoutMs, params)
func (agent *HttpAgent) RequestOnlyResult
      agent.Get
      agent.Post
      agent.Put
      agent.Delete
      bytes, errRead := ioutil.ReadAll(response.Body)
    func (agent *HttpAgent) Request
      agent.Get
      agent.Post
      agent.Put
      agent.Delete

其中 get 實現如下

func get(path string, header http.Header, timeoutMs uint64, params map[string]string) (response *http.Response, err error) 
  client := http.Client{}
  resp, errDo := client.Do(request)

客戶端採用隨機策略選取一個實例

func (sc *NamingClient) SelectOneHealthyInstance(param vo.SelectOneHealthInstanceParam) (*model.Instance, error)
  service, err := sc.hostReactor.GetServiceInfo(util.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","))
  return serviceName + constant.SERVICE_INFO_SPLITER + clusters

其中

  SERVICE_INFO_SPLITER        = "@@"

通過 list 方法獲取服務列表

    cacheService, ok := hr.serviceInfoMap.Get(key)
    hr.updateServiceNow(serviceName, clusters)
      result, err := hr.serviceProxy.QueryList(serviceName, clusters, hr.pushReceiver.port, false)
        api := constant.SERVICE_PATH + "/list"
  return proxy.nacosServer.ReqApi(api, param, http.MethodGet)

然後解析 json

  SERVICE_BASE_PATH           = "/v1/ns"
  SERVICE_PATH                = SERVICE_BASE_PATH + "/instance"
  hr.ProcessServiceJson(result)

     獲取到實例列表後,就通過隨機算法選取一個活着的節點

return sc.selectOneHealthyInstances(service)
      for _, host := range hosts {
    if host.Healthy && host.Enable && host.Weight > 0 {
      cw := int(math.Ceil(host.Weight))
      if cw > mw {
        mw = cw
      }
      result = append(result, host)
    chooser := newChooser(result)
    instance := chooser.pick()

其中選擇器定義:

sort.Sort(instance(instances))
      return Chooser{data: instances, totals: totals, max: runningTotal}

選擇算法實現:

instance := chooser.pick()
  r := rand.Intn(chs.max) + 1
  i := sort.SearchInts(chs.totals, r)
  return chs.data[i]
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/rfFWyWJVgzP1TytLqJQsSg