Rust 異步組合器

今天,我們來看看如何使用 Rust 的異步組合器,以及它們如何讓你的代碼更實用、更乾淨。

你可能想知道: 組合器與異步有什麼關係?

Future 和 Stream 特徵有兩個朋友,FutureExt 和 StreamExt 特徵。這兩個 trait 分別爲 Future 和 Stream 類型添加了組合器。

FutureExt

then

在第一個異步函數完成後調用一個返回 Future 的函數:

 1async fn compute_a() -> i64 {
 2    40
 3}
 4
 5async fn compute_b(a: i64) -> i64 {
 6    a + 2
 7}
 8
 9let b = compute_a().then(compute_b).await;
10// b = 42

map

通過調用非異步函數將 Future 的輸出轉換爲不同的類型:

1async fn get_port() -> String {
2    // ...
3}
4
5fn parse_port() -> Result<u16, Error> {
6    // ...
7}
8
9let port: Result<u16, Error> = get_port().map(parse_port).await;

flatten

將一個 Future 的 Future(例如 Future<Output=Future<Output=String>>) 合併爲一個簡單的 Future (Future<Output=String>)。

1let nested_future = async { async { 42 } };
2
3let f = nested_future.flatten();
4let forty_two = f.await;

into_stream

將一個 future 轉換爲單個元素的 stream:

1let f = async { 42 };
2let stream = f.into_stream();

StreamExt

正如我們所看到的,流就像異步迭代器,這就是爲什麼你會發現相同的組合器,比如 filter, fold, for_each, map 等等。

此外,還有一些特定的組合器可用於併發處理元素:for_each_concurrent 和 buffer_unordered。

兩者之間的區別在於 buffer_unordered 產生一個需要被消費的流,而 for_each_concurrent 實際上要消費這個流。

這裏有一個簡單的例子:

 1use futures::{stream, StreamExt};
 2use rand::{thread_rng, Rng};
 3use std::time::Duration;
 4
 5#[tokio::main(flavor = "multi_thread")]
 6async fn main() {
 7    stream::iter(0..200u64)
 8        .for_each_concurrent(20, |number| async move {
 9            let mut rng = thread_rng();
10            let sleep_ms: u64 = rng.gen_range(0..20);
11            tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
12            println!("{}", number);
13        })
14        .await;
15}
8
17
7
0
1
10
16
6
9
12
......

打印的數字沒有順序表明作業是併發執行的。

在 Rust 異步編程中,流及其併發組合器取代了其他語言中的工作池。工作池通常用於併發處理作業,如 HTTP 請求、文件散列等。實際上,工作者池最常見的挑戰是收集工作的計算結果。

有三種方法可以使用 Streams 來替換工作池來收集結果。記住,始終要設置併發任務數量的上限。否則,可能會很快耗盡系統的資源,從而影響性能。

使用 buffer_unordered 和 collect

collect 也可以在 Streams 上使用,將它們轉換爲一個集合。

1// Concurrent stream method 1: Using buffer_unordered + collect
2let subdomains: Vec<Subdomain> = stream::iter(subdomains.into_iter())
3    .map(|subdomain| ports::scan_ports(ports_concurrency, subdomain))
4    .buffer_unordered(subdomains_concurrency)
5    .collect()
6    .await;

由於使用了 buffer_unordered,這些 Futures 可以併發執行。流最終通過. collect().await 轉換成 Vec。

使用 Arc<Mutex>

 1// Concurrent stream method 2: Using an Arc<Mutex<T>>
 2let res: Arc<Mutex<Vec<Subdomain>>> = Arc::new(Mutex::new(Vec::new()));
 3
 4stream::iter(subdomains.into_iter())
 5    .for_each_concurrent(subdomains_concurrency, |subdomain| {
 6        let res = res.clone();
 7        async move {
 8            let subdomain = ports::scan_ports(ports_concurrency, subdomain).await;
 9            res.lock().await.push(subdomain)
10        }
11    })
12    .await;

使用 Channels

 1// Concurrent stream method 3: using channels
 2let (input_tx, input_rx) = mpsc::channel(concurrency);
 3let (output_tx, output_rx) = mpsc::channel(concurrency);
 4
 5tokio::spawn(async move {
 6    for port in MOST_COMMON_PORTS_100 {
 7        let _ = input_tx.send(*port).await;
 8    }
 9});
10
11let input_rx_stream = tokio_stream::wrappers::ReceiverStream::new(input_rx);
12input_rx_stream
13    .for_each_concurrent(concurrency, |port| {
14        let subdomain = subdomain.clone();
15        let output_tx = output_tx.clone();
16        async move {
17            let port = scan_port(&subdomain.domain, port).await;
18            if port.is_open {
19                let _ = output_tx.send(port).await;
20            }
21        }
22    })
23    .await;
24
25// close channel
26drop(output_tx);
27
28let output_rx_stream = tokio_stream::wrappers::ReceiverStream::new(output_rx);
29let open_ports: Vec<Port> = output_rx_stream.collect().await;

本文翻譯自:

https://kerkour.com/rust-async-combinators

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/ihUpOWIB6YUtI1fJh9-o4Q