Rust 併發編程 - 基本併發原語

本章主要介紹 Arc、Mutex、RWMutex、Once、Barrier、Convar、LazyCell、LazyLock、 Exclusive、mpsc Channel、atomic 原子操作,第三方庫實現的信號量、SingleFlight、Waitgroup 等在後面的章節中介紹。

同步是多線程程序中的一個重要概念。在多線程環境下, 多個線程可能同時訪問某個共享資源, 這就可能導致數據競爭或者數據不一致的問題。爲了保證數據安全, 需要進行同步操作。

常見的同步需求包括:

爲了實現這些同步需求, 就需要使用同步原語。常見的同步原語有互斥鎖、信號量、條件變量等。

互斥鎖可以保證同一時刻只有一個線程可以訪問共享資源。信號量可以限制同時訪問的線程數。條件變量可以實現線程間的通信和協調。這些同步原語的使用可以避免同步問題, 幫助我們正確有效地處理多線程之間的同步需求。

Go 併發編程電子書已經更新到第 5 章:

Arc

Arc已改放在前一章的,這一章補上。我這裏介紹的時候分類不一定精確,只是方便給大家介紹各種庫和併發原語,不用追求分類的準確性。

Rust 的 Arc 代表原子引用計數(Atomic Reference Counting),是一種用於多線程環境的智能指針。它允許在多個地方共享數據,同時確保線程安全性。Arc 的全稱是std::sync::Arc,屬於標準庫的一部分。

在 Rust 中,通常情況下,變量是被所有權管理的,但有時候我們需要在多個地方共享數據。這就是Arc的用武之地。它通過在堆上分配內存,並使用引用計數來跟蹤數據的所有者數量,確保在不需要的時候正確地釋放資源。

下面是一個簡單的例子,演示瞭如何使用Arc

use std::sync::Arc;
use std::thread;

fn main() {
    // 創建一個可共享的整數
    let data = Arc::new(46);

    // 創建兩個線程,共享對data的引用
    let thread1 = {
        let data = Arc::clone(&data);
        thread::spawn(move || {
            // 在線程中使用data
            println!("Thread 1: {}", data);
        })
    };

    let thread2 = {
        let data = Arc::clone(&data);
        thread::spawn(move || {
            // 在另一個線程中使用data
            println!("Thread 2: {}", data);
        })
    };

    // 等待兩個線程完成
    thread1.join().unwrap();
    thread2.join().unwrap();
}

Arc(原子引用計數)和 Rc(引用計數)都是 Rust 中用於多所有權的智能指針,但它們有一些關鍵的區別。

總之你記住在多線程的情況下使用Arc,單線程的情況下使用Rc就好了。

當你需要在多線程環境中共享可變數據時,常常會結合使用ArcMutexMutex(互斥鎖)用於確保在任意時刻只有一個線程能夠訪問被鎖定的數據。下面是一個簡單的例子,演示瞭如何使用ArcMutex來在多線程中共享可變數據:

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    // 創建一個可共享的可變整數
    let counter = Arc::new(Mutex::new(0));

    // 創建多個線程來增加計數器的值
    let mut handles = vec![];

    for _ in 0..5 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            // 獲取鎖,確保只有一個線程能夠訪問計數器
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }

    // 等待所有線程完成
    for handle in handles {
        handle.join().unwrap();
    }

    // 打印最終的計數器值
    println!("Final count: {}", *counter.lock().unwrap());
}

ArcRefCell 結合使用的場景通常發生在多線程中需要共享可變狀態,但又不需要互斥鎖的場合。RefCell允許在運行時進行借用檢查,因此在單線程環境下使用時,它不會像 Mutex 那樣引入鎖的開銷。

以下是一個使用 ArcRefCell 的簡單例子,演示了在多線程環境中共享可變狀態, 注意這個例子只是用來演示,我們並不期望 num 的最終結果和上面的例子一樣:

use std::sync::{Arc};
use std::cell::RefCell;
use std::thread;

fn main() {
    // 創建一個可共享的可變整數
    let counter = Arc::new(RefCell::new(0));

    // 創建多個線程來增加計數器的值
    let mut handles = vec![];

    for _ in 0..5 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            // 使用RefCell獲取可變引用,確保運行時借用檢查
            let mut num = counter.borrow_mut();
            *num += 1;
        });
        handles.push(handle);
    }

    // 等待所有線程完成
    for handle in handles {
        handle.join().unwrap();
    }

    // 打印最終的計數器值
    println!("Final count: {}", *counter.borrow());
}

互斥鎖 Mutex

互斥鎖歷史悠久,在很多編程語言中都有實現。

Mutex 是 Rust 中的互斥鎖,用於解決多線程併發訪問共享數據時可能出現的競態條件。Mutex 提供了一種機制,只有擁有鎖的線程才能訪問被鎖定的數據,其他線程必須等待鎖的釋放。

Lock

在標準庫中,Mutex 位於 std::sync 模塊下。下面是一個簡單的例子,演示瞭如何使用 Mutex:

use std::sync::{Mutex, Arc};
use std::thread;

fn main() {
    // 創建一個可共享的可變整數
    let counter = Arc::new(Mutex::new(0));

    // 創建多個線程來增加計數器的值
    let mut handles = vec![];

    for _ in 0..5 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            // 獲取鎖,確保只有一個線程能夠訪問計數器
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }

    // 等待所有線程完成
    for handle in handles {
        handle.join().unwrap();
    }

    // 打印最終的計數器值
    println!("Final count: {}", *counter.lock().unwrap());
}

在這個例子中,counter 是一個 Mutex 保護 (且包裝) 的可變整數,然後使用 Arc 來多線程共享。在每個線程中,通過 counter.lock().unwrap() 獲取鎖,確保一次只有一個線程能夠修改計數器的值。這樣可以確保在併發情況下不會發生競態條件。

