Go 防止流量過載的利器——限流組件

一、服務流量限制的重要性

隨着業務規模的增長, 服務的流量也會激增, 大流量可能會壓垮服務器, 導致服務癱瘓。因此需對服務的流量進行限制, 確保在大流量的情況下也能正常運行。

當流量激增時, 會佔用大量服務器資源和帶寬, 可能會壓垮整個系統。比如流量激增期間數據庫連接用盡, 會導致服務無法訪問數據庫而宕機。用限制流量可以有效防止流量暴增壓垮系統。

沒有限流時, 流量激增期間會啓動很多無用的任務佔用服務器資源, 造成不必要的浪費。適當限流可以排隊或拒絕無效請求, 有效控制資源消耗使用。

二、常見的限流算法

限流的基本思想是通過算法預先設置閾值, 當流量達到閾值時自動限制, 常見的限流算法有:

1. 計數器算法

計數器算法根據時間窗口內的請求數進行限制, 基本思路是設置一個計數器統計時間窗口內的請求數, 當請求數達到限流閾值時, 就拒絕服務或者排隊。

優點: 實現簡單, 資源消耗少。

缺點: 無法處理突發流量, 時間窗口比較難確定合適的值。

2. 漏桶算法

漏桶算法是限制請求通過的速率, 基本思路是設置桶的容量和流出速率, 如果請求流入速率過大會被桶阻止, 根據固定速率流出, 起到平滑調節速率的作用。

優點: 可以很好地應對突發流量。

缺點: 需要實時處理每一個請求, 對系統資源消耗較大。

3. 令牌桶算法

令牌桶算法按照固定速率向桶中放入令牌, 每次請求需要消耗一個令牌, 只有拿到令牌的請求才允許通過。放令牌的速率可以通過調節, 實現不同速率的請求限流。

優點: 擁堵控制能力更強, 可自定義速率。

缺點: 需要實時處理請求, 系統資源消耗依然比較大。

三、限流實戰

1. 使用計數器限制總請求數

計數器算法主要是基於時間窗口和計數器來進行限制的。下面來看一個基於計數器進行限流的實現:

package limit
import (
    "sync/atomic"
    "time"
)
// 計數器限流器
type CounterLimiter struct {
    count      uint64 // 當前計數
    lastUpdate int64  // 上次更新的時間
    limitPerSec uint64 // 每秒限制的請求數
}
// 創建計數器限流器
func NewCounterLimiter(limitPerSec uint64) *CounterLimiter {
    return &CounterLimiter{
        count:      0,
        lastUpdate: time.Now().Unix(),
        limitPerSec: limitPerSec,
    }
}
// 實現限流器接口的Allow()
func (c *CounterLimiter) Allow() bool {
    now := time.Now().Unix()
    elapse := now - c.lastUpdate
    c.lastUpdate = now
    addedCount := elapse * c.limitPerSec
    c.count += uint64(addedCount)
    if c.count > c.limitPerSec {
        c.count = c.limitPerSec
    }
    if c.count < c.limitPerSec {
        c.count++
        return true
    }
    return false
}

基於計數器的限流器 CounterLimiter,主要邏輯是

創建限流器時指定限流頻率 limitPerSec

Allow() 方法校驗是否限流

  • 計算當前時間和上次更新時間差 elapsed

  • 根據時間差計算這段時間新增的限流額度 addedCount

  • 計數器統計值增加 addedCount

  • 判斷計數器是否達到閾值

  • 到達閾值則限流, 未到則計數 + 1 並放行

只需要創建一個限流器, 並在請求處理前調用 Allow() 方法判斷是否限流即可。

func httpHandler(w http.ResponseWriter, r *http.Request) {
    limiter := NewCounterLimiter(10) 
    if !limiter.Allow() {
        http.Error(w, http.StatusText(429), 
        http.StatusTooManyRequests)
        return
    }
    // 核心邏輯
}

2. 使用漏桶算法限制請求通過速率

漏桶算法需要設置一個桶的容量 capacity, 和漏出流量的速率 flow per second。如果請求流入速度過快, 會被桶的容量限制而丟棄。

使用漏桶算法實現請求通過速率限流的示例:

package limit
import (
    "sync"
    "time"
)
type LeakyBucket struct {
    capacity     int64 // 桶容量
    used         int64 // 當前已使用
    mu           sync.Mutex
    lastLeakTime time.Time // 上次漏水時間
    flow         int64 // 每秒流速
}
// 創建漏桶限流器
func NewLeakyBucket(capacity, flow int64) *LeakyBucket {
    return &LeakyBucket{
        capacity: capacity,
        used:     0,
        flow:     flow,
    }
}
// 實現限流器接口的Allow()
func (l *LeakyBucket) Allow() bool {
    l.mu.Lock()
    defer l.mu.Unlock()
    now := time.Now()
    l.leak(now)
    if l.used >= l.capacity {
        return false 
    }
    l.used += 1
    return true
}
// 漏水處理
func (l *LeakyBucket) leak(now time.Time) {
    delta := now.Sub(l.lastLeakTime)
    leaked := delta.Seconds() * float64(l.flow)
    // 計算這段間隔內的漏水量 
    leakedInt := int64(leaked) 
    if leakedInt > (l.used) {
        // 漏出了全部水
        l.used = 0
    } else {
        l.used -= leakedInt
    }
    l.lastLeakTime = now
}

