微服務中的熔斷算法

雪崩效應

在微服務系統中,整個系統是以一系列功能獨立的微服務組成,如果某一個服務,因爲流量異常或者內部其他原因,導致響應異常,那麼該服務會影響到其下游服務, 從而發生一系列連鎖反應,最終導致整個系統崩潰,這就是微服務中的 雪崩效應

例如:當前系統中有 ABC 三個服務,服務 A 是上游,服務 B 是中游,服務 C 是下游。一旦下游服務 C 變得不可用,積壓了大量請求,服務 B 的請求也隨之阻塞,資源逐漸耗盡,使得服務 B 也變得不可用。最後,服務 A 也變爲不可用,整個系統鏈路崩潰。

熔斷

熔斷 機制是微服務調用鏈路中的的自我保護機制,當鏈路中某個服務響應時間過長甚至不可用時,會進行服務 熔斷,快速返回錯誤響應,停止 級聯故障,避免 雪崩效應

熔斷、限流、降級區別

限流 是針對服務請求數量的一種自我保護機制,當請求數量超出服務負載時,自動丟棄新的請求,是系統高可用架構的第一步。服務有了 限流 之後,爲什麼還需要 熔斷 呢?限流 面向的是上游的服務,而 熔斷 面向的是下游的服務。

降級 通過將不重要的服務暫停,提高系統負載能力。例如電商的 秒殺 場景中,可以暫停 用戶好友關係, 用戶信息 等服務。

hiinsu

hystrix-go

Hystrix[1] 是 Netflix 開源的由 Java 開發的 熔斷器 組件,對應的 Go 版本爲 afex/hystrix-go[2], 筆者選擇該組件作爲研究 熔斷 算法代碼實現。

三個狀態

5MLR4a

這裏需要注意的是: afex/hystrix-go[3] 的實現中並沒有 半打開 的狀態,也就是說,一旦 熔斷 開啓後, 只能等待配置的時間之後,才能去主動判定下游服務是否已經恢復,繼而恢復請求。筆者認爲這個不是重要的部分,如果讀者比較介意的話, 可以參考 引用部分 [4] 的另外兩個 熔斷 開源組件。

示例代碼

主流程代碼

package hystrix

import (
 "fmt"
 "net/http"
 "sync/atomic"
 "testing"
 "time"

 "github.com/afex/hystrix-go/hystrix"
 "github.com/gin-gonic/gin"
 "github.com/go-resty/resty/v2"
)

func server() {
 r := gin.Default()

 // 服務請求計數器
 var count int64

 r.GET("/ping", func(ctx *gin.Context) {
  // 模擬服務故障,前兩次請求返回錯誤
  if atomic.AddInt64(&count, 1) < 3 {
   ctx.String(http.StatusInternalServerError, "pong")
   return
  }
  // 後面的請求正常返回
  ctx.String(http.StatusOK, "pong")
 })

 // 通過 http://localhost:8080/ping 訪問
 _ = r.Run(":8080")
}

func TestQuickStart(t *testing.T) {
 // 啓動服務
 go server()

 hystrix.ConfigureCommand("test-api-ping", hystrix.CommandConfig{
  // 執行命令超時時間, 默認值 1 秒
  Timeout: 0,

  // 最大併發請求量, 默認值 10
  MaxConcurrentRequests: 100,

  // 熔斷開啓前需要達到的最小請求數量, 默認值 20
  RequestVolumeThreshold: 5,

  // 熔斷器開啓後,重試服務是否恢復的等待時間,默認值 5 秒
  // 這裏修改爲 0.5 秒
  SleepWindow: 500,

  // 請求錯誤百分比閾值,超過閾值後熔斷開啓
  ErrorPercentThreshold: 20,
 })

 for i := 0; i < 20; i++ {
  _ = hystrix.Do("test-api-ping", func() error {
   resp, _ := resty.New().R().Get("http://localhost:8080/ping")
   if resp.IsError() {
    return fmt.Errorf("err code: %s", resp.Status())
   }
   return nil
  }, func(err error) error {
   fmt.Println("fallback err: ", err)
   return err
  })

  // 每次請求之間休眠 0.1 秒
  time.Sleep(100 * time.Millisecond)
 }
}