需要注意的是,lock 方法返回一個 MutexGuard,它是一個智能指針,實現了 DerefDrop trait。當 MutexGuard 被銷燬時,會自動釋放鎖,確保在任何情況下都能正確釋放鎖。

這裏注意三個知識點:

目前 nightly 版本的 rust 提供了一個實驗性的方法unlock, 功能和drop一樣,也是釋放互斥鎖。

try_lock

Mutex 的 try_lock 方法嘗試獲取鎖,如果鎖已經被其他線程持有,則立即返回 Err 而不是阻塞線程。這對於在嘗試獲取鎖時避免線程阻塞很有用。

以下是一個使用 try_lock 的簡單例子:

use std::sync::{Mutex, Arc};
use std::thread;

fn main() {
    // 創建一個可共享的可變整數
    let counter = Arc::new(Mutex::new(0));

    // 創建多個線程來增加計數器的值
    let mut handles = vec![];

    for _ in 0..5 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            // 嘗試獲取鎖,如果獲取失敗就繼續嘗試或者放棄
            if let Ok(mut num) = counter.try_lock() {
                *num += 1;
            } else {
                println!("Thread failed to acquire lock.");
            }
        });
        handles.push(handle);
    }

    // 等待所有線程完成
    for handle in handles {
        handle.join().unwrap();
    }

    // 打印最終的計數器值
    println!("Final count: {}", *counter.lock().unwrap());
}

在這個例子中,try_lock 方法被用於嘗試獲取鎖。如果獲取成功,線程就可以修改計數器的值,否則它會打印一條消息表示沒有獲取到鎖。

需要注意的是,try_lock 方法返回一個 Result,如果獲取鎖成功,返回 Ok 包含 MutexGuard,否則返回 Err。這使得你可以根據獲取鎖的結果執行不同的邏輯。

Poisoning

在 Rust 中,poisoning 是一種用於處理線程 panic 導致的不可恢復的狀態的機制。這個概念通常與 MutexRwLock 相關。當一個線程在持有鎖的情況下 panic 時,這就會導致鎖進入一種不一致的狀態,因爲鎖的內部狀態可能已經被修改,而沒有機會進行清理。爲了避免這種情況,Rust 的標準庫使用 poisoning 機制 (形象的比喻)。具體來說,在 MutexRwLock 中,當一個線程在持有鎖的時候 panic,鎖就會被標記爲 poisoned。此後任何線程嘗試獲取這個鎖時,都會得到一個 PoisonError,它包含一個標識鎖是否被 poisoned 的標誌。這樣,線程可以檢測到之前的 panic,並進行相應的處理。

Mutex 通過在 LockResult 中包裝 PoisonError 來表示這種情況。具體來說,LockResultErr 分支是一個 PoisonError,其中包含一個 MutexGuard。你可以通過 into_inner 方法來獲取 MutexGuard,然後繼續操作。

以下是一個簡單的例子,演示了鎖的 "poisoning",以及如何處理:

use std::sync::{Mutex, Arc, LockResult, PoisonError};
use std::thread;

fn main() {
    // 創建一個可共享的可變整數
    let counter = Arc::new(Mutex::new(0));

    // 創建多個線程來增加計數器的值
    let mut handles = vec![];

    for _ in 0..5 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            // 獲取鎖
            let result: LockResult<_> = counter.lock();

            // 嘗試獲取鎖,如果獲取失敗就打印錯誤信息
            match result {
                Ok(mut num) ={
                    *num += 1;
                    // 模擬 panic
                    if *num == 3 {
                        panic!("Simulated panic!");
                    }
                }
                Err(poisoned) ={
                    // 鎖被 "poisoned",處理錯誤
                    println!("Thread encountered a poisoned lock: {:?}", poisoned);

                    // 獲取 MutexGuard,繼續操作
                    let mut num = poisoned.into_inner();
                    *num += 1;
                }
            }
        });
        handles.push(handle);
    }

    // 等待所有線程完成
    for handle in handles {
        handle.join().unwrap();
    }

    // 打印最終的計數器值
    println!("Final count: {}", *counter.lock().unwrap());
}

在這個例子中,當計數器的值達到 3 時,一個線程故意引發了 panic,其他線程在嘗試獲取鎖時就會得到一個 PoisonError。在錯誤處理分支,我們打印錯誤信息,然後使用 into_inner 方法獲取 MutexGuard,以確保鎖被正確釋放。這樣其他線程就能夠繼續正常地使用鎖。

更快的釋放互斥鎖

前面說了,因爲MutexGuard實現了Drop了,所以鎖可以自動釋放,可是如果鎖的 scope 太大,我們想盡快的釋放,該怎麼辦呢?

第一種方式你可以通過創建一個新的內部的作用域 (scope) 來達到類似手動釋放 Mutex 的效果。在新的作用域中,MutexGuard 將在離開作用域時自動釋放鎖。這是通過作用域的離開而觸發的 Drop trait 的實現。:

use std::sync::{Mutex, Arc};
use std::thread;

fn main() {
    // 創建一個可共享的可變整數
    let counter = Arc::new(Mutex::new(0));

    // 創建多個線程來增加計數器的值
    let mut handles = vec![];

    for _ in 0..5 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            // 進入一個新的作用域!!!!!!!!!!!!!!
            {
                // 獲取鎖
                let mut num = counter.lock().unwrap();
                *num += 1;
                // MutexGuard 在這個作用域結束時自動釋放鎖
            }

            // 在這裏,鎖已經被釋放
            // 這裏可以進行其他操作
        });
        handles.push(handle);
    }

    // 等待所有線程完成
    for handle in handles {
        handle.join().unwrap();
    }

    // 打印最終的計數器值
    println!("Final count: {}", *counter.lock().unwrap());
}

第二種方法就是主動drop或者unlock, 以下是一個演示手動釋放 Mutex 的例子:

use std::sync::{Mutex, Arc};
use std::thread;

