Rust 的 async-await 語法是怎樣工作的

原文地址《Rust 的 async/await 語法是怎樣工作的》[1]

從最開始的宏到現在的 Rust 關鍵字,距離 async/await 語法的 rfc[2] 被提出已經過去將近 4 年了。相比於回調地獄,或者類似 CPS[3]-Style 的鐵索連環套娃(此處應有聖經傳唱:一個 Monad 說白了不過就是自函子範疇上的一個幺半羣而已),async/await 的存在無疑提供了一種良好的異步代碼編寫方式,它更像是把同步代碼寫法的異步化,讓代碼編寫者能夠最大限度的遵循同步代碼編寫方式,但同時提供異步的運行時表現。

不過,有言道:”哪有什麼歲月靜好,不過是有人替你負重前行 “。想要代碼寫的爽,編譯器一定會在背後做很多” 髒活累活“。Rust 的 async/await 語法具體是怎樣工作的?它又是如何將我們寫的代碼,轉化成異步執行的呢?

先來看一段代碼。

#[inline(never)]
async fn x() -> usize {
    5
}

再簡單不過的一個 async 函數,只會返回一個 5,爲了防止被編譯器優化掉,我們給它加上了一個 #[inline(never)] 屬性。這個異步函數的等價同步代碼長這樣:

#[inline(never)]
fn x() -> impl Future<Output = usize> {
    async { 5 }
}

async fn 其實就是會返回一個 Future trait[4] 的函數。不過這一步轉化並沒有幫助我們更深地理解 async 關鍵字到底做了什麼。爲了一探究竟,我們可以嘗試看看上述代碼的 HIR[5] 長什麼樣。HIR 是 Rust 在編譯過程中的一箇中間產物,在轉化成更爲晦澀難懂的 MIR[6] 之前,它可以幫助我們一窺編譯器的小小細節。

cargo rustc -- -Z unpretty=hir

輸出如下(爲了方便展示,我做了一些格式上的調整):

#[inline(never)]
async fn x()
 -> /*impl Trait*/ #[lang = "from_generator"](move |mut _task_context| { { let _t = { 5 }; _t } } "lang = "from_generator"")

此時我們終於看到了 Rust 中異步語義實現的核心:generator。不過上面這個函數的內容還是過於貧瘠了,甚至都沒有涉及到今天文章的另一個主角 await。所以我們先在 x() 的基礎上再加一個 y()

#[inline(never)]
async fn x() -> i32 {
    5
}

async fn y() -> i32 {
    x().await
}

y() 也是一個異步函數,它會在內部調用 x().await,即在 x() 返回結果前 block 住自己,不進行後續的操作。雖然在本例中 x() 並沒有任何需要等待的操作,會直接返回 5,但在實際開發中,await 可能作用在各種各樣的 Future 上,諸如鎖的爭用,網絡 I/O 等,能夠在此類操作不能被立馬完成時提前返回並稍後再看也是異步編程的一個核心思想。此時我們再次輸出 HIR,可以發現內容果然豐富了許多。

#[inline(never)]
async fn x()
 -> /*impl Trait*/ #[lang = "from_generator"](move |mut _task_context| { { let _t = { 5 }; _t } } "lang = "from_generator"")

async fn y()
 -> /*impl Trait*/ #[lang = "from_generator"](move |mut _task_context|
   {
     {
       let _t =
       {
         match #[lang = "into_future"](x( "lang = "into_future""))
         {
           mut pinned
           =>
           loop {
             match unsafe
             {
               #[lang = "poll"](#[lang = "new_unchecked"](&mut pinned),
               #[lang = "get_context"](_task_context "lang = "get_context""))
           }
           {
             #[lang = "Ready"] {
               0result
             }
             =>
             break
             result,
             #[lang = "Pending"] {}
             =>
             {
             }
           }
           _task_context
           =
           (yield
             ());
         },
       }
     };
     _t
   }
})

爲了方便講解,我嘗試把上述代碼轉化成 Rust 僞代碼:

#[inline(never)]
fn x() -> impl Future<Output = usize> {
    from_generator(move |mut _task_context| {
        let _t = 5;
        _t
    })
}

fn y() -> impl Future<Output = usize> {
    from_generator(move |mut _task_context| {
        let mut pinned = into_future(x());
        loop {
            match unsafe {
                Pin::new_unchecked(&mut pinned).poll(_task_context.get_context());
            } {
                Poll::Ready(result) => break result,
                Poll::Pending => {}
            }
            yield
        }
    })
}

可以看到整個轉化主要乾了兩件事情:

這裏的大部分操作還是比較符合直覺的:因爲遇到了需要 await 完成的操作,所以運行一個循環去不停的獲取結果,完成後再繼續。注意到這裏,當 x 所代表的 Future 還沒有就緒時(即便在本例中並不會存在這種情況),loop 的運行會來到一個 yield 語句,而非 return。在開始闡述 generator 的 yield 之前,我們不妨先來思考一下,如果這裏使用了 break 或 return,會有什麼問題?