主流程代碼邏輯描述

我們通過修改 hystrix 的默認配置,期望達到以下的熔斷效果:

運行測試

$ go test -v -count=1 .

# 輸出如下
=== RUN   TestQuickStart

...

[GIN-debug] Listening and serving HTTP on :8080

# 前兩個請求錯誤
[GIN] 2023/03/03 - 12:02:03 | 500 |        16.7µs |       127.0.0.1 | GET      "/ping"
fallback err:  err code: 500 Internal Server Error
[GIN] 2023/03/03 - 12:02:03 | 500 |        22.6µs |       127.0.0.1 | GET      "/ping"
fallback err:  err code: 500 Internal Server Error
[GIN] 2023/03/03 - 12:02:04 | 200 |        33.9µs |       127.0.0.1 | GET      "/ping"
[GIN] 2023/03/03 - 12:02:04 | 200 |        20.9µs |       127.0.0.1 | GET      "/ping"
[GIN] 2023/03/03 - 12:02:04 | 200 |        22.6µs |       127.0.0.1 | GET      "/ping"

# 熔斷開啓
fallback err:  hystrix: circuit open
fallback err:  hystrix: circuit open
fallback err:  hystrix: circuit open
fallback err:  hystrix: circuit open
fallback err:  hystrix: circuit open

# 熔斷重試
[GIN] 2023/03/03 - 12:02:04 | 200 |        22.6µs |       127.0.0.1 | GET      "/ping"
[GIN] 2023/03/03 - 12:02:04 | 200 |        22.3µs |       127.0.0.1 | GET      "/ping"
[GIN] 2023/03/03 - 12:02:05 | 200 |        16.3µs |       127.0.0.1 | GET      "/ping"
[GIN] 2023/03/03 - 12:02:05 | 200 |        18.9µs |       127.0.0.1 | GET      "/ping"
[GIN] 2023/03/03 - 12:02:05 | 200 |        26.5µs |       127.0.0.1 | GET      "/ping"
--- PASS: TestQuickStart (1.52s)
PASS
ok      Golang-Patterns/hystrix 1.532s

通過測試的數據結果,我們可以看到,熔斷 執行流程和上面描述的邏輯一致,接下來,我們研究一下 hystrix-go 的內部實現。

算法實現

熔斷配置對象

各字段代表的含義,請參照剛纔示例代碼中的註釋。

type CommandConfig struct {
 Timeout                int `json:"timeout"`
 MaxConcurrentRequests  int `json:"max_concurrent_requests"`
 RequestVolumeThreshold int `json:"request_volume_threshold"`
 SleepWindow            int `json:"sleep_window"`
 ErrorPercentThreshold  int `json:"error_percent_threshold"`
}

創建熔斷配置

ConfigureCommand 方法根據參數創建一個 熔斷 配置對象,如果對象中的某些字段參數未提供,則適用默認值替代。

func ConfigureCommand(name string, config CommandConfig) {
    settingsMutex.Lock()
    defer settingsMutex.Unlock()

    timeout := DefaultTimeout
    if config.Timeout != 0 {
      timeout = config.Timeout
    }

    max := DefaultMaxConcurrent
      ...

    volume := DefaultVolumeThreshold
      ...

    sleep := DefaultSleepWindow
      ...

    errorPercent := DefaultErrorPercentThreshold
      ...

    circuitSettings[name] = &Settings{
      ...
    }
}

熔斷器對象

CircuitBreaker 表示單個請求對應的 熔斷器 對象,對象可以驗證請求是否觸發了 熔斷 機制,以及是否應該拒絕該請求繼續訪問。

type CircuitBreaker struct {
 Name                   string
 open                   bool
 forceOpen              bool
 mutex                  *sync.RWMutex
 openedOrLastTestedTime int64

 executorPool *executorPool
 metrics      *metricExchange
}

GetCircuit 函數

GetCircuit 函數根據名稱返回對應的 熔斷器 對象以及該名稱函數調用是否觸發了 熔斷 機制。

