服務註冊中心設計原理與 Golang 實現

內容提要

爲什麼引入服務註冊發現

從單體架構轉向微服務架構過程中,當服務調用其他服務時,如何找到正確的服務地址是最基礎問題。服務拆分的早期,將服務調用域名寫死到代碼或配置文件中,然後通過 Host 配置或 DNS 域名解析進行路由尋址,服務有多個實例,還會加入負載均衡 (Nginx、F5)。

(服務域名配置模式)

但人工維護慢慢會出現瓶頸和問題:新增服務或服務擴容,所有依賴需要新增修改配置;某臺服務器掛了還要手動摘流量;服務上下線變更時效慢;人工配置錯誤或漏配;RPC 類型服務不能滿足 ... 這時你會想如果能讓服務自動化完成配置(註冊)和查找(發現)就好了,於是乎服務註冊發現就應運而生。

(服務註冊發現模式)

可以看出,所有服務提供者在上下線時都會告知服務註冊中心,服務消費者要查找服務直接從註冊中心拉取。一切都變得更加美好,那麼服務註冊中心該如何實現呢?簡單!優秀的開源項目已有一大把,大名鼎鼎的 Zookeeper、Eureka,還有後期之秀 Consul、Nacos、Etcd,當然有些算是分佈式 KV 存儲,要實現服務註冊發現仍需些額外工作。如何技術選型,是 AP 模式更好還是 CP 模式更好?今天先拋開這些開源項目,我們親自動手來實現一個服務註冊中心,深入理解其設計原理,逐行代碼分析與實踐。PS:本文項目代碼參考 bilibili discover 開源項目進行改造。

註冊中心實現原理

設計思想

首先進行功能需求分析,作爲服務註冊中心,要實現如下基本功能:

構造註冊表

服務中心首先要維護一個服務地址註冊信息列表(簡稱註冊表)。通俗理解註冊表就像手機通訊錄,記錄了所有聯繫人(服務)的電話(服務地址),通過聯繫人姓名(服務名稱)即可找到。

那麼如何存儲註冊表呢?最普遍認知想到存數據庫(Redis 這種內存數據庫),Zookeeper、Etcd 本身作爲分佈式 KV 存儲天然具有成爲註冊中心的優勢,但這些都會引入新組件,要考慮其穩定性及性能。那麼我們可以直接將註冊信息存到內存中,這時候你會想如果服務掛了內存數據丟了怎麼辦?這個問題後面我們會想辦法解決。

首先構建一個註冊表 Registry 數據結構,定義如下:

type Registry struct {
    apps map[string]*Application
    lock sync.RWMutex
}

應用服務 Application 結構如下:

type Application struct {
    appid           string
    instances       map[string]*Instance
    latestTimestamp int64
    lock            sync.RWMutex
}

服務實例 Instance 的結構如下:

type Instance struct {
    Env      string   `json:"env"`
    AppId    string   `json:"appid"`
    Hostname string   `json:"hostname"`
    Addrs    []string `json:"addrs"`
    Version  string   `json:"version"`
    Status   uint32   `json:"status"`

    RegTimestamp    int64 `json:"reg_timestamp"`
    UpTimestamp     int64 `json:"up_timestamp"`
    RenewTimestamp  int64 `json:"renew_timestamp"`
    DirtyTimestamp  int64 `json:"dirty_timestamp"`
    LatestTimestamp int64 `json:"latest_timestamp"`
}

註冊表及相關依賴的結構體構建完成了,梳理一下所有概念和關係。註冊表 Registry 中存放多個應用服務 Application,每個應用服務又會有多個服務實例 Instance,服務實例中存儲服務的具體地址和其他信息。

服務註冊

**功能目標:**接受來自服務提交的註冊信息,並保存到註冊表中。先初始化註冊表 NewRegistry() ,根據提交信息構建實例 NewInstance(),然後進行註冊寫入。

