淺析 rust 大明星 Tokio

Tokio 可以說是 rust 中最熱門的庫,對於異步與併發進行了很好的支持。大多數基於 rust 的開源框架都使用到了 Tokio,因此在介紹這些實現開源框架時經常會被問到:底層的異步和併發是怎麼實現的?我只能回答:底層的異步和併發都是由 Tokio 控制的。這顯然不是一個令人滿意的回答。因此本文章將對於 Tokio 的基本方法和底層邏輯進行分析。

概述

一句話概括

Tokio 可以理解成一個 “任務池” 和一個“調度器”,負責把所有在任務池中的任務調度運行起來。

更具體一點,Tokio 可以類比爲一個 “異步操作系統”:

Tokio 的優勢主要體現在以下方面:

通過內部的優化機制(調度算法、無鎖隊列與內存池管理等)與 rust 的語言優勢,Tokio 效率較高,在早期的實驗中,官方給出了性能對比圖:

在 rust 發展之初,社區出現了很多運行時庫,但是,大浪淘沙,隨着時間的流逝,Tokio 越來越亮眼,無論是性能、功能還是社區、文檔,它在各個方面都異常優秀,時至今日,可以說已成爲事實上的標準。新出現的 rust 運行時庫(例如 Bytedace 的 monoio)宣傳性能優於 Tokio,但還是雷聲大雨點小,沒有被廣泛應用。

Tokio 的身影遍佈在各種類型的 rust 庫中,例如 HTTP 庫(Hyper)、Web 框架(Axum / Warp)、gRPC(Tonic)、TLS 庫(Rustls)、數據庫支持(SeaORM)等。同時各大廠商也廣泛使用,例如 AWS、Azure、Google 等。

Tokio 更加適合頻繁切換的場景,例如網絡服務、微服務、代理、數據庫連接池、實時通信系統等。而不適合並行計算或密集計算等場景。

因此,如果需要使用 rust 中的高性能異步併發,且對於 Tokio 內部工作原理不敏感,看到這裏放心使用就好了。後文將從 rust 語言的異步來着手,分析 Tokio 的架構以及具體調度的生命週期,最後分析與 Nginx 的對比以未來的一些方向。

rust 語言的異步

對 Future/async/await 非常熟悉可以跳過。

Golang/Nodejs 等語言的異步內置於語言本身,做了很好的封裝且開箱即用,雖然能夠簡化使用但不靈活無法更改。rust 作爲系統級語言,並不想把異步的具體實現單一化與侷限化,因此在 rust std 中只實現了異步的基本功能與框架(例如 Future/async/await 等),而把異步調度進行了開放,由第三方庫來具體實現。

Future

Future 是 Rust 異步編程的核心抽象,它是一個狀態機,通過多次 poll 推進其狀態,直到完成。它與 Waker、Runtime、I/O 驅動緊密配合,構成了整個非阻塞異步系統的基礎。

Future 其實就是一個 trait,定義如下:

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

Future 與 任務(task)的區別: Future 是 rust 原生支持的異步 trait,許多第三方異步庫在此基礎上將 Future 封裝爲 task 用來完成調度。

async

通常不會用上述的 trait 來創建 Future,而是結合使用 async,編譯器會將被 async 修飾的函數或代碼塊轉化爲 Future。也就是說調用 async fn 的具體函數並不會立刻執行,而只是創建 Future 等待 poll 來推動狀態機。

// async fn
async fn fetch_data() -> Result<String, Error> {
    let resp = reqwest::get("https://example.com").await?;
    Ok(resp.text().await?)
}
// async 代碼塊
let future = async {
    // 異步邏輯
    let data = expensive_computation().await;
    format!("Result: {}", data)
};

await

上述 Future trait 中,poll 是核心方法,用於推進狀態機的進行。我們的代碼不會直接調用 poll,而是通過 Rust 的關鍵字 .await 來執行這個 Future,await 會被 rust 在編譯時生成代碼來調用 poll,返回 Poll(見下),如果是 Pending 則被 runtime 掛起(比如重新放到任務隊列中)。當有 event 產生時,掛起的 future 會被喚醒,Rust 會再次調用 future 的 poll,如果此時返回 Ready 就執行完成。

pub enumPoll<T> {
    Ready(T),
    Pending,
}

多級 Future 嵌套時,只有遇到類似 .await 纔會推動執行,是協同式調度而不是搶佔式調度(Tokio 1.x 版本引入搶佔機制來緩解飢餓問題,但 rust 原生基礎是協同式調度)。因此 rust 無需提前爲 Future 分配獨立的棧或堆上內存,是一種零成本抽象。

總結

