Go 防止流量過載的利器——限流組件
一、服務流量限制的重要性
隨着業務規模的增長, 服務的流量也會激增, 大流量可能會壓垮服務器, 導致服務癱瘓。因此需對服務的流量進行限制, 確保在大流量的情況下也能正常運行。
當流量激增時, 會佔用大量服務器資源和帶寬, 可能會壓垮整個系統。比如流量激增期間數據庫連接用盡, 會導致服務無法訪問數據庫而宕機。用限制流量可以有效防止流量暴增壓垮系統。
沒有限流時, 流量激增期間會啓動很多無用的任務佔用服務器資源, 造成不必要的浪費。適當限流可以排隊或拒絕無效請求, 有效控制資源消耗使用。
二、常見的限流算法
限流的基本思想是通過算法預先設置閾值, 當流量達到閾值時自動限制, 常見的限流算法有:
1. 計數器算法
計數器算法根據時間窗口內的請求數進行限制, 基本思路是設置一個計數器統計時間窗口內的請求數, 當請求數達到限流閾值時, 就拒絕服務或者排隊。
優點: 實現簡單, 資源消耗少。
缺點: 無法處理突發流量, 時間窗口比較難確定合適的值。
2. 漏桶算法
漏桶算法是限制請求通過的速率, 基本思路是設置桶的容量和流出速率, 如果請求流入速率過大會被桶阻止, 根據固定速率流出, 起到平滑調節速率的作用。
優點: 可以很好地應對突發流量。
缺點: 需要實時處理每一個請求, 對系統資源消耗較大。
3. 令牌桶算法
令牌桶算法按照固定速率向桶中放入令牌, 每次請求需要消耗一個令牌, 只有拿到令牌的請求才允許通過。放令牌的速率可以通過調節, 實現不同速率的請求限流。
優點: 擁堵控制能力更強, 可自定義速率。
缺點: 需要實時處理請求, 系統資源消耗依然比較大。
三、限流實戰
1. 使用計數器限制總請求數
計數器算法主要是基於時間窗口和計數器來進行限制的。下面來看一個基於計數器進行限流的實現:
package limit
import (
"sync/atomic"
"time"
)
// 計數器限流器
type CounterLimiter struct {
count uint64 // 當前計數
lastUpdate int64 // 上次更新的時間
limitPerSec uint64 // 每秒限制的請求數
}
// 創建計數器限流器
func NewCounterLimiter(limitPerSec uint64) *CounterLimiter {
return &CounterLimiter{
count: 0,
lastUpdate: time.Now().Unix(),
limitPerSec: limitPerSec,
}
}
// 實現限流器接口的Allow()
func (c *CounterLimiter) Allow() bool {
now := time.Now().Unix()
elapse := now - c.lastUpdate
c.lastUpdate = now
addedCount := elapse * c.limitPerSec
c.count += uint64(addedCount)
if c.count > c.limitPerSec {
c.count = c.limitPerSec
}
if c.count < c.limitPerSec {
c.count++
return true
}
return false
}
基於計數器的限流器 CounterLimiter,主要邏輯是
創建限流器時指定限流頻率 limitPerSec
Allow() 方法校驗是否限流
計算當前時間和上次更新時間差 elapsed
根據時間差計算這段時間新增的限流額度 addedCount
計數器統計值增加 addedCount
判斷計數器是否達到閾值
到達閾值則限流, 未到則計數 + 1 並放行
只需要創建一個限流器, 並在請求處理前調用 Allow() 方法判斷是否限流即可。
func httpHandler(w http.ResponseWriter, r *http.Request) {
limiter := NewCounterLimiter(10)
if !limiter.Allow() {
http.Error(w, http.StatusText(429),
http.StatusTooManyRequests)
return
}
// 核心邏輯
}
2. 使用漏桶算法限制請求通過速率
漏桶算法需要設置一個桶的容量 capacity, 和漏出流量的速率 flow per second。如果請求流入速度過快, 會被桶的容量限制而丟棄。
使用漏桶算法實現請求通過速率限流的示例:
package limit
import (
"sync"
"time"
)
type LeakyBucket struct {
capacity int64 // 桶容量
used int64 // 當前已使用
mu sync.Mutex
lastLeakTime time.Time // 上次漏水時間
flow int64 // 每秒流速
}
// 創建漏桶限流器
func NewLeakyBucket(capacity, flow int64) *LeakyBucket {
return &LeakyBucket{
capacity: capacity,
used: 0,
flow: flow,
}
}
// 實現限流器接口的Allow()
func (l *LeakyBucket) Allow() bool {
l.mu.Lock()
defer l.mu.Unlock()
now := time.Now()
l.leak(now)
if l.used >= l.capacity {
return false
}
l.used += 1
return true
}
// 漏水處理
func (l *LeakyBucket) leak(now time.Time) {
delta := now.Sub(l.lastLeakTime)
leaked := delta.Seconds() * float64(l.flow)
// 計算這段間隔內的漏水量
leakedInt := int64(leaked)
if leakedInt > (l.used) {
// 漏出了全部水
l.used = 0
} else {
l.used -= leakedInt
}
l.lastLeakTime = now
}
漏桶算法的實現主要分爲兩個部分:
Allow() 方法檢查當前使用量是否達到桶容量, 未到則請求數 + 1 並返回 true, 已到則返回 false 限流
leak() 方法按照固定流速進行漏水, 對應速率進行的限流
可通過設置桶容量和流速, 來限制請求通過系統的速率了。
3. 使用令牌桶算法釋放固定數額的令牌
令牌桶算法的主要邏輯是按照一定速率往桶中放入令牌。
請求在處理前需要先獲取令牌, 如果沒有可用令牌則丟棄該請求或進入隊列等待。
下面是使用令牌桶算法實現的限流器:
package limit
import (
"sync"
"time"
)
// 令牌桶算法限流器
type TokenBucket struct {
capacity int64 // 桶容量
rate int64 // 令牌放入速率
tokens int64 // 當前令牌數
lastUpdate int64 // 上次添加令牌的時間
mu sync.Mutex
}
// 創建令牌桶限流器
func NewTokenBucket(capacity, rate int64) *TokenBucket {
return &TokenBucket{
capacity: capacity,
rate: rate,
tokens: capacity,
lastUpdate: time.Now().Unix(),
}
}
// Allow 方法實現限流器接口
func (t *TokenBucket) Allow() bool {
t.addTokens()
t.mu.Lock()
defer t.mu.Unlock()
if t.tokens <= 0 {
return false
}
t.tokens--
return true
}
// 按速率添加令牌
func (t *TokenBucket) addTokens() {
now := time.Now().Unix()
elapse := now - t.lastUpdate
add := elapse * t.rate
t.lastUpdate = now
t.tokens += add
if t.tokens > t.capacity {
t.tokens = t.capacity
}
}
主要邏輯分爲兩部分:
Allow() 方法處理請求前獲取令牌
addTokens() 方法按照速率往桶中添加令牌
調整桶容量和添加令牌的速率, 來達到平滑限流的效果。
4. 封裝限流中間件, 便於業務複用
在上例中, 實現了通用的限流器接口, 包含創建限流器和 Allow() 校驗方法。可以基於這個接口, 進一步封裝成限流中間件。
// 中間件實現
func LimitMiddleware(handler http.Handler, limiter Limiter)
http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter,
r *http.Request) {
if !limiter.Allow() {
http.Error(w, http.StatusText(429),
http.StatusTooManyRequests)
return
}
handler.ServeHTTP(w, r)
})
}
// 使用方式
limiter := NewTokenBucket(capacity, rate)
http.Handle("/", LimitMiddleware(myHandler, limiter))
四、優化限流策略
很多時候業務訪問量會有周期性波動或突發變化, 需要能夠檢測實時流量, 動態調整限流參數。比如可以根據每分鐘的請求量實時調整下一分鐘的限流閾值。
根據業務需要可以設置自定義策略, 比如對重要接口限流預留更多資源, 對次要接口限流更嚴格等。允許更加細粒度地控制限流。
可以根據服務器負載、平均響應時間等指標, 動態決定是否需要限流以保護系統。在流量大幅增長時自動跟進限制。
五、使用 Redis 實現分佈式限流
之前的限流方式都是在單機上通過計數或時間實現, 存在一定的不足。可使用 Redis 實現分佈式限流。
Redis 的性能和擴展性優勢:
Redis 單機可以達到 10 萬 + QPS 的性能, pipeline 批量操作可以進一步提升這一指標。此外 Redis 很容易通過主從複製和分片來進行擴展。正是得益於這些優勢, 才使得它非常適合實現分佈式限流。
利用 Redis 的計數器和定時任務實現分佈式限制 ,主要的思想是:
對每個唯一請求路徑維護一個計數器
每次請求計數器 +1
當計數器達到閾值則返回限流
通過定時任務定期重置計數器計數
客戶端請求時通過 Lua 腳本 canRequest.lua 來進行判斷:
-- canRequest.lua 限流判斷腳本
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local current = tonumber(redis.call('get', key) or "0")
if current + 1 > limit then
return 0
else
redis.call("INCRBY", key, 1)
redis.call("expire", key, 1)
return 1
end
使用一個唯一鍵 key 存儲計數器, 並設置 key 的過期時間, 比如 1 秒。根據 key 的當前計數是否達到閾值來拒絕請求。可輕鬆通過 Redis 實現分佈式限流, 並可以橫向擴展提高效率。
六、總結
服務限流的重要性, 可以防止過載保護系統
Go 語言通過計數器、漏桶、令牌桶等算法實現了限流
通過限流中間件複用限流邏輯, 靈活配置不同路由的策略
Redis 的性能優勢可以實現分佈式限流
希望本文能對實現服務限流提供一定思路和幫助。限流是一個值得探討的重要話題, 還有很多值得研究的細節, 感興趣的讀者可以自行擴展和優化。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/Rb6IoZBObdqYdWFBUYnU_A