在 Rust 中如何有效管理多個 Future?

在這篇文章中,我們將探索 futures::stream::FuturesUnordered,這是一個強大而有效的工具,可以同時處理多個異步任務。這個工具允許我們以非阻塞的方式輪詢多個 Future,自動處理完成的單個任務,並在它們可用時產生結果。

讓我們創建一些異步函數來模擬不同的任務,並使用 FuturesUnordered 來併發地運行它們。首先新建一個 Rust 項目:

cargo new futuresunordered_example

然後向 Cargo.toml 文件添加所需的依賴項:

[dependencies]
tokio = { version = "1.0"features = ["full"] }
futures = "0.3"

接下來,創建異步函數:

use std::time::Duration;
use tokio::time::sleep;

async fn task_one() -> String {
    sleep(Duration::from_secs(3)).await;
    "Task one completed".to_owned()
}
async fn task_two() -> String {
    sleep(Duration::from_secs(1)).await;
    "Task two completed".to_owned()
}
async fn task_three() -> String {
    sleep(Duration::from_secs(2)).await;
    "Task three completed".to_owned()
}

我們定義了三個異步函數 task_one、task_two 和 task_three,每個函數都使用 tokio::time::sleep 模擬一個具有不同持續時間的任務。現在,我們將使用 FuturesUnordered 來併發運行這些任務:

use futures::stream::FuturesUnordered;
use futures::StreamExt;
use std::pin::Pin;
use std::future::Future;

#[tokio::main]
async fn main() {
    let mut tasks = FuturesUnordered::<Pin<Box<dyn Future<Output = String>>>>::new();
     tasks.push(Box::pin(task_one()));
    tasks.push(Box::pin(task_two()));
    tasks.push(Box::pin(task_three()));
    while let Some(result) = tasks.next().await {
        println!("{}", result);
    }
}

在代碼中使用 Box::pin 創建每個 future 的固定、堆分配版本。確保 future 的內存位置在它開始執行時不會改變。

當你運行這個示例時,你將看到任務是併發執行的,並且它們的結果在完成時打印出來。輸出的順序可能不同,因爲它取決於每個任務的完成時間:

Task two completed
Task three completed
Task one completed

在本例中,我們定義了三個異步函數 task_one、task_two 和 task_three,每個函數都使用 tokio::time::sleep 模擬一個具有不同持續時間的任務。我們創建了一個名爲 tasks 的 FuturesUnordered 實例,並將三個任務推入其中。

FuturesUnordered 是如何工作的

futures::stream::FuturesUnordered 背後的理論在於它的實現,它結合了高效的任務管理和 Rust 的異步編程模型。

下面是它的工作原理概述:

1,FuturesUnordered 是一個包含一組 Future 的數據結構。在內部,它維護着兩個 Future 鏈表——一個用於準備接受輪詢的 Future,另一個用於尚未準備好接受輪詢的 Future。

2,當你使用 push()將一個 Future 添加到 FuturesUnordered 時,它最初被放置在未準備好的 Future 列表中。然後,future 被註冊到一個異步運行時中,以便在它準備好進行調度時 (即當它準備好進行輪詢時) 得到通知。

3,當一個 Future 準備好了,它就從未準備好列表移到了準備好列表。這是通過異步運行時通知與 future 相關的喚醒器執行的。

4,FuturesUnordered 結構本身實現了 Stream 特徵。當你在 FuturesUnordered 實例上調用 next() 時,它將輪詢就緒列表中的 Future。

5,當就緒列表中有可用的 future 時,next().await 調用將解析到已完成的 future。如果沒有準備好的 future,它將異步地等待,直到 future 準備好。

6,當一個 future 被輪詢並完成時,它將從就緒列表中刪除,結果由 next().await 返回。如果 future 沒有完成,它仍然在準備清單上,與此同時,另一個 Future 可能會被輪詢。這個過程確保 FuturesUnordered 有效地同時處理多個 Future,利用可用資源並避免阻塞。

7,如果一個 future 被取消或刪除,它將從它各自的列表中刪除 (準備好或未準備好),確保與 future 相關的任何資源都被釋放。

8,只要在就緒或未就緒列表中有 Future,FuturesUnordered stream 就繼續返回值。一旦所有 Future 都完成或取消,FuturesUnordered stream 將返回 None,表明 stream 已經結束。

總結

FuturesUnordered 確保沒有一個 Future 被落下,每一個 Future 都能在時機成熟時成爲關注的焦點。

我們也分析了 FuturesUnordered 如何維護兩個列表,一個用於準備輪詢的 Futre,另一個用於仍在等待輪詢的 Future。

利用 FuturesUnordered 的功能,Rust 應用程序可以獲得更好的性能,有效地管理併發性,並最大限度地利用資源。


本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/CTckmj9osZFlDuWAJ8dS3w