【Go Web 開發】基於 IP 限流

上一篇文章我們介紹了全侷限流,當您希望嚴格對 API 的請求總速率進行限制,並且不關心請求來自何處時,使用全侷限流器可能很有用。但通常更常見的是爲每個客戶端單獨設置一個限流器,這樣可以防止單個客戶端發出太多請求,影響其他客戶端。

一種簡單的實現方法是創建一個 map 來爲每個客戶端創建一個限流器映射,使用每個客戶端的 IP 地址作爲 map 的鍵。當一個新客戶端向 API 發出請求時,我們將初始化一個新的限流器並將其添加到 map 中。對於任何後續請求,我們將從 map 中檢索客戶端的限流器,並通過調用其 Allow() 方法檢查請求是否允許,就像之前所做的那樣。

因爲可能會有多個 goroutine 併發地訪問 map,所以我們需要使用互斥鎖來防止競爭條件來保護對 map 的訪問。如果您正在跟隨本文操作,那麼一起開始編碼並更新上一篇文章中的 rateLimit() 中間件來實現這一點。

File: cmd/api/middleware.go

package main

...

func (app *application)rateLimit(next http.Handler) http.Handler{
    //申明一個mutex和map存放客戶端IP和限流器實例
    var (
        mu      sync.Mutex
        clients = make(map[string]*rate.Limiter)
    )
    return http.HandlerFunc(func(w http.ResponseWrite, r *http.Request){
    //從請求中提取出客戶端IP地址
    ip, _, err := net.SplitHostPort(r.RremoteAddr)
    if err != nil {
        app.serverErrorResponse(w, err)
        return
        }
     //上鎖放在併發寫入map
     mu.Lock()
     //檢查map中IP是否存在,不存在就創建限流器
     if _, found := clients[ip]; !found{
        clients[ip] = rate.NewLimiter(2, 4)
        }
     //調用Allow()方法,查看請求是否允許
     if !clients[ip].Allow(){
        mu.Unlock()
        app.rateLimitExceededResponse(w, r)
        return
        } 
     //即使請求允許也要釋放鎖
     mu.Unlock()
     next.ServerHTTP(w, r)
    })
}

刪除 Map 中舊限流器

上面的代碼可正常工作,但是有一個小問題,clients 這個 map 會無限的增加鍵值對,添加的每一個新的 IP 地址和限流器都會佔用越來越多的資源。

爲了防止這種情況,我們更新下代碼,記錄每個客戶端的最後一次訪問 API 時間。然後運行一個後臺 goroutine,定期從映射中刪除最近沒有訪問服務的客戶端。爲了做到這一點,我們需要創建一個自定義的客戶端結構體,它包含每個客戶端的限流器和最後一次訪問服務時間,並在初始化中間件時啓動後臺清理程序。

File:cmd/api/middleware.go

package main

...

func (app *application) rateLimit(next http.Handler) http.Handler {
        //定義client結構體,包含限流器和最後一次訪問API服務的時間
    type client struct {
        limiter  *rate.Limiter
        lastSeen time.Time
    }
    var (
        mu  sync.Mutex
        //更新map值爲client結構體指針
        clients = make(map[string]*client)
    )

        //啓動goroutine清除長時間沒有訪問服務的值
    go func() {
        for {
            time.Sleep(time.Minute)
            //更新map需要上鎖
            mu.Lock()
           //循環遍歷所有客戶端,如果3分鐘沒有再訪問服務,就清除
            for ip, client := range clients {
                if time.Since(client.lastSeen) > 3*time.Minute {
                    delete(clients, ip)
                }
            }
            mu.Unlock()
        }
    }()

    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        if app.config.limiter.enable {
            ip, _, err := net.SplitHostPort(r.RemoteAddr)
            if err != nil {
                app.serverErrorResponse(w, r, err)
                return
            }

            mu.Lock()

            if _, found := clients[ip]; !found {
                clients[ip] = &client{
               //如果map中不存在的IP鍵,就創建新的client結構體
              limiter: rate.NewLimiter(2 4)}
            }

            if !clients[ip].limiter.Allow() {
                mu.Unlock()
                app.rateLimitExceededResponse(w, r)
                return
            }
            mu.Unlock()
        }
        next.ServeHTTP(w, r)
    })
}

此時,如果您重新啓動服務並再次嘗試快速連續地發出一批請求,應該會發現,從單個客戶端的角度來看,限流器能正確工作,還是像以前一樣返回。

$ for i in {1..6}; do curl http://localhost:4000/v1/healthcheck; done
{
        "status": "available",
        "system_info": {
                "environment": "development",
                "version": "1.0.0"
        }
}
{
        "status": "available",
        "system_info": {
                "environment": "development",
                "version": "1.0.0"
        }
}
{
        "status": "available",
        "system_info": {
                "environment": "development",
                "version": "1.0.0"
        }
}
{
        "status": "available",
        "system_info": {
                "environment": "development",
                "version": "1.0.0"
        }
}
{
        "error": "rate limit exceeded"
}
{
        "error": "rate limit exceeded"
}

附加內容

分佈式應用

使用此模式進行限流只在 API 應用程序運行在單機上時有效。如果您的基礎設施是分佈式的,並且您的應用程序運行在負載均衡器後面的多個服務器上,那麼您將需要使用另一種方法。

如果你使用 HAProxy 或 Nginx 作爲負載均衡器或反向代理,這兩者都有內置的限流功能,使用其自帶限流器是明智的。或者,你可以使用像 Redis 這樣的緩存來維護客戶端的請求計數,運行在一個所有應用服務器都可以訪問的服務器上。

