微服務中的熔斷算法
雪崩效應
在微服務系統中,整個系統是以一系列功能獨立的微服務組成,如果某一個服務,因爲流量異常或者內部其他原因,導致響應異常,那麼該服務會影響到其下游服務, 從而發生一系列連鎖反應,最終導致整個系統崩潰,這就是微服務中的 雪崩效應
。
例如:當前系統中有 A
,B
,C
三個服務,服務 A
是上游,服務 B
是中游,服務 C
是下游。一旦下游服務 C
變得不可用,積壓了大量請求,服務 B
的請求也隨之阻塞,資源逐漸耗盡,使得服務 B
也變得不可用。最後,服務 A
也變爲不可用,整個系統鏈路崩潰。
熔斷
熔斷
機制是微服務調用鏈路中的的自我保護機制,當鏈路中某個服務響應時間過長甚至不可用時,會進行服務 熔斷
,快速返回錯誤響應,停止 級聯故障
,避免 雪崩效應
。
熔斷、限流、降級區別
限流
是針對服務請求數量的一種自我保護機制,當請求數量超出服務負載時,自動丟棄新的請求,是系統高可用架構的第一步。服務有了 限流
之後,爲什麼還需要 熔斷
呢?限流
面向的是上游的服務,而 熔斷
面向的是下游的服務。
降級
通過將不重要的服務暫停,提高系統負載能力。例如電商的 秒殺
場景中,可以暫停 用戶好友關係
, 用戶信息
等服務。
hystrix-go
Hystrix[1] 是 Netflix
開源的由 Java
開發的 熔斷器
組件,對應的 Go
版本爲 afex/hystrix-go[2], 筆者選擇該組件作爲研究 熔斷
算法代碼實現。
三個狀態
這裏需要注意的是: afex/hystrix-go[3] 的實現中並沒有 半打開
的狀態,也就是說,一旦 熔斷
開啓後, 只能等待配置的時間之後,才能去主動判定下游服務是否已經恢復,繼而恢復請求。筆者認爲這個不是重要的部分,如果讀者比較介意的話, 可以參考 引用部分 [4] 的另外兩個 熔斷
開源組件。
示例代碼
主流程代碼
package hystrix
import (
"fmt"
"net/http"
"sync/atomic"
"testing"
"time"
"github.com/afex/hystrix-go/hystrix"
"github.com/gin-gonic/gin"
"github.com/go-resty/resty/v2"
)
func server() {
r := gin.Default()
// 服務請求計數器
var count int64
r.GET("/ping", func(ctx *gin.Context) {
// 模擬服務故障,前兩次請求返回錯誤
if atomic.AddInt64(&count, 1) < 3 {
ctx.String(http.StatusInternalServerError, "pong")
return
}
// 後面的請求正常返回
ctx.String(http.StatusOK, "pong")
})
// 通過 http://localhost:8080/ping 訪問
_ = r.Run(":8080")
}
func TestQuickStart(t *testing.T) {
// 啓動服務
go server()
hystrix.ConfigureCommand("test-api-ping", hystrix.CommandConfig{
// 執行命令超時時間, 默認值 1 秒
Timeout: 0,
// 最大併發請求量, 默認值 10
MaxConcurrentRequests: 100,
// 熔斷開啓前需要達到的最小請求數量, 默認值 20
RequestVolumeThreshold: 5,
// 熔斷器開啓後,重試服務是否恢復的等待時間,默認值 5 秒
// 這裏修改爲 0.5 秒
SleepWindow: 500,
// 請求錯誤百分比閾值,超過閾值後熔斷開啓
ErrorPercentThreshold: 20,
})
for i := 0; i < 20; i++ {
_ = hystrix.Do("test-api-ping", func() error {
resp, _ := resty.New().R().Get("http://localhost:8080/ping")
if resp.IsError() {
return fmt.Errorf("err code: %s", resp.Status())
}
return nil
}, func(err error) error {
fmt.Println("fallback err: ", err)
return err
})
// 每次請求之間休眠 0.1 秒
time.Sleep(100 * time.Millisecond)
}
}
主流程代碼邏輯描述
我們通過修改 hystrix
的默認配置,期望達到以下的熔斷效果:
-
前兩個請求直接返回錯誤
-
此時錯誤百分比達到
100%
, 但是還未達到開啓熔斷
的最小請求數量 (5) -
繼續發出請求,接下來的
3
個請求全部成功,此時請求數量達到開啓熔斷
的最小請求數量 -
開啓
熔斷
-
繼續發出請求,接下來的
5
個請求全部返回fallback error
, 同時每次請求之間間隔0.1 秒
-
此時達到開啓
熔斷
後,重試服務是否恢復的等待時間 (0.5 秒) -
繼續發出請求,接下來的
5
個請求全部成功
運行測試
$ go test -v -count=1 .
# 輸出如下
=== RUN TestQuickStart
...
[GIN-debug] Listening and serving HTTP on :8080
# 前兩個請求錯誤
[GIN] 2023/03/03 - 12:02:03 | 500 | 16.7µs | 127.0.0.1 | GET "/ping"
fallback err: err code: 500 Internal Server Error
[GIN] 2023/03/03 - 12:02:03 | 500 | 22.6µs | 127.0.0.1 | GET "/ping"
fallback err: err code: 500 Internal Server Error
[GIN] 2023/03/03 - 12:02:04 | 200 | 33.9µs | 127.0.0.1 | GET "/ping"
[GIN] 2023/03/03 - 12:02:04 | 200 | 20.9µs | 127.0.0.1 | GET "/ping"
[GIN] 2023/03/03 - 12:02:04 | 200 | 22.6µs | 127.0.0.1 | GET "/ping"
# 熔斷開啓
fallback err: hystrix: circuit open
fallback err: hystrix: circuit open
fallback err: hystrix: circuit open
fallback err: hystrix: circuit open
fallback err: hystrix: circuit open
# 熔斷重試
[GIN] 2023/03/03 - 12:02:04 | 200 | 22.6µs | 127.0.0.1 | GET "/ping"
[GIN] 2023/03/03 - 12:02:04 | 200 | 22.3µs | 127.0.0.1 | GET "/ping"
[GIN] 2023/03/03 - 12:02:05 | 200 | 16.3µs | 127.0.0.1 | GET "/ping"
[GIN] 2023/03/03 - 12:02:05 | 200 | 18.9µs | 127.0.0.1 | GET "/ping"
[GIN] 2023/03/03 - 12:02:05 | 200 | 26.5µs | 127.0.0.1 | GET "/ping"
--- PASS: TestQuickStart (1.52s)
PASS
ok Golang-Patterns/hystrix 1.532s
通過測試的數據結果,我們可以看到,熔斷
執行流程和上面描述的邏輯一致,接下來,我們研究一下 hystrix-go
的內部實現。
算法實現
熔斷配置對象
各字段代表的含義,請參照剛纔示例代碼中的註釋。
type CommandConfig struct {
Timeout int `json:"timeout"`
MaxConcurrentRequests int `json:"max_concurrent_requests"`
RequestVolumeThreshold int `json:"request_volume_threshold"`
SleepWindow int `json:"sleep_window"`
ErrorPercentThreshold int `json:"error_percent_threshold"`
}
創建熔斷配置
ConfigureCommand
方法根據參數創建一個 熔斷
配置對象,如果對象中的某些字段參數未提供,則適用默認值替代。
func ConfigureCommand(name string, config CommandConfig) {
settingsMutex.Lock()
defer settingsMutex.Unlock()
timeout := DefaultTimeout
if config.Timeout != 0 {
timeout = config.Timeout
}
max := DefaultMaxConcurrent
...
volume := DefaultVolumeThreshold
...
sleep := DefaultSleepWindow
...
errorPercent := DefaultErrorPercentThreshold
...
circuitSettings[name] = &Settings{
...
}
}
熔斷器對象
CircuitBreaker
表示單個請求對應的 熔斷器
對象,對象可以驗證請求是否觸發了 熔斷
機制,以及是否應該拒絕該請求繼續訪問。
type CircuitBreaker struct {
Name string
open bool
forceOpen bool
mutex *sync.RWMutex
openedOrLastTestedTime int64
executorPool *executorPool
metrics *metricExchange
}
GetCircuit 函數
GetCircuit
函數根據名稱返回對應的 熔斷器
對象以及該名稱函數調用是否觸發了 熔斷
機制。
func GetCircuit(name string) (*CircuitBreaker, bool, error) {
circuitBreakersMutex.RLock()
_, ok := circuitBreakers[name]
if !ok {
circuitBreakersMutex.RUnlock()
// 從讀寫鎖切換到寫鎖
circuitBreakersMutex.Lock()
defer circuitBreakersMutex.Unlock()
// 雙重檢測,防止在加鎖期間其他 goroutine 更新了對象
if cb, ok := circuitBreakers[name]; ok {
// 代碼執行到這裏,說明在加鎖期間其他 goroutine 觸發了熔斷機制
// 所以第二個返回值返回 false
return cb, false, nil
}
circuitBreakers[name] = newCircuitBreaker(name)
} else {
defer circuitBreakersMutex.RUnlock()
}
return circuitBreakers[name], !ok, nil
}
Do 函數
Do
函數以阻塞的方式運行參數函數,直到函數返回成功或者錯誤 (包括觸發了熔斷),具體的執行工作是由 DoC
函數和 GoC
函數完成的。
func Do(name string, run runFunc, fallback fallbackFunc) error {
// 包裝 run 參數函數
runC := func(ctx context.Context) error {
return run()
}
var fallbackC fallbackFuncC
if fallback != nil {
// 包裝 fallback 參數函數
fallbackC = func(ctx context.Context, err error) error {
return fallback(err)
}
}
return DoC(context.Background(), name, runC, fallbackC)
}
DoC 函數
DoC
函數負責參數的執行前封裝工作,Goc
函數負責參數的具體執行工作。
func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) error {
// 接收通道緩衝爲 1
// 函數的執行結果只可能是下面一種:
// 返回成功
// 返回錯誤 (包括觸發了熔斷)
done := make(chan struct{}, 1)
// 將 run 參數函數的執行過程包裝一下
// 如果函數返回成功,就發送信號給通道,否則返回錯誤
r := func(ctx context.Context) error {
err := run(ctx)
if err != nil {
return err
}
done <- struct{}{}
return nil
}
// 將 fallback 參數函數的執行過程包裝一下
// 如果函數返回錯誤,就發送信號給通道
f := func(ctx context.Context, e error) error {
err := fallback(ctx, e)
if err != nil {
return err
}
done <- struct{}{}
return nil
}
// 執行參數函數,委託給 GoC 函數執行
var errChan chan error
if fallback == nil {
errChan = GoC(ctx, name, r, nil)
} else {
errChan = GoC(ctx, name, r, f)
}
select {
case <-done:
// 代碼執行到這裏,說明函數返回成功
return nil
case err := <-errChan:
// 代碼執行到這裏,說明函數返回錯誤
return err
}
}
GoC 函數
GoC
運行參數函數,同時跟蹤該函數歷史運行情況。如果參數函數觸發了 熔斷
開啓,則必須等待服務恢復。
func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) chan error {
cmd := &command{
run: run,
fallback: fallback,
start: time.Now(),
errChan: make(chan error, 1),
finished: make(chan bool, 1),
}
circuit, _, err := GetCircuit(name)
if err != nil {
// 獲取該請求對應的熔斷器對象發生錯誤時,直接返回
cmd.errChan <- err
return cmd.errChan
}
...
// 當請求返回時,歸還請求 ticket 到池中
returnTicket := func() {
...
cmd.circuit.executorPool.Return(cmd.ticket)
}
// 該對象由兩個 goroutine 共享
// 兩個 goroutine :
// 一個負責具體的函數執行 (熔斷有可能已經開啓了)
// 一個負責監聽函數執行結果
// 不管哪個先執行完成,報告熔斷器相關 metric
returnOnce := &sync.Once{}
go func() {
defer func() { cmd.finished <- true }()
// 熔斷已經開啓了
// 拒絕當前請求,等待服務恢復
if !cmd.circuit.AllowRequest() {
...
returnOnce.Do(func() {
cmd.errorWithFallback(ctx, ErrCircuitOpen)
})
return
}
// 當後端 (被請求方,一般指上游服務) 服務不穩定時,請求會花費更多時間,但不一定每次都會失敗
// 當請求變慢但 QPS 不變時,需要限制併發數量,降低後端服務的負載
cmd.Lock()
select {
case cmd.ticket = <-circuit.executorPool.Tickets:
// 請求量沒有超過併發限制
...
cmd.Unlock()
default:
// 請求量超過了併發限制,返回錯誤
...
returnOnce.Do(func() {
cmd.errorWithFallback(ctx, ErrMaxConcurrency)
})
return
}
runErr := run(ctx) // 執行參數函數
returnOnce.Do(func() {
if runErr != nil {
cmd.errorWithFallback(ctx, runErr)
return
}
cmd.reportEvent("success")
})
}()
go func() {
timer := time.NewTimer(getSettings(name).Timeout)
defer timer.Stop()
select {
case <-cmd.finished:
// 上面的 goroutine 已經上報數據,這裏無需再上報
case <-ctx.Done():
// 上下文錯誤
returnOnce.Do(func() {
cmd.errorWithFallback(ctx, ctx.Err())
})
case <-timer.C:
// 執行超時
returnOnce.Do(func() {
cmd.errorWithFallback(ctx, ErrTimeout)
})
}
}()
return cmd.errChan
}
Do 函數調用鏈路
Do 函數調用鏈路
最後,我們來看一下 熔斷
開啓與狀態判斷機制的內部實現。
AllowRequest 方法
AllowRequest
方法在具體的請求執行之前,先判斷 熔斷
是否已經開啓,當 熔斷
開啓時,熔斷
時間超過等待外部服務恢復時間時返回 ture
。
func (circuit *CircuitBreaker) AllowRequest() bool {
return !circuit.IsOpen() || circuit.allowSingleTest()
}
IsOpen 方法
IsOpen
方法在具體的請求執行之前,根據 熔斷
是否以開啓確定是否已經拒絕請求執行。
func (circuit *CircuitBreaker) IsOpen() bool {
circuit.mutex.RLock()
o := circuit.forceOpen || circuit.open
circuit.mutex.RUnlock()
if o {
// 熔斷已開啓
return true
}
if uint64(circuit.metrics.Requests().Sum(time.Now())) < getSettings(circuit.Name).RequestVolumeThreshold {
// 未達到開啓熔斷需要的最小請求數量
return false
}
if !circuit.metrics.IsHealthy(time.Now()) {
// 請求失敗百分比超過閾值,開啓熔斷
circuit.setOpen()
return true
}
return false
}
allowSingleTest 方法
allowSingleTest
方法判斷 熔斷
時間是否已經超過重試服務是否恢復的等待時間。
func (circuit *CircuitBreaker) allowSingleTest() bool {
...
now := time.Now().UnixNano()
openedOrLastTestedTime := atomic.LoadInt64(&circuit.openedOrLastTestedTime)
if circuit.open && now > openedOrLastTestedTime+getSettings(circuit.Name).SleepWindow.Nanoseconds() {
...
return true
}
return false
}
通過上面三個方法實現代碼可以看到: 熔斷
開啓與狀態判斷是每次請求到來時實時判斷的。
AllowRequest 方法調用鏈路
小結
本文描述了 熔斷
的基本概念以及 熔斷
和 限流
、降級
之間的區別,同時藉助開源的 afex/hystrix-go[5] 組件源代碼, 研究瞭如何使用 Go
語言實現一個 熔斷器
組件,感興趣的讀者可以閱讀下列文章,瞭解下其他開源 熔斷器
組件是如何實現的。
Reference
-
afex/hystrix-go[6]
-
sony/gobreaker[7]
-
alibaba/sentinel-golang[8]
-
Circuit Breaker Pattern
鏈接
[1] Hystrix: https://github.com/Netflix/Hystrix
[2] afex/hystrix-go: https://github.com/afex/hystrix-go
[3] afex/hystrix-go: https://github.com/afex/hystrix-go
[4] 引用部分: #reference
[5] afex/hystrix-go: https://github.com/afex/hystrix-go
[6] afex/hystrix-go: https://github.com/afex/hystrix-go
[7] sony/gobreaker: https://github.com/sony/gobreaker
[8] alibaba/sentinel-golang: https://github.com/alibaba/sentinel-golang
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/TMgMzLrlpI_nx8OzAw8O9w