熔斷原理分析與源碼解讀
熔斷機制(Circuit Breaker)指的是在股票市場的交易時間中,當價格的波動幅度達到某一個限定的目標(熔斷點)時,對其暫停交易一段時間的機制。此機制如同保險絲在電流過大時候熔斷,故而得名。熔斷機制推出的目的是爲了防範系統性風險,給市場更多的冷靜時間,避免恐慌情緒蔓延導致整個市場波動,從而防止大規模股價下跌現象的發生。
同樣的,在高併發的分佈式系統設計中,也應該有熔斷的機制。熔斷一般是在客戶端(調用端)進行配置,當客戶端向服務端發起請求的時候,服務端的錯誤不斷地增多,這時候就可能會觸發熔斷,觸發熔斷後客戶端的請求不再發往服務端,而是在客戶端直接拒絕請求,從而可以保護服務端不會過載。這裏說的服務端可能是 rpc 服務,http 服務,也可能是 mysql,redis 等。注意熔斷是一種有損的機制,當熔斷後可能需要一些降級的策略進行配合。
熔斷原理
現代微服務架構基本都是分佈式的,整個分佈式系統是由非常多的微服務組成。不同服務之間相互調用,組成複雜的調用鏈路。在複雜的調用鏈路中的某一個服務如果不穩定,就可能會層層級聯,最終可能導致整個鏈路全部掛掉。因此我們需要對不穩定的服務依賴進行熔斷降級,暫時切斷不穩定的服務調用,避免局部不穩定因素導致整個分佈式系統的雪崩。
說白了,我覺得熔斷就像是那些容易異常服務的一種代理,這個代理能夠記錄最近調用發生錯誤的次數,然後決定是繼續操作,還是立即返回錯誤。
熔斷器內部維護了一個熔斷器狀態機,狀態機的轉換關係如下圖所示:
熔斷器有三種狀態:
-
Closed 狀態:也是初始狀態,我們需要一個調用失敗的計數器,如果調用失敗,則使失敗次數加 1。如果最近失敗次數超過了在給定時間內允許失敗的閾值,則切換到 Open 狀態,此時開啓一個超時時鐘,當到達超時時鐘時間後,則切換到 Half Open 狀態,該超時時間的設定是給了系統一次機會來修正導致調用失敗的錯誤,以回到正常的工作狀態。在 Closed 狀態下,錯誤計數是基於時間的。在特定的時間間隔內會自動重置,這能夠防止由於某次的偶然錯誤導致熔斷器進入 Open 狀態,也可以基於連續失敗的次數。
-
Open 狀態:在該狀態下,客戶端請求會立即返回錯誤響應,而不調用服務端。
-
Half-Open 狀態:允許客戶端一定數量的去調用服務端,如果這些請求對服務的調用成功,那麼可以認爲之前導致調用失敗的錯誤已經修正,此時熔斷器切換到 Closed 狀態,同時將錯誤計數器重置。如果這一定數量的請求有調用失敗的情況,則認爲導致之前調用失敗的的問題仍然存在,熔斷器切回到斷開狀態,然後重置計時器來給系統一定的時間來修正錯誤。Half-Open 狀態能夠有效防止正在恢復中的服務被突然而來的大量請求再次打掛。
下圖是 Netflix 的開源項目 Hystrix 中的熔斷器的實現邏輯:
從這個流程圖中,可以看到:
-
有請求來了,首先 allowRequest() 函數判斷是否在熔斷中,如果不是則放行,如果是的話,還要看有沒有達到一個熔斷時間片,如果熔斷時間片到了,也放行,否則直接返回錯誤。
-
每次調用都有兩個函數 makeSuccess(duration) 和 makeFailure(duration) 來統計一下在一定的 duration 內有多少是成功還是失敗的。
-
判斷是否熔斷的條件 isOpen(),是計算 failure/(success+failure) 當前的錯誤率,如果高於一個閾值,那麼熔斷器打開,否則關閉。
-
Hystrix 會在內存中維護一個數據,其中記錄着每一個週期的請求結果的統計,超過時長長度的元素會被刪除掉。
熔斷器實現
瞭解了熔斷的原理後,我們來自己實現一套熔斷器。
熟悉 go-zero 的朋友都知道,在 go-zero 中熔斷沒有采用上面介紹的方式,而是參考了《Google Sre》 採用了一種自適應的熔斷機制,這種自適應的方式有什麼好處呢?下文會基於這兩種機制做一個對比。
下面我們基於上面介紹的熔斷原理,實現一套自己的熔斷器。
代碼路徑:go-zero/core/breaker/hystrixbreaker.go
熔斷器默認的狀態爲 Closed,當熔斷器打開後默認的冷卻時間是 5 秒鐘,當熔斷器處於 HalfOpen 狀態時默認的探測時間爲 200 毫秒,默認使用 rateTripFunc 方法來判斷是否觸發熔斷,規則是採樣大於等於 200 且錯誤率大於 50%,使用滑動窗口來記錄請求總數和錯誤數。
func newHystrixBreaker() *hystrixBreaker {
bucketDuration := time.Duration(int64(window) / int64(buckets))
stat := collection.NewRollingWindow(buckets, bucketDuration)
return &hystrixBreaker{
state: Closed,
coolingTimeout: defaultCoolingTimeout,
detectTimeout: defaultDetectTimeout,
tripFunc: rateTripFunc(defaultErrRate, defaultMinSample),
stat: stat,
now: time.Now,
}
}
func rateTripFunc(rate float64, minSamples int64) TripFunc {
return func(rollingWindow *collection.RollingWindow) bool {
var total, errs int64
rollingWindow.Reduce(func(b *collection.Bucket) {
total += b.Count
errs += int64(b.Sum)
})
errRate := float64(errs) / float64(total)
return total >= minSamples && errRate > rate
}
}
每次請求都會調用 doReq 方法,在該方法中,首先通過 accept() 方法判斷是否拒絕本次請求,拒絕則直接返回熔斷錯誤。否則執行 req() 真正的發起服務端調用,成功和失敗分別調用 b.markSuccess() 和 b.markFailure()
func (b *hystrixBreaker) doReq(req func() error, fallback func(error) error, acceptable Acceptable) error {
if err := b.accept(); err != nil {
if fallback != nil {
return fallback(err)
}
return err
}
defer func() {
if e := recover(); e != nil {
b.markFailure()
panic(e)
}
}()
err := req()
if acceptable(err) {
b.markSuccess()
} else {
b.markFailure()
}
return err
}
在 accept() 方法中,首先獲取當前熔斷器狀態,當熔斷器處於 Closed 狀態直接返回,表示正常處理本次請求。
當前狀態爲 Open 的時候,判斷冷卻時間是否過期,如果沒有過期的話則直接返回熔斷錯誤拒絕本次請求,如果過期的話則把熔斷器狀態更改爲 HalfOpen,冷卻時間的主要目的是給服務端一些時間進行故障恢復,避免持續請求把服務端打掛。
當前狀態爲 HalfOpen 的時候,首先判斷探測時間間隔,避免探測過於頻繁,默認使用 200 毫秒作爲探測間隔。
func (b *hystrixBreaker) accept() error {
b.mux.Lock()
switch b.getState() {
case Open:
now := b.now()
if b.openTime.Add(b.coolingTimeout).After(now) {
b.mux.Unlock()
return ErrServiceUnavailable
}
if b.getState() == Open {
atomic.StoreInt32((*int32)(&b.state), int32(HalfOpen))
atomic.StoreInt32(&b.halfopenSuccess, 0)
b.lastRetryTime = now
b.mux.Unlock()
} else {
b.mux.Unlock()
return ErrServiceUnavailable
}
case HalfOpen:
now := b.now()
if b.lastRetryTime.Add(b.detectTimeout).After(now) {
b.mux.Unlock()
return ErrServiceUnavailable
}
b.lastRetryTime = now
b.mux.Unlock()
case Closed:
b.mux.Unlock()
}
return nil
}
如果本次請求正常返回,則調用 markSuccess() 方法,如果當前熔斷器處於 HalfOpen 狀態,則判斷當前探測成功數量是否大於默認的探測成功數量,如果大於則把熔斷器的狀態更新爲 Closed。
func (b *hystrixBreaker) markSuccess() {
b.mux.Lock()
switch b.getState() {
case Open:
b.mux.Unlock()
case HalfOpen:
atomic.AddInt32(&b.halfopenSuccess, 1)
if atomic.LoadInt32(&b.halfopenSuccess) > defaultHalfOpenSuccesss {
atomic.StoreInt32((*int32)(&b.state), int32(Closed))
b.stat.Reduce(func(b *collection.Bucket) {
b.Count = 0
b.Sum = 0
})
}
b.mux.Unlock()
case Closed:
b.stat.Add(1)
b.mux.Unlock()
}
}
在 markFailure() 方法中,如果當前狀態是 Closed 通過執行 tripFunc 來判斷是否滿足熔斷條件,如果滿足則把熔斷器狀態更改爲 Open 狀態。
func (b *hystrixBreaker) markFailure() {
b.mux.Lock()
b.stat.Add(0)
switch b.getState() {
case Open:
b.mux.Unlock()
case HalfOpen:
b.openTime = b.now()
atomic.StoreInt32((*int32)(&b.state), int32(Open))
b.mux.Unlock()
case Closed:
if b.tripFunc != nil && b.tripFunc(b.stat) {
b.openTime = b.now()
atomic.StoreInt32((*int32)(&b.state), int32(Open))
}
b.mux.Unlock()
}
}
熔斷器的實現邏輯總體比較簡單,閱讀代碼基本都能理解,這部分代碼實現的比較倉促,可能會有 bug,如果大家發現 bug 可以隨時聯繫我進行修正。
hystrixBreaker 和 googlebreaker 對比
接下來對比一下兩種熔斷器的熔斷效果。
這部分示例代碼在:go-zero/example 下
分別定義了 user-api 和 user-rpc 服務,user-api 作爲客戶端對 user-rpc 進行請求,user-rpc 作爲服務端響應客戶端請求。
在 user-rpc 的示例方法中,有 20% 的幾率返回錯誤。
func (l *UserInfoLogic) UserInfo(in *user.UserInfoRequest) (*user.UserInfoResponse, error) {
ts := time.Now().UnixMilli()
if in.UserId == int64(1) {
if ts%5 == 1 {
return nil, status.Error(codes.Internal, "internal error")
}
return &user.UserInfoResponse{
UserId: 1,
Name: "jack",
}, nil
}
return &user.UserInfoResponse{}, nil
}
在 user-api 的示例方法中,對 user-rpc 發起請求,然後使用 prometheus 指標記錄正常請求的數量。
var metricSuccessReqTotal = metric.NewCounterVec(&metric.CounterVecOpts{
Namespace: "circuit_breaker",
Subsystem: "requests",
Name: "req_total",
Help: "test for circuit breaker",
Labels: []string{"method"},
})
func (l *UserInfoLogic) UserInfo() (resp *types.UserInfoResponse, err error) {
for {
_, err := l.svcCtx.UserRPC.UserInfo(l.ctx, &user.UserInfoRequest{UserId: int64(1)})
if err != nil && err == breaker.ErrServiceUnavailable {
fmt.Println(err)
continue
}
metricSuccessReqTotal.Inc("UserInfo")
}
return &types.UserInfoResponse{}, nil
}
啓動兩個服務,然後觀察在兩種熔斷策略下正常請求的數量。
googleBreaker 熔斷器的正常請求率如下圖所示:
hystrixBreaker 熔斷器的正常請求率如下圖所示:
從上面的實驗結果可以看出,go-zero 內置的 googleBreaker 的正常請求數是高於 hystrixBreaker 的。這是因爲 hystrixBreaker 維護了三種狀態,當進入 Open 狀態後爲了避免繼續對服務端發起請求造成壓力,會使用一個冷卻時鐘,而在這段時間裏是不會放過任何請求的,同時,從 HalfOpen 狀態變爲 Closed 狀態後,瞬間又會有大量的請求發往服務端,這時服務端很可能還沒恢復,從而導致熔斷器又變爲 Open 狀態。而 googleBreaker 採用的是一種自適應的熔斷策略,也不需要多種狀態,也不會像 hystrixBreaker 那樣一刀切,而是會盡可能多的處理請求,這不也是我們期望的嘛,畢竟熔斷對客戶來說是有損的。下面我們來一起學習下 go-zero 內置的熔斷器 googleBreaker。
源碼解讀
googleBreaker 的代碼路徑在:go-zero/core/breaker/googlebreaker.go
在 doReq() 方法中通過 accept() 方法判斷是否觸發熔斷,如果觸發熔斷則返回 error,這裏如果定義了回調函數的話可以執行回調,比如做一些降級數據的處理等。如果請求正常則通過 markSuccess() 給總請求數和正常請求數都加 1,如果請求失敗通過 markFailure 則只給總請求數加 1。
func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
if err := b.accept(); err != nil {
if fallback != nil {
return fallback(err)
}
return err
}
defer func() {
if e := recover(); e != nil {
b.markFailure()
panic(e)
}
}()
err := req()
if acceptable(err) {
b.markSuccess()
} else {
b.markFailure()
}
return err
}
在 accept() 方法中通過計算判斷是否觸發熔斷。
在該算法中,需要記錄兩個請求數,分別是:
-
請求總量(requests): 調用方發起請求的數量總和
-
正常處理的請求數量(accepts): 服務端正常處理的請求數量
在正常情況下,這兩個值是相等的,隨着被調用方服務出現異常開始拒絕請求,請求接受數量 (accepts) 的值開始逐漸小於請求數量(requests),這個時候調用方可以繼續發送請求,直到 requests = K * accepts,一旦超過這個限制,熔斷器就會打開,新的請求會在本地以一定的概率被拋棄直接返回錯誤,概率的計算公式如下:
max(0, (requests - K * accepts) / (requests + 1))
通過修改算法中的 K(倍值),可以調節熔斷器的敏感度,當降低該倍值會使自適應熔斷算法更敏感,當增加該倍值會使得自適應熔斷算法降低敏感度,舉例來說,假設將調用方的請求上限從 requests = 2 acceptst 調整爲 requests = 1.1 accepts 那麼就意味着調用方每十個請求之中就有一個請求會觸發熔斷。
func (b *googleBreaker) accept() error {
accepts, total := b.history()
weightedAccepts := b.k * float64(accepts)
// https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
if dropRatio <= 0 {
return nil
}
if b.proba.TrueOnProba(dropRatio) {
return ErrServiceUnavailable
}
return nil
}
history 從滑動窗口中統計當前的總請求數和正常處理的請求數。
func (b *googleBreaker) history() (accepts, total int64) {
b.stat.Reduce(func(b *collection.Bucket) {
accepts += int64(b.Sum)
total += b.Count
})
return
}
結束語
本篇文章介紹了服務治理中的一種客戶端節流機制 - 熔斷。在 hystrix 熔斷策略中需要實現三個狀態,分別是 Open、HalfOpen 和 Closed。不同狀態的切換時機在上文中也有詳細描述,大家可以反覆閱讀理解,最好是能自己動手實現一下。對於 go-zero 內置的熔斷器是沒有狀態的,如果非要說它的狀態的話,那麼也只有打開和關閉兩種情況,它是根據當前請求的成功率自適應的丟棄請求,是一種更彈性的熔斷策略,丟棄請求概率隨着正常處理的請求數不斷變化,正常處理的請求越多丟棄請求的概率就越低,反之丟棄請求的概率就越高。
雖然熔斷的原理都一樣,但實現機制不同導致的效果可能也不同,在實際生產中可以根據實際情況選擇符合業務場景的熔斷策略。
希望本篇文章對你有所幫助。
本篇文章代碼:https://github.com/zhoushuguang/go-zero/tree/circuit-breaker
參考
https://martinfowler.com/bliki/CircuitBreaker.html
https://github.com/Netflix/Hystrix/wiki/How-it-Works
項目地址
https://github.com/zeromicro/go-zero
歡迎使用 go-zero
並 star 支持我們!
微信交流羣
關注『微服務實踐』公衆號並點擊 交流羣 獲取社區羣二維碼。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/4JLDjEVoYnceKcbBH4zm5w