Rust 併發控制之 Channel
Rust 官方sync
包中提供了mpsc
模式的 (多生產者,單消費者:multi-producer, single-consumer) channel,可以實現基於消息併發控制,而不是依賴控制內存共享(加鎖)。這正是 go 語言作者 R. Pike
所推崇的方式:
Don't communicate by sharing memory; share memory by communicating. (R. Pike)
今天就聊聊mpsc
提供的sync_channel
和channel
。
文章目錄
-
規則
-
sync_channel - spsc
-
sync_channel - mpsc
-
channel
-
併發安全
規則
首先一般 channel 機制都保證了
-
生產者(producer/sender) 可以發送(send)消息,消費者(consumer/receiver)可以接受(recv)消息,生產和消費的順序一致(一般都有消息隊列保證順序
FIFO
) -
消費者在沒有消息可接收前會阻塞等待,直到有消息或 channel 關閉
-
channel 可以限制同時可處理消息上限(buffer size)
-
生產者發送的消息累積到 buffer 上限時就要阻塞到有消息被消費
從這些規則中,可以看出,channel 保證了生產總是先於消費,消息處理總是先進先出(FIFO
)。
sync_channel - spsc
buffer size 最特別的情況就是 0,就是單生產者單消費者模式 (mpsc
):send 後會阻塞,直到有 recv 處理,才能再 send 下一個消息。
這就能很好的實現對併發順序的控制, 比如下邊代碼,用兩組 channel 實現 1 和 2 的交替打印。
不同 channel 的 send 和 recv 交叉等待,保證了打印的順序,就像這中間持有鎖一樣
use std::sync::mpsc::sync_channel;
use std::thread;
fn main() {
let (sender, receiver) = sync_channel(0);
let (sender2, receiver2) = sync_channel(0);
let cnt = 3;
let t1 = thread::spawn({
move || {
for _ in 0..cnt {
print!("1 ");
// t1打印完,通知t2的receiver打印
sender.send(2).unwrap();
// 阻塞,等待t2打印結束
receiver2.recv().unwrap();
}
}
});
let t2 = thread::spawn({
move || {
for _ in 0..cnt {
// 阻塞,等待t1 sender的已打印的消息
receiver.recv().unwrap();
print!("2 ");
// t2打印完, 給t1 receiver2通知可以進行下一次打印
sender2.send(1).unwrap();
}
}
});
t1.join().unwrap();
t2.join().unwrap();
}
sync_channel - mpsc
buffer size 增加,就是正常mpsc
摸式,可以控制同時能併發的上限(實際內部提前分配了數組來支持 buffer)。
達到上限,sender 就需要等待有 receiver 消費才能夠繼續發送消息。
當然沒消息的話,別忘了 drop 也是可以結束 recv 一直等待消息的。
如下邊代碼所示:
use std::sync::mpsc::sync_channel;
use std::thread;
fn main() {
let (sender, receiver) = sync_channel(3);
let sender2 = sender.clone();
let sender3 = sender.clone();
thread::spawn(move || sender.send(1).unwrap());
thread::spawn(move || sender2.clone().send(2).unwrap());
drop(sender3); // 這裏保證了第三個recv打印能成功
println!("{:?}", receiver.recv().unwrap());
println!("{:?}", receiver.recv().unwrap());
println!("{:?}", receiver.recv());
}
channel
明白了sync_channel
,channel
就簡單了,就是 buffer size 無限模式(實際是內部維護了一個鏈表自動擴容)。所有的 send 都不會阻塞,只有 recv 在沒消息時需要阻塞等待 channel 中產生新的消息。
use std::sync::mpsc::{channel, sync_channel};
use std::thread;
fn main() {
// let (sender, receiver) = sync_channel(1); // buffer爲1的話,不會打印send no block
let (sender, receiver) = channel(); // 使用channel,send不阻塞,會打印
thread::spawn(move || {
sender.send(1).unwrap();
sender.send(2).unwrap();
sender.send(3).unwrap();
println!("send no block");
});
println!("{:?}", receiver.recv().unwrap());
}
如果想及時 check 是否能 recv 消息時,可以用try_recv
-
TryRecvError::Empty
代表目前爲空,但 channel 連接還在 -
TryRecvError::Disconnected
則是連接已關閉,不可能再受到消息了
use std::sync::mpsc::{channel, Receiver, RecvError, TryRecvError};
fn main() {
let (sender, receiver) = channel();
fn try_recv_with_log(receiver: &Receiver<i32>) {
match receiver.try_recv() {
Ok(v) => println!("{:?}", v),
Err(TryRecvError::Empty) => println!("error: Empty"),
Err(TryRecvError::Disconnected) => println!("error: Disconnected"),
}
}
// error: Empty
try_recv_with_log(&receiver);
sender.send(1).unwrap();
receiver.recv().unwrap();
drop(sender);
// error: Disconnected
try_recv_with_log(&receiver);
}
併發安全
unsafe impl<T: Send> Send for Sender<T> {}
unsafe impl<T: Send> Sync for Sender<T> {}
unsafe impl<T: Send> Send for Receiver<T> {}
impl<T> !Sync for Receiver<T> {}
最後來看看 rust 如何保證 channel 的併發安全
Sender<T>
同時支持Send
和Sync
,其維護的消息隊列可以安全的在線程間傳遞所有權,也可以了共享引用,即可以被多個線程同時進行 send 操作。
其中T
需要實現Send
, 以確保消息可以在線程間安全傳遞所有權,避免競爭條件或使用已釋放的內存
而Receiver<T>
只支持 Send,只能在線程間傳遞自身所有權,但不能在線程間共享引用。同時只能有一個線程擁有其所有權,進而獨佔的去消費Sender<T>
的消息隊列。
依舊是巧妙的通過Send
和Sync
標記 trait 保證了併發的安全,輕鬆實現無畏併發。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/u4QGSskvGPv_iIV9nggikQ