如下圖所示,rust std 中的異步只維護 Future 以及內部方法 poll,具體的任務隊列和調度方法由第三方的 runtime 來實現。每次代碼執行到 .await 時會進行一次 poll,poll 若 ready 則直接退出表示執行完成,poll 若遇到阻塞,則掛起等待事件池來喚醒。當有事件(例如 I/O 等)喚醒之後,會把該掛起 Future 封裝爲 task,加入到任務隊列中等待調度,runtime 會不斷地從任務隊列中拿出任務來執行。

Tokio 架構與構造過程

架構

承接上文,這一部分主要介紹 Tokio 實現的 runtime 架構,如下圖所示:

構造(build)

Tokio 中的 Runtime 結構體如下:

pub structRuntime {
    /// Task scheduler
    scheduler: Scheduler,
    /// Handle to runtime, also contains driver handles
    handle: Handle,
    /// Blocking pool handle, used to signal shutdown
    blocking_pool: BlockingPool,
}

blocking 線程 和 worker 線程:worker 線程是我們要重點關注的運行時輕量級線程,負責調度和任務執行;blocking 線程是在這個過程中的所有的阻塞任務,其數量等於所有的 worker 線程數量 + 其他控制線程數量,原因是 worker 線程本身就是一個 blocking 任務,其他控制線程又包括信號與通道等。

其中 BlockingPool 是專門用來運行阻塞任務的線程池,上述解釋已簡單概括;Handle 維護了過程中各種 handler,本文不重點關注這兩項。Scheduler 是 “任務池” 和“調度器”的封裝,也是 Runtime 最核心的部分。

想要使用 Runtime 必須要經過初始化:

tokio::runtime::Builder::new_multi_thread()
                .enable_all().worker_threads(threads).thread_name(name)
                .build().unwrap(),

build() 即構造了 Runtime 結構,其中最重要的是 Driver 和 Worker 。

Driver 構造的過程

Driver 封裝了 I/O 和 Timer 的驅動,並加入了部分機制(例如內存 slab),下面以 I/O Driver 爲例,詳細說明:

pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle){
    let poll = mio::Poll::new()?;
    #[cfg(not(target_os = "wasi"))]
    let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
    let registry = poll.registry().try_clone()?;
    let driver = Driver {
        signal_ready: false,
        events: mio::Events::with_capacity(nevents),
        poll,
    };
    let (registrations, synced) = RegistrationSet::new();
    let handle = Handle {
        registry,
        registrations,
        synced: Mutex::new(synced),
        #[cfg(not(target_os = "wasi"))]
        waker,
        metrics: IoDriverMetrics::default(),
    };
    Ok((driver, handle))
}

worker 構造的過程

pub(super) fn create(
    size: usize,
    park: Parker,
    driver_handle: driver::Handle,
    blocking_spawner: blocking::Spawner,
    seed_generator: RngSeedGenerator,
    config: Config,
) -> (Arc<Handle>, Launch) {
    let mut cores = Vec::with_capacity(size);
    let mut remotes = Vec::with_capacity(size);
    let mut worker_metrics = Vec::with_capacity(size);
    // Create the local queues
    for _ in 0..size {
        let (steal, run_queue) = queue::local();
        let park = park.clone();
        let unpark = park.unpark();
        let metrics = WorkerMetrics::from_config(&config);
        let stats = Stats::new(&metrics);
        cores.push(Box::new(Core {
            tick: 0,
            lifo_slot: None,
            lifo_enabled: !config.disable_lifo_slot,
            run_queue,
            is_searching: false,
            is_shutdown: false,
            is_traced: false,
            park: Some(park),
            global_queue_interval: stats.tuned_global_queue_interval(&config),
            stats,
            rand: FastRand::from_seed(config.seed_generator.next_seed()),
        }));
        remotes.push(Remote { steal, unpark });
        worker_metrics.push(metrics);
    }
    let (idle, idle_synced) = Idle::new(size);
    let (inject, inject_synced) = inject::Shared::new();
    let remotes_len = remotes.len();
    let handle = Arc::new(Handle {
        task_hooks: TaskHooks::from_config(&config),
        shared: Shared {
            remotes: remotes.into_boxed_slice(),
            inject,
            idle,
            owned: OwnedTasks::new(size),
            synced: Mutex::new(Synced {
                idle: idle_synced,
                inject: inject_synced,
            }),
            shutdown_cores: Mutex::new(vec![]),
            trace_status: TraceStatus::new(remotes_len),
            config,
            scheduler_metrics: SchedulerMetrics::new(),
            worker_metrics: worker_metrics.into_boxed_slice(),
            _counters: Counters,
        },
        driver: driver_handle,
        blocking_spawner,
        seed_generator,
    });
    let mut launch = Launch(vec![]);
    for (index, core) in cores.drain(..).enumerate() {
        launch.0.push(Arc::new(Worker {
            handle: handle.clone(),
            index,
            core: AtomicCell::new(Some(core)),
        }));
    }
    (handle, launch)
}

