RUST 支持 Sync 的最簡 Queue 實現

以下是 RUST 標準庫 mpsc 的 queue 代碼分析,
代碼路徑:library/std/src/mpsc/mpsc_queue.rs
幾乎是見到的最簡單的一個支持多線程寫,單線程讀的隊列數據結構了。

//以下是簡單的FIFO的隊列實現

pub enum PopResult<T> {
    //返回隊列成員
    Data(T),
    //隊列爲空
    Empty,
    //隊列的一致性出錯
    Inconsistent,
}

//節點結構
struct Node<T> {
    //next指針,利用原子指針實現多線程的Sync,值得牢記
    next: AtomicPtr<Node<T>>,
    value: Option<T>,
}

///  能夠被多個線程操作的隊列
pub struct Queue<T> {
    //利用原子指針操作實現多線程的Sync,極大簡化了代碼
    head: AtomicPtr<Node<T>>,
    //從後面的代碼看,這裏實際上是隊列的頭部,這個Queue的代碼搞得奇怪
    tail: UnsafeCell<*mut Node<T>>,
}

unsafe impl<T: Send> Send for Queue<T> {}
unsafe impl<T: Send> Sync for Queue<T> {}

impl<T> Node<T> {
    unsafe fn new(v: Option<T>) -> *mut Node<T> {
        //申請堆內存後,將堆內存的指針提取出來
        Box::into_raw(box Node { next: AtomicPtr::new(ptr::null_mut()), value: v })
    }
}

impl<T> Queue<T> {
    pub fn new() -> Queue<T> {
        let stub = unsafe { Node::new(None) };
        //生成一個空元素的節點列表
        Queue { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) }
    }

    //在頭部
    pub fn push(&self, t: T) {
        unsafe {
            let n = Node::new(Some(t));
            //換成C的話,就是head->next = n; head = n
            //對於空隊列來說,是tail = head; head->next = n; head = n; 
            //現在tail實際上是隊列頭部,head是尾部。tail的next是第一個有意義的成員 
            let prev = self.head.swap(n, Ordering::AcqRel);
            //要考慮在兩個賦值中間加入了其他線程的操作是否會出問題,
            //這裏面有一個複雜的分析,
            //假設原隊列爲head, 有兩個線程分別插入新節點n,m
            //當n先執行,而m在這個代碼位置插入,則m插入前prev_n = pre_head, head = n
            //m插入後,prev_m = n, head = m。如果n先執行下面的語句,執行完後 
            // pre_head->next = n, n->next = null,然後m執行完下面語句
            // pre_head->next = n, n->next = m, head = m,隊列是正確的。
            // 如果m先執行,執行完後 pre_head->next = null, n->next = m, head = m;
            // 然後n執行,執行完成後 pre_head->next = n, n->next = m, head =m, 隊列是正確的。
            // 換成多個線程實際上也一樣是正確的。這個地方處理十分巧妙,這是系統級編程語言的魅
            //力, 當然,實際上是裸指針編程的魅力            
            (*prev).next.store(n, Ordering::Release);
            
        }
    }

    //僅有一個線程在pop
    pub fn pop(&self) -> PopResult<T> {
        unsafe {
            //tail實際上是隊列頭,value是None
            let tail = *self.tail.get();
            //tail的next是第一個有意義的成員
            let next = (*tail).next.load(Ordering::Acquire);

            //next如果爲空,說明隊列是空隊列
            if !next.is_null() {
                //此處原tail會被drop,tail被賦成next
                //因爲push只可能改變next,所以這裏不會有線程衝突問題
                //這個語句完成後,隊列是完整及一致的 
                *self.tail.get() = next;
                assert!((*tail).value.is_none());
                assert!((*next).value.is_some());
                //將value的所有權轉移出來,*next的value又重新置爲None
                //當tail == head的時候 就又都是stub了
                let ret = (*next).value.take().unwrap();
                //恢復Box,以便以後釋放堆內存
                let _: Box<Node<T>> = Box::from_raw(tail);
                return Data(ret);
            }

            //判斷是否出錯
            if self.head.load(Ordering::Acquire) == tail { Empty } else { Inconsistent }
        }
    }
}

impl<T> Drop for Queue<T> {
    fn drop(&mut self) {
        unsafe {
            //空隊列的stub也要釋放
            let mut cur = *self.tail.get();
            while !cur.is_null() {
                let next = (*cur).next.load(Ordering::Relaxed);
                //恢復Box並消費掉,釋放堆內存
                let _: Box<Node<T>> = Box::from_raw(cur);
                cur = next;
            }
        }
    }
}

以上摘錄自:深入 RUST 標準庫內核

https://github.com/Warrenren/inside-rust-std-library/

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/fmbu0jehcAz-ST_znDLnIw