golang 源碼分析:juju-ratelimit

        https://github.com/juju/ratelimit 是一個基於令牌桶算法的限流器:令牌桶就是想象有一個固定大小的桶,系統會以恆定速率向桶中放 Token,桶滿則暫時不放。漏桶算法和令牌桶算法的主要區別在於,"漏桶算法" 能夠強行限制數據的傳輸速率 (或請求頻率),而 "令牌桶算法" 在能夠限制數據的平均傳輸速率外,還允許某種程度的突發傳輸。

        首先看下如何使用:

import "github.com/juju/ratelimit"
var tokenBucket ratelimit.Bucket = nil
func init() {
  // func NewBucket(fillInterval time.Duration, capacity int64) *Bucket
  // fillInterval令牌填充的時間間隔
  // capacity令牌桶的最大容量
  tokenBucket = ratelimit.NewBucket(200time.Millisecond, 20)
}
func Handler() {
  available := tokenBucket.TakeAvailable(1)
  if available <= 0 {
    // 限流處理
  }
  // handling
}

        下面看下源碼實現,juju/ratelimit 實現很簡單,一共只有兩個源碼文件和一個測試文件:

ratelimit.go
ratelimit_test.go
reader.go

下面我們分析下常用的這兩個接口的實現:

1,ratelimit.NewBucket

傳入的兩個參數分別是產生令牌的的間隔和桶的容量。

func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {
  return NewBucketWithClock(fillInterval, capacity, nil)
}
func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket {
  return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock)
}

默認一個間隔週期內就產生一個 token,如果是高併發情況下,可以通過參數 quantum 控制產生多個。第三個參數是一個 clock  interface,主要是方便 mock 測試,如果傳 nil 用的就是 realClock{}

// Clock represents the passage of time in a way that
// can be faked out for tests.
type Clock interface {
  // Now returns the current time.
  Now() time.Time
  // Sleep sleeps for at least the given duration.
  Sleep(d time.Duration)
}

realClock 是實現了上述接口的結構體:

// realClock implements Clock in terms of standard time functions.
type realClock struct{}
// Now implements Clock.Now by calling time.Now.
func (realClock) Now() time.Time {
  return time.Now()
}
// Now implements Clock.Sleep by calling time.Sleep.
func (realClock) Sleep(d time.Duration) {
  time.Sleep(d)
}

上面幾個函數僅僅是對這個函數的一個簡單包裝,加上默認參數,方便一般場景的使用,最終都是調用了這個函數

func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket {
  if clock == nil {
    clock = realClock{}
  }
  if fillInterval <= 0 {
    panic("token bucket fill interval is not > 0")
  }
  if capacity <= 0 {
    panic("token bucket capacity is not > 0")
  }
  if quantum <= 0 {
    panic("token bucket quantum is not > 0")
  }
  return &Bucket{
    clock:           clock,
    startTime:       clock.Now(),
    latestTick:      0,
    fillInterval:    fillInterval,
    capacity:        capacity,
    quantum:         quantum,
    availableTokens: capacity,
  }
}

出來參數檢驗外,最後生成了結構體 Bucket 的指針

type Bucket struct {
  clock Clock
  // startTime holds the moment when the bucket was
  // first created and ticks began.
  startTime time.Time
  // capacity holds the overall capacity of the bucket.
  capacity int64
  // quantum holds how many tokens are added on
  // each tick.
  quantum int64
  // fillInterval holds the interval between each tick.
  fillInterval time.Duration
  // mu guards the fields below it.
  mu sync.Mutex
  // availableTokens holds the number of available
  // tokens as of the associated latestTick.
  // It will be negative when there are consumers
  // waiting for tokens.
  availableTokens int64
  // latestTick holds the latest tick for which
  // we know the number of tokens in the bucket.
  latestTick int64
}

Bucket 裏面出了存儲初始化必要的參數外,多了兩個變量:

availableTokens:當前可用的令牌數量

latestTick:從程序運行到上一次訪問的時候,一共產生了多少次計數(如果 quantum 等於 1 的話 ,就是一共產生的令牌數量)

2,TakeAvailable

有一個參數,每次取的 token 數量,一般是一個,爲了併發安全,一般會加鎖:

func (tb *Bucket) TakeAvailable(count int64) int64 {
  tb.mu.Lock()
  defer tb.mu.Unlock()
  return tb.takeAvailable(tb.clock.Now(), count)
}

調用了令牌桶計算的核心函數 takeAvailable,第一個參數表示是當前時間,用於計算一共產生了多少個 token:

func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
  if count <= 0 {
    return 0
  }
  tb.adjustavailableTokens(tb.currentTick(now))
  if tb.availableTokens <= 0 {
    return 0
  }
  if count > tb.availableTokens {
    count = tb.availableTokens
  }
  tb.availableTokens -= count
  return count
}

其中 tb.adjustavailableTokens(tb.currentTick(now)) 用於計算修改可用 token 數量 availableTokens,如果 availableTokens<=0, 說明限流了;如果輸入的 count 比 availableTokens,我麼最多隻能獲取 availableTokens 個 token,獲取後,我們把 availableTokens 減去已經使用的 token 數量。

func (tb *Bucket) currentTick(now time.Time) int64 {
  return int64(now.Sub(tb.startTime) / tb.fillInterval)
}

計算出了從開始運行到,當前時間內時間一共跳變了多少次,也就是一共產生了多少次令牌。

func (tb *Bucket) adjustavailableTokens(tick int64) {
  lastTick := tb.latestTick
  tb.latestTick = tick
  if tb.availableTokens >= tb.capacity {
    return
  }
  tb.availableTokens += (tick - lastTick) * tb.quantum
  if tb.availableTokens > tb.capacity {
    tb.availableTokens = tb.capacity
  }
  return
}

1,如果可用 token 數量大於等於令牌桶的容量,說明很長時間沒有流量來獲取 token 了,不用處理。

2,計算上一次獲取 token 到現時刻,產生的 token 數量,把它加到 availableTokens 上

3,如果 availableTokens 數量比 capacity 大,說明溢出了,修改 availableTokens 爲 capacity。

以上就是令牌桶算法的核心邏輯。當然,這個包還封裝了一些其他的靈活的取令牌的接口,比如

func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) {
  tb.mu.Lock()
  defer tb.mu.Unlock()
  return tb.take(tb.clock.Now(), count, maxWait)
}

這個函數就是獲取,在 maxWait time.Duration 超時的前提下,產生 count 個 token,需要等待的時間間隔。

func (tb *Bucket) take(now time.Time, count int64, maxWait time.Duration) (time.Duration, bool) {
  if count <= 0 {
    return 0, true
  }
  tick := tb.currentTick(now)
  tb.adjustavailableTokens(tick)
  avail := tb.availableTokens - count
  if avail >= 0 {
    tb.availableTokens = avail
    return 0, true
  }
  // Round up the missing tokens to the nearest multiple
  // of quantum - the tokens won't be available until
  // that tick.
  // endTick holds the tick when all the requested tokens will
  // become available.
  endTick := tick + (-avail+tb.quantum-1)/tb.quantum
  endTime := tb.startTime.Add(time.Duration(endTick) * tb.fillInterval)
  waitTime := endTime.Sub(now)
  if waitTime > maxWait {
    return 0, false
  }
  tb.availableTokens = avail
  return waitTime, true
}

函數的前半部分和 takeAvailable 一模一樣,後面邏輯表示,如果令牌不夠的情況下:

1,計算還缺多少個令牌

2,計算缺這麼多令牌需要跳變多少次

3,計算跳變這些次數需要的時間

4,判斷需要的時間是否超時

還有一個 wait 接口,用來計算,獲取 count 個令牌需要的時間,然後 sleep 這麼長時間。

func (tb *Bucket) Wait(count int64) {
  if d := tb.Take(count); d > 0 {
    tb.clock.Sleep(d)
  }
}

以上就是令牌桶算法的核心源碼實現,

ratelimit/reader.go

裏面實現了基於上述限流器實現的讀限速和寫限速,原理是通過讀寫 buff 的長度來控制 Wait 函數的等待時間,實現讀寫限速的

func (r *reader) Read(buf []byte) (int, error) {
  n, err := r.r.Read(buf)
  if n <= 0 {
    return n, err
  }
  r.bucket.Wait(int64(n))
  return n, err
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/9So1ER5mTSI9_wdvR0YFpA