爲 Tokio 多線程運行時創建一組 Worker 線程,每個 Worker 都綁定了一個本地任務隊列(Local Queue)、I/O 和定時器驅動(Driver),並準備好參與異步任務調度。其中:

Tokio task 生命週期

構造出基本的 runtime 架構後,就等待有任務被加入到 runtime 中被調度與執行,這一部分詳細說明任務從被構造到執行完成的流程。

如上圖所示,Worker 在創建後執行調度循環,不斷地從任務隊列中取任務,執行 poll,若結果爲阻塞則註冊 waker 並掛起,之後取新任務 poll。當 Driver 有新事件時會調用 waker 來喚醒任務,重新加入到任務隊列中待新一輪調用。

任務隊列

最終,任務隊列的執行順序是:本地 LIFO slot --->> 本地任務隊列 --->> 全局任務隊列 --->> 竊取其他線程的任務。

任務飢餓問題

飢餓問題有以下兩種場景:

  1. 某任務是密集計算型任務,不斷佔據 cpu 而不釋放;

  2. 本地任務更新太快太頻繁,全局任務無法被執行到;

第一種場景,Tokio 從 1.x 開始,引入了搶佔式調度來緩解餓死問題,簡單來說就是會定期強制任務掛起來讓出資源。但是需要說明的是,這種場景本質上和 rust 的異步併發衝突,更加推薦使用 tokio::task::spawn_blocking,來將任務轉化爲並行計算任務。

第二種場景經常會遇到,本質上是由於三種不同的隊列有優先級,可能會導致低優先級的隊列被餓死。例如 I/O 頻繁的 TCP 連接會不斷地加入到本地隊列,而無法處理全局隊列任務。Tokio 的解決的方法是爲每個任務加入循環次數,當其循環加入隊列次數超過一定上限後會先擱置,優先處理低優先級(例如全局隊列)的任務。

總之,Tokio 設計了相關的機制來平衡公平性和效率,同時還有一些其他算法或異步 runtime 優化了部分過程,取得了更好的效果(例如 horaedb(Apache、Ant)、Monoio(bytedance))。雖然並非完美,但是各方都在努力完善公平性和性能,這或許就是 rust 設計開放 runtime 的初衷。

其他

與 Nginx 的調度對比

Nginx 採用 多 worker 進程單線程 + 非阻塞 I/O + 事件驅動 的模型。

兩者的對比如下:

綁核問題

Tokio 本身並不支持綁 CPU 核,要實現綁核有以下三種方法:

taskset -c [CPU NUMBER] -p PID
docker run --cpuset-cpus [CPU NUMBER]
runtime::Builder::new_multi_thread()
.on_thread_start(move || {
    core_affinity::set_for_current(core_id.clone());
})

Tokio 未來的一些方向

io_uring是 Linux(>5.1) 上新一代的高性能異步 I/O 框架,其主要針對 epoll 進行了性能和功能上的提升。其設計了一種用戶態和內核的環形緩衝區,解決線程競爭,實現無鎖設計。減少了系統調用的次數,實現零拷貝。支持批量操作。

當前 Tokio 官方還沒有全面支持 io_uring,但是在社區中已經出現了不少支持:例如 tokio-uring,其基本思想是在 IO Driver 中的 mio 進行封裝,註冊在 Tokio(mio) 上一個 uring fd,而基於這個 fd 和一套自己的 Pending Op 管理系統又對外作爲 Reactor 暴露了事件源的能力。

首先需要明確,若想使用 IO 或 sleep 等操作,必須要使用 Tokio::net::TcpStream 來進行 Tokio 的封裝,主要原因是需要將 IO 或 Timer 事件註冊到 Reactor 上,因此協議的支持也是非常重要的部分。當前 Tokio 支持常見的 TCP、UDP 等協議,同時也在積極探索新協議以及更好的適配性,例如:更好的 TLS 支持(如整合 rustls);支持 HTTP/3、QUIC 等協議棧(底層支持);對 IPV6 多路複用的增強支持。

Tokio 正在推動對測試工具的支持,例如:提供 mock I/O 接口;支持 determinism 測試;提供 tracing、instrumentation 集成,便於調試異步程序。相關的社區實踐包括 tokio-tesing 等。

前文也說過,內部的調度算法決定了不同場景下的效果,當前已經有許多 runtime 庫提出了新的算法來優化某些過程,Tokio 也在積極探索和演進。

Tokio 也在積極探索在不同平臺的支持,例如嵌入式平臺。同時也在開發 WASM 支持,可以在 WASM 中運行 Tokio。

聲明:Tokio 更新頻繁,本文主要針對 1.44.1 版本分析。

參考資料:

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/c8_hIE2c-0l9bT0I73kjcQ