fn main() {
    // 創建一個可共享的可變整數
    let counter = Arc::new(Mutex::new(0));

    // 創建多個線程來增加計數器的值
    let mut handles = vec![];

    for _ in 0..5 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            // 獲取鎖
            let mut num = counter.lock().unwrap();
            *num += 1;

            // 手動釋放鎖!!!!!!!!
            drop(num);
        });
        handles.push(handle);
    }

    // 等待所有線程完成
    for handle in handles {
        handle.join().unwrap();
    }

    // 打印最終的計數器值
    println!("Final count: {}", *counter.lock().unwrap());
}

Mutex是可重入鎖嗎?應該不是,但是官方文檔把它標記爲未定義的行爲 [1],所以不要試圖在同一個線程中獲取兩次鎖, 如果你想使用可重入鎖,請使用我將來要介紹的第三方併發庫。同樣需要注意的是讀寫鎖RWMutex

讀寫鎖 RWMutex

RWMutex 是 Rust 中的讀寫鎖(Read-Write Lock),允許多個線程同時獲取共享數據的讀取訪問權限,但在寫入時會排他。這意味着多個線程可以同時讀取數據,但只有一個線程能夠寫入數據,且寫入時不允許其他線程讀取或寫入。

讀寫鎖一般使用在下面的場景中:

以下是使用 RWMutex的例子 :

use std::sync::{RwLock, Arc};
use std::thread;

fn main() {
    // 創建一個可共享的可變整數,使用RwLock包裝
    let counter = Arc::new(RwLock::new(0));

    // 創建多個線程來讀取計數器的值
    let mut read_handles = vec![];

    for _ in 0..3 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            // 獲取讀取鎖
            let num = counter.read().unwrap();
            println!("Reader {}: {}", thread::current().id(), *num);
        });
        read_handles.push(handle);
    }

    // 創建一個線程來寫入計數器的值
    let write_handle = thread::spawn(move || {
        // 獲取寫入鎖
        let mut num = counter.write().unwrap();
        *num += 1;
        println!("Writer {}: Incremented counter to {}", thread::current().id(), *num);
    });

    // 等待所有讀取線程完成
    for handle in read_handles {
        handle.join().unwrap();
    }

    // 等待寫入線程完成
    write_handle.join().unwrap();
}

它的使用和互斥鎖類似,只不過需要調用read()方法獲得讀鎖,使用write()方法獲得寫鎖。

讀寫鎖有以下的性質:

如果一個線程已經持有讀鎖,而另一個線程請求寫鎖,它必須等待讀鎖被釋放。這確保在寫入操作進行時,沒有其他線程能夠同時持有讀鎖。寫鎖確保了對共享數據的寫入操作是獨佔的。

use std::sync::{RwLock, Arc};
use std::thread;

fn main() {
    // 創建一個可共享的可變整數,使用RwLock包裝
    let counter = Arc::new(RwLock::new(0));

    // 創建一個線程持有讀鎖
    let read_handle = {
        let counter = Arc::clone(&counter);
        thread::spawn(move || {
            // 獲取讀鎖
            let num = counter.read().unwrap();
            println!("Reader {}: {}", thread::current().id(), *num);

            // 休眠模擬讀取操作
            thread::sleep(std::time::Duration::from_secs(10));
        })
    };

    // 創建一個線程請求寫鎖
    let write_handle = {
        let counter = Arc::clone(&counter);
        thread::spawn(move || {
            // 休眠一小段時間,確保讀鎖已經被獲取
            thread::sleep(std::time::Duration::from_secs(1));

            // 嘗試獲取寫鎖
            // 注意:這裏會等待讀鎖被釋放
            let mut num = counter.write().unwrap();
            *num += 1;
            println!("Writer {}: Incremented counter to {}", thread::current().id(), *num);
        })
    };

    // 等待讀取線程和寫入線程完成
    read_handle.join().unwrap();
    write_handle.join().unwrap();
}

更進一步,在寫鎖請求後,再有新的讀鎖請求進來,它是在等待寫鎖釋放?還是直接獲得讀鎖?答案是等待寫鎖釋放,看下面的例子:

    // 創建一個可共享的可變整數,使用RwLock包裝
    let counter = Arc::new(RwLock::new(0));

    // 創建一個線程持有讀鎖
    let read_handle = {
        let counter = counter.clone();
        thread::spawn(move || {
            // 獲取讀鎖
            let num = counter.read().unwrap();
            println!("Reader#1: {}", *num);

            // 休眠模擬讀取操作
            thread::sleep(std::time::Duration::from_secs(10));
        })
    };

    // 創建一個線程請求寫鎖
    let write_handle = {
        let counter = counter.clone();
        thread::spawn(move || {
            // 休眠一小段時間,確保讀鎖已經被獲取
            thread::sleep(std::time::Duration::from_secs(1));

            // 嘗試獲取寫鎖
            let mut num = counter.write().unwrap();
            *num += 1;
            println!("Writer : Incremented counter to {}",  *num);
        })
    };

    // 創建一個線程請求讀鎖
    let read_handle_2 = {
        let counter = counter.clone();
        thread::spawn(move || {
            // 休眠一小段時間,確保寫鎖已經被獲取
            thread::sleep(std::time::Duration::from_secs(2));

            // 嘗試獲取讀鎖
            let num = counter.read().unwrap();
            println!("Reader#2: {}", *num);
        })
    };

    // 等待讀取線程和寫入線程完成
    read_handle.join().unwrap();
    write_handle.join().unwrap();
    read_handle_2.join().unwrap();

死鎖是一種併發編程中的常見問題,可能發生在 RwLock 使用不當的情況下。一個典型的死鎖場景是,一個線程在持有讀鎖的情況下嘗試獲取寫鎖,而其他線程持有寫鎖並嘗試獲取讀鎖,導致彼此相互等待。

以下是一個簡單的例子,演示了可能導致 RwLock 死鎖的情況:

use std::sync::{RwLock, Arc};
use std::thread;

