CRust 學習筆記:手動實現 Channel-1
本系列文章是 Jon Gjengset 發佈的 CRust of Rust 系列視頻的學習筆記,CRust of Rust 是一系列持續更新的 Rust 中級教程。
在這篇文章中,我們將通過實現一個最基本的我們自己的 Channel。
標準庫的 std::sync::mpsc::channel 是一個異步通道,返回一對 sender/receiver,發送方發送的所有數據將在接收方上以與發送時相同的順序接收。可以將 Sender 克隆爲多個發送到同一通道,但只支持一個 Receiver。
讓我們開始構建自己的通道,命名爲 panama:
cargo new --lib panama
要實現 Channel 我們用到了三個標準庫的同步原語:std::sync::Mutex,std::sync::Arc,std::sync::Condvar。
use std::{
collections::VecDeque,
sync::{Arc, Mutex},
};
pub struct Sender<T> {
inner: Arc<Inner<T>>,
}
pub struct Receiver<T> {
inner: Arc<Inner<T>>,
}
struct Inner<T> {
// 環形緩衝區的雙端隊列
queue: Mutex<VecDeque<T>>,
}
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Inner { queue: Mutex::default()};
let inner = Arc::new(inner);
(
Sender {
inner: inner.clone(),
},
Receiver {
inner: inner.clone(),
}
)
}
這裏用到了 Rust 中常用的設計模式,對於需要共享的變量,通常都把它封裝成內部 struct,如:Inner。這裏存儲發送數據的隊列使用了 VecDeque,它是一個可增長的環形緩衝區實現的雙端隊列。這樣就可以在隊列的尾端 push 數據,在隊列的頭 pop 數據。
下面簡單實現發送數據和接收數據的方法:
impl<T> Sender<T> {
pub fn send(&mut self, t: T) {
let mut queue = self.inner.queue.lock().unwrap();
queue.push_back(t);
}
}
impl<T> Receiver<T> {
pub fn recv(&mut self) -> T {
let mut queue = self.inner.queue.lock().unwrap();
queue.pop_front().unwrap()
}
}
接下來,我們深入思考一下,如果隊列中沒有數據,調用 recv() 方法的線程需要阻塞等待,直到隊列中有數據。而調用 send() 方法的線程需要去通知喚醒在 recv() 上阻塞等待的線程,隊列中有數據了,可以接收了。這裏就需要用到條件變量:std::sync::Condvar:
struct Inner<T> {
// 環形緩衝區的雙端隊列
queue: Mutex<VecDeque<T>>,
// 隊列中是否有數據的條件變量
available: Condvar,
}
impl<T> Sender<T> {
pub fn send(&mut self, t: T) {
let mut queue = self.inner.queue.lock().unwrap();
queue.push_back(t);
// 先drop鎖,然後通知喚醒阻塞的線程
drop(queue);
self.inner.available.notify_one();
}
}
impl<T> Receiver<T> {
pub fn recv(&mut self) -> T {
let mut queue = self.inner.queue.lock().unwrap();
loop {
match queue.pop_front() {
Some(t) => return t,
None => {
// 隊列中沒有數據,阻塞等待
queue = self.inner.available.wait(queue).unwrap();
}
}
}
}
}
在第 7 行用到了 notify_one(),因爲我們實現的是 mpsc 隊列,即多個發送端,一個接收端隊列,只有一個 receiver 線程。
因爲可以有多個發送端,所以 Sender 還需要實現 Clone trait:
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Sender {
inner: Arc::clone(&self.inner),
}
}
}
下面我們寫個測試用例來檢驗一下:
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ping_pong() {
let (mut tx, mut rx) = channel();
tx.send(42);
assert_eq!(rx.recv(), 42);
}
}
執行 cargo test,測試通過。
running 1 test
test panama::tests::ping_pong ... ok
這裏有個顯而易見的 Bug,當沒有任何 sender 或者 channel 關閉,這時執行 receiver 的 recv() 方法,線程會被永遠掛起。寫個測試用例來看看:
#[test]
fn closed() {
let (tx, mut rx) = channel::<()>();
drop(tx);
let _ = rx.recv();
}
執行 cargo test 後,線程會被掛起。
在下一篇文章中,我們優化代碼來解決這個 Bug。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/miWf8MmoqaQ5zcGCdz6VXg