Rust 無畏併發 - 3

在本系列的《Rust 無畏併發 - 1》和《Rust 無畏併發 - 2》中,我們使用多線程去處理 cpu 的負載。這些工作負載使用了你可以提供的所有 cpu 時間,並且可以通過根據 cpu 的核數來劃分計算以加快速度。Rust 和 Rayon 爲這類工作負載提供了很好的解決方案。

在本部分中,我們將研究另一種併發形式:異步。

注意,異步代碼並不意味着多線程。在單個線程上運行異步程序且其性能顯著提升是完全有可能的。Node.js 就是這種方法的一個很好的例子。

爲什麼使用異步代碼?

許多工作負載——特別是在服務器程序中——花費大量時間等待其他事情。你的代碼可能正在等待數據庫向它發送結果,等待打開和處理一個文件,或者等待網絡請求到達。當然可以爲每個請求派生一個線程——並讓該線程等待結果到達——但這樣做可能非常低效。創建線程需要服務器資源,可能是一個相對緩慢的操作;成本是以毫秒爲單位的,但是在重負載的服務器上,線程會迅速增加。在 Linux 上,同時活動的線程不能超過 63,704 個。

異步代碼使用一種稱爲綠色線程的技術。綠色線程 (有時被稱爲 fibers) 不是一個完整的操作系統線程:它們表示一個任務,並存儲關於該任務的最小數量的信息——它應該將結果發送到哪裏。綠色線程可能 (而且經常) 分佈在操作系統線程之間——但是它們產生的開銷比系統線程低得多。

異步任務是一個帶有 async 標記的函數。當它執行時,它會一直運行,直到出現以下情況之一:

調用 async 函數不會運行它,它只是返回一個 Future。這代表着一種承諾,等待未來的結果。當你調用 await 時,future 將被添加到異步綠色線程調度器中,調用函數將處於空閒狀態,cpu 可以處理其他任務,直到 future 完成執行。

考慮 web 用戶從服務器請求 frontpage.html 的常見場景。如果被請求的頁面需要來自數據庫和模板的一些內容,請求將經過以下幾個步驟:

每個 await 表示調用者空閒的時間:在另一個進程完成之前,它們什麼都不做。如果 frontpage.html 訪問量很大,那麼用每個任務一個線程來管理它——每個線程在等待子任務時等待——很快就會生成大量的線程。更糟糕的是,大多數線程會一直處於空閒狀態。空閒線程仍然由操作系統調度器輪詢——大量的空閒線程可能消耗大量的 CPU 時間!

使用異步可以顯著降低開銷,因爲每個任務都處於等待狀態,直到收到結果。

構建一個簡單的異步 Web 服務

讓我們使用 Rocket 構建一個簡單的 web 服務,以演示異步的執行。

創建一個新的 Rust 項目:

cargo new rocket-async-medium

將 Rocket 添加到 Cargo.toml 的依賴項中:

[dependencies]
rocket = { version = "0.5.0-rc.2", features = [ "json" ] }

我們導入 json 特性來幫助我們模擬一些數據庫查詢。

現在打開 src/main.rs,我們將從 main 函數開始,啓動 Rocket:

1use rocket::{
 2    response::content::RawHtml,
 3    serde::Deserialize,
 4    tokio::fs,
 5};
 6
 7#[macro_use]
 8extern crate rocket;
 9
10#[launch]
11fn rocket() -> _ {
12    rocket::build().mount("/", routes![index])
13}

Rocket 提供了一個方便的 #[launch] 宏,它創建了一個 main() 函數,並在調用 Rocket 函數之前執行初始化。這就是使用 Rocket 啓動基本 web 服務器所需的全部內容。

加載 index 模板

我們需要定義 index 函數:

 1#[get("/")]
 2async fn index() -> RawHtml<String> {
 3    // Pauses execution while the filesystem fetches index.html
 4    let index_html = fs::read_to_string("src/index.html").await.unwrap();
 5
 6    // Pauses execution while the menu is constructed
 7    let menu = build_menu().await;
 8
 9    // Pauses execution while the news feed is constructed
10    let news = build_news().await;
11
12    // Does not pause - you can call non asynchronous functions normally
13    let footer = build_footer();
14
15    // Modify the template to include content
16    RawHtml(
17        index_html
18            .replace("!!MENU!!"&menu)
19            .replace("!!NEWS!!"&news)
20            .replace("!!FOOTER!!"&footer),
21    )
22}

這裏有一些需要學習的東西:

這是服務器端典型的通過模板渲染網站的過程,剩下的就是構建內容了。

