Rust 併發編程 - async-await 異步編程

異步編程綜述

異步編程是一種併發編程模型,通過在任務執行期間不阻塞線程的方式,提高系統的併發能力和響應性。相比於傳統的同步編程,異步編程可以更好地處理 I/O 密集型任務和併發請求,提高系統的吞吐量和性能。

異步編程具有以下優勢:

異步編程廣泛應用於以下場景:

Rust 中的異步編程模型

Rust 作爲一門現代的系統級編程語言,旨在提供高效、安全和可靠的異步編程能力。Rust 異步編程的目標是實現高性能、無安全漏洞的異步應用程序,同時提供簡潔的語法和豐富的異步庫。

最值得一讀的是 Rust 官方的 Rust 異步編程書 [1]

中文版: Rust 異步編程指南 [2]

由於併發編程在現代社會非常重要,因此每個主流語言都對自己的併發模型進行過權衡取捨和精心設計,Rust 語言也不例外。下面的列表可以幫助大家理解不同併發模型的取捨:

總之,Rust 經過權衡取捨後,最終選擇了同時提供多線程編程async 編程:

異步運行時是 Rust 中支持異步編程的運行時環境,負責管理異步任務的執行和調度。它提供了任務隊列、線程池和事件循環等基礎設施,支持異步任務的併發執行和事件驅動的編程模型。Rust 沒有內置異步調用所必須的運行時, 主要的 Rust 異步運行時包括:

還有 futuresust 異步編程的基礎抽象庫。大多數運行時都依賴 futures 提供異步原語。

今日頭條是國內使用 Rust 語言的知名公司之一,他們也開源了一個他們的運行時 bytedance/monoio[4]

Rust 異步編程模型包含了一些關鍵的組件和概念,包括:

async 語句塊和 async fn 最大的區別就是前者無法顯式的聲明返回值,在大多數時候這都不是問題,但是當配合 ? 一起使用時,問題就有所不同:

async fn foo() -> Result<u8, String> {
    Ok(1)
}
async fn bar() -> Result<u8, String> {
    Ok(1)
}
pub fn main() {
    let fut = async {
        foo().await?;
        bar().await?;
        Ok(())
    };
}

以上代碼編譯後會報錯:

error[E0282]type annotations needed
  --> src/main.rs:14:9
   |
11 |     let fut = async {
   |         --- consider giving `fut` a type
...
14 |         Ok(1)
   |         ^^ cannot infer type for type parameter `E` declared on the enum `Result`

原因在於編譯器無法推斷出  Result<T, E>中的  E  的類型, 而且編譯器的提示consider giving fut a type你也別傻乎乎的相信,然後嘗試半天,最後無奈放棄:目前還沒有辦法爲  async  語句塊指定返回類型。

既然編譯器無法推斷出類型,那咱就給它更多提示,可以使用  ::< ... >  的方式來增加類型註釋:

let fut = async {
    foo().await?;
    bar().await?;
    Ok::<(), String>(()) // 在這一行進行顯式的類型註釋
};

async/await 語法和用法

async 和 await 是 Rust 中用於異步編程的關鍵字。async 用於定義異步函數,表示函數體中包含異步代碼。await 用於等待異步操作完成,並返回異步操作的結果。

異步函數的返回類型通常是實現了 Future Trait 的類型。Future Trait 表示一個異步任務,提供異步任務的執行和狀態管理。Rust 標準庫和第三方庫中提供了許多實現了 Future Trait 的類型,用於表示各種異步操作。

舉一個例子,下面這個例子是一個傳統的併發下載網頁的例子:

fn get_two_sites() {
    // 創建兩個新線程執行任務
    let thread_one = thread::spawn(|| download("https://course.rs"));
    let thread_two = thread::spawn(|| download("https://fancy.rs"));

    // 等待兩個線程的完成
    thread_one.join().expect("thread one panicked");
    thread_two.join().expect("thread two panicked");
}

如果是在一個小項目中簡單的去下載文件,這麼寫沒有任何問題,但是一旦下載文件的併發請求多起來,那一個下載任務佔用一個線程的模式就太重了,會很容易成爲程序的瓶頸。好在,我們可以使用async的方式來解決:

async fn get_two_sites_async() {
    // 創建兩個不同的`future`,你可以把`future`理解爲未來某個時刻會被執行的計劃任務
    // 當兩個`future`被同時執行後,它們將併發的去下載目標頁面
    let future_one = download_async("https://www.foo.com");
    let future_two = download_async("https://www.bar.com");

    // 同時運行兩個`future`,直至完成
    join!(future_one, future_two);
}

注意上面的代碼必須在一個異步運行時在運行,以便異步運行時使用一定數量的線程來調度這些代碼的運行。

接下來我們就學習各種異步運行時庫和異步運行時的方法。

Tokio

Tokio 是 Rust 異步編程最重要的運行時庫, 提供了異步 IO、異步任務調度、同步原語等功能。

Tokio 的主要組件包括:

可以看到 Tokio 庫包含了很多的功能,包括異步網絡編程、併發原語等,之後我們會花整個一章專門介紹它,這一節我們值介紹它的異步運行時的使用。

你可以如下定義 main 函數,它自動支持運行時的啓動:

#[tokio::main]
async fn main() {
  // 在運行時中異步執行任務
  tokio::spawn(async {
    // do work
  });

  // 等待任務完成
  other_task.await;
}

這個例子 main 函數前必須加 async 關鍵字,並且加#[tokio::main]屬性,那麼這個 main 就會在異步運行時運行。

你也可以使用顯示創建運行時的方法:

pub fn tokio_async() {
    let rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {
        println!("Hello from tokio!");

        rt.spawn(async {
            println!("Hello from a tokio task!");
            println!("in spawn")
        })
        .await
        .unwrap();
    });

    rt.spawn_blocking(|| println!("in spawn_blocking"));
}

