一文掌握常見的限流算法:計數器、漏桶、令牌桶等

限流(Rate Limiting),也稱流量控制。是指系統在面臨高併發,或者大流量請求的情況下,限制新的請求對系統的訪問,從而保證系統的穩定性。限流會導致部分用戶請求處理不及時或者被拒,這就影響了用戶體驗。所以一般需要在系統穩定和用戶體驗之間平衡一下。

常見的限流算法包括固定窗口計數器算法滑動窗口計數器算法漏桶算法令牌桶算法基於用戶的限流動態限流。其中固定窗口計數器算法、滑動窗口計數器算法由都屬於計數器算法。

以下將逐一介紹這些算法並附上 Go 語言的代碼示例。

  1. 固定窗口計數器算法

固定窗口計數器算法屬於計數器算法中的一種。固定窗口計數器算法通過對請求進行計數,限制在固定時間窗口內的請求數。每個窗口開始時計數器被重置,遍歷時間限制內的請求次數。

假設一個接口 1s 中最多請求 100 次。最開始設置一個計數器 count=0,來一個請求 count+1,1s 之內 count<=100 的請求可以正常訪問,count>100 的請求則被拒絕,1s 之後 count 被重置爲 0,重新開始計數。

固定窗口的問題是容易出現 “突刺現象”,例如在 1s 內,100 個請求都是在前 100ms 過來的,那麼後面的 900ms 的請求都會被拒絕,而系統此時是空閒的。另外還有 “臨界問題”,如果 100 個請求是在後 100ms 過來的,而下一個 1s 的 100 個請求在前 100ms 過來,此時系統在這 200ms 內就需要處理 200 個請求,跟我們想要的不符合。

到這裏我們很容易想到,1s 這個範圍太大了,縮小一些就更好了,這種把固定窗口拆成更多個小窗口的做法就是滑動窗口算法了。

Go 代碼示例

package main

import (
    "sync"
    "time"
)

type FixedWindowCounter struct {
    mu        sync.Mutex
    requestCount int
    limit     int
    window    time.Duration
    resetTime time.Time
}

func NewFixedWindowCounter(limit int, window time.Duration) *FixedWindowCounter {
    return &FixedWindowCounter{
        limit: limit,
        window: window,
        resetTime: time.Now(),
    }
}

func (fw *FixedWindowCounter) Allow() bool {
    fw.mu.Lock()
    defer fw.mu.Unlock()

    now := time.Now()
    if now.Sub(fw.resetTime) >= fw.window {
        fw.requestCount = 0
        fw.resetTime = now
    }

    if fw.requestCount < fw.limit {
        fw.requestCount++
        return true
    }
    return false
}
  1. 滑動窗口計數器算法

滑動窗口計數器算法屬於計數器算法中的一種。滑動窗口的思想是將固定窗口拆成更多個小窗口,隨着時間的推移,窗口不斷的滑動,統計也在不斷的變化。窗口拆分的越多,滑動就會越平滑,統計就會越精確,所消耗的資源就會越多。滑動窗口如果只拆爲 1 個窗口,就退化爲固定窗口。

Go 代碼示例

package main

import (
    "container/list"
    "sync"
    "time"
)

type SlidingWindowCounter struct {
    mu        sync.Mutex
    events    *list.List
    limit     int
    window    time.Duration
}

func NewSlidingWindowCounter(limit int, window time.Duration) *SlidingWindowCounter {
    return &SlidingWindowCounter{
        events: list.New(),
        limit: limit,
        window: window,
    }
}

func (sw *SlidingWindowCounter) Allow() bool {
    sw.mu.Lock()
    defer sw.mu.Unlock()

    now := time.Now()
    // 清理過期的事件
    for sw.events.Len() > 0 {
        if sw.events.Front().Value.(time.Time).Add(sw.window).Before(now) {
            sw.events.Remove(sw.events.Front())
        } else {
            break
        }
    }

    if sw.events.Len() < sw.limit {
        sw.events.PushBack(now)
        return true
    }

    return false
}
  1. 漏桶算法

漏桶算法通過一個固定速率的漏桶完成請求,任何超出桶容量的請求將被拒絕。請求以固定速率從桶中出桶。

漏桶算法的思想是將請求先放到一個桶中,然後像滴水一樣不斷的從中取出請求執行,桶滿則溢,後面的請求會被拒絕。當漏斗滿了,多餘的水就被直接丟棄了。

漏桶算法的特點是流入速度不確定,但是流出速度是確定的,漏桶可以很平滑,均衡的處理請求,但是無法應對短暫的突發流量。

Go 代碼示例

package main

import (
    "sync"
    "time"
)

type LeakyBucket struct {
    mu        sync.Mutex
    capacity  int
    available int
    rate      time.Duration
    lastTime  time.Time
}

func NewLeakyBucket(capacity int, rate time.Duration) *LeakyBucket {
    return &LeakyBucket{
        capacity:  capacity,
        available: capacity,
        rate:      rate,
        lastTime:  time.Now(),
    }
}

func (lb *LeakyBucket) Allow() bool {
    lb.mu.Lock()
    defer lb.mu.Unlock()

    now := time.Now()
    elapsed := now.Sub(lb.lastTime)

    // 更新可用令牌
    lb.available += int(elapsed / lb.rate)
    if lb.available > lb.capacity {
        lb.available = lb.capacity
    }
    lb.lastTime = now

    if lb.available > 0 {
        lb.available--
        return true
    }

    return false
}
  1. 令牌桶算法

