Rust 併發編程 - async-await 異步編程
異步編程綜述
異步編程是一種併發編程模型,通過在任務執行期間不阻塞線程的方式,提高系統的併發能力和響應性。相比於傳統的同步編程,異步編程可以更好地處理 I/O 密集型任務和併發請求,提高系統的吞吐量和性能。
異步編程具有以下優勢:
-
提高系統的併發能力和響應速度
-
減少線程等待時間,提高資源利用率
-
可以處理大量的併發請求或任務
-
支持高效的事件驅動編程風格
異步編程廣泛應用於以下場景:
-
網絡編程:處理大量的併發網絡請求
-
I/O 密集型任務:如文件操作、數據庫訪問等
-
用戶界面和圖形渲染:保持用戶界面的流暢響應
-
並行計算:加速複雜計算任務的執行
Rust 中的異步編程模型
Rust 作爲一門現代的系統級編程語言,旨在提供高效、安全和可靠的異步編程能力。Rust 異步編程的目標是實現高性能、無安全漏洞的異步應用程序,同時提供簡潔的語法和豐富的異步庫。
最值得一讀的是 Rust 官方的 Rust 異步編程書 [1]
中文版: Rust 異步編程指南 [2]
由於併發編程在現代社會非常重要,因此每個主流語言都對自己的併發模型進行過權衡取捨和精心設計,Rust 語言也不例外。下面的列表可以幫助大家理解不同併發模型的取捨:
-
OS 線程, 它最簡單,也無需改變任何編程模型 (業務 / 代碼邏輯),因此非常適合作爲語言的原生併發模型,我們在多線程章節 [3] 也提到過,Rust 就選擇了原生支持線程級的併發編程。但是,這種模型也有缺點,例如線程間的同步將變得更加困難,線程間的上下文切換損耗較大。使用線程池在一定程度上可以提升性能,但是對於 IO 密集的場景來說,線程池還是不夠看。
-
事件驅動 (Event driven), 這個名詞你可能比較陌生,如果說事件驅動常常跟回調 (Callback) 一起使用,相信大家就恍然大悟了。這種模型性能相當的好,但最大的問題就是存在回調地獄的風險:非線性的控制流和結果處理導致了數據流向和錯誤傳播變得難以掌控,還會導致代碼可維護性和可讀性的大幅降低,大名鼎鼎的 JS 曾經就存在回調地獄。
-
協程 (Coroutines) 可能是目前最火的併發模型,
Go
語言的協程設計就非常優秀,這也是Go
語言能夠迅速火遍全球的殺手鐧之一。協程跟線程類似,無需改變編程模型,同時,它也跟async
類似,可以支持大量的任務併發運行。但協程抽象層次過高,導致用戶無法接觸到底層的細節,這對於系統編程語言和自定義異步運行時是難以接受的 -
actor 模型是 erlang 的殺手鐧之一,它將所有併發計算分割成一個一個單元,這些單元被稱爲
actor
, 單元之間通過消息傳遞的方式進行通信和數據傳遞,跟分佈式系統的設計理念非常相像。由於actor
模型跟現實很貼近,因此它相對來說更容易實現,但是一旦遇到流控制、失敗重試等場景時,就會變得不太好用 -
async/await, 該模型性能高,還能支持底層編程,同時又像線程和協程那樣無需過多的改變編程模型,但有得必有失,
async
模型的問題就是內部實現機制過於複雜,對於用戶來說,理解和使用起來也沒有線程和協程簡單,好在前者的複雜性開發者們已經幫我們封裝好,而理解和使用起來不夠簡單,正是本章試圖解決的問題。
總之,Rust 經過權衡取捨後,最終選擇了同時提供多線程編程和 async 編程:
-
前者通過標準庫實現,當你無需那麼高的併發時,例如需要並行計算時,可以選擇它,優點是線程內的代碼執行效率更高、實現更直觀更簡單,這塊內容已經在多線程章節進行過深入講解,不再贅述
-
後者通過語言特性 + 標準庫 + 三方庫的方式實現,在你需要高併發、異步
I/O
時,選擇它就對了
異步運行時是 Rust 中支持異步編程的運行時環境,負責管理異步任務的執行和調度。它提供了任務隊列、線程池和事件循環等基礎設施,支持異步任務的併發執行和事件驅動的編程模型。Rust 沒有內置異步調用所必須的運行時, 主要的 Rust 異步運行時包括:
-
Tokio - Rust 異步運行時的首選, 擁有強大的性能和生態系統。Tokio 提供異步 TCP/UDP 套接字、線程池、定時器等功能。
-
async-std - 較新但功能完善的運行時, 提供與 Tokio 類似的異步抽象。代碼較簡潔, 易於上手。
-
smol - 一個輕量級的運行時, 側重 simplicity(簡單性)、ergonomics(易用性) 和小巧。
-
futures/futures-lite
還有 futuresust 異步編程的基礎抽象庫。大多數運行時都依賴 futures 提供異步原語。
今日頭條是國內使用 Rust 語言的知名公司之一,他們也開源了一個他們的運行時 bytedance/monoio[4]
Rust 異步編程模型包含了一些關鍵的組件和概念,包括:
-
異步函數和異步塊:使用 async 關鍵字定義的異步函數和異步代碼塊。
// `foo()`返回一個`Future<Output = u8>`, // 當調用`foo().await`時,該`Future`將被運行,當調用結束後我們將獲取到一個`u8`值 async fn foo() -> u8 { 5 } fn bar() -> impl Future<Output = u8> { // 下面的`async`語句塊返回`Future<Output = u8>` async { let x: u8 = foo().await; x + 5 } }
async 語句塊和 async fn 最大的區別就是前者無法顯式的聲明返回值,在大多數時候這都不是問題,但是當配合 ? 一起使用時,問題就有所不同:
async fn foo() -> Result<u8, String> {
Ok(1)
}
async fn bar() -> Result<u8, String> {
Ok(1)
}
pub fn main() {
let fut = async {
foo().await?;
bar().await?;
Ok(())
};
}
以上代碼編譯後會報錯:
error[E0282]: type annotations needed
--> src/main.rs:14:9
|
11 | let fut = async {
| --- consider giving `fut` a type
...
14 | Ok(1)
| ^^ cannot infer type for type parameter `E` declared on the enum `Result`
原因在於編譯器無法推斷出 Result<T, E>
中的 E
的類型, 而且編譯器的提示consider giving fut a type
你也別傻乎乎的相信,然後嘗試半天,最後無奈放棄:目前還沒有辦法爲 async
語句塊指定返回類型。
既然編譯器無法推斷出類型,那咱就給它更多提示,可以使用 ::< ... >
的方式來增加類型註釋:
let fut = async {
foo().await?;
bar().await?;
Ok::<(), String>(()) // 在這一行進行顯式的類型註釋
};
-
await 關鍵字:在異步函數內部使用 await 關鍵字等待異步操作完成。
async/.await
是 Rust 語法的一部分,它在遇到阻塞操作時 (例如 IO) 會讓出當前線程的所有權而不是阻塞當前線程,這樣就允許當前線程繼續去執行其它代碼,最終實現併發。async
是懶惰的,直到被執行器poll
或者.await
後纔會開始運行,其中後者是最常用的運行Future
的方法。 當.await
被調用時,它會嘗試運行Future
直到完成,但是若該Future
進入阻塞,那就會讓出當前線程的控制權。當Future
後面準備再一次被運行時 (例如從socket
中讀取到了數據),執行器會得到通知,並再次運行該Future
,如此循環,直到完成。 -
Future Trait:表示異步任務的 Future Trait,提供異步任務的執行和狀態管理。
pub trait Future { type Output; // Required method fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; }
async/await 語法和用法
async 和 await 是 Rust 中用於異步編程的關鍵字。async 用於定義異步函數,表示函數體中包含異步代碼。await 用於等待異步操作完成,並返回異步操作的結果。
-
異步函數使用 async 關鍵字定義,並返回實現了 Future Trait 的類型。異步函數可以在其他異步函數中使用 await 關鍵字等待異步操作完成。調用異步函數時,會返回一個實現了 Future Trait 的對象,可以通過調用 .await 方法等待結果。
-
異步塊是一種在異步函數內部創建的臨時異步上下文,可以使用 async 關鍵字創建。異步閉包是一種將異步代碼封裝在閉包中的方式,可以使用 async 關鍵字創建。異步塊和異步閉包允許在同步上下文中使用 await 關鍵字等待異步操作。
異步函數的返回類型通常是實現了 Future Trait 的類型。Future Trait 表示一個異步任務,提供異步任務的執行和狀態管理。Rust 標準庫和第三方庫中提供了許多實現了 Future Trait 的類型,用於表示各種異步操作。
舉一個例子,下面這個例子是一個傳統的併發下載網頁的例子:
fn get_two_sites() {
// 創建兩個新線程執行任務
let thread_one = thread::spawn(|| download("https://course.rs"));
let thread_two = thread::spawn(|| download("https://fancy.rs"));
// 等待兩個線程的完成
thread_one.join().expect("thread one panicked");
thread_two.join().expect("thread two panicked");
}
如果是在一個小項目中簡單的去下載文件,這麼寫沒有任何問題,但是一旦下載文件的併發請求多起來,那一個下載任務佔用一個線程的模式就太重了,會很容易成爲程序的瓶頸。好在,我們可以使用async
的方式來解決:
async fn get_two_sites_async() {
// 創建兩個不同的`future`,你可以把`future`理解爲未來某個時刻會被執行的計劃任務
// 當兩個`future`被同時執行後,它們將併發的去下載目標頁面
let future_one = download_async("https://www.foo.com");
let future_two = download_async("https://www.bar.com");
// 同時運行兩個`future`,直至完成
join!(future_one, future_two);
}
注意上面的代碼必須在一個異步運行時在運行,以便異步運行時使用一定數量的線程來調度這些代碼的運行。
接下來我們就學習各種異步運行時庫和異步運行時的方法。
Tokio
Tokio 是 Rust 異步編程最重要的運行時庫, 提供了異步 IO、異步任務調度、同步原語等功能。
Tokio 的主要組件包括:
-
tokio - 核心運行時, 提供任務調度, IO 資源等。
-
tokio::net - 異步 TCP、UDP 的實現。
-
tokio::sync - 互斥量、信號量等併發原語。
-
tokio::time - 時間相關工具。
-
tokio::fs - 異步文件 IO。
可以看到 Tokio 庫包含了很多的功能,包括異步網絡編程、併發原語等,之後我們會花整個一章專門介紹它,這一節我們值介紹它的異步運行時的使用。
你可以如下定義 main 函數,它自動支持運行時的啓動:
#[tokio::main]
async fn main() {
// 在運行時中異步執行任務
tokio::spawn(async {
// do work
});
// 等待任務完成
other_task.await;
}
這個例子 main 函數前必須加 async 關鍵字,並且加#[tokio::main]
屬性,那麼這個 main 就會在異步運行時運行。
你也可以使用顯示創建運行時的方法:
pub fn tokio_async() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
println!("Hello from tokio!");
rt.spawn(async {
println!("Hello from a tokio task!");
println!("in spawn")
})
.await
.unwrap();
});
rt.spawn_blocking(|| println!("in spawn_blocking"));
}
首先它創建了一個 Tokio 運行時rt
。block_on
方法在運行時上下文中執行一個異步任務, 這裏我們簡單地打印了一句話。
然後使用rt.spawn
在運行時中異步執行另一個任務。這個任務也打印了幾句話。spawn
返回一個JoinHandle
, 所以這裏調用.await
來等待任務結束。
最後, 使用spawn_blocking
在運行時中執行一個普通的阻塞任務。這個任務會在線程池中運行, 而不會阻塞運行時。
總結一下這個例子展示的要點:
-
在 Tokio 運行時中用
block_on
執行異步任務 -
用
spawn
在運行時中異步執行任務 -
用
spawn_blocking
在線程池中執行阻塞任務 -
可以
await
JoinHandle 來等待異步任務結束
Tokio 運行時提供了執行和調度異步任務所需的全部功能。通過正確地組合block_on
、spawn
和spawn_blocking
, 可以發揮 Tokio 的強大能力, 實現各種異步場景。
futures
futures 庫 futures 是 Rust 異步編程的基礎抽象庫, 爲編寫異步代碼提供了核心的 trait 和類型。
主要提供了以下功能:
-
Future trait - 表示一個異步計算的抽象, 可以
.await
獲取其結果。 -
Stream trait - 表示一個異步的數據流, 可以通過
.await
迭代獲取其元素。 -
Sink trait - 代表一個可以異步接收數據的目標。
-
Executor - 執行 futures 的運行時環境。
-
Utilities - 一些組合、創建 futures 的函數
pub fn futures_async() {
let pool = ThreadPool::new().expect("Failed to build pool");
let (tx, rx) = mpsc::unbounded::<i32>();
let fut_values = async {
let fut_tx_result = async move {
(0..100).for_each(|v| {
tx.unbounded_send(v).expect("Failed to send");
})
};
pool.spawn_ok(fut_tx_result);
let fut_values = rx.map(|v| v * 2).collect();
fut_values.await
};
let values: Vec<i32> = executor::block_on(fut_values);
println!("Values={:?}", values);
}
這個例子展示瞭如何使用 futures 和線程池進行異步編程:
-
創建一個線程池
pool
-
創建一個無邊界的通道
tx
和rx
用來在任務間傳遞數據 -
定義一個異步任務
fut_values
, 裏面首先用spawn_ok
在線程池中異步執行一個任務, 這個任務會通過通道發送 0-99 的數字。 -
然後通過
rx
用map
創建一個 Stream, 它會將收到的數字乘 2。 -
用
collect
收集 Stream 的結果到一個 Vec。 -
block_on
在主線程中執行這個異步任務並獲取結果。
這段代碼展示了 futures 和通道的組合使用 - 通過線程池併發地處理數據流。
block_on
運行 future 而不需要顯式運行時也很方便。
futures 通過異步處理數據流, 可以實現非阻塞併發程序, 這在諸如網絡服務端編程中很有用。與線程相比, futures 的抽象通常更輕量和高效。
futures_lite
這個庫是 futures 的一個子集, 它的編譯速度快了一個數量級, 修復了 futures API 中的一些小問題, 補充了一些明顯的空白, 並移除了絕大部分不安全的代碼。
簡而言之, 這個庫的目標是比 futures 更可易用, 同時仍然與其完全兼容。
讓我們從創建一個簡單的 Future 開始。在 Rust 中,Future 是一種表示異步計算的 trait。以下是一個示例:
use futures_lite::future;
async fn hello_async() {
println!("Hello, async world!");
}
fn main() {
future::block_on(hello_async());
}
在這個例子中,我們使用 futures-lite
中的 future::block_on
函數來運行異步函數 hello_async
。
async_std
async-std
是一個爲 Rust 提供異步標準庫的庫。它擴展了標準庫,使得在異步上下文中進行文件 I/O、網絡操作和任務管理等操作更加便捷。
它提供了你所習慣的所有接口, 但以異步的形式, 並且準備好用於 Rust 的async
/await
語法。
特性
-
現代: 從零開始針對
std::future
和async/await
構建, 編譯速度極快。 -
快速: 我們可靠的分配器和線程池設計提供了超高吞吐量和可預測的低延遲。
-
直觀: 與標準庫完全對等意味着你只需要學習一次 API。
-
清晰: 詳細的文檔和可訪問的指南意味着使用異步 Rust 從未如此簡單。
use async_std::task;
async fn hello_async() {
println!("Hello, async world!");
}
fn main() {
task::block_on(hello_async());
}
這個例子首先導入 async_std::task。
然後定義一個異步函數 hello_async, 其中只是簡單打印一句話。
在 main 函數中, 使用 task::block_on 來執行這個異步函數。block_on 會阻塞當前線程, 直到傳入的 future 運行完成。
這樣的效果就是, 儘管 hello_async 函數是異步的, 但我們可以用同步的方式調用它, 不需要手動處理 future。
async/await 語法隱藏了 future 的細節, 給異步編程帶來了極大的便利。藉助 async_std, 我們可以非常輕鬆地使用 async/await 來編寫異步 Rust 代碼。
smol
smol
是一個超輕量級的異步運行時(async runtime)庫,專爲簡化異步 Rust 代碼的編寫而設計。它提供了一個簡潔而高效的方式來管理異步任務。
特性
-
輕量級:
smol
的設計目標之一是輕量級,以便快速啓動和低資源開銷。 -
簡潔 API: 提供簡潔的 API,使得異步任務的創建、組合和運行變得直觀和簡單。
-
零配置: 無需複雜的配置,可以直接在現有的 Rust 項目中使用。
-
異步 I/O 操作: 支持異步文件 I/O、網絡操作等,使得異步編程更加靈活。
下面這個例子演示了使用 smol 異步運行時執行異步代碼塊的例子:
pub fn smol_async() {
smol::block_on(async { println!("Hello from smol") });
}
try_join、join、select 和 zip
在 Rust 中, 有兩個常見的宏可以用於同時等待多個 future:select 和 join。
select! 宏可以同時等待多個 future, 並只處理最先完成的那個 future:
use futures::future::{select, FutureExt};
let future1 = async { /* future 1 */ };
let future2 = async { /* future 2 */ };
let result = select! {
res1 = future1 => { /* handle result of future1 */ },
res2 = future2 => { /* handle result of future2 */ },
};
join! 宏可以同時等待多個 future, 並處理所有 future 的結果:
use futures::future::{join, FutureExt};
let future1 = async { /* future 1 */ };
let future2 = async { /* future 2 */ };
let (res1, res2) = join!(future1, future2);
join! 返回一個元組, 包含所有 future 的結果。
這兩個宏都需要 futures crate, 使代碼更加簡潔。不使用宏的話, 需要手動創建一個 Poll 來組合多個 future。
所以 select 和 join 在處理多個 future 時非常方便。select 用於只處理最先完成的, join 可以同時處理所有 future。
try_join! 宏也可以用於同時等待多個 future, 它與 join! 類似, 但是有一點不同:
try_join! 在任何一個 future 返回錯誤時, 就會提前返回錯誤, 而不會等待其他 future。
例如:
use futures::try_join;
let future1 = async {
Ok::<(), Error>(/*...*/)
};
let future2 = async {
Err(Error::SomethingBad)
};
let result = try_join!(future1, future2);
這裏因爲 future2 返回了錯誤, 所以 try_join! 也會返回這個錯誤, 不會等待 future1 完成。
這不同於 join!,join! 會等待所有 future 完成。
所以 try_join! 的用途是同時啓動多個 future, 但是遇到任何一個錯誤就立即返回, 避免不必要的等待。這在需要併發但不能容忍任何失敗的場景很有用。
而當需要等待所有 future 無論成功失敗, 獲取所有結果的時候, 再使用 join!。
所以 try_join! 和 join! 都可以組合多個 future, 但錯誤處理策略不同。選擇哪個要根據實際需要決定。
zip
函數會 join 兩個 future, 並等待他們完成。而try_zip
函數會 join 兩個函數,但是會等待兩個 future 都完成或者其中一個 Err 則返回:
pub fn smol_zip() {
smol::block_on(async {
use smol::future::{try_zip, zip, FutureExt};
let future1 = async { 1 };
let future2 = async { 2 };
let result = zip(future1, future2);
println!("smol_zip: {:?}", result.await);
let future1 = async { Ok::<i32, i32>(1) };
let future2 = async { Err::<i32, i32>(2) };
let result = try_zip(future1, future2).await;
println!("smol_try_zip: {:?}", result);
});
}
參考資料
[1]
Rust 異步編程書: https://rust-lang.github.io/async-book/
[2]
Rust 異步編程指南: https://github.com/rustlang-cn/async-book
[3]
多線程章節: https://github.com/rustlang-cn/async-book/blob/master/advance/concurrency-with-threads/concurrency-parallelism.md
[4]
bytedance/monoio: https://github.com/bytedance/monoio
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/hnKNjqlYrdtI8ZLpJx5ufg