fn main() {
    // 創建一個可共享的可變整數,使用RwLock包裝
    let counter = Arc::new(RwLock::new(0));

    // 創建一個線程持有讀鎖,嘗試獲取寫鎖
    let read_and_write_handle = {
        let counter = Arc::clone(&counter);
        thread::spawn(move || {
            // 獲取讀鎖
            let num = counter.read().unwrap();
            println!("Reader {}: {}", thread::current().id(), *num);

            // 嘗試獲取寫鎖,這會導致死鎖
            let mut num = counter.write().unwrap();
            *num += 1;
            println!("Reader {}: Incremented counter to {}", thread::current().id(), *num);
        })
    };

    // 創建一個線程持有寫鎖,嘗試獲取讀鎖
    let write_and_read_handle = {
        let counter = Arc::clone(&counter);
        thread::spawn(move || {
            // 獲取寫鎖
            let mut num = counter.write().unwrap();
            *num += 1;
            println!("Writer {}: Incremented counter to {}", thread::current().id(), *num);

            // 嘗試獲取讀鎖,這會導致死鎖
            let num = counter.read().unwrap();
            println!("Writer {}: {}", thread::current().id(), *num);
        })
    };

    // 等待線程完成
    read_and_write_handle.join().unwrap();
    write_and_read_handle.join().unwrap();
}

和 Mutex 一樣, RwLock 在 panic 時也會變爲中毒狀態。但是請注意, 只有在 RwLock 被獨佔式寫入鎖住時發生 panic, 它纔會中毒。如果在任意 reader 中發生 panic, 該鎖則不會中毒。

原因是:

所以綜上, RwLock 只會在獨佔式寫入時發生 panic 時中毒。而 reader panic 不會導致中毒。這是由 RwLock 讀寫鎖語義決定的。

這種機制可以避免不必要的中毒, 因爲非獨佔的讀鎖之間不會互相影響, 其中任一個鎖持有者 panic 不應影響其他讀者。只有獨佔寫鎖需要特殊處理。

一次初始化 Once

std::sync::Once 是 Rust 中的一種併發原語,用於確保某個操作在整個程序生命週期內只執行一次。Once 主要用於在多線程環境中執行初始化代碼,確保該代碼只被執行一次,即使有多個線程同時嘗試執行它。

以下是使用Once的一個例子:

use std::sync::{Once, ONCE_INIT};

static INIT: Once = ONCE_INIT;

fn main() {
    // 通過 call_once 方法確保某個操作只執行一次
    INIT.call_once(|| {
        // 這裏放置需要執行一次的初始化代碼
        println!("Initialization code executed!");
    });

    // 之後再調用 call_once,初始化代碼不會再次執行
    INIT.call_once(|| {
        println!("This won't be printed.");
    });
}

使用場景:

下面這個例子是帶返回值的例子,實現懶加載全局配置的場景:

use std::sync::{Once, ONCE_INIT};

static mut GLOBAL_CONFIG: Option<String> = None;
static INIT: Once = ONCE_INIT;

fn init_global_config() {
    unsafe {
        GLOBAL_CONFIG = Some("Initialized global configuration".to_string());
    }
}

fn get_global_config() -> &'static str {
    INIT.call_once(|| init_global_config());
    unsafe {
        GLOBAL_CONFIG.as_ref().unwrap()
    }
}

fn main() {
    println!("{}", get_global_config());
    println!("{}", get_global_config()); // 不會重新初始化,只會輸出一次
}

在這個例子中,get_global_config 函數通過 Once 確保 init_global_config 函數只會被調用一次,從而實現了全局配置的懶加載。

上一章我們還介紹了OnceCellOnceLock, 它們都是同一族的單次初始化的併發原語,主要區別是:

OnceCell不是線程安全的,而OnceLock是線程安全的,但是OnceLock只能存儲 Copy 類型的數據,而OnceCell可以存儲任意類型的數據。

還有一個被廣泛使用的第三方庫once_cell, 它提供了線程安全和非線程安全的兩種類型的OnceCell, 比如下面就是一個線程安全的例子:

use once_cell::sync::OnceCell;

static CELL: OnceCell<String> = OnceCell::new();
assert!(CELL.get().is_none());

std::thread::spawn(|| {
    let value: &String = CELL.get_or_init(|| {
        "Hello, World!".to_string()
    });
    assert_eq!(value, "Hello, World!");
}).join().unwrap();

let value: Option<&String> = CELL.get();
assert!(value.is_some());
assert_eq!(value.unwrap().as_str()"Hello, World!");

屏障 / 柵欄 Barrier

Barrier 是 Rust 標準庫中的一種併發原語,用於在多個線程之間創建一個同步點。它允許多個線程在某個點上等待,直到所有線程都到達該點,然後它們可以同時繼續執行。

下面是一個使用Barrier的例子:

use std::sync::{Arc, Barrier};
use std::thread;

fn main() {
    // 創建一個 Barrier,指定參與同步的線程數量
    let barrier = Arc::new(Barrier::new(3)); // 在這個例子中,有 3 個線程參與同步

    // 創建多個線程
    let mut handles = vec![];

    for id in 0..3 {
        let barrier = Arc::clone(&barrier);
        let handle = thread::spawn(move || {
            // 模擬一些工作
            println!("Thread {} working", id);
            thread::sleep(std::time::Duration::from_secs(id as u64));

            // 線程到達同步點
            barrier.wait();

            // 執行同步後的操作
            println!("Thread {} resumed", id);
        });

        handles.push(handle);
    }

    // 等待所有線程完成
    for handle in handles {
        handle.join().unwrap();
    }
}

在這個例子中,創建了一個 Barrier,並指定了參與同步的線程數量爲 3。然後,創建了三個線程,每個線程模擬一些工作,然後調用 barrier.wait() 來等待其他線程。當所有線程都調用了 wait 後,它們同時繼續執行。

使用場景

