服務註冊中心設計原理與 Golang 實現
內容提要
-
微服務爲什麼引入服務註冊發現
-
服務註冊中心設計原理
-
Golang 代碼實現服務註冊中心
爲什麼引入服務註冊發現
從單體架構轉向微服務架構過程中,當服務調用其他服務時,如何找到正確的服務地址是最基礎問題。服務拆分的早期,將服務調用域名寫死到代碼或配置文件中,然後通過 Host 配置或 DNS 域名解析進行路由尋址,服務有多個實例,還會加入負載均衡 (Nginx、F5)。
(服務域名配置模式)
但人工維護慢慢會出現瓶頸和問題:新增服務或服務擴容,所有依賴需要新增修改配置;某臺服務器掛了還要手動摘流量;服務上下線變更時效慢;人工配置錯誤或漏配;RPC 類型服務不能滿足 ... 這時你會想如果能讓服務自動化完成配置(註冊)和查找(發現)就好了,於是乎服務註冊發現就應運而生。
(服務註冊發現模式)
可以看出,所有服務提供者在上下線時都會告知服務註冊中心,服務消費者要查找服務直接從註冊中心拉取。一切都變得更加美好,那麼服務註冊中心該如何實現呢?簡單!優秀的開源項目已有一大把,大名鼎鼎的 Zookeeper、Eureka,還有後期之秀 Consul、Nacos、Etcd,當然有些算是分佈式 KV 存儲,要實現服務註冊發現仍需些額外工作。如何技術選型,是 AP 模式更好還是 CP 模式更好?今天先拋開這些開源項目,我們親自動手來實現一個服務註冊中心,深入理解其設計原理,逐行代碼分析與實踐。PS:本文項目代碼參考 bilibili discover 開源項目進行改造。
註冊中心實現原理
設計思想
首先進行功能需求分析,作爲服務註冊中心,要實現如下基本功能:
-
服務註冊:接受來自服務提交的註冊信息,並保存起來
-
服務下線:接受服務的主動下線請求,並將服務從註冊信息表中刪除
-
服務獲取:調用方從註冊中心拉取服務信息
-
服務續約:服務健康檢查,服務通過心跳保持(主動續約)告知註冊中心服務可用
-
服務剔除:註冊中心將長時間不續約的服務實例從註冊信息表中刪除
構造註冊表
服務中心首先要維護一個服務地址註冊信息列表(簡稱註冊表)。通俗理解註冊表就像手機通訊錄,記錄了所有聯繫人(服務)的電話(服務地址),通過聯繫人姓名(服務名稱)即可找到。
那麼如何存儲註冊表呢?最普遍認知想到存數據庫(Redis 這種內存數據庫),Zookeeper、Etcd 本身作爲分佈式 KV 存儲天然具有成爲註冊中心的優勢,但這些都會引入新組件,要考慮其穩定性及性能。那麼我們可以直接將註冊信息存到內存中,這時候你會想如果服務掛了內存數據丟了怎麼辦?這個問題後面我們會想辦法解決。
首先構建一個註冊表 Registry 數據結構,定義如下:
type Registry struct {
apps map[string]*Application
lock sync.RWMutex
}
-
apps 記錄應用服務 Application 的信息,使用 map 結構,key 爲應用服務的唯一標識,值爲應用服務結構類型
-
lock 讀寫鎖,保障併發讀寫安全
應用服務 Application 結構如下:
type Application struct {
appid string
instances map[string]*Instance
latestTimestamp int64
lock sync.RWMutex
}
-
appid 記錄應用服務唯一標識
-
lock 讀寫鎖,保障併發讀寫安全
-
latestTimestamp 記錄更新時間
-
instances 記錄服務實例 Instance 的信息,使用 map 結構,key 爲實例的 hostname (唯一標識),值爲實例結構類型
服務實例 Instance 的結構如下:
type Instance struct {
Env string `json:"env"`
AppId string `json:"appid"`
Hostname string `json:"hostname"`
Addrs []string `json:"addrs"`
Version string `json:"version"`
Status uint32 `json:"status"`
RegTimestamp int64 `json:"reg_timestamp"`
UpTimestamp int64 `json:"up_timestamp"`
RenewTimestamp int64 `json:"renew_timestamp"`
DirtyTimestamp int64 `json:"dirty_timestamp"`
LatestTimestamp int64 `json:"latest_timestamp"`
}
-
Env 服務環境標識,如 online、dev、test
-
AppId 應用服務的唯一標識
-
Hostname 服務實例的唯一標識
-
Addrs 服務實例的地址,可以是 http 或 rpc 地址,多個地址可以維護數組
-
Version 服務實例版本
-
Status 服務實例狀態,用於控制上下線
-
xxTimestamp 依次記錄服務實例註冊時間戳,上線時間戳,最近續約時間戳,髒時間戳(後面解釋),最後更新時間戳
註冊表及相關依賴的結構體構建完成了,梳理一下所有概念和關係。註冊表 Registry 中存放多個應用服務 Application,每個應用服務又會有多個服務實例 Instance,服務實例中存儲服務的具體地址和其他信息。
服務註冊
**功能目標:**接受來自服務提交的註冊信息,並保存到註冊表中。先初始化註冊表 NewRegistry() ,根據提交信息構建實例 NewInstance(),然後進行註冊寫入。
func NewRegistry() *Registry {
registry := &Registry{
apps: make(map[string]*Application),
}
return registry
}
func NewInstance(req *RequestRegister) *Instance {
now := time.Now().UnixNano()
instance := &Instance{
Env: req.Env,
AppId: req.AppId,
Hostname: req.Hostname,
Addrs: req.Addrs,
Version: req.Version,
Status: req.Status,
RegTimestamp: now,
UpTimestamp: now,
RenewTimestamp: now,
DirtyTimestamp: now,
LatestTimestamp: now,
}
return instance
}
r := NewRegistry()
instance := NewInstance(&req)
r.Register(instance, req.LatestTimestamp)
註冊時,先從 apps 中查找是否已註冊過,根據唯一標識 key = appid + env 確定。如果沒有註冊過,先新建應用 app,然後將 instance 加入到 app 中,最後 app 放入註冊表中。這裏分別使用了讀鎖和寫鎖,保障數據安全同時,儘量減少鎖時間和鎖搶佔影響。
func (r *Registry) Register(instance *Instance, latestTimestamp int64) (*Application, *errcode.Error) {
key := getKey(instance.AppId, instance.Env)
r.lock.RLock()
app, ok := r.apps[key]
r.lock.RUnlock()
if !ok { //new app
app = NewApplication(instance.AppId)
}
//add instance
_, isNew := app.AddInstance(instance, latestTimestamp)
if isNew { //todo }
//add into registry apps
r.lock.Lock()
r.apps[key] = app
r.lock.Unlock()
return app, nil
}
新建應用服務 app,初始化 instances
func NewApplication(appid string) *Application {
return &Application{
appid: appid,
instances: make(map[string]*Instance),
}
}
將服務主機實例 instance 加入應用 app 中,注意判斷是否已存在,存在根據髒時間戳 DirtyTimestamp 比對,是否進行替換,添加實例信息,更新最新時間 latestTimestamp ,並返回實例。
func (app *Application) AddInstance(in *Instance, latestTimestamp int64) (*Instance, bool) {
app.lock.Lock()
defer app.lock.Unlock()
appIns, ok := app.instances[in.Hostname]
if ok { //exist
in.UpTimestamp = appIns.UpTimestamp
//dirtytimestamp
if in.DirtyTimestamp < appIns.DirtyTimestamp {
log.Println("register exist dirty timestamp")
in = appIns
}
}
//add or update instances
app.instances[in.Hostname] = in
app.upLatestTimestamp(latestTimestamp)
returnIns := new(Instance)
*returnIns = *in
return returnIns, !ok
}
返回 !ok (isNew)表明,本次服務註冊時,實例爲新增還是替換,用來維護服務健康信息(後面會再次提到)。
服務註冊完成了,編寫測試用例看下效果。
var req = &model.RequestRegister{AppId: "com.xx.testapp", Hostname: "myhost", Addrs: []string{"http://testapp.xx.com/myhost"}, Status: 1}
func TestRegister(t *testing.T) {
r := model.NewRegistry()
instance := model.NewInstance(req)
app, _ := r.Register(instance, req.LatestTimestamp)
t.Log(app)
}
服務發現
**功能目標:**查找已註冊的服務獲取信息,可以指定條件查找,也可以全量查找。這裏以指定過濾條件 appid 、env 和 status 爲例。
r := model.NewRegistry()
fetchData, err := r.Fetch(req.Env, req.AppId, req.Status, 0)
根據 appid 和 env 組合成 key,然後從註冊表的 apps 中獲取應用 app,然後通過 app 獲取服務實例 GetInstance()
func (r *Registry) Fetch(env, appid string, status uint32, latestTime int64) (*FetchData, *errcode.Error) {
app, ok := r.getApplication(appid, env)
if !ok {
return nil, errcode.NotFound
}
return app.GetInstance(status, latestTime)
}
func (r *Registry) getApplication(appid, env string) (*Application, bool) {
key := getKey(appid, env)
r.lock.RLock()
app, ok := r.apps[key]
r.lock.RUnlock()
return app, ok
}
根據 app 獲取所有應用實例,並用 status 過濾,這裏對返回結果 instances 中的 Addr 進行了拷貝返回一個新的切片。
func (app *Application) GetInstance(status uint32, latestTime int64) (*FetchData, *errcode.Error) {
app.lock.RLock()
defer app.lock.RUnlock()
if latestTime >= app.latestTimestamp {
return nil, errcode.NotModified
}
fetchData := FetchData{
Instances: make([]*Instance, 0),
LatestTimestamp: app.latestTimestamp,
}
var exists bool
for _, instance := range app.instances {
if status&instance.Status > 0 {
exists = true
newInstance := copyInstance(instance)
fetchData.Instances = append(fetchData.Instances, newInstance)
}
}
if !exists {
return nil, errcode.NotFound
}
return &fetchData, nil
}
//deep copy
func copyInstance(src *Instance) *Instance {
dst := new(Instance)
*dst = *src
//copy addrs
dst.Addrs = make([]string, len(src.Addrs))
for i, addr := range src.Addrs {
dst.Addrs[i] = addr
}
return dst
}
編寫測試用例,先註冊再獲取,看到可以正常獲取到信息。
服務下線
**功能目標:**接受服務的下線請求,並將服務從註冊信息列表中刪除。通過傳入 env, appid, hostname 三要素信息進行對應服務實例的取消。
r := model.NewRegistry()
r.Cancel(req.Env, req.AppId, req.Hostname, 0)
根據 appid 和 env 找到對象的 app,然後刪除 app 中對應的 hostname。如果 hostname 後 app.instances 爲空,那麼將 app 從註冊表中清除。
func (r *Registry) Cancel(env, appid, hostname string, latestTimestamp int64) (*Instance, *errcode.Error) {
log.Println("action cancel...")
//find app
app, ok := r.getApplication(appid, env)
if !ok {
return nil, errcode.NotFound
}
instance, ok, insLen := app.Cancel(hostname, latestTimestamp)
if !ok {
return nil, errcode.NotFound
}
//if instances is empty, delete app from apps
if insLen == 0 {
r.lock.Lock()
delete(r.apps, getKey(appid, env))
r.lock.Unlock()
}
return instance, nil
}
func (app *Application) Cancel(hostname string, latestTimestamp int64) (*Instance, bool, int) {
newInstance := new(Instance)
app.lock.Lock()
defer app.lock.Unlock()
appIn, ok := app.instances[hostname]
if !ok {
return nil, ok, 0
}
//delete hostname
delete(app.instances, hostname)
appIn.LatestTimestamp = latestTimestamp
app.upLatestTimestamp(latestTimestamp)
*newInstance = *appIn
return newInstance, true, len(app.instances)
}
編寫測試用例先註冊,再取消,然後獲取信息,發現 404 not found。
服務續約
**功能目標:**實現服務的健康檢查機制,服務註冊後,如果沒有取消,那麼就應該在註冊表中,可以隨時查到,如果某個服務實例掛了,能否自動的從註冊表中刪除,保障註冊表中的服務實例都是正常的。
通常有兩種方式做法:註冊中心(服務端)主動探活,通過請求指定接口得到正常響應來確認;服務實例(客戶端)主動上報,調用續約接口進行續約,續約設有時效 TTL (time to live)。兩種方式各有優缺點,大家可以思考一下,不同的註冊中心也採用了不同的方式,這裏選型第二種方案。
r := model.NewRegistry()
r.Renew(req.Env, req.AppId, req.Hostname)
根據 appid 和 env 找到對象的 app,再根據 hostname 找到對應主機實例,更新其 RenewTimestamp 爲當前時間。
func (r *Registry) Renew(env, appid, hostname string) (*Instance, *errcode.Error) {
app, ok := r.getApplication(appid, env)
if !ok {
return nil, errcode.NotFound
}
in, ok := app.Renew(hostname)
if !ok {
return nil, errcode.NotFound
}
return in, nil
}
func (app *Application) Renew(hostname string) (*Instance, bool) {
app.lock.Lock()
defer app.lock.Unlock()
appIn, ok := app.instances[hostname]
if !ok {
return nil, ok
}
appIn.RenewTimestamp = time.Now().UnixNano()
return copyInstance(appIn), true
}
服務剔除
**功能目標:**既然有服務定期續約,那麼對應的如果服務沒有續約呢?服務如果下線可以使用 Cancel 進行取消,但如果服務因爲網絡故障或掛了導致不能提供服務,那麼可以通過檢查它是否按時續約來判斷,把 TTL 達到閾值的服務實例剔除(Cancel),實現服務的被動下線。
首先在新建註冊表時開啓一個定時任務,新啓一個 goroutine 來實現。
func NewRegistry() *Registry {
++ go r.evictTask()
}
配置定時檢查的時間間隔,默認 60 秒,通過 Tick 定時器開啓 evict。
func (r *Registry) evictTask() {
ticker := time.Tick(configs.CheckEvictInterval)
for {
select {
case <-ticker:
r.evict()
}
}
}
遍歷註冊表的所有 apps,然後再遍歷其中的 instances,如果當前時間減去實例上一次續約時間 instance.RenewTimestamp 達到閾值(默認 90 秒),那麼將其加入過期隊列中。這裏並沒有直接將過期隊列所有實例都取消,考慮 GC 以及 本地時間漂移的因素,設定了一個剔除的上限 evictionLimit,隨機剔除一些過期實例。
func (r *Registry) evict() {
now := time.Now().UnixNano()
var expiredInstances []*Instance
apps := r.getAllApplications()
var registryLen int
for _, app := range apps {
registryLen += app.GetInstanceLen()
allInstances := app.GetAllInstances()
for _, instance := range allInstances {
if now-instance.RenewTimestamp > int64(configs.InstanceExpireDuration) {
expiredInstances = append(expiredInstances, instance)
}
}
}
evictionLimit := registryLen - int(float64(registryLen)*configs.SelfProtectThreshold)
expiredLen := len(expiredInstances)
if expiredLen > evictionLimit {
expiredLen = evictionLimit
}
if expiredLen == 0 {
return
}
for i := 0; i < expiredLen; i++ {
j := i + rand.Intn(len(expiredInstances)-i)
expiredInstances[i], expiredInstances[j] = expiredInstances[j], expiredInstances[i]
expiredInstance := expiredInstances[i]
r.Cancel(expiredInstance.Env, expiredInstance.AppId, expiredInstance.Hostname, now)
}
}
剔除上限數量,是通過當前註冊表大小(註冊表所有 instances 實例數)減去 觸發自我保護機制的閾值(當前註冊表大小 * 保護自我機制比例值),保護機制稍後會具體解釋。
剔除過期時,採用了 Knuth-Shuffle 算法,也叫公平洗牌算法來實現隨機剔除。當然如果 expiredLen <= evictionLimit,隨機剔除的意義不大,如果前者大於後者,隨機剔除能最大程度保障,剔除的實例均勻分散到所有應用實例中,降低某服務被全部清空的風險。公平洗牌算法實現也比較簡單,循環遍歷過期列表,將當前數與特定隨機數交換,和我們打牌時兩兩交換洗牌過程類似,它實現了 O(n) 的時間複雜度,由 Knuth 發明。
自我保護
**功能目標:**既然服務會定期剔除超時未續約的服務,那麼假設一種情況,網絡一段時間發生了異常,所有服務都沒成功續約,這時註冊中心是否將所有服務全部剔除?當然不行!所以,我們需要一個自我保護的機制防止此類事情的發生。
怎麼設計自我保護機制呢?按短時間內失敗的比例達到某特定閾值就開啓保護,保護模式下不進行服務剔除。所以我們需要一個統計模塊,續約成功 +1。默認情況下,服務剔除每 60 秒執行一次,服務續約每 30 秒執行一次,那麼一個服務實例在檢查時應該有 2 次續約。
type Guard struct {
renewCount int64
lastRenewCount int64
needRenewCount int64
threshold int64
lock sync.RWMutex
}
-
renewCount 記錄所有服務續約次數,每執行一次 renew 加 1
-
lastRenewCount 記錄上一次檢查週期(默認 60 秒)服務續約統計次數
-
needRenewCount 記錄一個週期總計需要的續約數,按一次續約 30 秒,一週期 60 秒,一個實例就需要 2 次,所以服務註冊時 + 2,服務取消時 - 2
-
threshold 通過 needRenewCount 和閾值比例 (0.85)確定觸發自我保護的值
func (gd *Guard) incrNeed() {
gd.lock.Lock()
defer gd.lock.Unlock()
gd.needRenewCount += int64(configs.CheckEvictInterval / configs.RenewInterval)
gd.threshold = int64(float64(gd.needRenewCount) * configs.SelfProtectThreshold)
}
func (gd *Guard) decrNeed() {
gd.lock.Lock()
defer gd.lock.Unlock()
gd.needRenewCount -= int64(configs.CheckEvictInterval / configs.RenewInterval)
gd.threshold = int64(float64(gd.needRenewCount) * configs.SelfProtectThreshold)
}
func (gd *Guard) setNeed(count int64) {
gd.lock.Lock()
defer gd.lock.Unlock()
gd.needRenewCount = count * int64(configs.CheckEvictInterval/configs.RenewInterval)
gd.threshold = int64(float64(gd.needRenewCount) * configs.SelfProtectThreshold)
}
func (gd *Guard) incrCount() {
atomic.AddInt64(&gd.renewCount, 1)
}
在註冊表中增加 Guard 模塊並初始化,在服務註冊成功,服務取消,服務續約時操作統計。
type Registry struct {
++ gd *Guard
}
func NewRegistry() *Registry {
r := &Registry{
++ gd: new(Guard),
}
}
func (r *Registry) Register(...) {
if isNew {
++ r.gd.incrNeed()
}
}
func (r *Registry) Cancel(...) {
++ r.gd.decrNeed()
}
func (r *Registry) Renew(...) {
++ r.gd.incrCount()
}
在服務剔除前進行上一週期計數統計,並判斷是否達到自我保護開啓狀態。
func (gd *Guard) storeLastCount() {
atomic.StoreInt64(&gd.lastRenewCount, atomic.SwapInt64(&gd.needRenewCount, 0))
}
func (gd *Guard) selfProtectStatus() bool {
return atomic.LoadInt64(&gd.lastRenewCount) < atomic.LoadInt64(&gd.threshold)
}
如果開啓自我保護,那麼續約時間超過閾值(默認 90 秒)忽略不會剔除。但如果續約時間超過最大閾值(默認 3600 秒),那麼不管是否開啓保護都要剔除。因爲自我保護只是保護短時間由於網絡原因未續約的服務,長時間未續約大概率已經有問題了。
func (r *Registry) evictTask() {
case <-ticker:
++ r.gd.storeLastCount()
r.evict()
}
}
func (r *Registry) evict() {
delta := now - instance.RenewTimestamp
++ if !protectStatus && delta > int64(configs.InstanceExpireDuration) ||
delta > int64(configs.InstanceMaxExpireDuration) {
expiredInstances = append(expiredInstances, instance)
}
}
思考下,服務續約比例未達到 85% 就會觸發自我保護,還記不記得在服務剔除那塊有一個剔除數量上限不能超過 15%,這裏就 match 了,否則還沒來得及進入自我保護程序就把服務都剔除了。
最後增加一個定時器,如果超過一定時間(15 分鐘),重新計算下當前實例數,重置保護閾值,降低髒數據風險。
func (r *Registry) evictTask() {
resetTicker := time.Tick(configs.ResetGuardNeedCountInterval)
for {
select {
case <-resetTicker:
var count int64
for _, app := range r.getAllApplications() {
count += int64(app.GetInstanceLen())
}
r.gd.setNeed(count)
}
}
}
註冊中心對外提供服務
目前註冊中心基本功能已實現,需要對外提供服務了,我們採用 gin 來實現一個 web 服務,接受 http 請求進行服務的註冊、查找、續約、下線操作,這樣保障註冊中心可以方便的接受來自任何語言客戶端請求。
func main() {
//init config
c := flag.String("c", "", "config file path")
flag.Parse()
config, err := configs.LoadConfig(*c)
if err != nil {
log.Println("load config error:", err)
return
}
//global discovery
global.Discovery = model.NewDiscovery(config)
//init router and start server
router := api.InitRouter()
srv := &http.Server{
Addr: config.HttpServer,
Handler: router,
}
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("listen:%s\n", err)
}
}()
}
增加一個 discovery 結構,並開啓一個全局變量 global.Discovery ,該結構中維護註冊表 Registry,然後就可以根據註冊表實現各種操作了。
type Discovery struct {
config *configs.GlobalConfig
protected bool
Registry *Registry
}
func NewDiscovery(config *configs.GlobalConfig) *Discovery {
dis := &Discovery{
protected: false,
config: config,
Registry: NewRegistry(), //init registry
}
return dis
}
//init discovery
var Discovery *model.Discovery
api.InitRouter() 綁定 url 路由和 Handler,以註冊爲例,接受請求入參,調用 global.Discovery.Registry.Register() 進行註冊,成功返回。
router.POST("api/register", handler.RegisterHandler)
func RegisterHandler(c *gin.Context) {
var req model.RequestRegister
if e := c.ShouldBindJSON(&req); e != nil {
err := errcode.ParamError
c.JSON(http.StatusOK, gin.H{
"code": err.Code(),
"message": err.Error(),
})
return
}
//bind instance
instance := model.NewInstance(&req)
if instance.Status == 0 || instance.Status > 2 {
err := errcode.ParamError
c.JSON(http.StatusOK, gin.H{
"code": err.Code(),
"message": err.Error(),
})
return
}
//dirtytime
if req.DirtyTimestamp > 0 {
instance.DirtyTimestamp = req.DirtyTimestamp
}
global.Discovery.Registry.Register(instance, req.LatestTimestamp)
c.JSON(http.StatusOK, gin.H{
"code": 200,
"message": "",
"data": "",
})
}
接着要實現平滑重啓,在 main 啓動時增加接收信號後關閉服務。
func main() {
//...
//graceful restart
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT)
<-quit
log.Println("shutdown discovery server...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Fatal("server shutdown error:", err)
}
select {
case <-ctx.Done():
log.Println("timeout of 5 seconds")
}
log.Println("server exiting")
}
實現效果如圖所示:
工程實踐
-
使用 go module 管理依賴的三方包 (gin 和 yaml)
-
api 存放 http 服務路由以及對應處理函數
-
cmd 存放編譯入口 main 文件
-
configs 存放全局配置和全局常量
-
global 存放全局結構變量
-
model 存放註冊表結構模型及主要邏輯
總結與問題
註冊中心功能實現
至此,一個單機版的註冊中心就可以工作了,但生產環境單點肯定是不能容忍的,因此有必要實現一個註冊中心集羣。那麼是否部署多個註冊中心實例就可以了,當然 .... 不行!這隻能保障有多個註冊中心節點,而每個節點中維護自己的註冊表,那麼就需要進行註冊表數據同步。多節點數據同步又會涉及著名的一致性問題,這時 Paxos、Raft、ZAB、Gossip 等算法名詞湧現,而我們將使用 P2P(Peer to Peer)對等網絡協議來實現。關於集羣設計與實現我們將在後續文章中展開。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/WZ6HG-ZLFiBJLbE29vTr3g