創建一個名爲 src/index.html 的新文件,並在其中插入以下代碼:

 1<!DOCTYPE html>
 2<html>
 3    <head>
 4        <title>Hello Async World</title>
 5        <style>
 6            #menuHead { background-color: navy; color: white; font-weight: bold; }
 7            .news { border: 1px solid #aaa; border-radius: 4px; background-color: #ddd; padding: 8px; }
 8            .news h1 { font-size: 14pt; margin: 2px; }
 9            #footer { font-size: smaller; font-style: italic; }
10        </style>
11    </head>
12    <body>
13        !!MENU!!
14        !!NEWS!!
15        !!FOOTER!!
16    </body>
17</html>

如果你熟悉 HTML,這是非常簡單的,有三個佔位符:!!MENU!!, ! ! NEWS! ! 和! !FOOTER ! ! 在上面的 index() 函數中,將這些標記替換爲站點內容。

加載菜單

讓我們創建我們的菜單。菜單將是一個簡單的欄,從文件加載。創建另一個文件,命名爲 src/menu.html:

1<div id="menuHead">
2    Main Menu
3</div>

沒有比這更簡單的了!在 src/main.rs 中,創建一個函數來加載它:

1async fn build_menu() -> String {
2    // Mocking a database request
3    fs::read_to_string("src/menu.html").await.unwrap()
4}

它從磁盤加載菜單並將其作爲字符串返回。注意它是異步的——我們使用 Tokio 的文件系統函數。這種方法確保如果訪問文件有任何延遲,函數將安靜地等待—可能是因爲服務器繁忙。

加載新聞提要

現在,讓我們添加一些站點內容。我們將假設正在從數據庫加載一個新聞提要。我們不要求安裝一個完整的數據庫系統,而是將當前新聞提要存儲在一個 JSON 文件中。創建一個名爲 src/news.json 的文件,並粘貼一些新聞:

 1[
 2    {
 3        "title" : "Rust Supports Async/Wait",
 4        "summary" : "Zero-cost async/await support has landed in Rust 1.39"
 5    },
 6    {
 7        "title" : "Tokio 1.0",
 8        "summary" : "Tokio 1.0 was released on December 22, 2020."
 9    }
10]

這是一個簡單的 JSON 文件:它包含一個對象數組,每個對象數組包含一個標題和一個摘要。

現在回到 src/main.rs,添加一個函數來異步讀取該數據:

 1#[derive(Deserialize)]
 2#[serde(crate = "rocket::serde")]
 3struct NewsItem {
 4    title: String,
 5    summary: String,
 6}
 7
 8async fn build_news() -> String {
 9    // Mocking another database request
10    let all_news = fs::read_to_string("src/news.json").await.unwrap();
11    let news_feed: Vec<NewsItem> = rocket::serde::json::from_str(&all_news).unwrap();
12    news_feed
13        .iter()
14        .map(|news| {
15            format!(
16                "<div class='news'><h1>{}</h1><p>{}</p></div>",
17                news.title, news.summary
18            )
19        })
20        .reduce(|cur: String, nxt: String| cur + &nxt)
21        .unwrap()
22}

我們首先定義一個與新聞提要格式匹配的 struct。使用 #[Deserialize] 允許 Rocket(嵌入 Serde) 將 JSON 解碼爲 NewsItem 結構。

build_news() 函數加載新聞提要文件,並調用 Serde 對其進行解碼。然後它使用 map 和 format! 迭代新聞數組,把它轉換成 HTML。

最後是 footer,我們將總是返回相同的字符串,因爲我們不等待任何其他東西,所以我們將它設置爲一個常規函數:

1fn build_footer() -> String {
2    "<p id='footer'>© Copyright My Cool Company</p>".to_string()
3}

恭喜,你現在已經擁有了運行 web 服務器所需的所有元素。輸入 cargo run 運行並導航到 http://localhost:8000/。你應該看到你的新聞網站:

使用 Join 提高併發性

你可能已經注意到 index 的某些部分可以並行運行。菜單和新聞並不相互依賴,爲什麼不同時等待兩者呢。

 1#[get("/")]
 2async fn index() -> RawHtml<String> {
 3    use rocket::tokio::join;
 4    // Run all sub-tasks concurrently
 5    let (index_html, menu, news) = join!(
 6        fs::read_to_string("src/index.html"),
 7        build_menu(),
 8        build_news(),
 9    );
10    let footer = build_footer();
11
12    // Modify the template to include content
13    RawHtml(index_html.unwrap()
14        .replace("!!MENU!!"&menu)
15        .replace("!!NEWS!!"&news)
16        .replace("!!FOOTER!!"&footer),
17    )
18}

join! 宏同時執行所有三個 futures(加載模板、構建菜單和新聞提要)—直到所有三個 future 都返回數據才喚醒。

當任務阻塞

我們調用的所有函數都是異步的——它們清晰地等待另一個任務完成。有時,你需要等待一個非異步的任務—可能是一個 CPU 綁定的任務,或者一個不提供異步接口的設備。

當你無法避免阻塞任務時,你可以使用 Tokio 的 spawn_blocking 函數,如下所示:

1task::spawn_blocking(move || {
2    // Blocking work
3}).await;

使用 spawn_blocking 會將你的任務轉到它自己的線程中——並暫停你的 future 直到線程返回。這種方法使其他任務在函數執行時平穩運行。

本文翻譯自:

https://medium.com/pragmatic-programmers/fearless-concurrency-with-rust-part-3-asynchronous-concurrency-e23bad856087

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/wu50EFfg-ZzaJU5JVK15jA