CRust 學習筆記:手動實現 Channel-1

本系列文章是 Jon Gjengset 發佈的 CRust of Rust 系列視頻的學習筆記,CRust of Rust 是一系列持續更新的 Rust 中級教程。

在這篇文章中,我們將通過實現一個最基本的我們自己的 Channel。

標準庫的 std::sync::mpsc::channel 是一個異步通道,返回一對 sender/receiver,發送方發送的所有數據將在接收方上以與發送時相同的順序接收。可以將 Sender 克隆爲多個發送到同一通道,但只支持一個 Receiver。

讓我們開始構建自己的通道,命名爲 panama:

cargo new --lib panama

要實現 Channel 我們用到了三個標準庫的同步原語:std::sync::Mutex,std::sync::Arc,std::sync::Condvar。

use std::{
    collections::VecDeque,
    sync::{Arc, Mutex},
};

pub struct Sender<T> {
    innerArc<Inner<T>>,
}

pub struct Receiver<T> {
    innerArc<Inner<T>>,
}

struct Inner<T> {
    // 環形緩衝區的雙端隊列
    queueMutex<VecDeque<T>>,
}

pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
    let inner = Inner { queueMutex::default()};
    let inner = Arc::new(inner);
    (
        Sender {
            innerinner.clone(),
        },
        Receiver {
            innerinner.clone(),
        }
    )
}

這裏用到了 Rust 中常用的設計模式,對於需要共享的變量,通常都把它封裝成內部 struct,如:Inner。這裏存儲發送數據的隊列使用了 VecDeque,它是一個可增長的環形緩衝區實現的雙端隊列。這樣就可以在隊列的尾端 push 數據,在隊列的頭 pop 數據。

下面簡單實現發送數據和接收數據的方法:

impl<T> Sender<T> {
    pub fn send(&mut self, tT) {
        let mut queue = self.inner.queue.lock().unwrap();
        queue.push_back(t);
    }
}

impl<T> Receiver<T> {
    pub fn recv(&mut self) -> T {
        let mut queue = self.inner.queue.lock().unwrap();
        queue.pop_front().unwrap()
    }
}

接下來,我們深入思考一下,如果隊列中沒有數據,調用 recv() 方法的線程需要阻塞等待,直到隊列中有數據。而調用 send() 方法的線程需要去通知喚醒在 recv() 上阻塞等待的線程,隊列中有數據了,可以接收了。這裏就需要用到條件變量:std::sync::Condvar:

struct Inner<T> {
    // 環形緩衝區的雙端隊列
    queueMutex<VecDeque<T>>,

    // 隊列中是否有數據的條件變量 
    availableCondvar,
}
impl<T> Sender<T> {
    pub fn send(&mut self, tT) {
        let mut queue = self.inner.queue.lock().unwrap();
        queue.push_back(t);
        // 先drop鎖,然後通知喚醒阻塞的線程
        drop(queue);
        self.inner.available.notify_one();
    }
}

impl<T> Receiver<T> {
    pub fn recv(&mut self) -> T {
        let mut queue = self.inner.queue.lock().unwrap();
        loop {
            match queue.pop_front() {
                Some(t) => return t,
                None => {
                    // 隊列中沒有數據,阻塞等待
                    queue = self.inner.available.wait(queue).unwrap();
                }
            }
        }
    }
}

在第 7 行用到了 notify_one(),因爲我們實現的是 mpsc 隊列,即多個發送端,一個接收端隊列,只有一個 receiver 線程。

因爲可以有多個發送端,所以 Sender 還需要實現 Clone trait:

impl<T> Clone for Sender<T> {
    fn clone(&self) -> Self {
        Sender { 
            innerArc::clone(&self.inner),
        }
    }
}

下面我們寫個測試用例來檢驗一下:

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn ping_pong() {
        let (mut tx, mut rx) = channel();
        tx.send(42);
        assert_eq!(rx.recv(), 42);
    }
}

執行 cargo test,測試通過。

running 1 test
test panama::tests::ping_pong ... ok

這裏有個顯而易見的 Bug,當沒有任何 sender 或者 channel 關閉,這時執行 receiver 的 recv() 方法,線程會被永遠掛起。寫個測試用例來看看:

#[test]
fn closed() {
    let (tx, mut rx) = channel::<()>();
    drop(tx);
    let _ = rx.recv();
}

執行 cargo test 後,線程會被掛起。

在下一篇文章中,我們優化代碼來解決這個 Bug。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/miWf8MmoqaQ5zcGCdz6VXg