func NewRegistry() *Registry {
    registry := &Registry{
        apps: make(map[string]*Application),
    }
    return registry
}
func NewInstance(req *RequestRegister) *Instance {
    now := time.Now().UnixNano()
    instance := &Instance{
        Env:             req.Env,
        AppId:           req.AppId,
        Hostname:        req.Hostname,
        Addrs:           req.Addrs,
        Version:         req.Version,
        Status:          req.Status,
        RegTimestamp:    now,
        UpTimestamp:     now,
        RenewTimestamp:  now,
        DirtyTimestamp:  now,
        LatestTimestamp: now,
    }
    return instance
}
r := NewRegistry()
instance := NewInstance(&req)
r.Register(instance, req.LatestTimestamp)

註冊時,先從 apps 中查找是否已註冊過,根據唯一標識 key = appid + env  確定。如果沒有註冊過,先新建應用 app,然後將 instance 加入到 app 中,最後 app 放入註冊表中。這裏分別使用了讀鎖和寫鎖,保障數據安全同時,儘量減少鎖時間和鎖搶佔影響。

func (r *Registry) Register(instance *Instance, latestTimestamp int64) (*Application, *errcode.Error) {
    key := getKey(instance.AppId, instance.Env)
    r.lock.RLock()
    app, ok := r.apps[key]
    r.lock.RUnlock()
    if !ok { //new app
        app = NewApplication(instance.AppId)
    }
    //add instance
    _, isNew := app.AddInstance(instance, latestTimestamp)
    if isNew { //todo }
    //add into registry apps
    r.lock.Lock()
    r.apps[key] = app
    r.lock.Unlock()
    return app, nil
}

新建應用服務 app,初始化 instances 

func NewApplication(appid string) *Application {
    return &Application{
        appid:     appid,
        instances: make(map[string]*Instance),
    }
}

將服務主機實例 instance 加入應用 app 中,注意判斷是否已存在,存在根據髒時間戳  DirtyTimestamp 比對,是否進行替換,添加實例信息,更新最新時間 latestTimestamp ,並返回實例。

func (app *Application) AddInstance(in *Instance, latestTimestamp int64) (*Instance, bool) {
    app.lock.Lock() 
    defer app.lock.Unlock()
    appIns, ok := app.instances[in.Hostname]
    if ok { //exist
        in.UpTimestamp = appIns.UpTimestamp
        //dirtytimestamp
        if in.DirtyTimestamp < appIns.DirtyTimestamp {
            log.Println("register exist dirty timestamp")
            in = appIns
        }
    }
    //add or update instances
    app.instances[in.Hostname] = in
    app.upLatestTimestamp(latestTimestamp)
    returnIns := new(Instance)
    *returnIns = *in
    return returnIns, !ok
}

返回 !ok (isNew)表明,本次服務註冊時,實例爲新增還是替換,用來維護服務健康信息(後面會再次提到)。

服務註冊完成了,編寫測試用例看下效果。

var req = &model.RequestRegister{AppId: "com.xx.testapp", Hostname: "myhost", Addrs: []string{"http://testapp.xx.com/myhost"}, Status: 1}
func TestRegister(t *testing.T) {
    r := model.NewRegistry()
    instance := model.NewInstance(req)
    app, _ := r.Register(instance, req.LatestTimestamp)
    t.Log(app)
}

服務發現

**功能目標:**查找已註冊的服務獲取信息,可以指定條件查找,也可以全量查找。這裏以指定過濾條件 appid 、env 和 status 爲例。

r := model.NewRegistry()
fetchData, err := r.Fetch(req.Env, req.AppId, req.Status, 0)

根據 appid 和 env 組合成 key,然後從註冊表的 apps 中獲取應用 app,然後通過 app 獲取服務實例 GetInstance()

func (r *Registry) Fetch(env, appid string, status uint32, latestTime int64) (*FetchData, *errcode.Error) {
    app, ok := r.getApplication(appid, env)
    if !ok {
        return nil, errcode.NotFound
    }
    return app.GetInstance(status, latestTime)
}
func (r *Registry) getApplication(appid, env string) (*Application, bool) {
    key := getKey(appid, env)
    r.lock.RLock() 
    app, ok := r.apps[key]
    r.lock.RUnlock()
    return app, ok
}

根據 app 獲取所有應用實例,並用 status 過濾,這裏對返回結果 instances 中的 Addr 進行了拷貝返回一個新的切片。

func (app *Application) GetInstance(status uint32, latestTime int64) (*FetchData, *errcode.Error) {
    app.lock.RLock()
    defer app.lock.RUnlock()
    if latestTime >= app.latestTimestamp {
        return nil, errcode.NotModified
    }
    fetchData := FetchData{
        Instances:       make([]*Instance, 0),
        LatestTimestamp: app.latestTimestamp,
    }
    var exists bool
    for _, instance := range app.instances {
        if status&instance.Status > 0 {
            exists = true
            newInstance := copyInstance(instance)
            fetchData.Instances = append(fetchData.Instances, newInstance)
        }
    }
    if !exists {
        return nil, errcode.NotFound
    }
    return &fetchData, nil
}
//deep copy
func copyInstance(src *Instance) *Instance {
    dst := new(Instance)
    *dst = *src
    //copy addrs
    dst.Addrs = make([]string, len(src.Addrs))
    for i, addr := range src.Addrs {
        dst.Addrs[i] = addr
    }
    return dst
}

編寫測試用例,先註冊再獲取,看到可以正常獲取到信息。

服務下線

**功能目標:**接受服務的下線請求,並將服務從註冊信息列表中刪除。通過傳入 env, appid, hostname 三要素信息進行對應服務實例的取消。

r := model.NewRegistry()
r.Cancel(req.Env, req.AppId, req.Hostname, 0)

根據 appid 和 env 找到對象的 app,然後刪除 app 中對應的 hostname。如果 hostname 後 app.instances 爲空,那麼將 app 從註冊表中清除。

func (r *Registry) Cancel(env, appid, hostname string, latestTimestamp int64) (*Instance, *errcode.Error) {
    log.Println("action cancel...")
    //find app
    app, ok := r.getApplication(appid, env)
    if !ok {
        return nil, errcode.NotFound
    }   
    instance, ok, insLen := app.Cancel(hostname, latestTimestamp)
    if !ok {
        return nil, errcode.NotFound
    }   
    //if instances is empty, delete app from apps
    if insLen == 0 { 
        r.lock.Lock()
        delete(r.apps, getKey(appid, env))
        r.lock.Unlock()
    }   
    return instance, nil 
}
func (app *Application) Cancel(hostname string, latestTimestamp int64) (*Instance, bool, int) {
    newInstance := new(Instance)
    app.lock.Lock()
    defer app.lock.Unlock()
    appIn, ok := app.instances[hostname]
    if !ok {
        return nil, ok, 0
    }   
    //delete hostname
    delete(app.instances, hostname)
    appIn.LatestTimestamp = latestTimestamp
    app.upLatestTimestamp(latestTimestamp)
    *newInstance = *appIn
    return newInstance, true, len(app.instances)
}

編寫測試用例先註冊,再取消,然後獲取信息,發現 404 not found。

服務續約

**功能目標:**實現服務的健康檢查機制,服務註冊後,如果沒有取消,那麼就應該在註冊表中,可以隨時查到,如果某個服務實例掛了,能否自動的從註冊表中刪除,保障註冊表中的服務實例都是正常的。

通常有兩種方式做法:註冊中心(服務端)主動探活,通過請求指定接口得到正常響應來確認;服務實例(客戶端)主動上報,調用續約接口進行續約,續約設有時效 TTL (time to live)。兩種方式各有優缺點,大家可以思考一下,不同的註冊中心也採用了不同的方式,這裏選型第二種方案。

r := model.NewRegistry()
r.Renew(req.Env, req.AppId, req.Hostname)

根據 appid 和 env 找到對象的 app,再根據 hostname 找到對應主機實例,更新其 RenewTimestamp 爲當前時間。

func (r *Registry) Renew(env, appid, hostname string) (*Instance, *errcode.Error) {
    app, ok := r.getApplication(appid, env)
    if !ok {
        return nil, errcode.NotFound
    }
    in, ok := app.Renew(hostname)
    if !ok {
        return nil, errcode.NotFound
    }       
    return in, nil
}  
func (app *Application) Renew(hostname string) (*Instance, bool) {
    app.lock.Lock()
    defer app.lock.Unlock()
    appIn, ok := app.instances[hostname]
    if !ok {
        return nil, ok
    }
    appIn.RenewTimestamp = time.Now().UnixNano()
    return copyInstance(appIn)true
}

服務剔除

**功能目標:**既然有服務定期續約,那麼對應的如果服務沒有續約呢?服務如果下線可以使用 Cancel 進行取消,但如果服務因爲網絡故障或掛了導致不能提供服務,那麼可以通過檢查它是否按時續約來判斷,把 TTL 達到閾值的服務實例剔除(Cancel),實現服務的被動下線。

首先在新建註冊表時開啓一個定時任務,新啓一個 goroutine 來實現。

func NewRegistry() *Registry {
++ go r.evictTask()
}

配置定時檢查的時間間隔,默認 60 秒,通過 Tick 定時器開啓 evict。

func (r *Registry) evictTask() {
    ticker := time.Tick(configs.CheckEvictInterval)
    for {
        select {
        case <-ticker:
            r.evict()
        }
    }
}

遍歷註冊表的所有 apps,然後再遍歷其中的 instances,如果當前時間減去實例上一次續約時間 instance.RenewTimestamp 達到閾值(默認 90 秒),那麼將其加入過期隊列中。這裏並沒有直接將過期隊列所有實例都取消,考慮 GC 以及 本地時間漂移的因素,設定了一個剔除的上限 evictionLimit,隨機剔除一些過期實例。

func (r *Registry) evict() {
    now := time.Now().UnixNano()
    var expiredInstances []*Instance
    apps := r.getAllApplications()
    var registryLen int
    for _, app := range apps {
        registryLen += app.GetInstanceLen()
        allInstances := app.GetAllInstances()
        for _, instance := range allInstances {
            if now-instance.RenewTimestamp > int64(configs.InstanceExpireDuration) {
                expiredInstances = append(expiredInstances, instance)
            }
        }
    }
    evictionLimit := registryLen - int(float64(registryLen)*configs.SelfProtectThreshold)
    expiredLen := len(expiredInstances)
    if expiredLen > evictionLimit {
        expiredLen = evictionLimit
    }

    if expiredLen == 0 {
        return
    }
    for i := 0; i < expiredLen; i++ {
        j := i + rand.Intn(len(expiredInstances)-i)
        expiredInstances[i], expiredInstances[j] = expiredInstances[j], expiredInstances[i]
        expiredInstance := expiredInstances[i]
        r.Cancel(expiredInstance.Env, expiredInstance.AppId, expiredInstance.Hostname, now)
    }
}

剔除上限數量,是通過當前註冊表大小(註冊表所有 instances 實例數)減去 觸發自我保護機制的閾值(當前註冊表大小 * 保護自我機制比例值),保護機制稍後會具體解釋。

剔除過期時,採用了 Knuth-Shuffle 算法,也叫公平洗牌算法來實現隨機剔除。當然如果 expiredLen <= evictionLimit,隨機剔除的意義不大,如果前者大於後者,隨機剔除能最大程度保障,剔除的實例均勻分散到所有應用實例中,降低某服務被全部清空的風險。公平洗牌算法實現也比較簡單,循環遍歷過期列表,將當前數與特定隨機數交換,和我們打牌時兩兩交換洗牌過程類似,它實現了 O(n) 的時間複雜度,由 Knuth 發明。

自我保護

**功能目標:**既然服務會定期剔除超時未續約的服務,那麼假設一種情況,網絡一段時間發生了異常,所有服務都沒成功續約,這時註冊中心是否將所有服務全部剔除?當然不行!所以,我們需要一個自我保護的機制防止此類事情的發生。

怎麼設計自我保護機制呢?按短時間內失敗的比例達到某特定閾值就開啓保護,保護模式下不進行服務剔除。所以我們需要一個統計模塊,續約成功 +1。默認情況下,服務剔除每 60 秒執行一次,服務續約每 30 秒執行一次,那麼一個服務實例在檢查時應該有 2 次續約。

type Guard struct {
    renewCount     int64
    lastRenewCount int64
    needRenewCount int64
    threshold      int64
    lock           sync.RWMutex
}
func (gd *Guard) incrNeed() {
    gd.lock.Lock()
    defer gd.lock.Unlock()
    gd.needRenewCount += int64(configs.CheckEvictInterval / configs.RenewInterval)
    gd.threshold = int64(float64(gd.needRenewCount) * configs.SelfProtectThreshold)
}
func (gd *Guard) decrNeed() {
    gd.lock.Lock()
    defer gd.lock.Unlock()
    gd.needRenewCount -= int64(configs.CheckEvictInterval / configs.RenewInterval)
    gd.threshold = int64(float64(gd.needRenewCount) * configs.SelfProtectThreshold)
}
func (gd *Guard) setNeed(count int64) {
    gd.lock.Lock()
    defer gd.lock.Unlock()
    gd.needRenewCount = count * int64(configs.CheckEvictInterval/configs.RenewInterval)
    gd.threshold = int64(float64(gd.needRenewCount) * configs.SelfProtectThreshold)
}
func (gd *Guard) incrCount() {
    atomic.AddInt64(&gd.renewCount, 1)
}

在註冊表中增加 Guard 模塊並初始化,在服務註冊成功,服務取消,服務續約時操作統計。

type Registry struct {
++    gd   *Guard
}
func NewRegistry() *Registry {
    r := &Registry{
++      gd:   new(Guard),
    } 
}
func (r *Registry) Register(...) {
    if isNew {
++      r.gd.incrNeed()
    }
}
func (r *Registry) Cancel(...) {
++   r.gd.decrNeed()
}
func (r *Registry) Renew(...) {
++   r.gd.incrCount()
}

在服務剔除前進行上一週期計數統計,並判斷是否達到自我保護開啓狀態。

func (gd *Guard) storeLastCount() {
    atomic.StoreInt64(&gd.lastRenewCount, atomic.SwapInt64(&gd.needRenewCount, 0))
}
func (gd *Guard) selfProtectStatus() bool {
    return atomic.LoadInt64(&gd.lastRenewCount) < atomic.LoadInt64(&gd.threshold)
}

如果開啓自我保護,那麼續約時間超過閾值(默認 90 秒)忽略不會剔除。但如果續約時間超過最大閾值(默認 3600 秒),那麼不管是否開啓保護都要剔除。因爲自我保護只是保護短時間由於網絡原因未續約的服務,長時間未續約大概率已經有問題了。

func (r *Registry) evictTask() {
        case <-ticker:
++          r.gd.storeLastCount()
            r.evict()
        }
}
func (r *Registry) evict() {
   delta := now - instance.RenewTimestamp
++ if !protectStatus && delta > int64(configs.InstanceExpireDuration) ||
      delta > int64(configs.InstanceMaxExpireDuration) {
      expiredInstances = append(expiredInstances, instance)
   }
}