限流器配置

目前,限流器的每秒請求數和突發值被硬編碼到 rateLimit() 中間件中。這當然是可以的,但是如果它們在運行時可配置的話,將會更加靈活。同樣,如果有一種簡單的方法可以完全關閉限流器會非常有用 (當您想要運行基準測試或執行負載測試時,當所有請求可能來自少量 IP 地址時,這將非常有用)。

爲了使這些參數是可配置的,我們回到 cmd/api/main.go 文件,更新配置結構和命令行參數如下所示:

File:cmd/api/main.go

package main

...

// 定義一個配置結構體來保存應用程序的所有配置設置.
//目前,配置是服務器監聽的端口和應用程序的環境名稱 (開發, 預發, 生成等等)
//將從命令行參數中讀取這些配置信息
type config struct {
    port int
    env  string
    db   struct {
        dsn          string
        maxOpenConns int
        maxIdleConns int
        maxIdleTime  string
    }
    limiter struct {
        rps    float64
        burst  int
        enable bool
    }
}

// 修改logger字段,用*jsonlog.Logger代替
type application struct {
    config config
    logger *jsonlog.Logger
    models data.Models
}

func main() {
    // 聲明一個配置結構體實例
    var cfg config

    // 從命令行參數中將port和env讀取到配製結構體實例當中。
    //默認端口使用4000以及環境信息使用開發環境development
    flag.IntVar(&cfg.port, "port", 4000, "API server port")
    flag.StringVar(&cfg.env, "env", "development", "Environment (development|staging|production)")

    flag.StringVar(&cfg.db.dsn, "db-dsn", "postgres://greenlight:pa55word@localhost/greenlight?sslmode=disable", "PostgreSQL DSN")
    flag.IntVar(&cfg.db.maxOpenConns, "db-max-open-conns", 25, "PostgreSQL max open connections")
    flag.IntVar(&cfg.db.maxIdleConns, "db-max-idle-conns", 25, "PostgreSQL max idle connections")
    flag.StringVar(&cfg.db.maxIdleTime, "db-max-idle-time", "15m", "PostgreSQL max connection idle time")

    //限流參數
    flag.Float64Var(&cfg.limiter.rps, "limiter-rps", 2, "Rate limiter maximum request per secod")
    flag.IntVar(&cfg.limiter.burst, "limiter-burst", 4, "Rate limiter maximum burst")
    flag.BoolVar(&cfg.limiter.enable, "limiter-enable", true, "enable")





flag.Parse()

然後更新 rateLimiter() 中間件使用這些配置參數:

package main

...

func (app *application) rateLimit(next http.Handler) http.Handler {
        ...

    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
                //只有配置打開限流功能才起作用
        if app.config.limiter.enable {
            ip, _, err := net.SplitHostPort(r.RemoteAddr)
            if err != nil {
                app.serverErrorResponse(w, r, err)
                return
            }

            mu.Lock()

            if _, found := clients[ip]; !found {
                        //從命令行參數讀取限流器配置
                clients[ip] = &client{
                    limiter: rate.NewLimiter(rate.Limit(app.config.limiter.rps), app.config.limiter.burst),
                }
            }

            if !clients[ip].limiter.Allow() {
                mu.Unlock()
                app.rateLimitExceededResponse(w, r)
                return
            }
            mu.Unlock()
        }
        next.ServeHTTP(w, r)
    })
}

一旦完成以上代碼,讓我們通過運行帶有 - limiter-burst 參數的 API 服務並將 burst 值減少爲 2 來嘗試一下:

$ go run . -limiter-burst=2
{"level":"INFO","time":"2021-12-18T10:40:57Z","message":"database connection pool established"}
{"level":"INFO","time":"2021-12-18T10:40:57Z","message":"starting server","properties":{"addr":":4000","env":"development"}}

如果您再次快速連續地發出一批 6 個請求,您現在應該會發現只有前兩個成功:

$ for i in {1..6}; do curl http://localhost:4000/v1/healthcheck; done
{
        "status": "available",
        "system_info": {
                "environment": "development",
                "version": "1.0.0"
        }
}
{
        "status": "available",
        "system_info": {
                "environment": "development",
                "version": "1.0.0"
        }
}
{
        "error": "rate limit exceeded"
}
{
        "error": "rate limit exceeded"
}
{
        "error": "rate limit exceeded"
}
{
        "error": "rate limit exceeded"
}

類似地,你可以嘗試使用 limiter-enabled=false 來禁用限流器,如下所示:

$ go run . -limiter-enable=false
{"level":"INFO","time":"2021-12-18T10:43:41Z","message":"database connection pool established"}
{"level":"INFO","time":"2021-12-18T10:43:41Z","message":"starting server","properties":{"addr":":4000","env":"development"}}

你會發現現在所有的請求都成功完成了,不管你發了多少請求。

$ for i in {1..6}; do curl http://localhost:4000/v1/healthcheck; done
{
        "status": "available",
        "system_info": {
                "environment": "development",
                "version": "1.0.0"
        }
}
{
        "status": "available",
        "system_info": {
                "environment": "development",
                "version": "1.0.0"
        }
}
{
        "status": "available",
        "system_info": {
                "environment": "development",
                "version": "1.0.0"
        }
}
{
        "status": "available",
        "system_info": {
                "environment": "development",
                "version": "1.0.0"
        }
}
{
        "status": "available",
        "system_info": {
                "environment": "development",
                "version": "1.0.0"
        }
}
{
        "status": "available",
        "system_info": {
                "environment": "development",
                "version": "1.0.0"
        }
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/ZmyRLKtxy9AnuVl5c249QQ