Barrier 的靈活性使得它在協調多個線程的執行流程時非常有用。

那麼,Barrier 可以循環使用嗎?一旦所有線程都通過 wait 方法達到同步點後,Barrier 就被重置,可以再次使用。這種重置操作是自動的。

當所有線程都調用 wait 方法後,Barrier 的內部狀態會被重置,下一次調用 wait 方法時,線程會重新被阻塞,直到所有線程再次到達同步點。這樣,Barrier 可以被循環使用,用於多輪的同步。

以下是一個簡單的示例,演示了 Barrier 的循環使用:

    let barrier = Arc::new(Barrier::new(10));
    let mut handles = vec![];

    for _ in 0..10 {
        let barrier = barrier.clone();
        handles.push(thread::spawn(move || {
            println!("before wait1");
            let dur = rand::thread_rng().gen_range(100..1000);
            thread::sleep(std::time::Duration::from_millis(dur));

            //step1
            barrier.wait();
            println!("after wait1");
            thread::sleep(time::Duration::from_secs(1));

            //step2
            barrier.wait();
            println!("after wait2");
        }));
    }

    for handle in handles {
        handle.join().unwrap();
    }

條件變量 Condvar

Condvar 是 Rust 標準庫中的條件變量(Condition Variable),用於在多線程之間進行線程間的協調和通信。條件變量允許線程等待某個特定的條件成立,當條件滿足時,線程可以被喚醒並繼續執行。

以下是 Condvar 的一個例子:

use std::sync::{Arc, Mutex, Condvar};
use std::thread;

fn main() {
    // 創建一個 Mutex 和 Condvar,用於共享狀態和線程協調
    let mutex = Arc::new(Mutex::new(false));
    let condvar = Arc::new(Condvar::new());

    // 創建多個線程
    let mut handles = vec![];

    for id in 0..3 {
        let mutex = Arc::clone(&mutex);
        let condvar = Arc::clone(&condvar);

        let handle = thread::spawn(move || {
            // 獲取 Mutex 鎖
            let mut guard = mutex.lock().unwrap();

            // 線程等待條件變量爲 true
            while !*guard {
                guard = condvar.wait(guard).unwrap();
            }

            // 條件滿足後執行的操作
            println!("Thread {} woke up", id);
        });

        handles.push(handle);
    }

    // 模擬條件滿足後喚醒等待的線程
    thread::sleep(std::time::Duration::from_secs(2));

    // 修改條件,並喚醒等待的線程
    {
        let mut guard = mutex.lock().unwrap();
        *guard = true;
        condvar.notify_all();
    }

    // 等待所有線程完成
    for handle in handles {
        handle.join().unwrap();
    }
}

在這個例子中,創建了一個 MutexCondvar,其中 Mutex 用於保護共享狀態(條件),而 Condvar 用於等待和喚醒線程。多個線程在 Mutex 上加鎖後,通過 condvar.wait() 方法等待條件滿足,然後在主線程中修改條件,並通過 condvar.notify_all() 喚醒所有等待的線程。

使用場景

需要注意的是,使用 Condvar 時,通常需要配合 Mutex 使用,以確保在等待和修改條件時的線程安全性。

Condvar 可以通過調用 notify_one() 方法來發出信號。當 notify_one() 方法被調用時,Condvar 會隨機選擇一個正在等待信號的線程,並釋放該線程。Condvar 也可以通過調用 notify_all() 方法來發出信號。當 notify_all() 方法被調用時,Condvar 會釋放所有正在等待信號的線程。

LazyCell 和 LazyLock

我們介紹了OnceCellOnceLock, 我們再介紹兩個類似的用於懶加載的併發原語LazyCellLazyLock

Rust 中的 LazyCell 和 LazyLock 都是用於懶惰初始化對象的工具。LazyCell 用於懶惰初始化值,LazyLock 用於懶惰初始化資源。

| 類型 | 用途 | 初始化時機 | 線程安全 | | --- | --- | --- | --- | | LazyCell | 懶惰初始化值 | 第一次訪問 | 否 | | LazyLock | 懶惰初始化資源 | 第一次獲取鎖 | 是 | | OnceCell | 懶惰初始化單例值 | 第一次調用 | get_or_init() 方法 | | OnceLock | 懶惰初始化互斥鎖 | 第一次調用 | lock() 方法 |

Exclusive

Rust 中的 Exclusive 是一個用於保證某個資源只被一個線程訪問的工具。Exclusive 可以通過導入 std::sync::Exclusive 來使用。

    let mut exclusive = Exclusive::new(92);
    println!("ready");
    std::thread::spawn(move || {
        let counter = exclusive.get_mut();
        println!("{}", *counter);
        *counter = 100;
    }).join().unwrap();

和 Mutex 有什麼區別?Exclusive 僅提供對底層值的可變訪問,也稱爲對底層值的獨佔訪問。它不提供對底層值的不可變或共享訪問。

雖然這可能看起來不太有用,但它允許 Exclusive 無條件地實現 Sync。事實上,Sync 的安全要求是,對於 Exclusive 而言,它必須可以安全地跨線程共享,也就是說,&Exclusive 跨越線程邊界時必須是安全的。出於設計考慮,&Exclusive 沒有任何 API,使其無用,因此無害,因此內存安全。

這個類型還是一個 nightly 的實驗性特性,所以我們不妨等它穩定後在學習和使用。

mpsc

mpsc 是 Rust 標準庫中的一個模塊,提供了多生產者、單消費者(Multiple Producers, Single Consumer)的消息傳遞通道。mpsc 是 multiple-producer, single-consumer 的縮寫。這個模塊基於 channel 的基於消息傳遞的通訊,具體定義了三個類型:

SenderSyncSender 用於向 Receiver 發送數據。兩種 sender 都是可 clone 的 (多生產者), 這樣多個線程就可以同時向一個 receiver(單消費者) 發送。

這些通道有兩種類型:

使用場景