break 很好思考,loop 循環直接結束,如果 y 函數後續還有其它操作那麼就會被執行——這顯然不符合 await 的語義,我們需要 block 在當前的 Future 上,而不是忽略其結果繼續運行後續代碼。

那麼 return 呢?如果這個 Future 暫時不能 await 出結果,那麼我們爲了應該儘快完成上層函數的 poll 操作,不 block 當前 Executor 對其他 Future 的執行,直接返回一個 Poll::Pending——到目前爲止都沒什麼問題,但問題的關鍵在於,如果 y() 這個 Future 被 Waker 喚醒後,再次被 poll 的時候會發生什麼?它會把 await 之前的所有代碼都再運行一遍,這顯然也不是我們想要的。不論是操作系統的線程還是 Future 這種用戶態的 Task,我們想要的任務調度切換顯然是需要有一個 “斷點續傳” 的基本能力。對於系統線程來說,我們知道操作系統進行線程調度時,會將上下文信息保存好,以遍後續線程再次被運行時可以通過上下文切換再次恢復運行時的狀態。那麼 Rust 的異步是怎麼做到這一點的呢?答案就是 generator。

再來看一段代碼:

#![feature(generators, generator_trait)]

use std::ops::{Generator, GeneratorState};
use std::pin::Pin;

fn main() {
    let mut generator = || {
       let mut val = 1;
        yield val;
       val += 1;
        yield val;
       val += 1;
        return val;
    };

    match Pin::new(&mut generator).resume(()) {
        GeneratorState::Yielded(1) => {}
        _ => panic!("unexpected value from resume"),
    }
    match Pin::new(&mut generator).resume(()) {
        GeneratorState::Yielded(2) => {}
        _ => panic!("unexpected value from resume"),
    }
    match Pin::new(&mut generator).resume(()) {
        GeneratorState::Complete(3) => {}
        _ => panic!("unexpected value from resume"),
    }
}

可以看到 generator 擁有自己的狀態,當你在通過調用 resume() 方法來推進其執行狀態時,它不會從頭來過,而是從上一次 yield 的地方繼續向後執行,直到 return。上面的代碼會被轉換成類似下面的代碼:

#![feature(generators, generator_trait)]

use std::ops::{Generator, GeneratorState};
use std::pin::Pin;

fn main() {
    let mut generator = {
        enum MyGenerator {
            Start,
            Yield1(i32),
            Yield2(i32),
            Done,
        }

        impl Generator for MyGenerator {
            type Yield = i32;
            type Return = i32;

            fn resume(mut selfPin<&mut Self>, _resume()) -> GeneratorState<Self::Yield, Self::Return> {
                match std::mem::replace(&mut *self, MyGenerator::Done) {
                    MyGenerator::Start => {
                        let val = 1;
                        *self = MyGenerator::Yield1(val);
                        GeneratorState::Yielded(val)
                    }

                    MyGenerator::Yield1(val) => {
                        let new_val = val + 1;
                        *self = MyGenerator::Yield2(new_val);
                        GeneratorState::Yielded(new_val)
                    }

                    MyGenerator::Yield2(val) => {
                        let new_val = val + 1;
                        *self = MyGenerator::Done;
                        GeneratorState::Complete(new_val)
                    }

                    MyGenerator::Done => {
                        panic!("generator resumed after completion")
                    }
                }
            }
        }

        MyGenerator::Start
    };
    
    
    match Pin::new(&mut generator).resume(()) {
        GeneratorState::Yielded(1) => {}
        _ => panic!("unexpected value from resume"),
    }
    match Pin::new(&mut generator).resume(()) {
        GeneratorState::Yielded(2) => {}
        _ => panic!("unexpected value from resume"),
    }
    match Pin::new(&mut generator).resume(()) {
        GeneratorState::Complete(3) => {}
        _ => panic!("unexpected value from resume"),
    }
}

以上代碼可以被正常編譯通過,有興趣的話可以到 Rust Playground[7] 親自試一試。可以看到整體思路其實就是一個狀態機,每次 yield 就是一次對 enum 實現的狀態進行推進,直到最終狀態被完成。過程中與狀態相關的數據還會被存儲到對應的枚舉類型裏,以遍下一次被推進時使用。你可能已經注意到一個 generator 的 resume() 方法和 Future 的 poll 似乎有幾分神似——都要求方法的調用對象是 Pin 住的,且都會返回一個表示當前狀態的枚舉類型。那麼回到我們最開始的 x 和 y 函數部分,對應的 generator 代碼在接下來的 Rust 編譯過程中,也正是會被變成一個狀態機,來表示 Future 的推進狀態。僞代碼如下:

struct GeneratorY {
    statei32,
    task_contextContext<'static>,
    futuredyn Future<Output = Vec<i32>>,
}

impl Generator for GeneratorY {
    type Yield = ();
    type Return = i32;

    fn resume(mut selfPin<&mut Self>, resume()) -> GeneratorState<Self::Yield, Self::Return> {
        match self.state {
            0 => {
                self.task_context = Context::new();
                self.future = into_future(x());
                self.state = 1;
                self.resume(resume)
            }
            1 => {
                let result = loop {
                    if let Poll::Ready(result) =
                        Pin::new_unchecked(self.future.get_mut()).poll(self.task_context)
                    {
                        break result;
                    }
                    return GeneratorState::Yielded(());
                };
                self.state = 2;
                GeneratorState::Complete(result)
            }
            _ => panic!("GeneratorY polled with an invalid state"),
        }
    }
}

