Rust 併發編程番外篇: Mutex 內部實現

Mutex 是最常用的一種同步原語,它提供了互斥鎖的功能, 多線程可以互斥訪問共享數據以及通過鎖保護臨界區。Rust 標準庫提供了 Mutex 的實現,接下來我們看看它是怎麼實現的。

Mutex 的定義

Mutex包含三個字段。

一個是內部實現的鎖 (sys::Mutex),根據不同的操作系統,可能選擇不同的實現。 一個是poison,用來標記鎖是否被破壞,是否中毒了。 最後一個是data,用來存儲被保護的數據。

pub struct Mutex<T: ?Sized> {
    inner: sys::Mutex,
    poison: poison::Flag,
    data: UnsafeCell<T>,
}

impl<T> Mutex<T> {
    pub const fn new(t: T) -> Mutex<T> {
        Mutex { inner: sys::Mutex::new(), poison: poison::Flag::new(), data: UnsafeCell::new(t) }
    }
}

另外一個關聯的數據結構是MutexGuard, 它是Mutex的一個智能指針,用來管理鎖的生命週期。它實現了DerefDrop,所以可以通過*來訪問被保護的數據,當MutexGuard離開作用域時,會自動釋放鎖。

```rust
pub struct MutexGuard<'a, T: ?Sized + 'a> {
    lock: &'a Mutex<T>,
    poison: poison::Guard,
}

當請求鎖時,調用內部的sys::Mutex上鎖,並且返回一個MutexGuard,其實嚴格的說,返回一個LockResult,它是一個Result,當鎖中毒時,返回Err,否則返回OkOk中包含了MutexGuard

```rust
    pub fn lock(&self) -> LockResult<MutexGuard<'_, T>> {
        unsafe {
            self.inner.lock();
            MutexGuard::new(self)
        }
    }

    pub type LockResult<Guard> = Result<Guard, PoisonError<Guard>>;

poison如果你不太瞭解,可以看看這篇文章 [1],我們不進一步介紹它了。MutexGuard功能已經介紹了, 接下來我們看看它的實現:

impl<T: ?Sized> Deref for MutexGuard<'_, T> {
    type Target = T;

    fn deref(&self) -> &T {
        unsafe { &*self.lock.data.get() }
    }
}

impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
    fn deref_mut(&mut self) -> &mut T {
        unsafe { &mut *self.lock.data.get() }
    }
}

MutexGuard的解引用返回被保護的數據,返回的是一個引用和可變引用。

MutexGuard離開作用域時,會自動釋放鎖,它的實現如下, 可以看到它調用了self.lock.inner.unlock():

impl<T: ?Sized> Drop for MutexGuard<'_, T> {
    #[inline]
    fn drop(&mut self) {
        unsafe {
            self.lock.poison.done(&self.poison);
            self.lock.inner.unlock();
        }
    }
}

try_lock調用innertry_lock, 每什麼好說的:

    pub fn try_lock(&self) -> TryLockResult<MutexGuard<'_, T>> {
        unsafe {
            if self.inner.try_lock() {
                Ok(MutexGuard::new(self)?)
            } else {
                Err(TryLockError::WouldBlock)
            }
        }
    }

目前 rust 還提供了一個不穩定的方法unlock,主動的立即釋放鎖, 其實就是調用了drop(guard):

pub fn unlock(guard: MutexGuard<'_, T>) {
        drop(guard);
}

那麼看起來,鎖的主要邏輯是inner實現的,接下來我們看看inner的實現。

inner的類型是sys::Mutex, 它位於 library/std/src/sys/mod.rs[2], 根據操作系統的不同,有不同的實現,我們主要看 linux(unix) 的實現。

unix 的實現位於 /library/std/src/sys/unix/locks[3], 根據具體的操作系統,也有兩種實現方案,我們主要看 linux 的實現。

if #[cfg(any(
        target_os = "linux",
        target_os = "android",
        all(target_os = "emscripten"target_feature = "atomics"),
        target_os = "freebsd",
        target_os = "openbsd",
        target_os = "dragonfly",
    ))] {
        mod futex_mutex;
        mod futex_rwlock;
        mod futex_condvar;
        pub(crate) use futex_mutex::Mutex;
        pub(crate) use futex_rwlock::RwLock;
        pub(crate) use futex_condvar::Condvar;
    } ...

