在 Rust 中實現 API 限速

在這篇文章中,我們將討論如何在 Rust 中實現 API 限速。當涉及到生產中的服務時,是爲了確保不良行爲者不會濫用 API——這就是 API 限速的作用所在。

我們將實現 “滑動窗口” 算法,通過動態週期來檢查請求歷史,並使用基本的內存 hashmap 來存儲用戶 IP 及其請求時間。我們還將研究如何使用 tower-governor 庫來實現限速。

實現一個簡單的滑動窗口限速器

讓我們從頭開始編寫一個簡單的基於 IP 的滑動窗口限速器。

在項目中加入以下依賴項:

[dependencies]
chrono = { version = "0.4.34"features = ["serde""clock"] }
serde = { version = "1.0.196"features = ["derive"] }

我們將聲明一個新的結構體 RateLimiter,它保存 IP 地址作爲鍵的 HashMap,值爲 Vec<DateTime>(Utc 時區的時間戳)。

use std::sync::{Arc, Mutex};
use std::collections::HashMap;
use std::net::IpAddr;
use chrono::{DateTime, Utc};

// 這是用戶訪問端點的請求限制(每分鐘)
// 如果用戶試圖超過這個限制,返回一個錯誤
const REQUEST_LIMIT: usize = 120;

#[derive(Clone, Default)]
pub struct RateLimiter {
    requests: Arc<Mutex<HashMap<IpAddr, Vec<DateTime<Utc>>>>>,
}

首先,我們想要通過使用.lock() 來鎖定我們的 HashMap,這給了我們寫訪問權限。然後,我們需要檢查 hashmap 是否包含一個鍵,其中包含我們想要使用.entry() 函數檢查的 IP 地址,然後通過保留有效的時間戳來修改它,並根據長度是否在請求限制之下 push 一個新記錄。然後檢查記錄長度是否大於請求限制—如果是,則返回錯誤;如果不是,則返回 Ok(())。

impl RateLimiter {
    fn check_if_rate_limited(&self, ip_addr: IpAddr) -> Result<(), String> {
        // 我們只想保留60秒前的時間戳
        let throttle_time_limit = Utc::now() - std::time::Duration::from_secs(60);

        let mut requests_hashmap = self.requests.lock().unwrap();

        let mut requests_for_ip = requests_hashmap
            // 檢索記錄,並允許我們在適當的地方修改它
            .entry(ip_addr)
            // 如果記錄爲空,則插入帶有當前時間戳的vec
            .or_default();

        requests_for_ip.retain(|x| x.to_utc() > throttle_time_limit);
        requests_for_ip.push(Utc::now());

        if requests_for_ip.len() > REQUEST_LIMIT {
            return Err("IP is rate limited :(".to_string());
        }

        Ok(())
    }
}

測試如下:

fn main() {
    let rate_limiter = RateLimiter::default();

    let localhost_v4 = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));

    // 這裏我們請求120次
    for _ in 1..80 {
        assert!(rate_limiter.check_if_rate_limited(localhost_v4).is_ok())
    }

    // 等待30秒
    std::thread::sleep(std::time::Duration::from_secs(30));

    // 在這裏再做40個請求
    for _ in 1..40 {
        assert!(rate_limiter.check_if_rate_limited(localhost_v4).is_ok())
    }

    // 再等30秒
    std::thread::sleep(std::time::Duration::from_secs(30));

    // 現在我們可以再做80個請求
    for _ in 1..80 {
        assert!(rate_limiter.check_if_rate_limited(localhost_v4).is_ok())
    }
}

然而,生產上的限速系統通常比這先進得多。我們將在下面討論如何利用 tower-governor crate 進行速率限制。

使用 tower-governor 實現限速器

提供速率限制的 tower 服務和層,後端是 governo 庫。tower-governor 很大程度上基於 actix-governor 所做的工作,可以與 Axum, Hyper, Tonic 和其他任何基於 tower 組件庫的框架一起使用!

tower-governor 的特點:

它是如何工作的?

每個調控器中間件都有一個存儲配額的配置,配額指定可以從一個 IP 地址發送多少請求,如果超過配額則阻止進一步請求。

例如,如果配額允許 10 個請求,客戶機可以在中間件開始阻塞之前的短時間內發送 10 個請求。

一旦使用了至少一個配額元素,配額元素將在指定時間後補充。

例如,如果週期爲 2 秒,並且配額爲空,則需要 2 秒來補充配額的一個元素。這意味着可以平均每兩秒鐘發送一個請求。

如果配額允許在同一時間段內發送 10 個請求,則客戶端可以再次發送 10 個請求的突發事件,然後必須等待 2 秒才能發送進一步的請求,或者在完整配額被補充之前等待 20 秒,他可以發送另一個突發事件。

下面我們來實現一個例子。

首先,在項目中加入以下依賴項:

[dependencies]
tokio = {version = "1.36.0"features = ["full"] }
tower_governor = "0.3.2"
axum = "0.7"
tracing = {version="0.1.37"features=["attributes"]}
tracing-subscriber = "0.3"
tower = "0.4.13"

然後,在 main.rs 文件中寫入以下代碼:

use axum::{routing::get, Router};
use std::net::SocketAddr;
use std::time::Duration;
use tokio::net::TcpListener;
use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer};

async fn hello() -> &'static str {
   "Hello world"
}

#[tokio::main]
async fn main() {
   // 配置跟蹤
   // 構造一個訂閱者,將格式化的跟蹤信息打印到標準輸出
   let subscriber = tracing_subscriber::FmtSubscriber::new();
   // 使用subscriber處理在此點之後發出的跟蹤
   tracing::subscriber::set_global_default(subscriber).unwrap();

   // 允許每個IP地址最多有五個請求,每兩秒鐘補充一個
   // 我們將其裝箱是因爲Axum 0.6要求所有層都是克隆的,因此我們需要一個靜態引用
   let governor_conf = Box::new(
       GovernorConfigBuilder::default()
           .per_second(2)
           .burst_size(5)
           .finish()
           .unwrap(),
   );

   let governor_limiter = governor_conf.limiter().clone();
   let interval = Duration::from_secs(60);
   // 一個單獨的後臺任務
   std::thread::spawn(move || {
       loop {
           std::thread::sleep(interval);
           tracing::info!("rate limiting storage size: {}", governor_limiter.len());
           governor_limiter.retain_recent();
       }
   });

   // 構建路由
   let app = Router::new()
       // `GET /` 
       .route("/", get(hello))
       .layer(GovernorLayer {
           // 我們可以泄漏它,因爲它是一次性創建的
           config: Box::leak(governor_conf),
       });

   let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
   tracing::debug!("listening on {}", addr);
   let listener = TcpListener::bind(addr).await.unwrap();
   axum::serve(listener, app.into_make_service_with_connect_info::<SocketAddr>())
       .await
       .unwrap();
}

運行服務器後,在瀏覽器中刷新請求,2 秒內超過 5 次會提示以下錯誤:

服務器後臺跟蹤日誌如下:

INFO tower_governor: rate limiting storage size: 0
INFO tower_governor: rate limiting storage size: 0
INFO tower_governor: rate limiting storage size: 1
INFO tower_governor: rate limiting storage size: 1
INFO tower_governor: rate limiting storage size: 1
INFO tower_governor: rate limiting storage size: 0
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/KByb_hGTcCBcXpydo3F0cQ