Golang 實現熔斷機制

【導讀】一些場景下,爲了保障服務穩定性會引入熔斷機制。本文介紹了用 Go 語言自己實現熔斷需要什麼操作。

什麼是熔斷?

熔斷是指在下游發生錯誤時上游主動關閉或限制對下游的請求。

原理

  1. 通常熔斷器分爲三個時期:CLOSED,OPEN,HALFOPEN

  2. RPC 正常時,爲 CLOSED;

  3. 當 RPC 錯誤增多時,熔斷器會被觸發,進入 OPEN;

  4. OPEN 後經過一定的冷卻時間,熔斷器變爲 HALFOPEN;

  5. HALFOPEN 時會對下游進行一些有策略的訪問,然後根據結果決定是變爲 CLOSED,還是 OPEN;

總得來說三個狀態的轉換大致如下圖:

Go 實現

https://github.com/rubyist/circuitbreaker

IsAllowed 是否允許請求,根據當前狀態判斷

CLOSE 允許
OPEN
HALFOPEN
atomic.StoreInt32((*int32)(&b.state), int32(HALFOPEN))

trip 判斷是否達到熔斷限額(可以自定義)

type TripFunc func(Metricser) bool

Metricser 訪問統計,包括成功數、失敗數、超時數、錯誤率、採樣數、連續錯誤數

type Metricser interface {
   Fail()    // records a failure
   Succeed() // records a success
   Timeout() // records a timeout

   Failures() int64    // return the number of failures
   Successes() int64   // return the number of successes
   Timeouts() int64    // return the number of timeouts
   ConseErrors() int64 // return the consecutive errors recently
   ErrorRate() float64 // rate = (timeouts + failures) / (timeouts + failures + successes)
   Samples() int64     // (timeouts + failures + successes)
   Counts() (successes, failures, timeouts int64)

   Reset()
}
window 實現類
type window struct {
   sync.RWMutex
   oldest  int32     // oldest bucket index
   latest  int32     // latest bucket index
   buckets []bucket // buckets this window holds

   bucketTime time.Duration // time each bucket holds
   bucketNums int32         // the numbe of buckets
   inWindow   int32         // the number of buckets in the window

   allSuccess int64
   allFailure int64
   allTimeout int64

   conseErr int64
}

type bucket struct {
   failure int64
   success int64
   timeout int64
}

用環形隊列實現動態統計。把一個連續的時間切成多個小份,每一個 bucket 保存 BucketTime 的統計數據,BucketTime * BucketNums 是統計的時間區間。

每 BucketTime,會有一個 bucket 過期

if w.inWindow == w.bucketNums {
   // the lastest covered the oldest(latest == oldest)
   oldBucket := &w.buckets[w.oldest]
   atomic.AddInt64(&w.allSuccess, -oldBucket.Successes())
   atomic.AddInt64(&w.allFailure, -oldBucket.Failures())
   atomic.AddInt64(&w.allTimeout, -oldBucket.Timeouts())
   w.oldest++
   if w.oldest >= w.bucketNums {
      w.oldest = 0
   }
} else {
   w.inWindow++
}

w.latest++
if w.latest >= w.bucketNums {
   w.latest = 0
}
(&w.buckets[w.latest]).Reset()

Panel Metricser 的容器

PanelStateChangeHandler 熔斷事件

type PanelStateChangeHandler func(key string, oldState, newState State, m Metricser)

缺陷

  1. 所有 breaker 公用同一個 BucketTime,統計週期不支持更新

  2. 冷卻時間不支持動態更新

轉自:

blog.cyeam.com/framework/2020/03/01/circuitbreaker

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/bVYburXMpRJZi1bx4uFqkg