它有兩種實現futex_mutexpthread_mutex.rs, Linux 操作系統下,使用的是futex_mutex

futex_mutex會使用 Linux 操作系統的futex_waitfutex_wake系統調用。 它只包含一個futex字段,它是一個AtomicU32,用來表示鎖的狀態,它有三種狀態:

use crate::sync::atomic::{
    AtomicU32,
    Ordering::{Acquire, Relaxed, Release},
};
use crate::sys::futex::{futex_wait, futex_wake};

pub struct Mutex {
    futex: AtomicU32,
}

new創建一個未加鎖的 Muext, 而try_lock會嘗試將futex從 0 改爲 1,如果成功,表示加鎖成功,否則失敗。 注意這裏使用了 AcquireRelaxed Ordering,交換成功新值內存可見 (Acquire),失敗無所謂了 (Relaxed):

impl Mutex {
    #[inline]
    pub const fn new() -> Self {
        Self { futex: AtomicU32::new(0) }
    }

    #[inline]
    pub fn try_lock(&self) -> bool {
        self.futex.compare_exchange(0, 1, Acquire, Relaxed).is_ok()
    }

    ...
}

接下來是重頭戲`lock`:

```rust
impl Mutex {
    ...
    #[inline]
    pub fn lock(&self) {
        if self.futex.compare_exchange(0, 1, Acquire, Relaxed).is_err() {
            self.lock_contended(); // 如果第一次搶不到鎖,進入鎖競爭場景的處理
        }
    }

    #[cold]
    fn lock_contended(&self) {
        // 獲取鎖的狀態
        let mut state = self.spin();

        // 如果未加速,去搶!
        if state == 0 {
            match self.futex.compare_exchange(0, 1, Acquire, Relaxed) {
                Ok(_) =return, // 搶成功,返回
                Err(s) =state = s, // 失敗,把當前的狀態賦值給state
            }
        }

        loop {
            // 運氣來了,在多個人搶一個未加鎖的鎖時,你,命中天子搶到了鎖,並且把鎖狀態從0改爲2
            if state != 2 && self.futex.swap(2, Acquire) == 0 {
                // 成功把鎖的狀態從0改成了2,獲取了鎖,返回
                return;
            }

            // 等待鎖狀態變化,如果鎖狀態不是2,就不等了
            futex_wait(&self.futex, 2, None);

            // 別的線程釋放了鎖,spin檢查鎖的狀態,再集中精力繼續搶
            state = self.spin();
        }
    }

    fn spin(&self) -> u32 {
        let mut spin = 100;
        loop {
            // 獲取當前鎖的狀態, 可能是0,1,2三種狀態
            let state = self.futex.load(Relaxed);

            // 如果是未加鎖,或者是有其它線程等待,則返回
            // 如果spin次數用完了,也返回
            if state != 1 || spin == 0 {
                return state;
            }

            crate::hint::spin_loop();
            spin -= 1;
        }
    }
    ...

}

總得來說,搶鎖的線程通過 spin, 避免上下文切換,能夠提高性能。如果 spin 次數用完了,就進入等待狀態,等待其它線程釋放鎖,然後再搶。

剩下一個方法就是解鎖了。解鎖比較簡單,就是把鎖的狀態從 1 或者 2 改爲 0,如果原來是 2,表示還有其它線程等待,就喚醒一個。

impl Mutex {
    ...
    pub unsafe fn unlock(&self) {
        if self.futex.swap(0, Release) == 2 {
            // 如果還有等待的線程,就喚醒一個
            self.wake();
        }
    }

    #[cold]
    fn wake(&self) {
        futex_wake(&self.futex);
    }
    ...
}

可以看到,Rust 的 Mutex 的實現還是比較簡單的,它的核心是利用操作系統的futex相關方法,加一個AtomicU32標誌來實現。

參考資料

[1]

這篇文章: https://nomicon.purewhite.io/poisoning.html

[2]

library/std/src/sys/mod.rs: https://github.com/rust-lang/rust/blob/8acf40bd546258a70953187c950ea56d1a8f5bf2/library/std/src/sys/mod.rs

[3]

/library/std/src/sys/unix/locks: https://github.com/rust-lang/rust/blob/8acf40bd546258a70953187c950ea56d1a8f5bf2/library/std/src/sys/unix/locks/futex_mutex.rs

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/ieIeEN6kwhwFEagk2bK39Q