可以看到每一個 Future 的本質其實都是一個 Generator,兩者可以互相轉換,例如 x 函數其實也是一個 Generator,它的實現會比 y 函數簡單不少,畢竟只需要直接返回值,而沒有額外需要 await 進行 yield 的狀態。由於狀態機本身就實現了 Future 方法,所以 into_future 也只是簡單的進行了一個類型的轉化,代碼在這裏 [8]。具體的 Future trait 實現則在 from_generator 的過程中:

/// Wrap a generator in a future.
///
/// This function returns a `GenFuture` underneath, but hides it in `impl Trait` to give
/// better error messages (`impl Future` rather than `GenFuture<[closure.....]>`).
// This is `const` to avoid extra errors after we recover from `const async fn`
#[lang = "from_generator"]
#[doc(hidden)]
#[unstable(feature = "gen_future", issue = "50547")]
#[rustc_const_unstable(feature = "gen_future", issue = "50547")]
#[inline]
pub const fn from_generator<T>(genT) -> impl Future<Output = T::Return>
where
    TGenerator<ResumeTy, Yield = ()>,
{
    #[rustc_diagnostic_item = "gen_future"]
    struct GenFuture<TGenerator<ResumeTy, Yield = ()>>(T);

    // We rely on the fact that async/await futures are immovable in order to create
    // self-referential borrows in the underlying generator.
    impl<TGenerator<ResumeTy, Yield = ()>> !Unpin for GenFuture<T> {}

    impl<TGenerator<ResumeTy, Yield = ()>> Future for GenFuture<T> {
        type Output = T::Return;
        fn poll(selfPin<&mut Self>, cx&mut Context<'_>) -> Poll<Self::Output> {
            // SAFETY: Safe because we're !Unpin + !Drop, and this is just a field projection.
            let gen = unsafe { Pin::map_unchecked_mut(self, |s| &mut s.0) };

            // Resume the generator, turning the `&mut Context` into a `NonNull` raw pointer. The
            // `.await` lowering will safely cast that back to a `&mut Context`.
            match gen.resume(ResumeTy(NonNull::from(cx).cast::<Context<'static>>())) {
                GeneratorState::Yielded(()) => Poll::Pending,
                GeneratorState::Complete(x) => Poll::Ready(x),
            }
        }
    }

    GenFuture(gen)
}

from_generator 的源代碼 [9] 如上,可以看到 Future 轉換成 Generator 後的 poll 的實現就等於進行一次 generator 的 resume,獲得 GeneratorState::Yielded 即返回 Poll::Pending,獲得 GeneratorState::Complete(result) 即返回 Poll::Ready(result) ,Context 則是作爲 resume 的參數透傳給狀態機內部,整體邏輯還是非常清晰的。其中關於 Pin 的相關細節則是另一個比較繁雜的話題了,可以參考這篇博客進行學習:Rust 的 Pin 與 Unpin[10]。

參考

參考資料

[1] 原文地址《Rust 的 async/await 語法是怎樣工作的》: https://ipotato.me/article/70

[2] rfc: https://github.com/rust-lang/rfcs/blob/master/text/2394-async_await.md

[3] CPS: https://en.wikipedia.org/wiki/Continuation-passing_style

[4] Future trait: https://docs.rs/rustc-std-workspace-std/latest/std/future/trait.Future.html

[5] HIR: https://rustc-dev-guide.rust-lang.org/hir.html

[6] MIR: https://rustc-dev-guide.rust-lang.org/mir/index.html

[7] Rust Playground: https://play.rust-lang.org/?version=nightly&mode=debug&edition=2021&gist=49c5d4da4a94b7b8538457c3e4891ec2

[8] 這裏: https://github.com/rust-lang/rust/blob/master/library/core/src/future/into_future.rs

[9] from_generator 的源代碼: https://github.com/rust-lang/rust/blob/42313dd29b3edb0ab453a0d43d12876ec7e48ce0/library/core/src/future/mod.rs#L65

[10] Rust 的 Pin 與 Unpin: https://folyd.com/blog/rust-pin-unpin

[11] Inside Rust's Async Transform: https://blag.nemo157.com/2018/12/09/inside-rusts-async-transform.html

[12] Generators and async/await: https://cfsamson.github.io/books-futures-explained/4_generators_async_await.html#generators-and-asyncawait

[13] generators: https://doc.rust-lang.org/beta/unstable-book/language-features/generators.html

[14] How Rust optimizes async/await I: https://tmandry.gitlab.io/blog/posts/optimizing-await-1

[15] How Rust optimizes async/await II: Program analysis: https://tmandry.gitlab.io/blog/posts/optimizing-await-2

[16] Xuanwo's Note: Rust std/Future: https://note.xuanwo.io/#/page/rust%2Fstd%20future

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/ZGuqqFOcoUERMnGMtpNuIA