每當看到 rust 的 mpsc, 我總是和 Go 的 channel 作比較,事實上 rust 的 channel 使用起來也非常的簡單。

一個簡單的 channel 例子如下:

use std::thread;
use std::sync::mpsc::channel;

// Create a simple streaming channel
let (tx, rx) = channel();
thread::spawn(move|| {
    tx.send(10).unwrap();
});
assert_eq!(rx.recv().unwrap(), 10);

一個多生產者單消費者的例子:

use std::thread;
use std::sync::mpsc::channel;

// Create a shared channel that can be sent along from many threads
// where tx is the sending half (tx for transmission), and rx is the receiving
// half (rx for receiving).
let (tx, rx) = channel();
for i in 0..10 {
    let tx = tx.clone();
    thread::spawn(move|| {
        tx.send(i).unwrap();
    });
}

for _ in 0..10 {
    let j = rx.recv().unwrap();
    assert!(0 <= j && j < 10);
}

一個同步 channel 的例子:

use std::sync::mpsc::sync_channel;
use std::thread;

let (tx, rx) = sync_channel(3);

for _ in 0..3 {
    // It would be the same without thread and clone here
    // since there will still be one `tx` left.
    let tx = tx.clone();
    // cloned tx dropped within thread
    thread::spawn(move || tx.send("ok").unwrap());
}

// Drop the last sender to stop `rx` waiting for message.
// The program will not complete if we comment this out.
// **All** `tx` needs to be dropped for `rx` to have `Err`.
drop(tx);

// Unbounded receiver waiting for all senders to complete.
while let Ok(msg) = rx.recv() {
    println!("{msg}");
}

println!("completed");

發送端釋放的情況下,receiver 會收到 error:

use std::sync::mpsc::channel;

// The call to recv() will return an error because the channel has already
// hung up (or been deallocated)
let (tx, rx) = channel::<i32>();
drop(tx);
assert!(rx.recv().is_err());

在 Rust 標準庫中,目前沒有提供原生的 MPMC(Multiple Producers, Multiple Consumers)通道。std::sync::mpsc 模塊提供的是單一消費者的通道,主要是出於設計和性能的考慮。

設計上,MPSC 通道的實現相對較簡單,可以更容易地滿足特定的性能需求,並且在很多情況下是足夠的。同時,MPSC 通道的使用場景更爲常見,例如在線程池中有一個任務隊列,多個生產者將任務推送到隊列中,而單個消費者負責執行這些任務。

未來我會在專門的章節中介紹更多的第三方庫提供的 channel 以及類似的同步原語,如oneshotbroadcastermpmc等。

依照這個 mpmc[2] 的介紹,以前的 rust 標準庫應該是實現了mpmc, 這個庫就是從老的標準庫中抽取出來的。

信號量 Semaphore

標準庫中沒有 Semaphore 的實現,單這個是在是非常通用的一個併發原語,理論上也應該在這裏介紹。

但是這一章內容也非常多了,而且我也會在 tokio 中介紹信號兩,在一個專門的特殊併發原語 (第十四章或者更多),所以不在這個章節專門介紹了。

這個章節還是偏重標準庫的併發原語的介紹。

原子操作 atomic

Rust 中的原子操作(Atomic Operation)是一種特殊的操作,可以在多線程環境中以原子方式進行,即不會被其他線程的操作打斷。原子操作可以保證數據的線程安全性,避免數據競爭。

在 Rust 中,std::sync::atomic 模塊提供了一系列用於原子操作的類型和函數。原子操作是一種特殊的操作,可以在多線程環境中安全地執行,而不需要使用額外的鎖。

atomic 可以用於各種場景,例如:

目前 Rust 原子類型遵循與 C++20 atomic[3] 相同的規則, 具體來說就是atomic_ref。基本上, 創建 Rust 原子類型的一個共享引用, 相當於在 C++ 中創建一個atomic_ref; 當共享引用的生命週期結束時,atomic_ref也會被銷燬。(一個被獨佔擁有或者位於可變引用後面的 Rust 原子類型, 並不對應 C++ 中的 “原子對象”, 因爲它可以通過非原子操作被訪問。)

這個模塊爲一些基本類型定義了原子版本, 包括AtomicBoolAtomicIsizeAtomicUsizeAtomicI8AtomicU16等。原子類型提供的操作在正確使用時可以在線程間同步更新。

每個方法都帶有一個 Ordering[4] 參數, 表示該操作的內存屏障的強度。這些排序與 C++20 原子排序 [5] 相同。更多信息請參閱 nomicon[6]。

原子變量在線程間安全共享 (實現了 Sync), 但它本身不提供共享機制, 遵循 Rust 的線程模型。共享一個原子變量最常見的方式是把它放到一個Arc中 (一個原子引用計數的共享指針)。

原子類型可以存儲在靜態變量中, 使用像AtomicBool::new這樣的常量初始化器初始化。原子靜態變量通常用於懶惰的全局初始化。

我們已經說了,這個模塊爲一些基本類型定義了原子版本,包括AtomicBoolAtomicIsizeAtomicUsizeAtomicI8AtomicU16等,其實每一種類似的方法都比較類似的,所以我們以AtomicI64介紹。可以通過pub const fn new(v: i64) -> AtomicI64得到一個AtomicI64對象, AtomicI64定義了一些方法,用於對原子變量進行操作,例如:

// i64 和 AtomicI64的轉換,以及一組對象之間的轉換
pub unsafe fn from_ptr<'a>(ptr: *mut i64) -> &'a AtomicI64
pub const fn as_ptr(&self) -> *mut i64
pub fn get_mut(&mut self) -> &mut i64
pub fn from_mut(v: &mut i64) -> &mut AtomicI64
pub fn get_mut_slice(this: &mut [AtomicI64]) -> &mut [i64]
pub fn from_mut_slice(v: &mut [i64]) -> &mut [AtomicI64]
pub fn into_inner(self) -> i64