func GetCircuit(name string) (*CircuitBreaker, bool, error) {
 circuitBreakersMutex.RLock()
 _, ok := circuitBreakers[name]
 if !ok {
  circuitBreakersMutex.RUnlock()
  // 從讀寫鎖切換到寫鎖
  circuitBreakersMutex.Lock()
  defer circuitBreakersMutex.Unlock()
  // 雙重檢測,防止在加鎖期間其他 goroutine 更新了對象
  if cb, ok := circuitBreakers[name]; ok {
   // 代碼執行到這裏,說明在加鎖期間其他 goroutine 觸發了熔斷機制
   // 所以第二個返回值返回 false
   return cb, false, nil
  }
  circuitBreakers[name] = newCircuitBreaker(name)
 } else {
  defer circuitBreakersMutex.RUnlock()
 }

 return circuitBreakers[name], !ok, nil
}

Do 函數

Do 函數以阻塞的方式運行參數函數,直到函數返回成功或者錯誤 (包括觸發了熔斷),具體的執行工作是由 DoC 函數和 GoC 函數完成的。

func Do(name string, run runFunc, fallback fallbackFunc) error {
 // 包裝 run 參數函數
 runC := func(ctx context.Context) error {
  return run()
 }
 var fallbackC fallbackFuncC
 if fallback != nil {
  // 包裝 fallback 參數函數
  fallbackC = func(ctx context.Context, err error) error {
   return fallback(err)
  }
 }
 return DoC(context.Background(), name, runC, fallbackC)
}

DoC 函數

DoC 函數負責參數的執行前封裝工作,Goc 函數負責參數的具體執行工作。

func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) error {
 // 接收通道緩衝爲 1
 // 函數的執行結果只可能是下面一種:
 //  返回成功
 //  返回錯誤 (包括觸發了熔斷)
 done := make(chan struct{}, 1)

 // 將 run 參數函數的執行過程包裝一下
 // 如果函數返回成功,就發送信號給通道,否則返回錯誤
 r := func(ctx context.Context) error {
  err := run(ctx)
  if err != nil {
   return err
  }

  done <- struct{}{}
  return nil
 }

 // 將 fallback 參數函數的執行過程包裝一下
 // 如果函數返回錯誤,就發送信號給通道
 f := func(ctx context.Context, e error) error {
  err := fallback(ctx, e)
  if err != nil {
   return err
  }

  done <- struct{}{}
  return nil
 }

 // 執行參數函數,委託給 GoC 函數執行
 var errChan chan error
 if fallback == nil {
  errChan = GoC(ctx, name, r, nil)
 } else {
  errChan = GoC(ctx, name, r, f)
 }

 select {
 case <-done:
  // 代碼執行到這裏,說明函數返回成功
  return nil
 case err := <-errChan:
  // 代碼執行到這裏,說明函數返回錯誤
  return err
 }
}

GoC 函數

GoC 運行參數函數,同時跟蹤該函數歷史運行情況。如果參數函數觸發了 熔斷 開啓,則必須等待服務恢復。

func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) chan error {
 cmd := &command{
  run:      run,
  fallback: fallback,
  start:    time.Now(),
  errChan:  make(chan error, 1),
  finished: make(chan bool, 1),
 }

 circuit, _, err := GetCircuit(name)
 if err != nil {
  // 獲取該請求對應的熔斷器對象發生錯誤時,直接返回
  cmd.errChan <- err
  return cmd.errChan
 }

 ...

 // 當請求返回時,歸還請求 ticket 到池中
 returnTicket := func() {
  ...
  cmd.circuit.executorPool.Return(cmd.ticket)
 }

 // 該對象由兩個 goroutine 共享
 // 兩個 goroutine :
 //  一個負責具體的函數執行 (熔斷有可能已經開啓了)
 //  一個負責監聽函數執行結果
 // 不管哪個先執行完成,報告熔斷器相關 metric
 returnOnce := &sync.Once{}

 go func() {
  defer func() { cmd.finished <- true }()

  // 熔斷已經開啓了
  // 拒絕當前請求,等待服務恢復
  if !cmd.circuit.AllowRequest() {
   ...
   returnOnce.Do(func() {
    cmd.errorWithFallback(ctx, ErrCircuitOpen)
   })
   return
  }

  // 當後端 (被請求方,一般指上游服務) 服務不穩定時,請求會花費更多時間,但不一定每次都會失敗
  // 當請求變慢但 QPS 不變時,需要限制併發數量,降低後端服務的負載
  cmd.Lock()
  select {
  case cmd.ticket = <-circuit.executorPool.Tickets:
   // 請求量沒有超過併發限制
   ...
   cmd.Unlock()
  default:
  // 請求量超過了併發限制,返回錯誤
   ...
   returnOnce.Do(func() {
    cmd.errorWithFallback(ctx, ErrMaxConcurrency)
   })
   return
  }

  runErr := run(ctx)  // 執行參數函數
  returnOnce.Do(func() {
   if runErr != nil {
    cmd.errorWithFallback(ctx, runErr)
    return
   }
   cmd.reportEvent("success")
  })
 }()

 go func() {
  timer := time.NewTimer(getSettings(name).Timeout)
  defer timer.Stop()

  select {
  case <-cmd.finished:
   // 上面的 goroutine 已經上報數據,這裏無需再上報
  case <-ctx.Done():
   // 上下文錯誤
   returnOnce.Do(func() {
    cmd.errorWithFallback(ctx, ctx.Err())
   })
  case <-timer.C:
   // 執行超時
   returnOnce.Do(func() {
    cmd.errorWithFallback(ctx, ErrTimeout)
   })
  }
 }()

 return cmd.errChan
}