令牌桶算法的思想是不斷的生成令牌放到一個桶中,請求到來時到桶中申請令牌,申請得到就執行,申請不到就拒絕。如果桶中的令牌滿了,新生成的令牌也會丟棄。

與漏桶不同的是,令牌桶是流入速度確定(生成令牌的速度),流出速度不確定,所以它不像漏桶一樣可以均衡的處理請求,但是由於有令牌桶這個緩衝,一旦有突增的流量,令牌桶裏已有的令牌可以短暫的應對突發流量。

由於流出速度是不限制的,此時桶中已有的令牌都可以被申請到,請求一下子就會到我們的服務,給系統帶來一定的壓力,所以桶的大小需要合適,不宜過大。

舉個例子:令牌桶的大小是 1000,每秒放 100 個令牌,經過一段時間後,請求有空閒時,桶裏的令牌就會積壓,最終保存了滿 1000 個令牌,由於某刻流量突增,有 1000 個請求到來,此時能申請到 1000 個令牌,所有請求都會放行,最終達到我們的系統,如果令牌桶過大,系統可能會承受不了這波請求。

令牌桶算法可以說是對漏桶算法的改進。漏桶算法能限制請求的速率。而令牌桶算法在限制請求速率的同時還允許一定程度的突發調用。

過程如下:

一直放令牌,如果令牌桶達到上限則丟棄令牌,假設每秒放 10 個,可以應對一定程度的流量激增,如此時令牌桶有 100 個令牌,突然發生 200 次調用,則此時最開始的 100 次請求可以正常調用,後續的請求才會以 10 個 / s 的速率來調用。

Go 代碼示例

package main

import (
    "sync"
    "time"
)

type TokenBucket struct {
    mu       sync.Mutex
    capacity int
    tokens   int
    rate     time.Duration
    lastTime time.Time
}

func NewTokenBucket(capacity int, rate time.Duration) *TokenBucket {
    return &TokenBucket{
        capacity: capacity,
        tokens:   capacity,
        rate:     rate,
        lastTime: time.Now(),
    }
}

func (tb *TokenBucket) Allow() bool {
    tb.mu.Lock()
    defer tb.mu.Unlock()

    now := time.Now()
    elapsed := now.Sub(tb.lastTime)

    // 計算可用令牌數
    tb.tokens += int(elapsed / tb.rate)
    if tb.tokens > tb.capacity {
        tb.tokens = tb.capacity
    }
    tb.lastTime = now

    if tb.tokens > 0 {
        tb.tokens--
        return true
    }
    
    return false
}
  1. 基於用戶的限流

基於用戶的限流策略允許對不同用戶設置不同的請求頻率限制。可以使用上述任意算法作爲基礎,根據用戶身份進行控制。

Go 代碼示例

package main

import (
    "sync"
    "time"
)

type UserRateLimiter struct {
    mu         sync.Mutex
    userLimits map[string]int
    userCounts map[string]int
    limit      int
    window     time.Duration
    resetTime  map[string]time.Time
}

func NewUserRateLimiter(limit int, window time.Duration) *UserRateLimiter {
    return &UserRateLimiter{
        userLimits: make(map[string]int),
        userCounts: make(map[string]int),
        limit:      limit,
        window:     window,
        resetTime:  make(map[string]time.Time),
    }
}

func (url *UserRateLimiter) Allow(userId string) bool {
    url.mu.Lock()
    defer url.mu.Unlock()

    now := time.Now()
    if _, exists := url.resetTime[userId]; !exists {
        url.resetTime[userId] = now
    }

    if now.Sub(url.resetTime[userId]) >= url.window {
        url.userCounts[userId] = 0
        url.resetTime[userId] = now
    }

    if url.userCounts[userId] < url.limit {
        url.userCounts[userId]++
        return true
    }
    
    return false
}
  1. 動態限流

動態限流算法根據系統的實時性能和負載情況動態調整限流策略。具體實現可以結合上述算法,尤其是令牌桶算法。

Go 代碼示例

package main

import (
    "sync"
    "time"
)

type DynamicRateLimiter struct {
    mu         sync.Mutex
    currentRate int
    maxRate    int
    minRate    int
    rateChange time.Duration
    lastChange time.Time
}

func NewDynamicRateLimiter(maxRate, minRate int, rateChange time.Duration) *DynamicRateLimiter {
    return &DynamicRateLimiter{
        currentRate: maxRate,
        maxRate:     maxRate,
        minRate:     minRate,
        rateChange:  rateChange,
        lastChange:  time.Now(),
    }
}

func (dr *DynamicRateLimiter) AdjustRate(load int) {
    dr.mu.Lock()
    defer dr.mu.Unlock()

    now := time.Now()
    if now.Sub(dr.lastChange) < dr.rateChange {
        return
    }

    if load > dr.currentRate {
        dr.currentRate--
        if dr.currentRate < dr.minRate {
            dr.currentRate = dr.minRate
        }
    } else {
        dr.currentRate++
        if dr.currentRate > dr.maxRate {
            dr.currentRate = dr.maxRate
        }
    }

    dr.lastChange = now
}

func (dr *DynamicRateLimiter) Allow() bool {
    // 這裏可以使用任意一種算法實現,根據dr.currentRate來限制請求
    // 簡單示例,返回 true 表示請求被允許
    return true
}

總結

儘管限流算法在實現上各有不同,但它們的核心目標是確保系統在高併發情況下能夠高效、穩定地運行。選擇合適的限流算法需要根據具體業務需求、流量特徵及系統架構來進行相應評估。如需深入瞭解某種算法或是具體應用,歡迎隨時詢問!

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/uP5WT7NHXBBHWyn_PrEv_Q