思考下,服務續約比例未達到 85% 就會觸發自我保護,還記不記得在服務剔除那塊有一個剔除數量上限不能超過 15%,這裏就 match 了,否則還沒來得及進入自我保護程序就把服務都剔除了。

最後增加一個定時器,如果超過一定時間(15 分鐘),重新計算下當前實例數,重置保護閾值,降低髒數據風險。

func (r *Registry) evictTask() {
    resetTicker := time.Tick(configs.ResetGuardNeedCountInterval)
    for {
        select { 
        case <-resetTicker:
            var count int64
            for _, app := range r.getAllApplications() {
                count += int64(app.GetInstanceLen())
            }
            r.gd.setNeed(count)
        }
    }
}

註冊中心對外提供服務

目前註冊中心基本功能已實現,需要對外提供服務了,我們採用 gin 來實現一個 web 服務,接受 http 請求進行服務的註冊、查找、續約、下線操作,這樣保障註冊中心可以方便的接受來自任何語言客戶端請求。

func main() {
    //init config
    c := flag.String("c""""config file path")
    flag.Parse()
    config, err := configs.LoadConfig(*c)
    if err != nil {
        log.Println("load config error:", err)
        return
    }  
    //global discovery
    global.Discovery = model.NewDiscovery(config)
    //init router and start server
    router := api.InitRouter()
    srv := &http.Server{
        Addr:    config.HttpServer,
        Handler: router,
    }
    go func() {
        if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            log.Fatalf("listen:%s\n", err)
        }
    }()
}

增加一個 discovery 結構,並開啓一個全局變量  global.Discovery ,該結構中維護註冊表 Registry,然後就可以根據註冊表實現各種操作了。

type Discovery struct {
    config    *configs.GlobalConfig
    protected bool
    Registry  *Registry
}
func NewDiscovery(config *configs.GlobalConfig) *Discovery {
    dis := &Discovery{
        protected: false,
        config:    config,
        Registry:  NewRegistry(), //init registry
    }  
    return dis 
}
//init discovery
var Discovery *model.Discovery

api.InitRouter() 綁定 url 路由和 Handler,以註冊爲例,接受請求入參,調用 global.Discovery.Registry.Register() 進行註冊,成功返回。

router.POST("api/register", handler.RegisterHandler)
func RegisterHandler(c *gin.Context) {
    var req model.RequestRegister
    if e := c.ShouldBindJSON(&req); e != nil {
        err := errcode.ParamError
        c.JSON(http.StatusOK, gin.H{
            "code":    err.Code(),
            "message": err.Error(),
        })
        return
    }
    //bind instance
    instance := model.NewInstance(&req)
    if instance.Status == 0 || instance.Status > 2 {
        err := errcode.ParamError
        c.JSON(http.StatusOK, gin.H{
            "code":    err.Code(),
            "message": err.Error(),
        })
        return
    }
    //dirtytime
    if req.DirtyTimestamp > 0 {
        instance.DirtyTimestamp = req.DirtyTimestamp
    }
    global.Discovery.Registry.Register(instance, req.LatestTimestamp)
    c.JSON(http.StatusOK, gin.H{
        "code":    200,
        "message""",
        "data":    "",
    })
}

接着要實現平滑重啓,在 main 啓動時增加接收信號後關閉服務。

func main() {
    //...
    //graceful restart
    quit := make(chan os.Signal)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT)
    <-quit
    log.Println("shutdown discovery server...")
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    if err := srv.Shutdown(ctx); err != nil {
        log.Fatal("server shutdown error:", err)
    }
    select {
    case <-ctx.Done():
        log.Println("timeout of 5 seconds")
    }
    log.Println("server exiting")
}

實現效果如圖所示:

工程實踐

總結與問題

註冊中心功能實現

至此,一個單機版的註冊中心就可以工作了,但生產環境單點肯定是不能容忍的,因此有必要實現一個註冊中心集羣。那麼是否部署多個註冊中心實例就可以了,當然 .... 不行!這隻能保障有多個註冊中心節點,而每個節點中維護自己的註冊表,那麼就需要進行註冊表數據同步。多節點數據同步又會涉及著名的一致性問題,這時 Paxos、Raft、ZAB、Gossip 等算法名詞湧現,而我們將使用 P2P(Peer to Peer)對等網絡協議來實現。關於集羣設計與實現我們將在後續文章中展開。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/WZ6HG-ZLFiBJLbE29vTr3g