Rust 異步編程簡介

在這篇文章中將解釋一些用於在 Rust 中實現異步編程的底層結構,同時將其與高級 crate 功能聯繫起來。

什麼是異步編程?

簡而言之,它允許在不等待另一個操作完成的情況下進行多個不同的操作。例如,如果你想做雞蛋和吐司,你可以先煎雞蛋,但你不必等到它們煎好再開始烤麪包。

那麼 Rust 是如何實現異步編程的呢?

例子比講述更好,所以這裏有一個 rust 異步函數的例子。首先,讓我們創建一個新的項目:

cargo new asyncexamples --bin

接下來,在我們的 cargo.toml 文件中添加以下依賴項:

[dependencies]
futures = "0.3.28"
tokio = {version = "1.27.0",  features = ["full"]}

現在,編寫我們的 async 函數:

use tokio::time::{sleep, Duration};

async fn eggs() -> u8 {
    println!("Proceed with expensive computation");
    sleep(Duration::from_millis(1000)).await;
    println!("Expensive computation is finished!");
    20
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    println!("Our first Async Function!");
    let x = eggs();
    println!("The END!?");
    Ok(())
}

執行 cargo run

Our first Async Function!
The END!?

有兩件事引起了我的注意,第一個是爲什麼沒有任何 print 語句在異步函數運行?其次,你可能會注意到的另一件事是,eggs 實際上返回了一些東西,它實現了 Future trait,它的輸出類型是 u8。因爲 Async 實際上只是下面代碼的語法糖。

use futures::Future;
use tokio::time::{sleep, Duration};

fn eggs() -> impl Future<Output = u8> {
    println!("Proceed with expensive computation");
    async {
        sleep(Duration::from_millis(1000)).await;
        println!("Expensive computation is finished!");
        20
    }
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    println!("Our first Async Function!");
    let x = eggs();
    println!("The END!?");
    Ok(())
}

這顯示了由 async 關鍵字封裝的塊,它實現了 Future trait 類型,這就是要返回的東西!

Rust 使用這個特性來表示一個值 (在本例中爲 u8 類型),這個值可能現在還不可用,但將來會可用。(因此他們稱之爲 “Future”)。

pub trait Future {
    type Output; // 這是trait返回的類型 
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

現在,Futures trait 有一個名爲 “poll” 的函數,它返回一個 Poll 枚舉,可以是:

Poll:Pending; // 表示任務仍在進行或等待中
Poll:Ready(Self::Output); // 表示完成
// 注意Self::Output包含我們的實際值!

poll 函數用於表示異步塊的狀態,如果它仍在進行中,則返回 Poll::Pending,否則返回 Poll::Ready,幷包含該異步塊的返回值!

Context 類型是什麼?它的意思是表示 “異步任務的上下文”,它被用來提供對 & Waker 類型對象的訪問,然後用於“喚醒” 當前任務!

那麼我們如何訪問函數要返回的值呢?我們使用 await,這意味着放棄對當前線程的執行控制,爲其他線程的執行騰出空間。

use tokio::time::{sleep, Duration};

async fn eggs() -> u8 {
    println!("Proceed with expensive computation");
    sleep(Duration::from_millis(1000)).await;
    println!("Expensive computation is finished!");
    20
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    println!("Our first Async Function!");
    let x = eggs().await;
    println!("x = {x}");
    println!("The END!?");
    Ok(())
}

現在我已經在 async eggs 函數中添加了 await,但是爲什麼我們一開始就需要添加它呢?這是因爲,在 Rust 中,future 是懶惰的,爲了讓它運行,我們必須調用 await,稍後會調用 poll 函數,但是調用這個 poll 函數的是什麼呢?

正如前面提到的,當我們調用. await 時,實際上會放棄對當前線程的控制,並將其發送到某個地方。所以,我們顯然需要一種方法來管理這些線程,這就是 Tokio 異步運行時發揮作用的地方。

Tokio 有一個全局執行器,它將幫助我們運行這些異步塊,並反過來做我們想要它做的計算。在此之前,我簡要介紹了 Waker 是什麼,以及他們做了些什麼。我想現在是時候介紹這些對象背後的原因了。注意,執行器和喚醒器有兩種實現方式。一種方法是使用分別由喚醒者和執行者調用的 thread::unpark 和 thread::park。Thread::park 意味着阻止當前線程運行,這樣我們就不用等待接收消息,而可以做其他事情。Thread::unpark 將被 Waker 對象調用,告訴執行者它已經等待了足夠長的時間。另一種方法你可以使用 Executor 中的 mspc 通道和事件隊列複製此功能。

假設我們現在把實現 Future 特性的異步塊交給一個執行器。爲了讓執行器開始運行這個異步塊直至完成,必須調用. await。一旦完成,執行器就會調用 poll 函數,並傳入一個 Context 類型的對象,該對象封裝了執行器創建的 Waker 對象。如果 poll 函數返回 poll::Pending,那麼該塊還沒有完成運行。因此,現在我們需要再次檢查,而不是從循環中強制調用輪詢函數,實際上可以使用 Waker API 向執行器發出任務完全完成或部分完成的信號。

我們從哪找這個 Future 的執行器?我們可以使用像 Tokio 這樣的異步運行時。Tokio 有一個運行時,它是一個強大的執行器,可以在單個運行時中管理各種 Tokio 組件。在下面的示例中,我們將使用一個多線程執行器,它允許運行時併發執行。注意,Tokio 還提供單線程運行時!

use tokio::time::{sleep, Duration};

async fn eggs() -> String {
    println!("Lets start cooking some eggs!");
    sleep(Duration::from_millis(1000)).await;
    println!("Eggs are finished!");
    "Eggs!".to_string()
}

fn simple_example() {
    println!("Lets make some eggs!");
    let mut egg = "Raw Eggs".to_string();
    // rt 是執行器
    let rt = tokio::runtime::Builder::new_multi_thread()
                                                .enable_all()
                                                .build()
                                                .unwrap();
    // 在Tokio運行時上運行future直到完成,將其視爲異步任務的起點。                                    
    rt.block_on(
        // 實現future的異步塊!
        async {
            // 調用.await將給執行器一個綠色信號,讓它運行直到完成!
            egg = eggs().await;
        }
    );
    println!("{}", egg);
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    simple_example();
    Ok(())
}

總結一下,我們有一個執行器,這個執行器爲每個給定的異步任務創建一個 Waker 類型對象,該對象被一些上下文類型包裝。執行器被賦予這些異步任務,它可以調用 poll 函數,該函數返回一個描述任務狀態的 enum。異步任務可以使用 Waker 類型通知執行者當前任務 (部分) 完成,所以執行者不會浪費任何時間調用 poll 函數來檢查異步任務的狀態。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/p1Fi1FJhFAaKHNwGOz1HKw