一文講透自適應熔斷的原理和實現

爲什麼需要熔斷

微服務集羣中,每個應用基本都會依賴一定數量的外部服務。有可能隨時都會遇到網絡連接緩慢,超時,依賴服務過載,服務不可用的情況,在高併發場景下如果此時調用方不做任何處理,繼續持續請求故障服務的話很容易引起整個微服務集羣雪崩。比如高併發場景的用戶訂單服務,一般需要依賴一下服務:

  1. 商品服務

  2. 賬戶服務

  3. 庫存服務

假如此時 賬戶服務 過載,訂單服務持續請求賬戶服務只能被動的等待賬戶服務報錯或者請求超時,進而導致訂單請求被大量堆積,這些無效請求依然會佔用系統資源:cpu,內存,數據連接... 導致訂單服務整體不可用。即使賬戶服務恢復了訂單服務也無法自我恢復。

這時如果有一個主動保護機制應對這種場景的話訂單服務至少可以保證自身的運行狀態, 等待賬戶服務恢復時訂單服務也同步自我恢復,這種自我保護機制在服務治理中叫熔斷機制。

熔斷

熔斷是調用方自我保護的機制(客觀上也能保護被調用方),熔斷對象是外部服務。

降級

降級是被調用方(服務提供者)的防止因自身資源不足導致過載的自我保護機制,降級對象是自身。

熔斷這一詞來源時我們日常生活電路里面的熔斷器,當負載過高時(電流過大)保險絲會自行熔斷防止電路被燒壞,很多技術都是來自生活場景的提煉。

工作原理

熔斷器一般具有三個狀態:

  1. 關閉:默認狀態,請求能被到達目標服務,同時統計在窗口時間成功和失敗次數,如果達到錯誤率閾值將會進入斷開狀態。

  2. 斷開:此狀態下將會直接返回錯誤,如果有 fallback 配置則直接調用 fallback 方法。

  3. 半斷開:進行斷開狀態會維護一個超市時間,到達超時時間開始進入 半斷開 狀態,嘗試允許一部門請求正常通過並統計成功數量,如果請求正常則認爲此時目標服務已恢復進入 關閉 狀態,否則進入 斷開 狀態。半斷開 狀態存在的目的在於實現了自我修復,同時防止正在恢復的服務再次被大量打垮。

使用較多的熔斷組件:

  1. hystrix circuit breaker(不再維護)

  2. hystrix-go

  3. resilience4j(推薦)

  4. sentinel(推薦)

什麼是自適應熔斷

基於上面提到的熔斷器原理,項目中我們要使用好熔斷器通常需要準備以下參數:

  1. 錯誤比例閾值:達到該閾值進入 斷開 狀態。

  2. 斷開狀態超時時間: 超時後進入 半斷開 狀態。

  3. 半斷開狀態允許請求數量。

  4. 窗口時間大小。

實際上可選的配置參數還有非常非常多,參考 https://resilience4j.readme.io/docs/circuitbreaker

對於經驗不夠豐富的開發人員而言,這些參數設置多少合適心裏其實並沒有底。

那麼有沒有一種自適應的熔斷算法能讓我們不關注參數,只要簡單配置就能滿足大部分場景?

其實是有的,google sre 提供了一種自適應熔斷算法來計算丟棄請求的概率:

算法參數:

  1. requests:窗口時間內的請求總數

  2. accepts:正常請求數量

  3. K:敏感度,K 越小越容易丟請求,一般推薦 1.5-2 之間

算法解釋:

  1. 正常情況下 requests=accepts,所以概率是 0。

  2. 隨着正常請求數量減少,當達到 requests == K* accepts 繼續請求時,概率 P 會逐漸比 0 大開始按照概率逐漸丟棄一些請求,如果故障嚴重則丟包會越來越多,假如窗口時間內 accepts==0 則完全熔斷。

  3. 當應用逐漸恢復正常時,accepts、requests 同時都在增加,但是 K*accepts 會比 requests 增加的更快,所以概率很快就會歸 0,關閉熔斷。

