文盤 Rust -- 用 Tokio 實現簡易任務池
Tokio 無疑是 Rust 世界中最優秀的異步 Runtime 實現。非阻塞的特性帶來了優異的性能,但是在實際的開發中我們往往需要在某些情況下阻塞任務來實現某些功能。
我們看看下面的例子
fn main(){
let max_task = 1;
let rt = runtime::Builder::new_multi_thread()
.worker_threads(max_task)
.build()
.unwrap();
rt.block_on(async {
println!("tokio_multi_thread ");
for i in 0..100 {
println!("run {}", i);
tokio::spawn(async move {
println!("spawn {}", i);
thread::sleep(Duration::from_secs(2));
});
}
});
}
我們期待的運行結構是通過異步任務打印出 99 個 “spawn i",但實際輸出的結果大概這樣
tokio_multi_thread
run 0
run 1
run 2
.......
run 16
spawn 0
run 17
......
run 99
spawn 1
spawn 2
......
spawn 29
......
spawn 58
spawn 59
59 執行完後面就沒有輸出了,如果把 max_task 設置爲 2,情況會好一點,但是也沒有執行完所有的異步操作,也就是說在資源不足的情況下,Tokio 會拋棄某些任務,這不符合我們的預期。那麼能不能再達到了某一閥值的情況下阻塞一下,不再給 Tokio 新的任務呢。這有點類似線程池,當達達最大線程數的時候阻塞後面的任務待有釋放的線程後再繼續。
我們看看下面的代碼。
fn main(){
let max_task = 2;
let rt = runtime::Builder::new_multi_thread()
.worker_threads(max_task)
.enable_time()
.build()
.unwrap();
let mut set = JoinSet::new();
rt.block_on(async {
for i in 0..100 {
println!("run {}", i);
while set.len() >= max_task {
set.join_next().await;
}
set.spawn(async move {
sleep().await;
println!("spawn {}", i);
});
}
while set.len() > 0 {
set.join_next().await;
}
});
}
我們使用 JoinSet 來管理派生出來的任務。set.join_next().await; 保證至少一個任務被執行完成。結合 set 的 len,我們可以在任務達到上限時阻塞任務派生。當循環結束,可能還有未完成的任務,所以只要 set.len() 大於 0 就等待任務結束。
輸出大概長這樣
running 1 test
tokio_multi_thread
run 0
run 1
spawn 0
run 2
spawn 1
......
run 31
spawn 30
run 32
spawn 31
run 33
......
run 96
spawn 95
run 97
spawn 96
run 98
spawn 97
run 99
spawn 98
spawn 99
符合預期,代碼不多,有興趣的同學可以動手嘗試一下。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/a6AiehfHp9RjPb0_zv6OiA