漏桶算法的實現主要分爲兩個部分:

  • Allow() 方法檢查當前使用量是否達到桶容量, 未到則請求數 + 1 並返回 true, 已到則返回 false 限流

  • leak() 方法按照固定流速進行漏水, 對應速率進行的限流

可通過設置桶容量和流速, 來限制請求通過系統的速率了。

3. 使用令牌桶算法釋放固定數額的令牌

令牌桶算法的主要邏輯是按照一定速率往桶中放入令牌。

請求在處理前需要先獲取令牌, 如果沒有可用令牌則丟棄該請求或進入隊列等待。

下面是使用令牌桶算法實現的限流器:

package limit
import (
    "sync"
    "time"
)
// 令牌桶算法限流器
type TokenBucket struct {
    capacity   int64 // 桶容量
    rate       int64 // 令牌放入速率
    tokens     int64 // 當前令牌數
    lastUpdate int64 // 上次添加令牌的時間
    mu         sync.Mutex
}
// 創建令牌桶限流器
func NewTokenBucket(capacity, rate int64) *TokenBucket {
    return &TokenBucket{
        capacity: capacity,
        rate:     rate,
        tokens:   capacity,
        lastUpdate: time.Now().Unix(),
    }
}
// Allow 方法實現限流器接口
func (t *TokenBucket) Allow() bool {
    t.addTokens()
    t.mu.Lock()
    defer t.mu.Unlock()
    if t.tokens <= 0 {
        return false
    }
    t.tokens--
    return true
}
// 按速率添加令牌 
func (t *TokenBucket) addTokens() {
    now := time.Now().Unix()
    elapse := now - t.lastUpdate
    add := elapse * t.rate
    t.lastUpdate = now
    t.tokens += add
    if t.tokens > t.capacity {
        t.tokens = t.capacity 
    }
}

主要邏輯分爲兩部分:

  • Allow() 方法處理請求前獲取令牌

  • addTokens() 方法按照速率往桶中添加令牌

調整桶容量和添加令牌的速率, 來達到平滑限流的效果。

4. 封裝限流中間件, 便於業務複用

在上例中, 實現了通用的限流器接口, 包含創建限流器和 Allow() 校驗方法。可以基於這個接口, 進一步封裝成限流中間件。

// 中間件實現
func LimitMiddleware(handler http.Handler, limiter Limiter)
 http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, 
    r *http.Request) {
        if !limiter.Allow() {
            http.Error(w, http.StatusText(429),
             http.StatusTooManyRequests)
            return
        }
        handler.ServeHTTP(w, r)
    })
}
// 使用方式
limiter := NewTokenBucket(capacity, rate)
http.Handle("/", LimitMiddleware(myHandler, limiter))

四、優化限流策略

很多時候業務訪問量會有周期性波動或突發變化, 需要能夠檢測實時流量, 動態調整限流參數。比如可以根據每分鐘的請求量實時調整下一分鐘的限流閾值。

根據業務需要可以設置自定義策略, 比如對重要接口限流預留更多資源, 對次要接口限流更嚴格等。允許更加細粒度地控制限流。

可以根據服務器負載、平均響應時間等指標, 動態決定是否需要限流以保護系統。在流量大幅增長時自動跟進限制。

五、使用 Redis 實現分佈式限流

之前的限流方式都是在單機上通過計數或時間實現, 存在一定的不足。可使用 Redis 實現分佈式限流。

Redis 的性能和擴展性優勢:

Redis 單機可以達到 10 萬 + QPS 的性能, pipeline 批量操作可以進一步提升這一指標。此外 Redis 很容易通過主從複製和分片來進行擴展。正是得益於這些優勢, 才使得它非常適合實現分佈式限流。

利用 Redis 的計數器和定時任務實現分佈式限制 ,主要的思想是:

  • 對每個唯一請求路徑維護一個計數器

  • 每次請求計數器 +1

  • 當計數器達到閾值則返回限流

  • 通過定時任務定期重置計數器計數

客戶端請求時通過 Lua 腳本 canRequest.lua 來進行判斷:

-- canRequest.lua 限流判斷腳本
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local current = tonumber(redis.call('get', key) or "0")
if current + 1 > limit then 
    return 0
else
    redis.call("INCRBY", key, 1)
    redis.call("expire", key, 1) 
    return 1
end

使用一個唯一鍵 key 存儲計數器, 並設置 key 的過期時間, 比如 1 秒。根據 key 的當前計數是否達到閾值來拒絕請求。可輕鬆通過 Redis 實現分佈式限流, 並可以橫向擴展提高效率。

六、總結

  • 服務限流的重要性, 可以防止過載保護系統

  • Go 語言通過計數器、漏桶、令牌桶等算法實現了限流

  • 通過限流中間件複用限流邏輯, 靈活配置不同路由的策略

  • Redis 的性能優勢可以實現分佈式限流

希望本文能對實現服務限流提供一定思路和幫助。限流是一個值得探討的重要話題, 還有很多值得研究的細節, 感興趣的讀者可以自行擴展和優化。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Rb6IoZBObdqYdWFBUYnU_A