代碼實現

接下來思考一個熔斷器如何實現。

初步思路是:

  1. 無論什麼熔斷器都得依靠指標統計來轉換狀態,而統計指標一般要求是最近的一段時間內的數據(太久的數據沒有參考意義也浪費空間),所以通常採用一個 滑動時間窗口 數據結構 來存儲統計數據。同時熔斷器的狀態也需要依靠指標統計來實現可觀測性,我們實現任何系統第一步需要考慮就是可觀測性,不然系統就是一個黑盒。

  2. 外部服務請求結果各式各樣,所以需要提供一個自定義的判斷方法,判斷請求是否成功。可能是 http.code 、rpc.code、body.code,熔斷器需要實時收集此數據。

  3. 當外部服務被熔斷時使用者往往需要自定義快速失敗的邏輯,考慮提供自定義的 fallback() 功能。

下面來逐步分析 go-zero 的源碼實現:

core/breaker/breaker.go

熔斷器接口定義

兵馬未動,糧草先行,明確了需求後就可以開始規劃定義接口了,接口是我們編碼思維抽象的第一步也是最重要的一步。

核心定義包含兩種類型的方法:

Allow():需要手動回調請求結果至熔斷器,相當於手動擋。

DoXXX():自動回調請求結果至熔斷器,相當於自動擋,實際上 DoXXX() 類型方法最後都是調用DoWithFallbackAcceptable(req func() error, fallback func(err error) error, acceptable Acceptable) error

// 自定義判定執行結果
Acceptable func(err error) bool
 
 // 手動回調
Promise interface {
  // Accept tells the Breaker that the call is successful.
  // 請求成功
  Accept()
  // Reject tells the Breaker that the call is failed.
  // 請求失敗
  Reject(reason string)
} 

Breaker interface {
  // 熔斷器名稱
  Name() string

  // 熔斷方法,執行請求時必須手動上報執行結果
  // 適用於簡單無需自定義快速失敗,無需自定義判定請求結果的場景
  // 相當於手動擋。。。
  Allow() (Promise, error)

  // 熔斷方法,自動上報執行結果
  // 自動擋。。。
  Do(req func() error) error

  // 熔斷方法
  // acceptable - 支持自定義判定執行結果
  DoWithAcceptable(req func() error, acceptable Acceptable) error

  // 熔斷方法
  // fallback - 支持自定義快速失敗
  DoWithFallback(req func() error, fallback func(err error) error) error

  // 熔斷方法
  // fallback - 支持自定義快速失敗
  // acceptable - 支持自定義判定執行結果
  DoWithFallbackAcceptable(req func() error, fallback func(err error) error, acceptable Acceptable) error
}

熔斷器實現

circuitBreaker 繼承 throttle,實際上這裏相當於靜態代理,代理模式可以在不改變原有對象的基礎上增強功能,後面我們會看到 go-zero 這樣做的原因是爲了收集熔斷器錯誤數據,也就是爲了實現可觀測性。

熔斷器實現採用靜態代理模式,看起來稍微有點繞腦。

// 熔斷器結構體
circuitBreaker struct {
  name string
  // 實際上 circuitBreaker熔斷功能都代理給 throttle來實現
  throttle
}
// 熔斷器接口
throttle interface {
  // 熔斷方法
  allow() (Promise, error)
  // 熔斷方法
  // DoXXX()方法最終都會該方法
  doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
}
 
func (cb *circuitBreaker) Allow() (Promise, error) {
  return cb.throttle.allow()
}
    
func (cb *circuitBreaker) Do(req func() error) error {
  return cb.throttle.doReq(req, nil, defaultAcceptable)
}
    
func (cb *circuitBreaker) DoWithAcceptable(req func() error, acceptable Acceptable) error {
  return cb.throttle.doReq(req, nil, acceptable)
}
    
func (cb *circuitBreaker) DoWithFallback(req func() error, fallback func(err error) error) error {
  return cb.throttle.doReq(req, fallback, defaultAcceptable)
}
    