// 原子操作
pub fn load(&self, order: Ordering) -> i64
pub fn store(&self, val: i64, order: Ordering)
pub fn swap(&self, val: i64, order: Ordering) -> i64
pub fn compare_and_swap(&self, current: i64, new: i64, order: Ordering) -> i64 //   棄用
pub fn compare_exchange(
    &self,
    current: i64,
    new: i64,
    success: Ordering,
    failure: Ordering
) -> Result<i64, i64>
pub fn compare_exchange_weak(
    &self,
    current: i64,
    new: i64,
    success: Ordering,
    failure: Ordering
) -> Result<i64, i64>
pub fn fetch_add(&self, val: i64, order: Ordering) -> i64
pub fn fetch_sub(&self, val: i64, order: Ordering) -> i64
pub fn fetch_and(&self, val: i64, order: Ordering) -> i64
pub fn fetch_nand(&self, val: i64, order: Ordering) -> i64
pub fn fetch_or(&self, val: i64, order: Ordering) -> i64
pub fn fetch_xor(&self, val: i64, order: Ordering) -> i64
pub fn fetch_update<F>(
    &self,
    set_order: Ordering,
    fetch_order: Ordering,
    f: F
) -> Result<i64, i64>
where
    F: FnMut(i64) -> Option<i64>,
pub fn fetch_max(&self, val: i64, order: Ordering) -> i64
pub fn fetch_min(&self, val: i64, order: Ordering) -> i64

如果你有一點原子操作的基礎,就不難理解這些原子操作以及它們的變種了:

下面這個例子演示了AtomicI64的基本原子操作:

use std::sync::atomic::{AtomicI64, Ordering};

let atomic_num = AtomicI64::new(0);

// 原子加載
let num = atomic_num.load(Ordering::Relaxed);

// 原子加法並返回舊值
let old = atomic_num.fetch_add(10, Ordering::SeqCst);

// 原子比較並交換
atomic_num.compare_and_swap(old, 100, Ordering::SeqCst);

// 原子交換
let swapped = atomic_num.swap(200, Ordering::Release);

// 原子存儲
atomic_num.store(1000, Ordering::Relaxed);

上面示例了:

這些原子操作都可以確保線程安全, 不會出現數據競爭。

不同的Ordering表示內存序不同強度的屏障, 可以根據需要選擇。

AtomicI64提供了豐富的原子操作, 可以實現無鎖的併發算法和數據結構

原子操作的 Ordering

在 Rust 中,Ordering 枚舉用於指定原子操作時的內存屏障(memory ordering)。這與 C++ 的內存模型中的原子操作順序性有一些相似之處,但也有一些不同之處。下面是 Ordering 的三個主要成員以及它們與 C++ 中的內存順序的對應關係:

  1. Ordering::Relaxed
  1. Ordering::Acquire
  1. Ordering::Release
  1. Ordering::AcqRel
  1. Ordering::SeqCst

合理選擇 Ordering 可以最大程度提高性能, 同時保證需要的內存序約束。

但是如何合理的選擇,這就依賴開發者的基本賬功了,使用原子操作時需要小心,確保正確地選擇適當的 Ordering,以及避免競態條件和數據競爭。

像 Go 語言,直接使用了Ordering::SeqCst作爲它的默認的內存屏障,開發者使用起來就沒有心智負擔了,但是你如果想更精細化的使用Ordering, 請確保你一定清晰的瞭解你的代碼邏輯和 Ordering 的意義。

Ordering::Relaxed

Ordering::Relaxed 是最輕量級的內存順序,允許編譯器和處理器在原子操作周圍進行指令重排,不提供強制的執行順序。這通常在對程序執行的順序沒有嚴格要求時使用,以獲得更高的性能。

以下是一個簡單的例子,演示了 Ordering::Relaxed 的用法:

use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;

fn main() {
    // 創建一個原子布爾值
    let atomic_bool = AtomicBool::new(false);

    // 創建一個生產者線程,設置布爾值爲 true
    let producer_thread = thread::spawn(move || {
        // 這裏可能會有指令重排,因爲使用了 Ordering::Relaxed
        atomic_bool.store(true, Ordering::Relaxed);
    });

    // 創建一個消費者線程,檢查布爾值的狀態
    let consumer_thread = thread::spawn(move || {
        // 這裏可能會有指令重排,因爲使用了 Ordering::Relaxed
        let value = atomic_bool.load(Ordering::Relaxed);
        println!("Received value: {}", value);
    });

    // 等待線程完成
    producer_thread.join().unwrap();
    consumer_thread.join().unwrap();
}

Ordering::Acquire

Ordering::Acquire 在 Rust 中表示插入一個獲取內存屏障,確保當前操作之前的所有讀取操作都在當前操作之前執行。這個內存順序常常用於同步共享數據,以確保線程能夠正確地觀察到之前的寫入操作。

以下是一個使用 Ordering::Acquire 的簡單例子:

use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;

fn main() {
    // 創建一個原子布爾值
    let atomic_bool = AtomicBool::new(false);

    // 創建一個生產者線程,設置布爾值爲 true
    let producer_thread = thread::spawn(move || {
        // 設置布爾值爲 true
        atomic_bool.store(true, Ordering::Release);
    });

    // 創建一個消費者線程,讀取布爾值的狀態
    let consumer_thread = thread::spawn(move || {
        // 等待直到讀取到布爾值爲 true
        while !atomic_bool.load(Ordering::Acquire) {
            // 這裏可能進行自旋,直到獲取到 Acquire 順序的布爾值
            // 注意:在實際應用中,可以使用更高級的同步原語而不是自旋
        }

        println!("Received value: true");
    });

    // 等待線程完成
    producer_thread.join().unwrap();
    consumer_thread.join().unwrap();
}

Ordering::Release

Ordering::Release 在 Rust 中表示插入一個釋放內存屏障,確保之前的所有寫入操作都在當前操作之後執行。這個內存順序通常用於同步共享數據,以確保之前的寫入操作對其他線程可見。

