golang 源碼分析:nacos 服務發現
https://github.com/alibaba/nacos 是阿里開源的服務發現和配置同步組件,上手非常容易,我們介紹下如何部署,然後看下 nacos 提供的 golang sdk:https://github.com/nacos-group/nacos-sdk-go 如何使用,分析下具體的源碼實現。
docker run --name nacos-quick -e MODE=standalone -p 8848:8848 -p 9848:9848 -d nacos/nacos-server:2.0.2
Unable to find image 'nacos/nacos-server:2.0.2' locally
2.0.2: Pulling from nacos/nacos-server
9a03b1668b6d: Pull complete
Digest: sha256:ac66d2fbc1ba432beff88beb165e5012686863d72a5e0f25da06e23c5e7b329d
Status: Downloaded newer image for nacos/nacos-server:2.0.2
db9558d41223b12bd58f2c120ead7d506a50bd40327a3fc6518178b27e50dd99
在 nacos 1.X 的版本中使用 http 方式來做服務註冊和發現,配置主端口 (默認 8848);在 2.0 版本支持了 grpc 服務發現:9848 是客戶端 gRPC 請求服務端端口,用於客戶端向服務端發起連接和請求 9849 是服務端 gRPC 請求服務端端口,用於服務間同步等。
我們實現一個服務註冊
curl -X POST 'http://127.0.0.1:8848/nacos/v1/ns/instance?serviceName=nacos.naming.serviceName&ip=20.18.7.10&port=8080'
ok
拉取註冊結果
curl -X GET 'http://127.0.0.1:8848/nacos/v1/ns/instance/list?serviceName=nacos.naming.serviceName'
{"name":"DEFAULT_GROUP@@nacos.naming.serviceName","groupName":"DEFAULT_GROUP","clusters":"","cacheMillis":10000,"hosts":[],"lastRefTime":1667400684240,"checksum":"","allIPs":false,"reachProtectionThreshold":false,"valid":true}
{"name":"DEFAULT_GROUP@@nacos.naming.serviceName","groupName":"DEFAULT_GROUP","clusters":"","cacheMillis":10000,"hosts":[{"instanceId":"20.18.7.10#8080#DEFAULT#DEFAULT_GROUP@@nacos.naming.serviceName","ip":"20.18.7.10","port":8080,"weight":1.0,"healthy":true,"enabled":true,"ephemeral":true,"clusterName":"DEFAULT","serviceName":"DEFAULT_GROUP@@nacos.naming.serviceName","metadata":{},"instanceHeartBeatInterval":5000,"instanceIdGenerator":"simple","ipDeleteTimeout":30000,"instanceHeartBeatTimeOut":15000}],"lastRefTime":1667400719947,"checksum":"","allIPs":false,"reachProtectionThreshold":false,"valid":true}
同樣我們也可以使用 nacos 的配置中心功能,發佈配置
curl -X POST "http://127.0.0.1:8848/nacos/v1/cs/configs?dataId=nacos.cfg.dataId&group=test&content=helloWorld"
true
獲取配置
curl -X GET "http://127.0.0.1:8848/nacos/v1/cs/configs?dataId=nacos.cfg.dataId&group=test"
helloWorld
其實 golang 的 sdk 就是基於上述 api 做的封裝來實現服務註冊與發現的。
我們定義一個服務
syntax = "proto3";
import "google/protobuf/empty.proto";
package grpcnacos;
option go_package = ".;grpcnacos";
service Test{
rpc Test(google.protobuf.Empty) returns( TestResponse) {};
}
message TestResponse{
string msg = 1;
}
生成對應的 golang 代碼
mkdir -p ../pkg/protocol/grpcnacos
protoc --go_out=../pkg/protocol/grpcnacos --go_opt=paths=source_relative --go-grpc_out=../pkg/protocol/grpcnacos --go-grpc_opt=paths=source_relative grpcnacos.proto
定義 grpc 服務的實現邏輯
package service
import (
"context"
"learn/learn/Nacos/pkg/protocol/grpcnacos"
"log"
emptypb "google.golang.org/protobuf/types/known/emptypb"
)
type Service struct {
grpcnacos.UnimplementedTestServer
}
func (s Service) Test(ctx context.Context, empty *emptypb.Empty) (*grpcnacos.TestResponse, error) {
log.Println("收到一個請求")
return &grpcnacos.TestResponse{Msg: "test"}, nil
}
註冊我們的服務
package main
import (
"fmt"
"learn/learn/Nacos/exp1/service"
"learn/learn/Nacos/pkg/protocol/grpcnacos"
"log"
"net"
"github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/common/constant"
"github.com/nacos-group/nacos-sdk-go/vo"
"google.golang.org/grpc"
)
func main() {
server := grpc.NewServer()
service := service.Service{}
grpcnacos.RegisterTestServer(server, service)
port := GenFreePort()
listen, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.Fatalf("監聽端口:%d失敗: %s", port, err.Error())
}
// 創建serverConfig
// 支持多個;至少一個ServerConfig
serverConfig := []constant.ServerConfig{
{
IpAddr: "127.0.0.1",
Port: 8848,
},
}
// 創建clientConfig
clientConfig := constant.ClientConfig{
NamespaceId: "", // 如果需要支持多namespace,我們可以場景多個client,它們有不同的NamespaceId。當namespace是public時,此處填空字符串。
TimeoutMs: 50000,
NotLoadCacheAtStart: true,
LogLevel: "debug",
}
// 創建服務發現客戶端的另一種方式 (推薦)
namingClient, err := clients.NewNamingClient(
vo.NacosClientParam{
ClientConfig: &clientConfig,
ServerConfigs: serverConfig,
},
)
if err != nil {
log.Fatalf("初始化nacos失敗: %s", err.Error())
}
success, err := namingClient.RegisterInstance(vo.RegisterInstanceParam{
Ip: "127.0.0.1",
Port: uint64(port),
ServiceName: "test-server",
Weight: 10,
Enable: true,
Healthy: true,
Ephemeral: true,
Metadata: map[string]string{"name": "test"},
ClusterName: "DEFAULT", // 默認值DEFAULT
GroupName: "DEFAULT_GROUP", // 默認值DEFAULT_GROUP
})
if err != nil {
log.Fatalf("註冊服務失敗: %s", err.Error())
}
log.Println("success: ", success)
log.Printf("服務啓動成功;PORT:%d\n", port)
_ = server.Serve(listen)
}
// GenFreePort 獲取一個空閒的端口;端口避免寫死,因爲要啓動多個實例,測試負載均衡
func GenFreePort() int {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
panic(err)
}
listen, err := net.ListenTCP("tcp", addr)
if err != nil {
panic(err)
}
defer listen.Close()
return listen.Addr().(*net.TCPAddr).Port
}
通過名字獲取服務的實力,請求獲取結果
package main
import (
"context"
"fmt"
"learn/learn/Nacos/pkg/protocol/grpcnacos"
"log"
"github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/common/constant"
"github.com/nacos-group/nacos-sdk-go/vo"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/emptypb"
)
func main() {
// 創建serverConfig
// 支持多個;至少一個ServerConfig
serverConfig := []constant.ServerConfig{
{
IpAddr: "127.0.0.1",
Port: 8848,
},
}
// 創建clientConfig
clientConfig := constant.ClientConfig{
// 如果需要支持多namespace,我們可以場景多個client,它們有不同的NamespaceId。當namespace是public時,此處填空字符串。
NamespaceId: "",
TimeoutMs: 5000,
NotLoadCacheAtStart: true,
LogLevel: "debug",
}
// 創建服務發現客戶端的另一種方式 (推薦)
namingClient, err := clients.NewNamingClient(
vo.NacosClientParam{
ClientConfig: &clientConfig,
ServerConfigs: serverConfig,
},
)
if err != nil {
log.Fatalf("初始化nacos失敗: %s", err.Error())
}
// SelectOneHealthyInstance將會按加權隨機輪詢的負載均衡策略返回一個健康的實例
// 實例必須滿足的條件:health=true,enable=true and weight>0
instance, err := namingClient.SelectOneHealthyInstance(vo.SelectOneHealthInstanceParam{
ServiceName: "test-server",
GroupName: "DEFAULT_GROUP", // 默認值DEFAULT_GROUP
Clusters: []string{"DEFAULT"}, // 默認值DEFAULT
})
log.Printf("獲取到的實例IP:%s;端口:%d", instance.Ip, instance.Port)
conn, err := grpc.Dial(fmt.Sprintf("%s:%d", instance.Ip, instance.Port), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("監聽%s:%d失敗:%s", instance.Ip, instance.Port, err.Error())
}
client := grpcnacos.NewTestClient(conn)
res, err := client.Test(context.Background(), &emptypb.Empty{})
if err != nil {
log.Fatalf("調用TestClient失敗: %s", err.Error())
}
log.Println(res.Msg)
}
至此我們完成了簡單的服務註冊和服務發現功能。測試下
% go run learn/Nacos/exp1/server/main.go
2022/11/04 00:04:38 success: true
2022/11/04 00:04:38 服務啓動成功;PORT:56358
2022/11/04 00:04:51 收到一個請求
% go run learn/Nacos/exp1/client/main.go
2022/11/04 00:04:51 獲取到的實例IP:127.0.0.1;端口:56358
2022/11/04 00:04:51 test
我們可以在頁面上看下我們服務的註冊情況 http://127.0.0.1:8848/nacos/#/login 用戶名密碼都是 nacos
可以看到,不論是服務端註冊還是客戶端拉取,我們首先都需要初始化 namingService 的客戶端,它需要兩組參數
namingClient, err := clients.NewNamingClient(
vo.NacosClientParam{
ClientConfig: &clientConfig,
ServerConfigs: serverConfig,
},
)
其中 clientConfig 配置了客戶端,也就是我們的應用允許的超時時間等配置,serverConfigs 是一組服務端的地址和端口後,也就是我們的 nacos 服務的地址,可以配置多個實例實現多活。
對於 server 端來說是通過 RegisterInstance 來實現服務的註冊的
success, err := namingClient.RegisterInstance(vo.RegisterInstanceParam{
Ip: "127.0.0.1",
Port: uint64(port),
ServiceName: "test-server",
Weight: 10,
Enable: true,
Healthy: true,
Ephemeral: true,
Metadata: map[string]string{"name": "test"},
ClusterName: "DEFAULT", // 默認值DEFAULT
GroupName: "DEFAULT_GROUP", // 默認值DEFAULT_GROUP
})
客戶端是通過 SelectOneHealthyInstance 將會按加權隨機輪詢的負載均衡策略返回一個健康的實例
// 實例必須滿足的條件:health=true,enable=true and weight>0
instance, err := namingClient.SelectOneHealthyInstance(vo.SelectOneHealthInstanceParam{
ServiceName: "test-server",
GroupName: "DEFAULT_GROUP", // 默認值DEFAULT_GROUP
Clusters: []string{"DEFAULT"}, // 默認值DEFAULT
})
log.Printf("獲取到的實例IP:%s;端口:%d", instance.Ip, instance.Port)
使用起來很簡單方便又沒有。下面分析下源碼實現,註冊參數定義如下
type RegisterInstanceParam struct {
Ip string `param:"ip"` //required
Port uint64 `param:"port"` //required
Weight float64 `param:"weight"` //required,it must be lager than 0
Enable bool `param:"enabled"` //required,the instance can be access or not
Healthy bool `param:"healthy"` //required,the instance is health or not
Metadata map[string]string `param:"metadata"` //optional
ClusterName string `param:"clusterName"` //optional,default:DEFAULT
ServiceName string `param:"serviceName"` //required
GroupName string `param:"groupName"` //optional,default:DEFAULT_GROUP
Ephemeral bool `param:"ephemeral"` //optional
}
註冊的時候先生成了服務的實例信息和心跳信息,然後請求 nacos 服務進行註冊
type Instance struct {
Valid bool `json:"valid"`
Marked bool `json:"marked"`
InstanceId string `json:"instanceId"`
Port uint64 `json:"port"`
Ip string `json:"ip"`
Weight float64 `json:"weight"`
Metadata map[string]string `json:"metadata"`
ClusterName string `json:"clusterName"`
ServiceName string `json:"serviceName"`
Enable bool `json:"enabled"`
Healthy bool `json:"healthy"`
Ephemeral bool `json:"ephemeral"`
}
type BeatInfo struct {
Ip string `json:"ip"`
Port uint64 `json:"port"`
Weight float64 `json:"weight"`
ServiceName string `json:"serviceName"`
Cluster string `json:"cluster"`
Metadata map[string]string `json:"metadata"`
Scheduled bool `json:"scheduled"`
Period time.Duration `json:"-"`
State int32 `json:"-"`
}
具體動作的執行是通過我們初始化 naming 客戶端的時候指定的 proxy agent 執行的,默認的 agent 是一個 httpagent
func (sc *NamingClient) RegisterInstance(param vo.RegisterInstanceParam) (bool, error)
_, err := sc.serviceProxy.RegisterInstance(util.GetGroupName(param.ServiceName, param.GroupName), param.GroupName, instance)
sc.beatReactor.AddBeatInfo(util.GetGroupName(param.ServiceName, param.GroupName), beatInfo)
其中註冊實例是直接調用的我們前面提到的服務註冊的 http 接口
return proxy.nacosServer.ReqApi(constant.SERVICE_PATH, params, http.MethodPost)
SERVICE_BASE_PATH = "/v1/ns"
SERVICE_PATH = SERVICE_BASE_PATH + "/instance"
發送心跳是單獨啓用了一個協程
go br.sendInstanceBeat(k, beatInfo)
如果當前實例註銷,則進行停止心跳,否則進行心跳通信
beatInterval, err := br.serviceProxy.SendBeat(beatInfo)
api := constant.SERVICE_BASE_PATH + "/instance/beat"
result, err := proxy.nacosServer.ReqApi(api, params, http.MethodPut)
具體調用的是
SERVICE_BASE_PATH = "/v1/ns"
result, err = server.callServer(api, params, method, getAddress(curServer), curServer.ContextPath)
最終調用的 agent 實現位於 github.com/nacos-group/nacos-sdk-go@v1.1.2/common/http_agent/http_agent.go
type HttpAgent struct {
}
func (agent *HttpAgent) Get
get(path, header, timeoutMs, params)
func (agent *HttpAgent) RequestOnlyResult
agent.Get
agent.Post
agent.Put
agent.Delete
bytes, errRead := ioutil.ReadAll(response.Body)
func (agent *HttpAgent) Request
agent.Get
agent.Post
agent.Put
agent.Delete
其中 get 實現如下
func get(path string, header http.Header, timeoutMs uint64, params map[string]string) (response *http.Response, err error)
client := http.Client{}
resp, errDo := client.Do(request)
客戶端採用隨機策略選取一個實例
func (sc *NamingClient) SelectOneHealthyInstance(param vo.SelectOneHealthInstanceParam) (*model.Instance, error)
service, err := sc.hostReactor.GetServiceInfo(util.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","))
return serviceName + constant.SERVICE_INFO_SPLITER + clusters
其中
SERVICE_INFO_SPLITER = "@@"
通過 list 方法獲取服務列表
cacheService, ok := hr.serviceInfoMap.Get(key)
hr.updateServiceNow(serviceName, clusters)
result, err := hr.serviceProxy.QueryList(serviceName, clusters, hr.pushReceiver.port, false)
api := constant.SERVICE_PATH + "/list"
return proxy.nacosServer.ReqApi(api, param, http.MethodGet)
然後解析 json
SERVICE_BASE_PATH = "/v1/ns"
SERVICE_PATH = SERVICE_BASE_PATH + "/instance"
hr.ProcessServiceJson(result)
獲取到實例列表後,就通過隨機算法選取一個活着的節點
return sc.selectOneHealthyInstances(service)
for _, host := range hosts {
if host.Healthy && host.Enable && host.Weight > 0 {
cw := int(math.Ceil(host.Weight))
if cw > mw {
mw = cw
}
result = append(result, host)
chooser := newChooser(result)
instance := chooser.pick()
其中選擇器定義:
sort.Sort(instance(instances))
return Chooser{data: instances, totals: totals, max: runningTotal}
選擇算法實現:
instance := chooser.pick()
r := rand.Intn(chs.max) + 1
i := sort.SearchInts(chs.totals, r)
return chs.data[i]
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/rfFWyWJVgzP1TytLqJQsSg