RUST 支持 Sync 的最簡 Queue 實現
以下是 RUST 標準庫 mpsc 的 queue 代碼分析,
代碼路徑:library/std/src/mpsc/mpsc_queue.rs
幾乎是見到的最簡單的一個支持多線程寫,單線程讀的隊列數據結構了。
//以下是簡單的FIFO的隊列實現
pub enum PopResult<T> {
//返回隊列成員
Data(T),
//隊列爲空
Empty,
//隊列的一致性出錯
Inconsistent,
}
//節點結構
struct Node<T> {
//next指針,利用原子指針實現多線程的Sync,值得牢記
next: AtomicPtr<Node<T>>,
value: Option<T>,
}
/// 能夠被多個線程操作的隊列
pub struct Queue<T> {
//利用原子指針操作實現多線程的Sync,極大簡化了代碼
head: AtomicPtr<Node<T>>,
//從後面的代碼看,這裏實際上是隊列的頭部,這個Queue的代碼搞得奇怪
tail: UnsafeCell<*mut Node<T>>,
}
unsafe impl<T: Send> Send for Queue<T> {}
unsafe impl<T: Send> Sync for Queue<T> {}
impl<T> Node<T> {
unsafe fn new(v: Option<T>) -> *mut Node<T> {
//申請堆內存後,將堆內存的指針提取出來
Box::into_raw(box Node { next: AtomicPtr::new(ptr::null_mut()), value: v })
}
}
impl<T> Queue<T> {
pub fn new() -> Queue<T> {
let stub = unsafe { Node::new(None) };
//生成一個空元素的節點列表
Queue { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) }
}
//在頭部
pub fn push(&self, t: T) {
unsafe {
let n = Node::new(Some(t));
//換成C的話,就是head->next = n; head = n
//對於空隊列來說,是tail = head; head->next = n; head = n;
//現在tail實際上是隊列頭部,head是尾部。tail的next是第一個有意義的成員
let prev = self.head.swap(n, Ordering::AcqRel);
//要考慮在兩個賦值中間加入了其他線程的操作是否會出問題,
//這裏面有一個複雜的分析,
//假設原隊列爲head, 有兩個線程分別插入新節點n,m
//當n先執行,而m在這個代碼位置插入,則m插入前prev_n = pre_head, head = n
//m插入後,prev_m = n, head = m。如果n先執行下面的語句,執行完後
// pre_head->next = n, n->next = null,然後m執行完下面語句
// pre_head->next = n, n->next = m, head = m,隊列是正確的。
// 如果m先執行,執行完後 pre_head->next = null, n->next = m, head = m;
// 然後n執行,執行完成後 pre_head->next = n, n->next = m, head =m, 隊列是正確的。
// 換成多個線程實際上也一樣是正確的。這個地方處理十分巧妙,這是系統級編程語言的魅
//力, 當然,實際上是裸指針編程的魅力
(*prev).next.store(n, Ordering::Release);
}
}
//僅有一個線程在pop
pub fn pop(&self) -> PopResult<T> {
unsafe {
//tail實際上是隊列頭,value是None
let tail = *self.tail.get();
//tail的next是第一個有意義的成員
let next = (*tail).next.load(Ordering::Acquire);
//next如果爲空,說明隊列是空隊列
if !next.is_null() {
//此處原tail會被drop,tail被賦成next
//因爲push只可能改變next,所以這裏不會有線程衝突問題
//這個語句完成後,隊列是完整及一致的
*self.tail.get() = next;
assert!((*tail).value.is_none());
assert!((*next).value.is_some());
//將value的所有權轉移出來,*next的value又重新置爲None
//當tail == head的時候 就又都是stub了
let ret = (*next).value.take().unwrap();
//恢復Box,以便以後釋放堆內存
let _: Box<Node<T>> = Box::from_raw(tail);
return Data(ret);
}
//判斷是否出錯
if self.head.load(Ordering::Acquire) == tail { Empty } else { Inconsistent }
}
}
}
impl<T> Drop for Queue<T> {
fn drop(&mut self) {
unsafe {
//空隊列的stub也要釋放
let mut cur = *self.tail.get();
while !cur.is_null() {
let next = (*cur).next.load(Ordering::Relaxed);
//恢復Box並消費掉,釋放堆內存
let _: Box<Node<T>> = Box::from_raw(cur);
cur = next;
}
}
}
}
以上摘錄自:深入 RUST 標準庫內核
https://github.com/Warrenren/inside-rust-std-library/
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/fmbu0jehcAz-ST_znDLnIw