首先它創建了一個 Tokio 運行時rtblock_on方法在運行時上下文中執行一個異步任務, 這裏我們簡單地打印了一句話。

然後使用rt.spawn在運行時中異步執行另一個任務。這個任務也打印了幾句話。spawn返回一個JoinHandle, 所以這裏調用.await來等待任務結束。

最後, 使用spawn_blocking在運行時中執行一個普通的阻塞任務。這個任務會在線程池中運行, 而不會阻塞運行時。

總結一下這個例子展示的要點:

Tokio 運行時提供了執行和調度異步任務所需的全部功能。通過正確地組合block_onspawnspawn_blocking, 可以發揮 Tokio 的強大能力, 實現各種異步場景。

futures

futures 庫 futures 是 Rust 異步編程的基礎抽象庫, 爲編寫異步代碼提供了核心的 trait 和類型。

主要提供了以下功能:

pub fn futures_async() {
    let pool = ThreadPool::new().expect("Failed to build pool");
    let (tx, rx) = mpsc::unbounded::<i32>();

    let fut_values = async {
        let fut_tx_result = async move {
            (0..100).for_each(|v| {
                tx.unbounded_send(v).expect("Failed to send");
            })
        };
        pool.spawn_ok(fut_tx_result);

        let fut_values = rx.map(|v| v * 2).collect();

        fut_values.await
    };

    let values: Vec<i32> = executor::block_on(fut_values);

    println!("Values={:?}", values);
}

這個例子展示瞭如何使用 futures 和線程池進行異步編程:

  1. 創建一個線程池pool

  2. 創建一個無邊界的通道txrx用來在任務間傳遞數據

  3. 定義一個異步任務fut_values, 裏面首先用spawn_ok在線程池中異步執行一個任務, 這個任務會通過通道發送 0-99 的數字。

  4. 然後通過rxmap創建一個 Stream, 它會將收到的數字乘 2。

  5. collect收集 Stream 的結果到一個 Vec。

  6. block_on在主線程中執行這個異步任務並獲取結果。

這段代碼展示了 futures 和通道的組合使用 - 通過線程池併發地處理數據流。

block_on運行 future 而不需要顯式運行時也很方便。

futures 通過異步處理數據流, 可以實現非阻塞併發程序, 這在諸如網絡服務端編程中很有用。與線程相比, futures 的抽象通常更輕量和高效。

futures_lite

這個庫是 futures 的一個子集, 它的編譯速度快了一個數量級, 修復了 futures API 中的一些小問題, 補充了一些明顯的空白, 並移除了絕大部分不安全的代碼。

簡而言之, 這個庫的目標是比 futures 更可易用, 同時仍然與其完全兼容。

讓我們從創建一個簡單的 Future 開始。在 Rust 中,Future 是一種表示異步計算的 trait。以下是一個示例:

use futures_lite::future;

async fn hello_async() {
    println!("Hello, async world!");
}

fn main() {
    future::block_on(hello_async());
}

在這個例子中,我們使用 futures-lite 中的 future::block_on 函數來運行異步函數 hello_async

async_std

async-std 是一個爲 Rust 提供異步標準庫的庫。它擴展了標準庫,使得在異步上下文中進行文件 I/O、網絡操作和任務管理等操作更加便捷。