以下是一個使用 Ordering::Release 的簡單例子:

use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;

fn main() {
    // 創建一個原子布爾值
    let atomic_bool = AtomicBool::new(false);

    // 創建一個生產者線程,設置布爾值爲 true
    let producer_thread = thread::spawn(move || {
        // 設置布爾值爲 true
        atomic_bool.store(true, Ordering::Release);
    });

    // 創建一個消費者線程,讀取布爾值的狀態
    let consumer_thread = thread::spawn(move || {
        // 等待直到讀取到布爾值爲 true
        while !atomic_bool.load(Ordering::Acquire) {
            // 這裏可能進行自旋,直到獲取到 Release 順序的布爾值
            // 注意:在實際應用中,可以使用更高級的同步原語而不是自旋
        }

        println!("Received value: true");
    });

    // 等待線程完成
    producer_thread.join().unwrap();
    consumer_thread.join().unwrap();
}

在這個例子中,生產者線程使用 store 方法將布爾值設置爲 true,而消費者線程使用 load 方法等待並讀取布爾值的狀態。由於使用了 Ordering::Release,在生產者線程設置布爾值之後,會插入釋放內存屏障,確保之前的所有寫入操作都在當前操作之後執行。這確保了消費者線程能夠正確地觀察到生產者線程的寫入操作。

Ordering::AcqRel

Ordering::AcqRel 在 Rust 中表示插入一個獲取釋放內存屏障,即同時包含獲取和釋放操作。它確保當前操作之前的所有讀取操作都在當前操作之前執行,並確保之前的所有寫入操作都在當前操作之後執行。這個內存順序通常用於同步共享數據,同時提供了一些平衡,適用於需要同時執行獲取和釋放操作的場景。

use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;

fn main() {
    // 創建一個原子布爾值
    let atomic_bool = AtomicBool::new(false);

    // 創建一個生產者線程,設置布爾值爲 true
    let producer_thread = thread::spawn(move || {
        // 設置布爾值爲 true
        atomic_bool.store(true, Ordering::AcqRel);
    });

    // 創建一個消費者線程,讀取布爾值的狀態
    let consumer_thread = thread::spawn(move || {
        // 等待直到讀取到布爾值爲 true
        while !atomic_bool.load(Ordering::AcqRel) {
            // 這裏可能進行自旋,直到獲取到 AcqRel 順序的布爾值
            // 注意:在實際應用中,可以使用更高級的同步原語而不是自旋
        }

        println!("Received value: true");
    });

    // 等待線程完成
    producer_thread.join().unwrap();
    consumer_thread.join().unwrap();
}

在這個例子中,生產者線程使用 store 方法將布爾值設置爲 true,而消費者線程使用 load 方法等待並讀取布爾值的狀態。由於使用了 Ordering::AcqRel,在生產者線程設置布爾值之後,會插入獲取釋放內存屏障,確保之前的所有讀取操作都在當前操作之前執行,同時確保之前的所有寫入操作都在當前操作之後執行。這確保了消費者線程能夠正確地觀察到生產者線程的寫入操作。

Ordering::SeqCst

Ordering::SeqCst 在 Rust 中表示插入一個全序內存屏障,保證所有線程都能看到一致的操作順序。這是最強的內存順序,通常用於實現全局同步。

以下是一個使用 Ordering::SeqCst 的簡單例子:

use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;

fn main() {
    // 創建一個原子布爾值
    let atomic_bool = AtomicBool::new(false);

    // 創建一個生產者線程,設置布爾值爲 true
    let producer_thread = thread::spawn(move || {
        // 設置布爾值爲 true
        atomic_bool.store(true, Ordering::SeqCst);
    });

    // 創建一個消費者線程,讀取布爾值的狀態
    let consumer_thread = thread::spawn(move || {
        // 等待直到讀取到布爾值爲 true
        while !atomic_bool.load(Ordering::SeqCst) {
            // 這裏可能進行自旋,直到獲取到 SeqCst 順序的布爾值
            // 注意:在實際應用中,可以使用更高級的同步原語而不是自旋
        }

        println!("Received value: true");
    });

    // 等待線程完成
    producer_thread.join().unwrap();
    consumer_thread.join().unwrap();
}

在這個例子中,生產者線程使用 store 方法將布爾值設置爲 true,而消費者線程使用 load 方法等待並讀取布爾值的狀態。由於使用了 Ordering::SeqCst,在生產者線程設置布爾值之後,會插入全序內存屏障,確保所有線程都能看到一致的操作順序。這確保了消費者線程能夠正確地觀察到生產者線程的寫入操作。SeqCst 是最強的內存順序,提供了最高級別的同步保證。

在 Rust 中,Ordering::Acquire內存順序通常與Ordering::Release配合使用。

Ordering::AcquireOrdering::Release之間形成happens-before關係, 可以實現不同線程之間的同步。

其典型用法是:

這樣就可以實現:

此外, Ordering::AcqRel也經常被用來同時具有兩者的語義。

如果用happens-before描述這五種內存順序,那麼:

happens-before關係表示對給定兩個操作 A 和 B:

Release建立寫之前的happens-before關係,Acquire建立讀之後的關係。兩者搭配可以實現寫入對其他線程可見。、SeqCst強制一個全序, 所有操作都是有序的。

參考資料

[1]

未定義的行爲: https://github.com/rust-lang/rust/issues/32260

[2]

mpmc: https://crates.io/crates/mpmc

[3]

C++20 atomic: https://en.cppreference.com/w/cpp/atomic

[4]

Ordering: https://doc.rust-lang.org/std/sync/atomic/enum.Ordering.html

[5]

C++20 原子排序: https://en.cppreference.com/w/cpp/atomic/memory_order

[6]

nomicon: https://doc.rust-lang.org/nomicon/atomics.html

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/fV8cnzh9U3XmPWciQX1Ihg