Rust 中 channel 的使用

關於 Rust 中的 channel

Rust 的channel是一種用於在不同線程間傳遞信息的通信機制,它實現了線程間的消息傳遞。

Channel 允許在 Rust 中創建一個消息傳遞渠道,它返回一個元組結構體,其中包含發送和接收端。發送端用於向通道發送數據,而接收端則用於從通道接收數據。

每個channel由兩部分組成:發送端(Sender)和接收端(Receiver)。

發送端用於向channel發送消息,而接收端則用於接收這些消息。這種機制允許線程之間的安全通信,避免了共享內存的複雜性和潛在的數據競爭問題。 (通過通信來共享內存, 而非通過共享內存來通信)

Rust 的channel爲線程間通信提供了一種安全、簡單的方式,是構建併發應用的基礎工具之一。

channel是 Rust 標準庫的一部分,自 Rust 1.0 版本以來就包含了這個功能。隨着 Rust 語言和標準庫的發展,channel的實現和 API 可能會有所改進,但其基本概念和用法保持一致。

使用方式

基本步驟如下:

  1. 創建: 使用std::sync::mpsc::channel()函數創建一個新的channel,這個函數返回一個包含發送端 (Sender) 和接收端 (Receiver) 的元組。

  2. 發送: 使用發送端的send方法發送消息。send方法接受一個消息值,如果接收端已經被丟棄,會返回一個錯誤。

  3. 接收: 使用接收端的recv方法接收消息。recv會阻塞當前線程直到一個消息可用,或者channel被關閉。

示例

以下是一個使用channel在兩個線程間發送和接收消息的簡單例子:

use std::sync::mpsc;
use std::thread;

fn main() {
    // 創建一個channel
    let (tx, rx) = mpsc::channel();

    // 創建一個新線程,並向其中發送一個消息
    thread::spawn(move || {
        let msg = "Hello from the thread";
        tx.send(msg).unwrap();
        println!("Sent message: {}", msg);
    });

    // 在主線程中接收消息
    let received = rx.recv().unwrap();
    println!("Received message: {}", received);
}

上面例子展示了channel的基本方法:先創建一個channel,然後在一個新線程中發送一個字符串消息,並在主線程中接收這個消息。

注意: 發送端tx通過move關鍵字移動到新線程中,這是因爲 Rust 的所有權規則要求確保使用數據的線程擁有該數據的所有權。

關於 MPSC

其中 mpsc 是Multi producer, Single consumer FIFO queue的縮寫, 即多生產者單消費者先入先出隊列

Rust 標準庫提供的 channel 是 MPSC(多生產者,單消費者)模型,這意味着可以有多個發送端(Sender)向同一個接收端(Receiver)發送消息。這種模式非常適用於工作隊列模型,其中多個生產者線程生成任務,而單個消費者線程處理這些任務。

除了 MPSC 之外, 還有如下幾種模型:

MPSC是標準庫中使用的模型

不需要阻塞嗎?

主線程是否會立馬結束退出程序?

在上面的示例中,如果主線程執行得太快,有可能在接收到 子線程發送消息之前就結束了,沒打印出接收到的內容程序就退出了.

但事實上, 並沒有發生這種現象. 即便在新進程段添加休眠 3s 的代碼,thread::sleep(std::time::Duration::from_secs(3));, 程序也不會提早退出.

關於 Rust 中程序的休眠, 可參考 Rust 中程序休眠的幾種方式

這是因爲,recv方法是阻塞的,即 它會阻塞當前線程, 直到從通道中接收到消息。

因此,在上面例子中,主線程在調用rx.recv().unwrap()時會阻塞 等待消息的到來。一旦子線程通過tx.send(msg).unwrap();發送了消息,主線程會接收到這個消息並繼續執行,之後程序纔會正常退出。

探索更多阻塞方式

可以使用join方法,來確保主線程等待一個或多個子線程完成執行。這在處理多個線程時特別有用。

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    // 創建一個新線程,並保存其句柄
    let handle = thread::spawn(move || {
        let msg = "Hello from the thread";
        tx.send(msg).unwrap();
        println!("Sent message: {}", msg);
    });

    // 在主線程中接收消息
    let received = rx.recv().unwrap();
    println!("Received message: {}", received);

    // 使用join等待子線程完成
    handle.join().unwrap();
}

thread::spawn返回一個JoinHandle,通過調用這個句柄的join方法來確保主線程在子線程完成其執行之後才繼續執行

但是因爲recv方法本身就是阻塞的,已經確保了主線程會等待至少一個消息的到來,這時再使用join看起來沒有太大必要。

但當有多個線程執行獨立任務,且這些任務不一定涉及到主線程立即需要的通道通信時,join 的作用就變得非常明顯了, 如下示例展示瞭如何創建多個線程,並使用join確保它們都完成了工作:

use std::thread;
use std::time::Duration;

fn main() {
    // 創建一個向量來存儲子線程的句柄
    let mut handles = vec![];

    for i in 0..10 {
        // 創建10個子線程
        let handle = thread::spawn(move || {
            println!("Thread {} is starting", i);
            println!("--------------");
            // 模擬工作負載,耗時1s
            thread::sleep(Duration::from_secs(1));
            println!("Thread {} has finished", i);
            println!("~~~~~~~~~~~~~~");
        });
        handles.push(handle);
    }


    // 等待所有子線程完成
    for handle in handles {
        handle.join().unwrap();
    }

    println!("All threads have finished");
}

