淺析 rust 大明星 Tokio
Tokio 可以說是 rust 中最熱門的庫,對於異步與併發進行了很好的支持。大多數基於 rust 的開源框架都使用到了 Tokio,因此在介紹這些實現開源框架時經常會被問到:底層的異步和併發是怎麼實現的?我只能回答:底層的異步和併發都是由 Tokio 控制的。這顯然不是一個令人滿意的回答。因此本文章將對於 Tokio 的基本方法和底層邏輯進行分析。
概述
一句話概括
Tokio 可以理解成一個 “任務池” 和一個“調度器”,負責把所有在任務池中的任務調度運行起來。
更具體一點,Tokio 可以類比爲一個 “異步操作系統”:
Tokio 的優勢主要體現在以下方面:
- 高效
通過內部的優化機制(調度算法、無鎖隊列與內存池管理等)與 rust 的語言優勢,Tokio 效率較高,在早期的實驗中,官方給出了性能對比圖:
- 通用
在 rust 發展之初,社區出現了很多運行時庫,但是,大浪淘沙,隨着時間的流逝,Tokio 越來越亮眼,無論是性能、功能還是社區、文檔,它在各個方面都異常優秀,時至今日,可以說已成爲事實上的標準。新出現的 rust 運行時庫(例如 Bytedace 的 monoio)宣傳性能優於 Tokio,但還是雷聲大雨點小,沒有被廣泛應用。
Tokio 的身影遍佈在各種類型的 rust 庫中,例如 HTTP 庫(Hyper)、Web 框架(Axum / Warp)、gRPC(Tonic)、TLS 庫(Rustls)、數據庫支持(SeaORM)等。同時各大廠商也廣泛使用,例如 AWS、Azure、Google 等。
Tokio 更加適合頻繁切換的場景,例如網絡服務、微服務、代理、數據庫連接池、實時通信系統等。而不適合並行計算或密集計算等場景。
因此,如果需要使用 rust 中的高性能異步併發,且對於 Tokio 內部工作原理不敏感,看到這裏放心使用就好了。後文將從 rust 語言的異步來着手,分析 Tokio 的架構以及具體調度的生命週期,最後分析與 Nginx 的對比以未來的一些方向。
rust 語言的異步
對 Future/async/await 非常熟悉可以跳過。
Golang/Nodejs 等語言的異步內置於語言本身,做了很好的封裝且開箱即用,雖然能夠簡化使用但不靈活無法更改。rust 作爲系統級語言,並不想把異步的具體實現單一化與侷限化,因此在 rust std 中只實現了異步的基本功能與框架(例如 Future/async/await 等),而把異步調度進行了開放,由第三方庫來具體實現。
Future
Future 是 Rust 異步編程的核心抽象,它是一個狀態機,通過多次 poll 推進其狀態,直到完成。它與 Waker、Runtime、I/O 驅動緊密配合,構成了整個非阻塞異步系統的基礎。
Future 其實就是一個 trait,定義如下:
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
Future 與 任務(task)的區別: Future 是 rust 原生支持的異步 trait,許多第三方異步庫在此基礎上將 Future 封裝爲 task 用來完成調度。
async
通常不會用上述的 trait 來創建 Future,而是結合使用 async,編譯器會將被 async 修飾的函數或代碼塊轉化爲 Future。也就是說調用 async fn 的具體函數並不會立刻執行,而只是創建 Future 等待 poll 來推動狀態機。
// async fn
async fn fetch_data() -> Result<String, Error> {
let resp = reqwest::get("https://example.com").await?;
Ok(resp.text().await?)
}
// async 代碼塊
let future = async {
// 異步邏輯
let data = expensive_computation().await;
format!("Result: {}", data)
};
await
上述 Future trait 中,poll 是核心方法,用於推進狀態機的進行。我們的代碼不會直接調用 poll,而是通過 Rust 的關鍵字 .await 來執行這個 Future,await 會被 rust 在編譯時生成代碼來調用 poll,返回 Poll(見下),如果是 Pending 則被 runtime 掛起(比如重新放到任務隊列中)。當有 event 產生時,掛起的 future 會被喚醒,Rust 會再次調用 future 的 poll,如果此時返回 Ready 就執行完成。
pub enumPoll<T> {
Ready(T),
Pending,
}
多級 Future 嵌套時,只有遇到類似 .await 纔會推動執行,是協同式調度而不是搶佔式調度(Tokio 1.x 版本引入搶佔機制來緩解飢餓問題,但 rust 原生基礎是協同式調度)。因此 rust 無需提前爲 Future 分配獨立的棧或堆上內存,是一種零成本抽象。
總結
如下圖所示,rust std 中的異步只維護 Future 以及內部方法 poll,具體的任務隊列和調度方法由第三方的 runtime 來實現。每次代碼執行到 .await 時會進行一次 poll,poll 若 ready 則直接退出表示執行完成,poll 若遇到阻塞,則掛起等待事件池來喚醒。當有事件(例如 I/O 等)喚醒之後,會把該掛起 Future 封裝爲 task,加入到任務隊列中等待調度,runtime 會不斷地從任務隊列中拿出任務來執行。
Tokio 架構與構造過程
架構
承接上文,這一部分主要介紹 Tokio 實現的 runtime 架構,如下圖所示:
-
Tokio 會啓動很多 Worker,每個 Worker 對應一個線程,併發完成異步任務。Worker 內部又主要包括 任務隊列 和 Driver 引用。
-
Driver 是異步運行時中的 “驅動引擎”,它通過操作系統提供的 I/O 多路複用機制(如 Linux 上的
epoll、Windows 上的IOCP、macOS 上的kqueue)來監聽 I/O 事件和定時器事件,並在事件就緒後喚醒對應的 Future 繼續執行。Tokio 中的 Driver 又分爲 I/O 和 Time,I/O 負責監聽 socket 和文件讀寫,Time 負責 sleep 等定時任務。 -
Tokio 將 Future 封裝爲任務,其中主要包含了 waker,waker 掛載到 Driver 中,當 Driver 有新事件時會回調 waker 來停止阻塞,並加入到任務隊列中。
-
任務會加入到任務隊列中,任務隊列又分爲每個 Worker 內部的,以及全局的隊列。通常在 Worker 創建的任務即加入到內部隊列,全局創造的加入到全局隊列,全局隊列的任務會被任何 Worker 線程執行到。
構造(build)
Tokio 中的 Runtime 結構體如下:
pub structRuntime {
/// Task scheduler
scheduler: Scheduler,
/// Handle to runtime, also contains driver handles
handle: Handle,
/// Blocking pool handle, used to signal shutdown
blocking_pool: BlockingPool,
}
blocking 線程 和 worker 線程:worker 線程是我們要重點關注的運行時輕量級線程,負責調度和任務執行;blocking 線程是在這個過程中的所有的阻塞任務,其數量等於所有的 worker 線程數量 + 其他控制線程數量,原因是 worker 線程本身就是一個 blocking 任務,其他控制線程又包括信號與通道等。
其中 BlockingPool 是專門用來運行阻塞任務的線程池,上述解釋已簡單概括;Handle 維護了過程中各種 handler,本文不重點關注這兩項。Scheduler 是 “任務池” 和“調度器”的封裝,也是 Runtime 最核心的部分。
想要使用 Runtime 必須要經過初始化:
tokio::runtime::Builder::new_multi_thread()
.enable_all().worker_threads(threads).thread_name(name)
.build().unwrap(),
build() 即構造了 Runtime 結構,其中最重要的是 Driver 和 Worker 。
Driver 構造的過程
Driver 封裝了 I/O 和 Timer 的驅動,並加入了部分機制(例如內存 slab),下面以 I/O Driver 爲例,詳細說明:
pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> {
let poll = mio::Poll::new()?;
#[cfg(not(target_os = "wasi"))]
let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
let registry = poll.registry().try_clone()?;
let driver = Driver {
signal_ready: false,
events: mio::Events::with_capacity(nevents),
poll,
};
let (registrations, synced) = RegistrationSet::new();
let handle = Handle {
registry,
registrations,
synced: Mutex::new(synced),
#[cfg(not(target_os = "wasi"))]
waker,
metrics: IoDriverMetrics::default(),
};
Ok((driver, handle))
}
-
初始化 mio 的 poll,底層就是 epoll/kqueue 對象。
-
初始化 waker,其是向 poll 註冊一個特殊的事件
TOKEN_WAKEUP -
初始化 driver 後同時創建 hadle,可以被線程共享,用於傳入 worker 中。
worker 構造的過程
pub(super) fn create(
size: usize,
park: Parker,
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
config: Config,
) -> (Arc<Handle>, Launch) {
let mut cores = Vec::with_capacity(size);
let mut remotes = Vec::with_capacity(size);
let mut worker_metrics = Vec::with_capacity(size);
// Create the local queues
for _ in 0..size {
let (steal, run_queue) = queue::local();
let park = park.clone();
let unpark = park.unpark();
let metrics = WorkerMetrics::from_config(&config);
let stats = Stats::new(&metrics);
cores.push(Box::new(Core {
tick: 0,
lifo_slot: None,
lifo_enabled: !config.disable_lifo_slot,
run_queue,
is_searching: false,
is_shutdown: false,
is_traced: false,
park: Some(park),
global_queue_interval: stats.tuned_global_queue_interval(&config),
stats,
rand: FastRand::from_seed(config.seed_generator.next_seed()),
}));
remotes.push(Remote { steal, unpark });
worker_metrics.push(metrics);
}
let (idle, idle_synced) = Idle::new(size);
let (inject, inject_synced) = inject::Shared::new();
let remotes_len = remotes.len();
let handle = Arc::new(Handle {
task_hooks: TaskHooks::from_config(&config),
shared: Shared {
remotes: remotes.into_boxed_slice(),
inject,
idle,
owned: OwnedTasks::new(size),
synced: Mutex::new(Synced {
idle: idle_synced,
inject: inject_synced,
}),
shutdown_cores: Mutex::new(vec![]),
trace_status: TraceStatus::new(remotes_len),
config,
scheduler_metrics: SchedulerMetrics::new(),
worker_metrics: worker_metrics.into_boxed_slice(),
_counters: Counters,
},
driver: driver_handle,
blocking_spawner,
seed_generator,
});
let mut launch = Launch(vec![]);
for (index, core) in cores.drain(..).enumerate() {
launch.0.push(Arc::new(Worker {
handle: handle.clone(),
index,
core: AtomicCell::new(Some(core)),
}));
}
(handle, launch)
}
爲 Tokio 多線程運行時創建一組 Worker 線程,每個 Worker 都綁定了一個本地任務隊列(Local Queue)、I/O 和定時器驅動(Driver),並準備好參與異步任務調度。其中:
-
創建本地任務隊列和任務竊取隊列。本地任務隊列是一個無鎖雙端隊列,可以用於其他線程的竊取,有關任務竊取在後文詳細介紹。
-
也創建了 Handle,該 Handle 可以用來提交新任務、同步執行 Future 和提交阻塞任務等。
-
(隱式)通過參數傳入 Driver,在線程中即可以通過 Driver 和 Driver Handle 完成 Wake 掛載、I/O 事件、推進定時器等。
-
每個 Worker 會被傳入到 Launch,啓動調度循環(無限循環從隊列中拿出任務 poll)。
Tokio task 生命週期
構造出基本的 runtime 架構後,就等待有任務被加入到 runtime 中被調度與執行,這一部分詳細說明任務從被構造到執行完成的流程。
如上圖所示,Worker 在創建後執行調度循環,不斷地從任務隊列中取任務,執行 poll,若結果爲阻塞則註冊 waker 並掛起,之後取新任務 poll。當 Driver 有新事件時會調用 waker 來喚醒任務,重新加入到任務隊列中待新一輪調用。
任務隊列
-
全局隊列(global queue) 有且僅有一個,全局隊列是必要的,因爲有些任務不屬於某個 worker,只能加到全局隊列中。全局隊列是 FIFO 的設計架構。
-
只有全局隊列就會面臨所有 worker 都從全局隊列拿任務,需要加鎖影響性能。因此每個 worker 一個維護本地隊列(local run queue),在 worker 內產生的任務會直接加到自己的本地隊列中。
-
本地隊列也是 FIFO 的,保證了公平性,但是不同任務之間的資源會被割裂,調度會浪費 CPU 緩存,因此又多增了 LIFO slot。LIFO slot 可以理解爲本地隊列中的 “特權” 任務,例如某些任務 spawn 出新的任務,且該新任務對舊任務有大量的資源依賴,此時可以將新任務加入到 LIFO slot 中;或者某些任務通過信號通知其他任務可以被執行,此時該任務可以被加入到 LIFO slot 中,避免排隊帶來的延遲。由於 LIFO slot 會帶來不公平的問題,因此只有一個槽位,被佔就只能到本地隊列的末尾排隊。
-
任務竊取:當本地和全局隊列中都沒有要執行的任務時(此時該 worker 已經空閒),會嘗試竊取其他 worker 的本地隊列的任務。這一機制保障了線程的負載均衡,同時提高系統整體的運行效率。竊取時會從末端開始,因此不會干擾 worker 的正常進行,該隊列也無需加鎖。
最終,任務隊列的執行順序是:本地 LIFO slot --->> 本地任務隊列 --->> 全局任務隊列 --->> 竊取其他線程的任務。
任務飢餓問題
飢餓問題有以下兩種場景:
-
某任務是密集計算型任務,不斷佔據 cpu 而不釋放;
-
本地任務更新太快太頻繁,全局任務無法被執行到;
第一種場景,Tokio 從 1.x 開始,引入了搶佔式調度來緩解餓死問題,簡單來說就是會定期強制任務掛起來讓出資源。但是需要說明的是,這種場景本質上和 rust 的異步併發衝突,更加推薦使用 tokio::task::spawn_blocking,來將任務轉化爲並行計算任務。
第二種場景經常會遇到,本質上是由於三種不同的隊列有優先級,可能會導致低優先級的隊列被餓死。例如 I/O 頻繁的 TCP 連接會不斷地加入到本地隊列,而無法處理全局隊列任務。Tokio 的解決的方法是爲每個任務加入循環次數,當其循環加入隊列次數超過一定上限後會先擱置,優先處理低優先級(例如全局隊列)的任務。
總之,Tokio 設計了相關的機制來平衡公平性和效率,同時還有一些其他算法或異步 runtime 優化了部分過程,取得了更好的效果(例如 horaedb(Apache、Ant)、Monoio(bytedance))。雖然並非完美,但是各方都在努力完善公平性和性能,這或許就是 rust 設計開放 runtime 的初衷。
其他
與 Nginx 的調度對比
Nginx 採用 多 worker 進程單線程 + 非阻塞 I/O + 事件驅動 的模型。
-
多進程單線程:最開始時,Nginx 會創建多個 worker 進程,每個進程創建後即可以視爲是完全獨立的存在,後續的任務執行和資源佔用互不干預。所有的 worker 進程會統一綁定監聽某個或某些端口,當某端口有數據時,操作系統會分配給某個 worker。(分配方法又分爲 SO_REUSEPORT 和 accept_mutex 機制,這裏的詳細機制後續會出一篇文章來詳細說明,簡單來說就是 SO_REUSEPORT 減少了驚羣效應,去除了鎖,提高了 CPU 負載均衡,但是會增加時延不均衡性)。
-
事件驅動:當某個 worker 接收到數據後,會將其註冊到 epoll 事件中,並在相應的階段調用相應的回調函數,主進程(即線程)會輪詢執行這些事件回調。主進程是串行執行,無搶佔式調度和鎖的開銷。
-
非阻塞 I/O:系統採用非阻塞 I/O 與多路複用機制(即 epoll)來完成高併發處理。
兩者的對比如下:
-
在性能方面:Nginx 更有利於處理高併發 但是 簡單或靜態請求,獨立進程 + 事件驅動可以輕鬆處理高併發,但是進程間隔離不利於複雜處理;Tokio 更有利於處理複雜場景,且開發靈活,能夠支持許多新協議開發。
-
在開發方面:Nginx 基於回調函數開發,同一過程可能需要拆成多個同步函數,面向狀態編程較爲混亂容易遺忘細節,門檻較高;而 Tokio 可以基於過程開發,開發邏輯簡單,門檻較低。
綁核問題
Tokio 本身並不支持綁 CPU 核,要實現綁核有以下三種方法:
- 直接給進程綁核。不同於 Nginx 的多進程程序,Tokio 本身是單進程多線程程序,因此可以直接在系統層面給進程綁核。
taskset -c [CPU NUMBER] -p PID
- Docker 綁核。類似於上述方法,若 Tokio 運行於 docker 上可以直接 docker 綁核。
docker run --cpuset-cpus [CPU NUMBER]
- core_affinity_rs 第三方 rust 庫。該庫支持線程級別設置 cpu 核,可以在創建 tokio::runtime 時進行綁核設置
runtime::Builder::new_multi_thread()
.on_thread_start(move || {
core_affinity::set_for_current(core_id.clone());
})
Tokio 未來的一些方向
- IO Uring
io_uring是 Linux(>5.1) 上新一代的高性能異步 I/O 框架,其主要針對 epoll 進行了性能和功能上的提升。其設計了一種用戶態和內核的環形緩衝區,解決線程競爭,實現無鎖設計。減少了系統調用的次數,實現零拷貝。支持批量操作。
當前 Tokio 官方還沒有全面支持 io_uring,但是在社區中已經出現了不少支持:例如 tokio-uring,其基本思想是在 IO Driver 中的 mio 進行封裝,註冊在 Tokio(mio) 上一個 uring fd,而基於這個 fd 和一套自己的 Pending Op 管理系統又對外作爲 Reactor 暴露了事件源的能力。
- 擴展協議棧
首先需要明確,若想使用 IO 或 sleep 等操作,必須要使用 Tokio::net::TcpStream 來進行 Tokio 的封裝,主要原因是需要將 IO 或 Timer 事件註冊到 Reactor 上,因此協議的支持也是非常重要的部分。當前 Tokio 支持常見的 TCP、UDP 等協議,同時也在積極探索新協議以及更好的適配性,例如:更好的 TLS 支持(如整合 rustls);支持 HTTP/3、QUIC 等協議棧(底層支持);對 IPV6 多路複用的增強支持。
- 日誌與調試支持
Tokio 正在推動對測試工具的支持,例如:提供 mock I/O 接口;支持 determinism 測試;提供 tracing、instrumentation 集成,便於調試異步程序。相關的社區實踐包括 tokio-tesing 等。
- 內部調度算法更新
前文也說過,內部的調度算法決定了不同場景下的效果,當前已經有許多 runtime 庫提出了新的算法來優化某些過程,Tokio 也在積極探索和演進。
- 不同平臺、形式支持
Tokio 也在積極探索在不同平臺的支持,例如嵌入式平臺。同時也在開發 WASM 支持,可以在 WASM 中運行 Tokio。
聲明:Tokio 更新頻繁,本文主要針對 1.44.1 版本分析。
參考資料:
-
Tokio 源碼:https://github.com/tokio-rs/tokio
-
Tokio 官方文檔:https://tokio.rs/tokio/tutorial
-
字節 monoio:https://rustmagazine.github.io/rust_magazine_2021/chapter_12/monoio.html
-
apache-haoraedb:https://github.com/apache/horaedb
-
https://tony612.github.io/tokio-internals/01_intro_async.html
-
https://tidb.net/blog/18804515
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/c8_hIE2c-0l9bT0I73kjcQ