Go 可用性 -四- 限流 3: 漏桶算法

在前面兩篇文章當中我們學習了令牌桶算法的使用和實現,今天我們就一起來看一看另外一種常見的限流算法,漏桶算法

漏桶算法

原理

漏桶算法 (Leaky Bucket) 是網絡世界中流量整形(Traffic Shaping)或速率限制(Rate Limiting)時經常使用的一種算法,它的主要目的是控制數據注入到網絡的速率,平滑網絡上的突發流量。漏桶算法提供了一種機制,通過它,突發流量可以被整形以便爲網絡提供一個穩定的流量。--- 百度百科

漏桶算法其實非常形象,如下圖所示可以理解爲一個漏水的桶,當有突發流量來臨的時候,會先到桶裏面,桶下有一個洞,可以以固定的速率向外流水,如果水的從桶中外溢了出來,那麼這個請求就會被拒絕掉。具體的表現就會向下圖右側的圖表一樣,突發流量就被整形成了一個平滑的流量。

漏桶算法的主要作用就是避免出現有的時候流量很高,有的時候又很低,導致系統出現旱的旱死,澇的澇死的這種情況。

Go 中比較常用的漏桶算法的實現就是來自 uber 的 ratelimit,下面我們就會看一下這個庫的使用方式和源碼

API

type Clock
type Limiter
    func New(rate int, opts ...Option) Limiter
    func NewUnlimited() Limiter
type Option
    func Per(per time.Duration) Option
    func WithClock(clock Clock) Option
    func WithSlack(slack int) Option

Clock  是一個接口,計時器的最小實現,有兩個方法,分別是當前的時間和睡眠

type Clock interface {
 Now() time.Time
 Sleep(time.Duration)
}

Limiter  也是一個接口,只有一個 Take  方法,執行這個方法的時候如果觸發了 rps 限制則會阻塞住

type Limiter interface {
 // Take should block to make sure that the RPS is met.
 Take() time.Time
}

NewLimter  和 NewUnlimited  會分別初始化一個無鎖的限速器和沒有任何限制的限速器

Option  是在初始化的時候的額外參數,這種使用姿勢在之前 Go 工程化的文章《Go 工程化 (六) 配置管理》當中有講到,這裏我們就不再贅述了

Option  有三個方法

案例: 10 行代碼實現一個基於漏桶算法的 ip 限流中間件

案例我們使用和令牌桶類似的案例

func NewLimiter(rps int) gin.HandlerFunc {
 limiters := &sync.Map{}

 return func(c *gin.Context) {
  // 獲取限速器
  // key 除了 ip 之外也可以是其他的,例如 header,user name 等
  key := c.ClientIP()
  l, _ := limiters.LoadOrStore(key, ratelimit.New(rps))
  now := l.(ratelimit.Limiter).Take()
  fmt.Printf("now: %s\n", now)
  c.Next()
 }
}

使用上也是比較簡單的

func main() {
 e := gin.Default()
 // 新建一個限速器,允許突發 3 個併發
 e.Use(NewLimiter(3))
 e.GET("ping", func(c *gin.Context) {
  c.String(http.StatusOK, "pong")
 })
 e.Run(":8080")
}

我們用 go-stress-testing  進行壓測

go-stress-testing-linux -c 100 -u http://localhost:8080/ping

─────┬───────┬───────┬───────┬────────┬────────┬────────┬────────┬────────┬────────┬────────
 耗時  併發數  成功數 失敗數    qps  最長耗時最短耗時 平均耗時 下載字節 字節每秒  錯誤碼
─────┼───────┼───────┼───────┼────────┼────────┼────────┼────────┼────────┼────────┼────────
   1s     13     13      0  233.55  676.10    5.82   85.64      52      51200:13
   2s     16     16      0   62.25 1675.17    5.82  321.30      64      31200:16
   3s     19     19      0   31.24 2673.94    5.82  640.20      76      25200:19
   3s     20     20      0   26.37 3006.49    5.82  758.51      80      26200:20


