Rust 中 channel 的使用
關於 Rust 中的 channel
Rust 的channel
是一種用於在不同線程間傳遞信息的通信機制,它實現了線程間的消息傳遞。
Channel 允許在 Rust 中創建一個消息傳遞渠道,它返回一個元組結構體,其中包含發送和接收端。發送端用於向通道發送數據,而接收端則用於從通道接收數據。
每個channel
由兩部分組成:發送端(Sender
)和接收端(Receiver
)。
發送端用於向channel
發送消息,而接收端則用於接收這些消息。這種機制允許線程之間的安全通信,避免了共享內存的複雜性和潛在的數據競爭問題。 (通過通信來共享內存, 而非通過共享內存來通信)
Rust 的channel
爲線程間通信提供了一種安全、簡單的方式,是構建併發應用的基礎工具之一。
channel
是 Rust 標準庫的一部分,自 Rust 1.0 版本以來就包含了這個功能。隨着 Rust 語言和標準庫的發展,channel
的實現和 API 可能會有所改進,但其基本概念和用法保持一致。
使用方式
基本步驟如下:
-
創建: 使用
std::sync::mpsc::channel()
函數創建一個新的channel
,這個函數返回一個包含發送端 (Sender
) 和接收端 (Receiver
) 的元組。 -
發送: 使用發送端的
send
方法發送消息。send
方法接受一個消息值,如果接收端已經被丟棄,會返回一個錯誤。 -
接收: 使用接收端的
recv
方法接收消息。recv
會阻塞當前線程直到一個消息可用,或者channel
被關閉。
示例
以下是一個使用channel
在兩個線程間發送和接收消息的簡單例子:
use std::sync::mpsc;
use std::thread;
fn main() {
// 創建一個channel
let (tx, rx) = mpsc::channel();
// 創建一個新線程,並向其中發送一個消息
thread::spawn(move || {
let msg = "Hello from the thread";
tx.send(msg).unwrap();
println!("Sent message: {}", msg);
});
// 在主線程中接收消息
let received = rx.recv().unwrap();
println!("Received message: {}", received);
}
上面例子展示了channel
的基本方法:先創建一個channel
,然後在一個新線程中發送一個字符串消息,並在主線程中接收這個消息。
注意: 發送端tx
通過move
關鍵字移動到新線程中,這是因爲 Rust 的所有權規則要求確保使用數據的線程擁有該數據的所有權。
關於 MPSC
其中 mpsc 是Multi producer, Single consumer FIFO queue
的縮寫, 即多生產者單消費者先入先出隊列
Rust 標準庫提供的 channel 是 MPSC(多生產者,單消費者)模型,這意味着可以有多個發送端(Sender
)向同一個接收端(Receiver
)發送消息。這種模式非常適用於工作隊列模型,其中多個生產者線程生成任務,而單個消費者線程處理這些任務。
除了 MPSC 之外, 還有如下幾種模型:
-
SPSC(Single Producer Single Consumer): 單生產者單消費者。
-
SPMC(Single Producer Multiple Consumer): 單生產者多消費者。
-
MPSC(Multi Producer Single Consumer): 多生產者單消費者, Rust 中標準的 mpsc 模型。
-
MPMC(Multi Producer Multi Consumer)*: 多生產者多消費者。
MPSC是標準庫中使用的模型
不需要阻塞嗎?
主線程是否會立馬結束退出程序?
在上面的示例中,如果主線程執行得太快,有可能在接收到 子線程發送消息之前就結束了,沒打印出接收到的內容程序就退出了.
但事實上, 並沒有發生這種現象. 即便在新進程段添加休眠 3s 的代碼,thread::sleep(std::time::Duration::from_secs(3));
, 程序也不會提早退出.
關於 Rust 中程序的休眠, 可參考 Rust 中程序休眠的幾種方式
這是因爲,recv
方法是阻塞的,即 它會阻塞當前線程, 直到從通道中接收到消息。
因此,在上面例子中,主線程在調用rx.recv().unwrap()
時會阻塞 等待消息的到來。一旦子線程通過tx.send(msg).unwrap();
發送了消息,主線程會接收到這個消息並繼續執行,之後程序纔會正常退出。
探索更多阻塞方式
可以使用join
方法,來確保主線程等待一個或多個子線程完成執行。這在處理多個線程時特別有用。
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
// 創建一個新線程,並保存其句柄
let handle = thread::spawn(move || {
let msg = "Hello from the thread";
tx.send(msg).unwrap();
println!("Sent message: {}", msg);
});
// 在主線程中接收消息
let received = rx.recv().unwrap();
println!("Received message: {}", received);
// 使用join等待子線程完成
handle.join().unwrap();
}
thread::spawn
返回一個JoinHandle
,通過調用這個句柄的join
方法來確保主線程在子線程完成其執行之後才繼續執行
但是因爲recv方法
本身就是阻塞的,已經確保了主線程會等待至少一個消息的到來,這時再使用join
看起來沒有太大必要。
但當有多個線程執行獨立任務,且這些任務不一定涉及到主線程立即需要的通道通信時,join 的作用就變得非常明顯了, 如下示例展示瞭如何創建多個線程,並使用join
確保它們都完成了工作:
use std::thread;
use std::time::Duration;
fn main() {
// 創建一個向量來存儲子線程的句柄
let mut handles = vec![];
for i in 0..10 {
// 創建10個子線程
let handle = thread::spawn(move || {
println!("Thread {} is starting", i);
println!("--------------");
// 模擬工作負載,耗時1s
thread::sleep(Duration::from_secs(1));
println!("Thread {} has finished", i);
println!("~~~~~~~~~~~~~~");
});
handles.push(handle);
}
// 等待所有子線程完成
for handle in handles {
handle.join().unwrap();
}
println!("All threads have finished");
}
輸出:
Thread 0 is starting
--------------
Thread 1 is starting
--------------
Thread 3 is starting
--------------
Thread 2 is starting
--------------
Thread 4 is starting
--------------
Thread 5 is starting
--------------
Thread 6 is starting
--------------
Thread 7 is starting
--------------
Thread 9 is starting
--------------
Thread 8 is starting
-------------- (到此都是立刻打印出來; 下面的輸出等1s後一股腦打印出來)
Thread 0 has finished
~~~~~~~~~~~~~~
Thread 1 has finished
Thread 2 has finished
Thread 5 has finished
~~~~~~~~~~~~~~
~~~~~~~~~~~~~~
Thread 4 has finished
~~~~~~~~~~~~~~
Thread 6 has finished
~~~~~~~~~~~~~~
Thread 3 has finished
~~~~~~~~~~~~~~
~~~~~~~~~~~~~~
Thread 7 has finished
~~~~~~~~~~~~~~
Thread 8 has finished
~~~~~~~~~~~~~~
Thread 9 has finished
~~~~~~~~~~~~~~
All threads have finished
在這個例子中創建了 10 個子線程,每個子線程都模擬執行一些操作,然後在主線程中使用一個循環來join
這些線程。
通過這種方式,即使這些子線程並沒有向主線程發送任何消息,仍然能夠確保它們都完成了各自的工作,然後程序纔會退出。這就是join
在處理多個線程時的優勢所在。
使用join
確保主線程等待所有子線程完成其任務,這在處理並行計算、執行多個獨立任務時特別重要,因爲這些任務可能不會立即或根本不會向主線程報告其完成狀態。在這種情況下,如果沒有使用join
,主線程可能會在子線程完成它們的工作之前結束,導致程序提前退出,而且可能留下未完成的後臺工作。
Rust channel 的更多高階用法
Rust 中的 channel 不僅僅支持簡單的消息傳遞,還可以用於實現更復雜的併發模式和高級用法。這些用法可以增加程序的靈活性和性能,特別是在處理大量數據、多線程任務或需要高度並行的場景中。
選擇性接收(Select)
在處理多個 channel 時,可能希望能夠選擇性地接收多個來源的消息。
Rust 的標準庫目前並沒有直接支持 select 機制,但是crossbeam-channel
庫提供了這樣的功能,使得可以從多個 channel 中選擇性地接收消息。
use crossbeam_channel::{select, unbounded};
use std::thread;
fn main() {
let (tx1, rx1) = unbounded();
let (tx2, rx2) = unbounded();
thread::spawn(move || {
tx1.send(1).unwrap();
});
thread::spawn(move || {
tx2.send(2).unwrap();
});
select! {
recv(rx1) -> msg => println!("Received {} from rx1", msg.unwrap()),
recv(rx2) -> msg => println!("Received {} from rx2", msg.unwrap()),
}
}
cargo add crossbeam_channel
添加依賴庫,
而後多次 cargo run
, 可以發現, 會在Received 1 from rx1
和和Received 2 from rx2
中隨機打印其中一個
如上代碼演示瞭如何在 Rust 中使用crossbeam-channel
庫實現選擇性接收(select)機制。該機制允許程序從多個不同的 channel 中接收消息,而不是被限制在單一的 channel 上等待。這是通過select!
宏來實現的,它可以監聽多個 channel,並在任一 channel 接收到消息時立即響應。
具體來說,代碼的功能如下:
-
引入庫:首先,引入了
crossbeam_channel
的select
和unbounded
,以及std::thread
。crossbeam_channel
是一個提供了高性能 channel 實現的外部庫,包括了 select 機制。unbounded
用於創建一個無界(unbounded)的 channel,即沒有容量限制的 channel。 -
創建無界 channel:通過調用
unbounded()
函數,創建了兩個無界 channel,分別是tx1/rx1
和tx2/rx2
。這裏,tx1
和tx2
是發送端(Sender
),而rx1
和rx2
是接收端(Receiver
)。 -
發送消息:接下來,創建了兩個線程,每個線程向各自的 channel 發送一個整數消息,第一個線程通過
tx1
發送1
,第二個線程通過tx2
發送2
。這兩個線程是並行執行的,因此發送操作是異步的。 -
選擇性接收消息:
select!
宏用於同時監聽rx1
和rx2
這兩個接收端。當任一 channel 接收到消息時,select!
宏會立即匹配到相應的分支並執行。這裏有兩個recv
調用,分別對應兩個接收端。一旦任一接收端接收到消息,對應的代碼塊就會執行,並打印出接收到的消息及其來源。msg.unwrap()
用於獲取Result
類型中的消息值,前提是沒有發生錯誤。
代碼中的select!
宏使得程序不必在單一的 channel 上阻塞等待,而是可以靈活地處理來自多個源的消息。這種模式在需要處理多個異步事件源時非常有用,例如在網絡服務器或併發系統中處理來自不同客戶端或任務的輸入。
有點類似 Go 的 select 語句
迭代器接收
Receiver
實現了Iterator
,這意味着可以使用迭代器的方式接收所有可用的消息,直到 channel 被關閉。這種方式簡化了接收端的代碼,特別是當需要處理所有消息而不必關心接收的具體時機時。
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
for i in 1..=5 {
tx.send(i).unwrap();
}
});
// 通過迭代器接收消息
for received in rx {
println!("Received: {}", received);
}
}
輸出:
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/arI9sip-5JH9YSu45XJ83w