Rust 無畏併發 - 3
在本系列的《Rust 無畏併發 - 1》和《Rust 無畏併發 - 2》中,我們使用多線程去處理 cpu 的負載。這些工作負載使用了你可以提供的所有 cpu 時間,並且可以通過根據 cpu 的核數來劃分計算以加快速度。Rust 和 Rayon 爲這類工作負載提供了很好的解決方案。
在本部分中,我們將研究另一種併發形式:異步。
注意,異步代碼並不意味着多線程。在單個線程上運行異步程序且其性能顯著提升是完全有可能的。Node.js 就是這種方法的一個很好的例子。
爲什麼使用異步代碼?
許多工作負載——特別是在服務器程序中——花費大量時間等待其他事情。你的代碼可能正在等待數據庫向它發送結果,等待打開和處理一個文件,或者等待網絡請求到達。當然可以爲每個請求派生一個線程——並讓該線程等待結果到達——但這樣做可能非常低效。創建線程需要服務器資源,可能是一個相對緩慢的操作;成本是以毫秒爲單位的,但是在重負載的服務器上,線程會迅速增加。在 Linux 上,同時活動的線程不能超過 63,704 個。
異步代碼使用一種稱爲綠色線程的技術。綠色線程 (有時被稱爲 fibers) 不是一個完整的操作系統線程:它們表示一個任務,並存儲關於該任務的最小數量的信息——它應該將結果發送到哪裏。綠色線程可能 (而且經常) 分佈在操作系統線程之間——但是它們產生的開銷比系統線程低得多。
異步任務是一個帶有 async 標記的函數。當它執行時,它會一直運行,直到出現以下情況之一:
-
任務調用 await 另一個異步任務。
-
任務完成,產生一個結果。
調用 async 函數不會運行它,它只是返回一個 Future。這代表着一種承諾,等待未來的結果。當你調用 await 時,future 將被添加到異步綠色線程調度器中,調用函數將處於空閒狀態,cpu 可以處理其他任務,直到 future 完成執行。
考慮 web 用戶從服務器請求 frontpage.html 的常見場景。如果被請求的頁面需要來自數據庫和模板的一些內容,請求將經過以下幾個步驟:
每個 await 表示調用者空閒的時間:在另一個進程完成之前,它們什麼都不做。如果 frontpage.html 訪問量很大,那麼用每個任務一個線程來管理它——每個線程在等待子任務時等待——很快就會生成大量的線程。更糟糕的是,大多數線程會一直處於空閒狀態。空閒線程仍然由操作系統調度器輪詢——大量的空閒線程可能消耗大量的 CPU 時間!
使用異步可以顯著降低開銷,因爲每個任務都處於等待狀態,直到收到結果。
構建一個簡單的異步 Web 服務
讓我們使用 Rocket 構建一個簡單的 web 服務,以演示異步的執行。
創建一個新的 Rust 項目:
cargo new rocket-async-medium
將 Rocket 添加到 Cargo.toml 的依賴項中:
[dependencies]
rocket = { version = "0.5.0-rc.2", features = [ "json" ] }
我們導入 json 特性來幫助我們模擬一些數據庫查詢。
現在打開 src/main.rs,我們將從 main 函數開始,啓動 Rocket:
1use rocket::{
2 response::content::RawHtml,
3 serde::Deserialize,
4 tokio::fs,
5};
6
7#[macro_use]
8extern crate rocket;
9
10#[launch]
11fn rocket() -> _ {
12 rocket::build().mount("/", routes![index])
13}
Rocket 提供了一個方便的 #[launch] 宏,它創建了一個 main() 函數,並在調用 Rocket 函數之前執行初始化。這就是使用 Rocket 啓動基本 web 服務器所需的全部內容。
加載 index 模板
我們需要定義 index 函數:
1#[get("/")]
2async fn index() -> RawHtml<String> {
3 // Pauses execution while the filesystem fetches index.html
4 let index_html = fs::read_to_string("src/index.html").await.unwrap();
5
6 // Pauses execution while the menu is constructed
7 let menu = build_menu().await;
8
9 // Pauses execution while the news feed is constructed
10 let news = build_news().await;
11
12 // Does not pause - you can call non asynchronous functions normally
13 let footer = build_footer();
14
15 // Modify the template to include content
16 RawHtml(
17 index_html
18 .replace("!!MENU!!", &menu)
19 .replace("!!NEWS!!", &news)
20 .replace("!!FOOTER!!", &footer),
21 )
22}
這裏有一些需要學習的東西:
-
我們使用 #[get("/")] 將函數映射到網站的根目錄。
-
這個函數返回一個 RawHtml 類型——Rocket 將接受一個字符串,並確保 HTTP 頭包含正確的內容類型。
-
該函數加載一個名爲 index.html 的文件 (我們稍後將編寫該文件),用 build_menu()、build_news() 和 build_footer()函數構建網站的各部分。然後,它將 index 文件中的佔位符替換爲從這些函數返回的字符串。
-
build_menu() 和 build_news() 是異步的; build_footer 不是。只要調用函數本身是異步的,就可以混合使用。
這是服務器端典型的通過模板渲染網站的過程,剩下的就是構建內容了。
創建一個名爲 src/index.html 的新文件,並在其中插入以下代碼:
1<!DOCTYPE html>
2<html>
3 <head>
4 <title>Hello Async World</title>
5 <style>
6 #menuHead { background-color: navy; color: white; font-weight: bold; }
7 .news { border: 1px solid #aaa; border-radius: 4px; background-color: #ddd; padding: 8px; }
8 .news h1 { font-size: 14pt; margin: 2px; }
9 #footer { font-size: smaller; font-style: italic; }
10 </style>
11 </head>
12 <body>
13 !!MENU!!
14 !!NEWS!!
15 !!FOOTER!!
16 </body>
17</html>
如果你熟悉 HTML,這是非常簡單的,有三個佔位符:!!MENU!!, ! ! NEWS! ! 和! !FOOTER ! ! 在上面的 index() 函數中,將這些標記替換爲站點內容。
加載菜單
讓我們創建我們的菜單。菜單將是一個簡單的欄,從文件加載。創建另一個文件,命名爲 src/menu.html:
1<div id="menuHead">
2 Main Menu
3</div>
沒有比這更簡單的了!在 src/main.rs 中,創建一個函數來加載它:
1async fn build_menu() -> String {
2 // Mocking a database request
3 fs::read_to_string("src/menu.html").await.unwrap()
4}
它從磁盤加載菜單並將其作爲字符串返回。注意它是異步的——我們使用 Tokio 的文件系統函數。這種方法確保如果訪問文件有任何延遲,函數將安靜地等待—可能是因爲服務器繁忙。
加載新聞提要
現在,讓我們添加一些站點內容。我們將假設正在從數據庫加載一個新聞提要。我們不要求安裝一個完整的數據庫系統,而是將當前新聞提要存儲在一個 JSON 文件中。創建一個名爲 src/news.json 的文件,並粘貼一些新聞:
1[
2 {
3 "title" : "Rust Supports Async/Wait",
4 "summary" : "Zero-cost async/await support has landed in Rust 1.39"
5 },
6 {
7 "title" : "Tokio 1.0",
8 "summary" : "Tokio 1.0 was released on December 22, 2020."
9 }
10]
這是一個簡單的 JSON 文件:它包含一個對象數組,每個對象數組包含一個標題和一個摘要。
現在回到 src/main.rs,添加一個函數來異步讀取該數據:
1#[derive(Deserialize)]
2#[serde(crate = "rocket::serde")]
3struct NewsItem {
4 title: String,
5 summary: String,
6}
7
8async fn build_news() -> String {
9 // Mocking another database request
10 let all_news = fs::read_to_string("src/news.json").await.unwrap();
11 let news_feed: Vec<NewsItem> = rocket::serde::json::from_str(&all_news).unwrap();
12 news_feed
13 .iter()
14 .map(|news| {
15 format!(
16 "<div class='news'><h1>{}</h1><p>{}</p></div>",
17 news.title, news.summary
18 )
19 })
20 .reduce(|cur: String, nxt: String| cur + &nxt)
21 .unwrap()
22}
我們首先定義一個與新聞提要格式匹配的 struct。使用 #[Deserialize] 允許 Rocket(嵌入 Serde) 將 JSON 解碼爲 NewsItem 結構。
build_news() 函數加載新聞提要文件,並調用 Serde 對其進行解碼。然後它使用 map 和 format! 迭代新聞數組,把它轉換成 HTML。
最後是 footer,我們將總是返回相同的字符串,因爲我們不等待任何其他東西,所以我們將它設置爲一個常規函數:
1fn build_footer() -> String {
2 "<p id='footer'>© Copyright My Cool Company</p>".to_string()
3}
恭喜,你現在已經擁有了運行 web 服務器所需的所有元素。輸入 cargo run 運行並導航到 http://localhost:8000/。你應該看到你的新聞網站:
使用 Join 提高併發性
你可能已經注意到 index 的某些部分可以並行運行。菜單和新聞並不相互依賴,爲什麼不同時等待兩者呢。
1#[get("/")]
2async fn index() -> RawHtml<String> {
3 use rocket::tokio::join;
4 // Run all sub-tasks concurrently
5 let (index_html, menu, news) = join!(
6 fs::read_to_string("src/index.html"),
7 build_menu(),
8 build_news(),
9 );
10 let footer = build_footer();
11
12 // Modify the template to include content
13 RawHtml(index_html.unwrap()
14 .replace("!!MENU!!", &menu)
15 .replace("!!NEWS!!", &news)
16 .replace("!!FOOTER!!", &footer),
17 )
18}
join! 宏同時執行所有三個 futures(加載模板、構建菜單和新聞提要)—直到所有三個 future 都返回數據才喚醒。
當任務阻塞
我們調用的所有函數都是異步的——它們清晰地等待另一個任務完成。有時,你需要等待一個非異步的任務—可能是一個 CPU 綁定的任務,或者一個不提供異步接口的設備。
當你無法避免阻塞任務時,你可以使用 Tokio 的 spawn_blocking 函數,如下所示:
1task::spawn_blocking(move || {
2 // Blocking work
3}).await;
使用 spawn_blocking 會將你的任務轉到它自己的線程中——並暫停你的 future 直到線程返回。這種方法使其他任務在函數執行時平穩運行。
本文翻譯自:
https://medium.com/pragmatic-programmers/fearless-concurrency-with-rust-part-3-asynchronous-concurrency-e23bad856087
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/wu50EFfg-ZzaJU5JVK15jA