*************************  結果 stat  ****************************
處理協程數量: 20
請求總數併發數*請求數 -c * -n: 20 總請求時間: 3.011  successNum: 20 failureNum: 0
*************************  結果 end   ****************************

查看結果發現爲什麼第一秒的時候完成了 13 個請求,不是限制的 3rps 麼?不要慌,我們看看它的實現就知道了

實現

這個庫有基於互斥鎖的實現和基於 CAS 的無鎖實現,默認使用的是無鎖實現版本,所以我們主要看無鎖實現的源碼

type state struct {
 last     time.Time
 sleepFor time.Duration
}

type atomicLimiter struct {
 state unsafe.Pointer
 //lint:ignore U1000 Padding is unused but it is crucial to maintain performance
 // of this rate limiter in case of collocation with other frequently accessed memory.
 padding [56]byte // cache line size - state pointer size = 64 - 8; created to avoid false sharing.

 perRequest time.Duration
 maxSlack   time.Duration
 clock      Clock
}

atomicLimiter  結構體

接下來看看最主要的 Take  方法

func (t *atomicLimiter) Take() time.Time {
 var (
  // 狀態
  newState state
  // 用於表示原子操作是否成功
  taken    bool
  // 需要 sleep 的時間
  interval time.Duration
 )

 // 如果 CAS 操作不成功就一直嘗試
 for !taken {
  // 獲取當前的時間
  now := t.clock.Now()

  // load 出上一次調用的時間
  previousStatePointer := atomic.LoadPointer(&t.state)
  oldState := (*state)(previousStatePointer)

  newState = state{
   last:     now,
   sleepFor: oldState.sleepFor,
  }

  // 如果 last 是零值的話,表示之前就沒用過,直接保存返回即可
  if oldState.last.IsZero() {
   taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
   continue
  }

  // sleepFor 是需要睡眠的時間,由於引入了鬆弛時間,所以 sleepFor 可能是一個
  // maxSlack ~ 0 之間的一個值,所以這裏需要將現在的需要 sleep 的時間和上一次
  // sleepFor 的值相加
  newState.sleepFor += t.perRequest - now.Sub(oldState.last)

  // 如果距離上一次調用已經很久了,sleepFor 可能會是一個很小的值
  // 最小值只能是 maxSlack 的大小
  if newState.sleepFor < t.maxSlack {
   newState.sleepFor = t.maxSlack
  }

  // 如果 sleepFor 大於 0  的話,計算出需要 sleep 的時間
  // 然後將 state.sleepFor 置零
  if newState.sleepFor > 0 {
   newState.last = newState.last.Add(newState.sleepFor)
   interval, newState.sleepFor = newState.sleepFor, 0
  }

  // 保存狀態
  taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
 }

 // sleep interval
 t.clock.Sleep(interval)
 return newState.last
}

總結

今天學習了漏桶的實現原理以及使用方式,漏桶和令牌桶的最大的區別就是,令牌桶是支持突發流量的,但是漏桶是不支持的。但是 uber 的這個庫通過引入彈性時間的方式也讓漏桶算法有了類似令牌桶能夠應對部分突發流量的能力,並且實現上還非常的簡單,值得學習。

多看看好的輪子的實現總會學到一些新姿勢,今天就學到了使用 padding 填充來避免 false sharing 提高性能的操作

參考文獻

  1. Go 進階訓練營 - 極客時間

  2. "帶你快速瞭解:限流中的漏桶和令牌桶算法"

  3. ratelimit · pkg.go.dev

  4. ratelimit/limiter_atomic.go at main · uber-go/ratelimit (github.com)

  5. 漏桶算法_百度百科 (baidu.com)

  6. 利用 CPU cache 特性優化 Go 程序

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/dJ3hiuA-8BdNF_ENL-WIUg