golang 熔斷器的實現

熔斷器像是一個保險絲。當我們依賴的服務出現問題時,可以及時容錯。一方面可以減少依賴服務對自身訪問的依賴,防止出現雪崩效應;另一方面降低請求頻率以方便上游儘快恢復服務。

熔斷器的應用也非常廣泛。除了在我們應用中,爲了請求服務時使用熔斷器外,在 web 網關、微服務中,也有非常廣泛的應用。本文將從源碼角度學習 sony 開源的一個熔斷器實現 github/sony/gobreaker。(代碼註釋可以從github/lpflpf/gobreaker 查看)

熔斷器的模式

gobreaker 是基於《微軟雲設計模式》一書中的熔斷器模式的 Golang 實現。有 sony 公司開源,目前 star 數有 1.2K。使用人數較多。

下面是模式定義的一個狀態機:

熔斷器有三種狀態,四種狀態轉移的情況:

三種狀態

四種狀態轉移

gobreaker 的實現

gobreaker 是在上述狀態機的基礎上,實現的一個熔斷器。

熔斷器的定義

type CircuitBreaker struct {
  name          string
  maxRequests   uint32  // 最大請求數 (半開啓狀態會限流)
  interval      time.Duration   // 統計週期
  timeout       time.Duration   // 進入熔斷後的超時時間
  readyToTrip   func(counts Counts) bool // 通過 Counts 判斷是否開啓熔斷。需要自定義
  onStateChange func(name string, from State, to State) // 狀態修改時的鉤子函數

  mutex      sync.Mutex // 互斥鎖,下面數據的更新都需要加鎖
  state      State  // 記錄了當前的狀態
  generation uint64 // 標記屬於哪個週期
  counts     Counts // 計數器,統計了 成功、失敗、連續成功、連續失敗等,用於決策是否進入熔斷
  expiry     time.Time // 進入下個週期的時間
}

其中,如下參數是我們可以自定義的:

請求的執行

熔斷器的執行操作,主要包括三個階段;①請求之前的判定;②服務的請求執行;③請求後的狀態和計數的更新

// 熔斷器的調用
func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) {

  // ①請求之前的判斷
  generation, err := cb.beforeRequest()
  if err != nil {
    return nil, err
  }

  defer func() {
    e := recover()
    if e != nil {
      // ③ panic 的捕獲
      cb.afterRequest(generation, false)
      panic(e)
    }
  }()

  // ② 請求和執行
  result, err := req()

  // ③ 更新計數
  cb.afterRequest(generation, err == nil)
  return result, err
}

請求之前的判定操作

請求之前,會判斷當前熔斷器的狀態。如果熔斷器以開啓,則不會繼續請求。如果熔斷器半開,並且已達到最大請求閾值,也不會繼續請求。

func (cb *CircuitBreaker) beforeRequest() (uint64, error) {
  cb.mutex.Lock()
  defer cb.mutex.Unlock()

  now := time.Now()
  state, generation := cb.currentState(now)

  if state == StateOpen { // 熔斷器開啓,直接返回
    return generation, ErrOpenState
  } else if state == StateHalfOpen && cb.counts.Requests >= cb.maxRequests { // 如果是半打開的狀態,並且請求次數過多了,則直接返回
    return generation, ErrTooManyRequests
  }

  cb.counts.onRequest()
  return generation, nil
}

其中當前狀態的計算,是依據當前狀態來的。如果當前狀態爲已開啓,則判斷是否已經超時,超時就可以變更狀態到半開;如果當前狀態爲關閉狀態,則通過週期判斷是否進入下一個週期。

func (cb *CircuitBreaker) currentState(now time.Time) (State, uint64) {
  switch cb.state {
  case StateClosed:
    if !cb.expiry.IsZero() && cb.expiry.Before(now) { // 是否需要進入下一個計數週期
      cb.toNewGeneration(now)
    }
  case StateOpen:
    if cb.expiry.Before(now) {
      // 熔斷器由開啓變更爲半開
      cb.setState(StateHalfOpen, now)
    }
  }
  return cb.state, cb.generation
}

週期長度的設定,也是以據當前狀態來的。如果當前正常(熔斷器關閉),則設置爲一個 interval 的週期;如果當前熔斷器是開啓狀態,則設置爲超時時間(超時後,才能變更爲半開狀態)。

請求之後的處理操作

每次請求之後,會通過請求結果是否成功,對熔斷器做計數。

func (cb *CircuitBreaker) afterRequest(before uint64, success bool) {
  cb.mutex.Lock()
  defer cb.mutex.Unlock()

  now := time.Now()

  // 如果不在一個週期,就不再計數
  state, generation := cb.currentState(now)
  if generation != before {
    return
  }

  if success {
    cb.onSuccess(state, now)
  } else {
    cb.onFailure(state, now)
  }
}

如果在半開的狀態下:

如果在關閉狀態下:

總結

轉自:

segmentfault.com/a/1190000023033343

Go 開發大全

參與維護一個非常全面的 Go 開源技術資源庫。日常分享 Go, 雲原生、k8s、Docker 和微服務方面的技術文章和行業動態。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/cWYSGon6LssmiC023f0cmw