它提供了你所習慣的所有接口, 但以異步的形式, 並且準備好用於 Rust 的async/await語法。

特性

use async_std::task;

async fn hello_async() {
    println!("Hello, async world!");
}

fn main() {
    task::block_on(hello_async());
}

這個例子首先導入 async_std::task。

然後定義一個異步函數 hello_async, 其中只是簡單打印一句話。

在 main 函數中, 使用 task::block_on 來執行這個異步函數。block_on 會阻塞當前線程, 直到傳入的 future 運行完成。

這樣的效果就是, 儘管 hello_async 函數是異步的, 但我們可以用同步的方式調用它, 不需要手動處理 future。

async/await 語法隱藏了 future 的細節, 給異步編程帶來了極大的便利。藉助 async_std, 我們可以非常輕鬆地使用 async/await 來編寫異步 Rust 代碼。

smol

smol 是一個超輕量級的異步運行時(async runtime)庫,專爲簡化異步 Rust 代碼的編寫而設計。它提供了一個簡潔而高效的方式來管理異步任務。

特性

下面這個例子演示了使用 smol 異步運行時執行異步代碼塊的例子:

pub fn smol_async() {
    smol::block_on(async { println!("Hello from smol") });
}

try_join、join、select 和 zip

在 Rust 中, 有兩個常見的宏可以用於同時等待多個 future:select 和 join。

select! 宏可以同時等待多個 future, 並只處理最先完成的那個 future:

use futures::future::{select, FutureExt};

let future1 = async { /* future 1 */ };
let future2 = async { /* future 2 */ };

let result = select{
    res1 = future1 ={ /* handle result of future1 */ },
    res2 = future2 ={ /* handle result of future2 */ },
};

join! 宏可以同時等待多個 future, 並處理所有 future 的結果:

use futures::future::{join, FutureExt};

let future1 = async { /* future 1 */ };
let future2 = async { /* future 2 */ };

let (res1, res2) = join!(future1, future2);

join! 返回一個元組, 包含所有 future 的結果。

這兩個宏都需要 futures crate, 使代碼更加簡潔。不使用宏的話, 需要手動創建一個 Poll 來組合多個 future。

所以 select 和 join 在處理多個 future 時非常方便。select 用於只處理最先完成的, join 可以同時處理所有 future。

try_join! 宏也可以用於同時等待多個 future, 它與 join! 類似, 但是有一點不同:

try_join! 在任何一個 future 返回錯誤時, 就會提前返回錯誤, 而不會等待其他 future。

例如:

use futures::try_join;

let future1 = async {
  Ok::<(), Error>(/*...*/)
};

let future2 = async {
  Err(Error::SomethingBad)
};

let result = try_join!(future1, future2);

這裏因爲 future2 返回了錯誤, 所以 try_join! 也會返回這個錯誤, 不會等待 future1 完成。

這不同於 join!,join! 會等待所有 future 完成。

所以 try_join! 的用途是同時啓動多個 future, 但是遇到任何一個錯誤就立即返回, 避免不必要的等待。這在需要併發但不能容忍任何失敗的場景很有用。

而當需要等待所有 future 無論成功失敗, 獲取所有結果的時候, 再使用 join!。

所以 try_join! 和 join! 都可以組合多個 future, 但錯誤處理策略不同。選擇哪個要根據實際需要決定。

zip函數會 join 兩個 future, 並等待他們完成。而try_zip函數會 join 兩個函數,但是會等待兩個 future 都完成或者其中一個 Err 則返回:

pub fn smol_zip() {
    smol::block_on(async {
        use smol::future::{try_zip, zip, FutureExt};

        let future1 = async { 1 };
        let future2 = async { 2 };

        let result = zip(future1, future2);
        println!("smol_zip: {:?}", result.await);

        let future1 = async { Ok::<i32, i32>(1) };
        let future2 = async { Err::<i32, i32>(2) };

        let result = try_zip(future1, future2).await;
        println!("smol_try_zip: {:?}", result);
    });
}

參考資料

[1]

Rust 異步編程書: https://rust-lang.github.io/async-book/

[2]

Rust 異步編程指南: https://github.com/rustlang-cn/async-book

[3]

多線程章節: https://github.com/rustlang-cn/async-book/blob/master/advance/concurrency-with-threads/concurrency-parallelism.md

[4]

bytedance/monoio: https://github.com/bytedance/monoio

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/hnKNjqlYrdtI8ZLpJx5ufg