輸出:

Thread 0 is starting
--------------
Thread 1 is starting
--------------
Thread 3 is starting
--------------
Thread 2 is starting
--------------
Thread 4 is starting
--------------
Thread 5 is starting
--------------
Thread 6 is starting
--------------
Thread 7 is starting
--------------
Thread 9 is starting
--------------
Thread 8 is starting
-------------- (到此都是立刻打印出來; 下面的輸出等1s後一股腦打印出來)
Thread 0 has finished
~~~~~~~~~~~~~~
Thread 1 has finished
Thread 2 has finished
Thread 5 has finished
~~~~~~~~~~~~~~
~~~~~~~~~~~~~~
Thread 4 has finished
~~~~~~~~~~~~~~
Thread 6 has finished
~~~~~~~~~~~~~~
Thread 3 has finished
~~~~~~~~~~~~~~
~~~~~~~~~~~~~~
Thread 7 has finished
~~~~~~~~~~~~~~
Thread 8 has finished
~~~~~~~~~~~~~~
Thread 9 has finished
~~~~~~~~~~~~~~
All threads have finished

在這個例子中創建了 10 個子線程,每個子線程都模擬執行一些操作,然後在主線程中使用一個循環來join這些線程。

通過這種方式,即使這些子線程並沒有向主線程發送任何消息,仍然能夠確保它們都完成了各自的工作,然後程序纔會退出。這就是join在處理多個線程時的優勢所在。

使用join確保主線程等待所有子線程完成其任務,這在處理並行計算、執行多個獨立任務時特別重要,因爲這些任務可能不會立即或根本不會向主線程報告其完成狀態。在這種情況下,如果沒有使用join,主線程可能會在子線程完成它們的工作之前結束,導致程序提前退出,而且可能留下未完成的後臺工作。

Rust channel 的更多高階用法

Rust 中的 channel 不僅僅支持簡單的消息傳遞,還可以用於實現更復雜的併發模式和高級用法。這些用法可以增加程序的靈活性和性能,特別是在處理大量數據、多線程任務或需要高度並行的場景中。

選擇性接收(Select)

在處理多個 channel 時,可能希望能夠選擇性地接收多個來源的消息。

Rust 的標準庫目前並沒有直接支持 select 機制,但是crossbeam-channel庫提供了這樣的功能,使得可以從多個 channel 中選擇性地接收消息。

use crossbeam_channel::{select, unbounded};
use std::thread;

fn main() {
    let (tx1, rx1) = unbounded();
    let (tx2, rx2) = unbounded();

    thread::spawn(move || {
        tx1.send(1).unwrap();
    });

    thread::spawn(move || {
        tx2.send(2).unwrap();
    });

    select{
        recv(rx1) -> msg => println!("Received {} from rx1", msg.unwrap()),
        recv(rx2) -> msg => println!("Received {} from rx2", msg.unwrap()),
    }
}

cargo add crossbeam_channel 添加依賴庫,

而後多次 cargo run, 可以發現, 會在Received 1 from rx1和Received 2 from rx2中隨機打印其中一個

如上代碼演示瞭如何在 Rust 中使用crossbeam-channel庫實現選擇性接收(select)機制。該機制允許程序從多個不同的 channel 中接收消息,而不是被限制在單一的 channel 上等待。這是通過select!宏來實現的,它可以監聽多個 channel,並在任一 channel 接收到消息時立即響應。

具體來說,代碼的功能如下:

  1. 引入庫:首先,引入了crossbeam_channelselectunbounded,以及std::threadcrossbeam_channel是一個提供了高性能 channel 實現的外部庫,包括了 select 機制。unbounded用於創建一個無界(unbounded)的 channel,即沒有容量限制的 channel。

  2. 創建無界 channel:通過調用unbounded()函數,創建了兩個無界 channel,分別是tx1/rx1tx2/rx2。這裏,tx1tx2是發送端(Sender),而rx1rx2是接收端(Receiver)。

  3. 發送消息:接下來,創建了兩個線程,每個線程向各自的 channel 發送一個整數消息,第一個線程通過tx1發送1,第二個線程通過tx2發送2。這兩個線程是並行執行的,因此發送操作是異步的。

  4. 選擇性接收消息select!宏用於同時監聽rx1rx2這兩個接收端。當任一 channel 接收到消息時,select!宏會立即匹配到相應的分支並執行。這裏有兩個recv調用,分別對應兩個接收端。一旦任一接收端接收到消息,對應的代碼塊就會執行,並打印出接收到的消息及其來源。msg.unwrap()用於獲取Result類型中的消息值,前提是沒有發生錯誤。

代碼中的select!宏使得程序不必在單一的 channel 上阻塞等待,而是可以靈活地處理來自多個源的消息。這種模式在需要處理多個異步事件源時非常有用,例如在網絡服務器或併發系統中處理來自不同客戶端或任務的輸入。

有點類似 Go 的 select 語句

迭代器接收

Receiver實現了Iterator,這意味着可以使用迭代器的方式接收所有可用的消息,直到 channel 被關閉。這種方式簡化了接收端的代碼,特別是當需要處理所有消息而不必關心接收的具體時機時。

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        for i in 1..={
            tx.send(i).unwrap();
        }
    });

    // 通過迭代器接收消息
    for received in rx {
        println!("Received: {}", received);
    }
}

輸出:

Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/arI9sip-5JH9YSu45XJ83w