Do 函數調用鏈路

Do 函數調用鏈路

最後,我們來看一下 熔斷 開啓與狀態判斷機制的內部實現。

AllowRequest 方法

AllowRequest 方法在具體的請求執行之前,先判斷 熔斷 是否已經開啓,當 熔斷 開啓時,熔斷 時間超過等待外部服務恢復時間時返回 ture

func (circuit *CircuitBreaker) AllowRequest() bool {
 return !circuit.IsOpen() || circuit.allowSingleTest()
}

IsOpen 方法

IsOpen 方法在具體的請求執行之前,根據 熔斷 是否以開啓確定是否已經拒絕請求執行。

func (circuit *CircuitBreaker) IsOpen() bool {
 circuit.mutex.RLock()
 o := circuit.forceOpen || circuit.open
 circuit.mutex.RUnlock()

 if o {
  // 熔斷已開啓
  return true
 }

 if uint64(circuit.metrics.Requests().Sum(time.Now())) < getSettings(circuit.Name).RequestVolumeThreshold {
  // 未達到開啓熔斷需要的最小請求數量
  return false
 }

 if !circuit.metrics.IsHealthy(time.Now()) {
  // 請求失敗百分比超過閾值,開啓熔斷
  circuit.setOpen()
  return true
 }

 return false
}

allowSingleTest 方法

allowSingleTest 方法判斷 熔斷 時間是否已經超過重試服務是否恢復的等待時間。

func (circuit *CircuitBreaker) allowSingleTest() bool {
 ...

 now := time.Now().UnixNano()
 openedOrLastTestedTime := atomic.LoadInt64(&circuit.openedOrLastTestedTime)
 if circuit.open && now > openedOrLastTestedTime+getSettings(circuit.Name).SleepWindow.Nanoseconds() {
  ...

  return true
 }

 return false
}

通過上面三個方法實現代碼可以看到: 熔斷 開啓與狀態判斷是每次請求到來時實時判斷的。

AllowRequest 方法調用鏈路

小結

本文描述了 熔斷 的基本概念以及 熔斷限流降級 之間的區別,同時藉助開源的 afex/hystrix-go[5] 組件源代碼, 研究瞭如何使用 Go 語言實現一個 熔斷器 組件,感興趣的讀者可以閱讀下列文章,瞭解下其他開源 熔斷器 組件是如何實現的。

Reference

鏈接

[1] Hystrix: https://github.com/Netflix/Hystrix

[2] afex/hystrix-go: https://github.com/afex/hystrix-go

[3] afex/hystrix-go: https://github.com/afex/hystrix-go

[4] 引用部分: #reference

[5] afex/hystrix-go: https://github.com/afex/hystrix-go

[6] afex/hystrix-go: https://github.com/afex/hystrix-go

[7] sony/gobreaker: https://github.com/sony/gobreaker

[8] alibaba/sentinel-golang: https://github.com/alibaba/sentinel-golang

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/TMgMzLrlpI_nx8OzAw8O9w