Go 可用性 -四- 限流 3: 漏桶算法
序
在前面兩篇文章當中我們學習了令牌桶算法的使用和實現,今天我們就一起來看一看另外一種常見的限流算法,漏桶算法
漏桶算法
原理
漏桶算法 (Leaky Bucket) 是網絡世界中流量整形(Traffic Shaping)或速率限制(Rate Limiting)時經常使用的一種算法,它的主要目的是控制數據注入到網絡的速率,平滑網絡上的突發流量。漏桶算法提供了一種機制,通過它,突發流量可以被整形以便爲網絡提供一個穩定的流量。--- 百度百科
漏桶算法其實非常形象,如下圖所示可以理解爲一個漏水的桶,當有突發流量來臨的時候,會先到桶裏面,桶下有一個洞,可以以固定的速率向外流水,如果水的從桶中外溢了出來,那麼這個請求就會被拒絕掉。具體的表現就會向下圖右側的圖表一樣,突發流量就被整形成了一個平滑的流量。
漏桶算法的主要作用就是避免出現有的時候流量很高,有的時候又很低,導致系統出現旱的旱死,澇的澇死的這種情況。
Go 中比較常用的漏桶算法的實現就是來自 uber 的 ratelimit,下面我們就會看一下這個庫的使用方式和源碼
API
type Clock
type Limiter
func New(rate int, opts ...Option) Limiter
func NewUnlimited() Limiter
type Option
func Per(per time.Duration) Option
func WithClock(clock Clock) Option
func WithSlack(slack int) Option
Clock
是一個接口,計時器的最小實現,有兩個方法,分別是當前的時間和睡眠
type Clock interface {
Now() time.Time
Sleep(time.Duration)
}
Limiter
也是一個接口,只有一個 Take
方法,執行這個方法的時候如果觸發了 rps 限制則會阻塞住
type Limiter interface {
// Take should block to make sure that the RPS is met.
Take() time.Time
}
NewLimter
和 NewUnlimited
會分別初始化一個無鎖的限速器和沒有任何限制的限速器
Option
是在初始化的時候的額外參數,這種使用姿勢在之前 Go 工程化的文章《Go 工程化 (六) 配置管理》當中有講到,這裏我們就不再贅述了
Option
有三個方法
-
Per
可以修改時間單位,默認是秒所以我們默認限制的是 rps,如果改成分鐘那麼就是 rpm 了 -
WithClock
可以修改時鐘,這個用於在測試的時候可以 mock 掉不使用真實的時間 -
WithSlack
用於修改鬆弛時間,也就是可以允許的突發流量的大小,默認是Pre / 10
,這個後面會講到
案例: 10 行代碼實現一個基於漏桶算法的 ip 限流中間件
案例我們使用和令牌桶類似的案例
func NewLimiter(rps int) gin.HandlerFunc {
limiters := &sync.Map{}
return func(c *gin.Context) {
// 獲取限速器
// key 除了 ip 之外也可以是其他的,例如 header,user name 等
key := c.ClientIP()
l, _ := limiters.LoadOrStore(key, ratelimit.New(rps))
now := l.(ratelimit.Limiter).Take()
fmt.Printf("now: %s\n", now)
c.Next()
}
}
使用上也是比較簡單的
func main() {
e := gin.Default()
// 新建一個限速器,允許突發 3 個併發
e.Use(NewLimiter(3))
e.GET("ping", func(c *gin.Context) {
c.String(http.StatusOK, "pong")
})
e.Run(":8080")
}
我們用 go-stress-testing
進行壓測
go-stress-testing-linux -c 100 -u http://localhost:8080/ping
─────┬───────┬───────┬───────┬────────┬────────┬────────┬────────┬────────┬────────┬────────
耗時 │ 併發數│ 成功數│ 失敗數 │ qps │最長耗時│最短耗時 │平均耗時 │下載字節 │字節每秒 │ 錯誤碼
─────┼───────┼───────┼───────┼────────┼────────┼────────┼────────┼────────┼────────┼────────
1s│ 13│ 13│ 0│ 233.55│ 676.10│ 5.82│ 85.64│ 52│ 51│200:13
2s│ 16│ 16│ 0│ 62.25│ 1675.17│ 5.82│ 321.30│ 64│ 31│200:16
3s│ 19│ 19│ 0│ 31.24│ 2673.94│ 5.82│ 640.20│ 76│ 25│200:19
3s│ 20│ 20│ 0│ 26.37│ 3006.49│ 5.82│ 758.51│ 80│ 26│200:20
************************* 結果 stat ****************************
處理協程數量: 20
請求總數(併發數*請求數 -c * -n): 20 總請求時間: 3.011 秒 successNum: 20 failureNum: 0
************************* 結果 end ****************************
查看結果發現爲什麼第一秒的時候完成了 13 個請求,不是限制的 3rps 麼?不要慌,我們看看它的實現就知道了
實現
這個庫有基於互斥鎖的實現和基於 CAS 的無鎖實現,默認使用的是無鎖實現版本,所以我們主要看無鎖實現的源碼
type state struct {
last time.Time
sleepFor time.Duration
}
type atomicLimiter struct {
state unsafe.Pointer
//lint:ignore U1000 Padding is unused but it is crucial to maintain performance
// of this rate limiter in case of collocation with other frequently accessed memory.
padding [56]byte // cache line size - state pointer size = 64 - 8; created to avoid false sharing.
perRequest time.Duration
maxSlack time.Duration
clock Clock
}
atomicLimiter
結構體
-
state
是一個狀態的指針,用於存儲上一次的執行的時間,以及需要sleep
的時間 -
padding
是一個無意義的填充數據,爲了提高性能,避免 cpu 緩存的 false sharing -
之前在講 Go 併發編程 (二) Go 內存模型 的時候有講到,爲了能夠最大限度的利用 CPU 的能力,會做很多喪心病狂的優化,其中一種就是 cpu cache
-
cpu cache 一般是以 cache line 爲單位的,在 64 位的機器上一般是 64 字節
-
所以如果我們高頻併發訪問的數據小於 64 字節的時候就可能會和其他數據一起緩存,其他數據如果出現改變就會導致 cpu 認爲緩存失效,這就是 false sharing
-
所以在這裏爲了儘可能提高性能,填充了 56 字節的無意義數據,因爲 state 是一個指針佔用了 8 個字節,所以
64 - 8 = 56
-
剩下三個字段和
Option
中的三個方法意義對應 -
perRequest
就是單位,默認是秒 -
maxSlack
鬆弛時間,也就是可以允許的突發流量的大小,默認是Pre / 10
,這個後面會講到 -
clock
時鐘,這個用於在測試的時候可以 mock 掉不使用真實的時間
接下來看看最主要的 Take
方法
func (t *atomicLimiter) Take() time.Time {
var (
// 狀態
newState state
// 用於表示原子操作是否成功
taken bool
// 需要 sleep 的時間
interval time.Duration
)
// 如果 CAS 操作不成功就一直嘗試
for !taken {
// 獲取當前的時間
now := t.clock.Now()
// load 出上一次調用的時間
previousStatePointer := atomic.LoadPointer(&t.state)
oldState := (*state)(previousStatePointer)
newState = state{
last: now,
sleepFor: oldState.sleepFor,
}
// 如果 last 是零值的話,表示之前就沒用過,直接保存返回即可
if oldState.last.IsZero() {
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
continue
}
// sleepFor 是需要睡眠的時間,由於引入了鬆弛時間,所以 sleepFor 可能是一個
// maxSlack ~ 0 之間的一個值,所以這裏需要將現在的需要 sleep 的時間和上一次
// sleepFor 的值相加
newState.sleepFor += t.perRequest - now.Sub(oldState.last)
// 如果距離上一次調用已經很久了,sleepFor 可能會是一個很小的值
// 最小值只能是 maxSlack 的大小
if newState.sleepFor < t.maxSlack {
newState.sleepFor = t.maxSlack
}
// 如果 sleepFor 大於 0 的話,計算出需要 sleep 的時間
// 然後將 state.sleepFor 置零
if newState.sleepFor > 0 {
newState.last = newState.last.Add(newState.sleepFor)
interval, newState.sleepFor = newState.sleepFor, 0
}
// 保存狀態
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
}
// sleep interval
t.clock.Sleep(interval)
return newState.last
}
總結
今天學習了漏桶的實現原理以及使用方式,漏桶和令牌桶的最大的區別就是,令牌桶是支持突發流量的,但是漏桶是不支持的。但是 uber 的這個庫通過引入彈性時間的方式也讓漏桶算法有了類似令牌桶能夠應對部分突發流量的能力,並且實現上還非常的簡單,值得學習。
多看看好的輪子的實現總會學到一些新姿勢,今天就學到了使用 padding 填充來避免 false sharing 提高性能的操作
參考文獻
-
Go 進階訓練營 - 極客時間
-
"帶你快速瞭解:限流中的漏桶和令牌桶算法"
-
ratelimit · pkg.go.dev
-
ratelimit/limiter_atomic.go at main · uber-go/ratelimit (github.com)
-
漏桶算法_百度百科 (baidu.com)
-
利用 CPU cache 特性優化 Go 程序
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/dJ3hiuA-8BdNF_ENL-WIUg