func (cb *circuitBreaker) DoWithFallbackAcceptable(req func() error, fallback func(err error) error,
    acceptable Acceptable) error {
  return cb.throttle.doReq(req, fallback, acceptable)
}

throttle 接口實現類:

loggedThrottle 增加了爲了收集錯誤日誌的滾動窗口,目的是爲了收集當請求失敗時的錯誤日誌。

// 帶日誌功能的熔斷器
type loggedThrottle struct {
  // 名稱
  name string
  // 代理對象
  internalThrottle
  // 滾動窗口,滾動收集數據,相當於環形數組
  errWin *errorWindow
}

// 熔斷方法
func (lt loggedThrottle) allow() (Promise, error) {
  promise, err := lt.internalThrottle.allow()
  return promiseWithReason{
    promise: promise,
    errWin:  lt.errWin,
  }, lt.logError(err)
}

// 熔斷方法
func (lt loggedThrottle) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
  return lt.logError(lt.internalThrottle.doReq(req, fallback, func(err error) bool {
    accept := acceptable(err)
    if !accept {
      lt.errWin.add(err.Error())
    }
    return accept
  }))
}

func (lt loggedThrottle) logError(err error) error {
  if err == ErrServiceUnavailable {
    // if circuit open, not possible to have empty error window
    stat.Report(fmt.Sprintf(
      "proc(%s/%d), callee: %s, breaker is open and requests dropped\nlast errors:\n%s",
      proc.ProcessName(), proc.Pid(), lt.name, lt.errWin))
  }

  return err
}

錯誤日誌收集 errorWindow

errorWindow 是一個環形數組,新數據不斷滾動覆蓋最舊的數據,通過取餘實現。

// 滾動窗口
type errorWindow struct {
  reasons [numHistoryReasons]string
  index   int
  count   int
  lock    sync.Mutex
}

// 添加數據
func (ew *errorWindow) add(reason string) {
  ew.lock.Lock()
  // 添加錯誤日誌
  ew.reasons[ew.index] = fmt.Sprintf("%s %s", timex.Time().Format(timeFormat), reason)
  // 更新index,爲下一次寫入數據做準備
  // 這裏用的取模實現了滾動功能
  ew.index = (ew.index + 1) % numHistoryReasons
  // 統計數量
  ew.count = mathx.MinInt(ew.count+1, numHistoryReasons)
  ew.lock.Unlock()
}

// 格式化錯誤日誌
func (ew *errorWindow) String() string {
  var reasons []string

  ew.lock.Lock()
  // reverse order
  for i := ew.index - 1; i >= ew.index-ew.count; i-- {
    reasons = append(reasons, ew.reasons[(i+numHistoryReasons)%numHistoryReasons])
  }
  ew.lock.Unlock()

  return strings.Join(reasons, "\n")
}

看到這裏我們還沒看到實際的熔斷器實現,實際上真正的熔斷操作被代理給了 internalThrottle 對象。

internalThrottle interface {
  allow() (internalPromise, error)
  doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
}

internalThrottle 接口實現 googleBreaker 結構體定義

type googleBreaker struct {
  // 敏感度,go-zero中默認值爲1.5
  k float64
  // 滑動窗口,用於記錄最近一段時間內的請求總數,成功總數
  stat *collection.RollingWindow
  // 概率生成器
  // 隨機產生0.0-1.0之間的雙精度浮點數
  proba *mathx.Proba
}

可以看到熔斷器屬性其實非常簡單,數據統計採用的是滑動時間窗口來實現。

RollingWindow 滑動窗口

滑動窗口屬於比較通用的數據結構,常用於最近一段時間內的行爲數據統計。

它的實現非常有意思,尤其是如何模擬窗口滑動過程。

先來看滑動窗口的結構體定義:

RollingWindow struct {
  // 互斥鎖
  lock sync.RWMutex
  // 滑動窗口數量
  size int
  // 窗口,數據容器
  win *window
  // 滑動窗口單元時間間隔
  interval time.Duration
  // 遊標,用於定位當前應該寫入哪個bucket
  offset int
  // 彙總數據時,是否忽略當前正在寫入桶的數據
  // 某些場景下因爲當前正在寫入的桶數據並沒有經過完整的窗口時間間隔
  // 可能導致當前桶的統計並不準確
  ignoreCurrent bool
  // 最後寫入桶的時間
  // 用於計算下一次寫入數據間隔最後一次寫入數據的之間
  // 經過了多少個時間間隔
  lastTime      time.Duration 
}

window 是數據的實際存儲位置,其實就是一個數組,提供向指定 offset 添加數據與清除操作。數組裏面按照 internal 時間間隔分隔成多個 bucket。

// 時間窗口
type window struct {
  // 桶
  // 一個桶標識一個時間間隔
  buckets []*Bucket
  // 窗口大小
  size int
}

// 添加數據
// offset - 遊標,定位寫入bucket位置
// v - 行爲數據
func (w *window) add(offset int, v float64) {
  w.buckets[offset%w.size].add(v)
}

// 彙總數據
// fn - 自定義的bucket統計函數
func (w *window) reduce(start, count int, fn func(b *Bucket)) {
  for i := 0; i < count; i++ {
    fn(w.buckets[(start+i)%w.size])
  }
}

// 清理特定bucket
func (w *window) resetBucket(offset int) {
  w.buckets[offset%w.size].reset()
}

// 桶
type Bucket struct {
  // 當前桶內值之和
  Sum float64
  //當前桶的add總次數
  Count int64
}

// 向桶添加數據
func (b *Bucket) add(v float64) {
  // 求和
  b.Sum += v
  // 次數+1
  b.Count++
}

// 桶數據清零
func (b *Bucket) reset() {
  b.Sum = 0
  b.Count = 0
}

window 添加數據:

  1. 計算當前時間距離上次添加時間經過了多少個 時間間隔,實際上就是過期了幾個 bucket。

  2. 清理過期桶的數據

  3. 更新 offset,更新 offset 的過程實際上就是在模擬窗口滑動

  4. 添加數據

// 添加數據
func (rw *RollingWindow) Add(v float64) {
  rw.lock.Lock()
  defer rw.lock.Unlock()
  // 獲取當前寫入的下標
  rw.updateOffset()
  // 添加數據
  rw.win.add(rw.offset, v)
}

// 計算當前距離最後寫入數據經過多少個單元時間間隔
// 實際上指的就是經過多少個桶
func (rw *RollingWindow) span() int {
  offset := int(timex.Since(rw.lastTime) / rw.interval)
  if 0 <= offset && offset < rw.size {
    return offset
  }
  // 大於時間窗口時 返回窗口大小即可
  return rw.size
}

// 更新當前時間的offset
// 實現窗口滑動
func (rw *RollingWindow) updateOffset() {
  // 經過span個桶的時間
  span := rw.span()
  // 還在同一單元時間內不需要更新
  if span <= 0 {
    return
  }
  offset := rw.offset
  // 既然經過了span個桶的時間沒有寫入數據
  // 那麼這些桶內的數據就不應該繼續保留了,屬於過期數據清空即可
  // 可以看到這裏全部用的 % 取餘操作,可以實現按照下標週期性寫入
  // 如果超出下標了那就從頭開始寫,確保新數據一定能夠正常寫入
  // 類似循環數組的效果
  for i := 0; i < span; i++ {
    rw.win.resetBucket((offset + i + 1) % rw.size)
  }
  // 更新offset
  rw.offset = (offset + span) % rw.size
  now := timex.Now()
  // 更新操作時間
  // 這裏很有意思
  rw.lastTime = now - (now-rw.lastTime)%rw.interval
}

window 統計數據:

// 歸納彙總數據
func (rw *RollingWindow) Reduce(fn func(b *Bucket)) {
  rw.lock.RLock()
  defer rw.lock.RUnlock()

  var diff int
  span := rw.span()
  // 當前時間截止前,未過期桶的數量
  if span == 0 && rw.ignoreCurrent {
   diff = rw.size - 1
  } else {
    diff = rw.size - span
  }
  if diff > 0 {
    // rw.offset - rw.offset+span之間的桶數據是過期的不應該計入統計
    offset := (rw.offset + span + 1) % rw.size
    // 彙總數據
    rw.win.reduce(offset, diff, fn)
  }
}

googleBreaker 判斷是否應該熔斷

  1. 收集滑動窗口內的統計數據

  2. 計算熔斷概率

// 按照最近一段時間的請求數據計算是否熔斷
func (b *googleBreaker) accept() error {
  // 獲取最近一段時間的統計數據
  accepts, total := b.history()
  // 計算動態熔斷概率
  weightedAccepts := b.k * float64(accepts)
  // https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
  dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
  // 概率爲0,通過
  if dropRatio <= 0 {
    return nil
  }
  // 隨機產生0.0-1.0之間的隨機數與上面計算出來的熔斷概率相比較
  // 如果隨機數比熔斷概率小則進行熔斷
  if b.proba.TrueOnProba(dropRatio) {
    return ErrServiceUnavailable
  }

  return nil
}

googleBreaker 熔斷邏輯實現

熔斷器對外暴露兩種類型的方法

  1. 簡單場景直接判斷對象是否被熔斷,執行請求後必須需手動上報執行結果至熔斷器。

func (b *googleBreaker) allow() (internalPromise, error)

  1. 複雜場景下支持自定義快速失敗,自定義判定請求是否成功的熔斷方法,自動上報執行結果至熔斷器。

func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error

Acceptable 參數目的是自定義判斷請求是否成功。

Acceptable func(err error) bool
// 熔斷方法
// 返回一個promise異步回調對象,可由開發者自行決定是否上報結果到熔斷器
func (b *googleBreaker) allow() (internalPromise, error) {
  if err := b.accept(); err != nil {
    return nil, err
  }

  return googlePromise{
    b: b,
  }, nil
}

// 熔斷方法
// req - 熔斷對象方法
// fallback - 自定義快速失敗函數,可對熔斷產生的err進行包裝後返回
// acceptable - 對本次未熔斷時執行請求的結果進行自定義的判定,比如可以針對http.code,rpc.code,body.code
func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
  // 判定是否熔斷
  if err := b.accept(); err != nil {
    // 熔斷中,如果有自定義的fallback則執行
    if fallback != nil {
      return fallback(err)
    }

    return err
  }
  // 如果執行req()過程發生了panic,依然判定本次執行失敗上報至熔斷器
  defer func() {
    if e := recover(); e != nil {
      b.markFailure()
      panic(e)
    }
  }()
  // 執行請求
  err := req()
  // 判定請求成功
  if acceptable(err) {
    b.markSuccess()
  } else {
    b.markFailure()
  }

  return err
}

// 上報成功
func (b *googleBreaker) markSuccess() {
  b.stat.Add(1)
}

// 上報失敗
func (b *googleBreaker) markFailure() {
  b.stat.Add(0)
}

// 統計數據
func (b *googleBreaker) history() (accepts, total int64) {
  b.stat.Reduce(func(b *collection.Bucket) {
    accepts += int64(b.Sum)
    total += b.Count
  })

  return
}

資料

微軟 azure 關於熔斷器設計模式:https://docs.microsoft.com/en-us/previous-versions/msp-n-p/dn589784(v=pandp.10

索尼參考微軟的文檔開源的熔斷器實現:https://github.com/sony/gobreaker

go-zero 自適應熔斷器文檔:https://go-zero.dev/cn/breaker-algorithms.html

項目地址

https://github.com/zeromicro/go-zero

歡迎使用 go-zerostar 支持我們!

微信交流羣

關注『微服務實踐』公衆號並點擊 交流羣 獲取社區羣二維碼。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/DDtFvFuJ1DMTBW-ViD7XTw