Rust 併發編程 - 基本併發原語
本章主要介紹 Arc、Mutex、RWMutex、Once、Barrier、Convar、LazyCell、LazyLock、 Exclusive、mpsc Channel、atomic 原子操作,第三方庫實現的信號量、SingleFlight、Waitgroup 等在後面的章節中介紹。
同步是多線程程序中的一個重要概念。在多線程環境下, 多個線程可能同時訪問某個共享資源, 這就可能導致數據競爭或者數據不一致的問題。爲了保證數據安全, 需要進行同步操作。
常見的同步需求包括:
-
互斥: 線程在使用共享資源時, 同一時刻只允許一個線程訪問共享資源, 在一個線程使用時, 其他線程需要等待, 不能同時訪問, 需要互斥訪問。
-
限制同時訪問線程數: 對某些共享資源, 可能需要限制同一時刻訪問的線程數。
-
線程間通信: 一個線程需要基於另一個線程的處理結果才能繼續執行, 需要線程間通信。
-
有序訪問: 對共享資源的訪問需要按某種順序進行。
爲了實現這些同步需求, 就需要使用同步原語。常見的同步原語有互斥鎖、信號量、條件變量等。
互斥鎖可以保證同一時刻只有一個線程可以訪問共享資源。信號量可以限制同時訪問的線程數。條件變量可以實現線程間的通信和協調。這些同步原語的使用可以避免同步問題, 幫助我們正確有效地處理多線程之間的同步需求。
Go 併發編程電子書已經更新到第 5 章:
Arc
Arc
已改放在前一章的,這一章補上。我這裏介紹的時候分類不一定精確,只是方便給大家介紹各種庫和併發原語,不用追求分類的準確性。
Rust 的 Arc 代表原子引用計數
(Atomic Reference Counting),是一種用於多線程環境的智能指針。它允許在多個地方共享數據,同時確保線程安全性。Arc 的全稱是std::sync::Arc
,屬於標準庫的一部分。
在 Rust 中,通常情況下,變量是被所有權管理的,但有時候我們需要在多個地方共享數據。這就是Arc
的用武之地。它通過在堆上分配內存,並使用引用計數來跟蹤數據的所有者數量,確保在不需要的時候正確地釋放資源。
下面是一個簡單的例子,演示瞭如何使用Arc
:
use std::sync::Arc;
use std::thread;
fn main() {
// 創建一個可共享的整數
let data = Arc::new(46);
// 創建兩個線程,共享對data的引用
let thread1 = {
let data = Arc::clone(&data);
thread::spawn(move || {
// 在線程中使用data
println!("Thread 1: {}", data);
})
};
let thread2 = {
let data = Arc::clone(&data);
thread::spawn(move || {
// 在另一個線程中使用data
println!("Thread 2: {}", data);
})
};
// 等待兩個線程完成
thread1.join().unwrap();
thread2.join().unwrap();
}
Arc(原子引用計數)和 Rc(引用計數)都是 Rust 中用於多所有權的智能指針,但它們有一些關鍵的區別。
-
線程安全性:
-
Arc
是線程安全的,可以安全地在多線程環境中共享。它使用原子操作來更新引用計數,確保併發訪問時的線程安全性。 -
Rc
不是線程安全的。它只適用於單線程環境,因爲它的引用計數操作不是原子的,可能導致在多線程中的競態條件和不安全行爲。 -
性能開銷:
-
由於 Arc 使用原子操作來更新引用計數,相對於 Rc,Arc 的性能開銷更大。原子操作通常比非原子操作更昂貴。
-
Rc 在單線程環境中性能更好,因爲它不需要進行原子操作。
-
可變性:
-
Arc
不能用於可變數據。如果需要在多線程環境中共享可變數據,通常會使用 Mutex、RwLock 等同步原語和Arc
。 -
Rc
也不能用於可變數據,因爲它無法提供併發訪問的安全性。 -
引用計數減少時的行爲:
-
當 Arc 的引用計數減少爲零時,由於它是原子的,它會正確地釋放底層資源(比如堆上的數據)。
-
Rc 在單線程中引用計數減少爲零時會正確釋放資源,但在多線程中可能存在問題,因爲它沒有考慮併發情況。
總之你記住在多線程的情況下使用Arc
,單線程的情況下使用Rc
就好了。
當你需要在多線程環境中共享可變數據時,常常會結合使用Arc
和Mutex
。Mutex
(互斥鎖)用於確保在任意時刻只有一個線程能夠訪問被鎖定的數據。下面是一個簡單的例子,演示瞭如何使用Arc
和Mutex
來在多線程中共享可變數據:
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
// 創建一個可共享的可變整數
let counter = Arc::new(Mutex::new(0));
// 創建多個線程來增加計數器的值
let mut handles = vec![];
for _ in 0..5 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
// 獲取鎖,確保只有一個線程能夠訪問計數器
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
// 等待所有線程完成
for handle in handles {
handle.join().unwrap();
}
// 打印最終的計數器值
println!("Final count: {}", *counter.lock().unwrap());
}
Arc
和 RefCell
結合使用的場景通常發生在多線程中需要共享可變狀態,但又不需要互斥鎖的場合。RefCell
允許在運行時進行借用檢查,因此在單線程環境下使用時,它不會像 Mutex
那樣引入鎖的開銷。
以下是一個使用 Arc
和 RefCell
的簡單例子,演示了在多線程環境中共享可變狀態, 注意這個例子只是用來演示,我們並不期望 num 的最終結果和上面的例子一樣:
use std::sync::{Arc};
use std::cell::RefCell;
use std::thread;
fn main() {
// 創建一個可共享的可變整數
let counter = Arc::new(RefCell::new(0));
// 創建多個線程來增加計數器的值
let mut handles = vec![];
for _ in 0..5 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
// 使用RefCell獲取可變引用,確保運行時借用檢查
let mut num = counter.borrow_mut();
*num += 1;
});
handles.push(handle);
}
// 等待所有線程完成
for handle in handles {
handle.join().unwrap();
}
// 打印最終的計數器值
println!("Final count: {}", *counter.borrow());
}
互斥鎖 Mutex
互斥鎖歷史悠久,在很多編程語言中都有實現。
Mutex
是 Rust 中的互斥鎖,用於解決多線程併發訪問共享數據時可能出現的競態條件。Mutex 提供了一種機制,只有擁有鎖的線程才能訪問被鎖定的數據,其他線程必須等待鎖的釋放。
Lock
在標準庫中,Mutex 位於 std::sync
模塊下。下面是一個簡單的例子,演示瞭如何使用 Mutex:
use std::sync::{Mutex, Arc};
use std::thread;
fn main() {
// 創建一個可共享的可變整數
let counter = Arc::new(Mutex::new(0));
// 創建多個線程來增加計數器的值
let mut handles = vec![];
for _ in 0..5 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
// 獲取鎖,確保只有一個線程能夠訪問計數器
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
// 等待所有線程完成
for handle in handles {
handle.join().unwrap();
}
// 打印最終的計數器值
println!("Final count: {}", *counter.lock().unwrap());
}
在這個例子中,counter 是一個 Mutex
保護 (且包裝) 的可變整數,然後使用 Arc
來多線程共享。在每個線程中,通過 counter.lock().unwrap()
獲取鎖,確保一次只有一個線程能夠修改計數器的值。這樣可以確保在併發情況下不會發生競態條件。
需要注意的是,lock
方法返回一個 MutexGuard
,它是一個智能指針,實現了 Deref
和 Drop
trait。當 MutexGuard
被銷燬時,會自動釋放鎖,確保在任何情況下都能正確釋放鎖。
這裏注意三個知識點:
-
爲了跨線程支持,一般
Mutex
會和Arc
組合使用, 這樣Mutex
對象在每個線程中都能安全訪問 -
lock
方法返回實現了Deref
trait 的MutexGuard
對象,所以它會自動解引用,你可以直接調用被保護對象上的方法 -
MutexGuard
還實現了Drop
, 所以鎖會自動解鎖,一般你不需要主動調用drop
去解鎖
目前 nightly 版本的 rust 提供了一個實驗性的方法unlock
, 功能和drop
一樣,也是釋放互斥鎖。
try_lock
Mutex 的 try_lock
方法嘗試獲取鎖,如果鎖已經被其他線程持有,則立即返回 Err
而不是阻塞線程。這對於在嘗試獲取鎖時避免線程阻塞很有用。
以下是一個使用 try_lock
的簡單例子:
use std::sync::{Mutex, Arc};
use std::thread;
fn main() {
// 創建一個可共享的可變整數
let counter = Arc::new(Mutex::new(0));
// 創建多個線程來增加計數器的值
let mut handles = vec![];
for _ in 0..5 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
// 嘗試獲取鎖,如果獲取失敗就繼續嘗試或者放棄
if let Ok(mut num) = counter.try_lock() {
*num += 1;
} else {
println!("Thread failed to acquire lock.");
}
});
handles.push(handle);
}
// 等待所有線程完成
for handle in handles {
handle.join().unwrap();
}
// 打印最終的計數器值
println!("Final count: {}", *counter.lock().unwrap());
}
在這個例子中,try_lock
方法被用於嘗試獲取鎖。如果獲取成功,線程就可以修改計數器的值,否則它會打印一條消息表示沒有獲取到鎖。
需要注意的是,try_lock
方法返回一個 Result
,如果獲取鎖成功,返回 Ok
包含 MutexGuard
,否則返回 Err
。這使得你可以根據獲取鎖的結果執行不同的邏輯。
Poisoning
在 Rust 中,poisoning
是一種用於處理線程 panic
導致的不可恢復的狀態的機制。這個概念通常與 Mutex
和 RwLock
相關。當一個線程在持有鎖的情況下 panic
時,這就會導致鎖進入一種不一致的狀態,因爲鎖的內部狀態可能已經被修改,而沒有機會進行清理。爲了避免這種情況,Rust 的標準庫使用 poisoning
機制 (形象的比喻)。具體來說,在 Mutex
和 RwLock
中,當一個線程在持有鎖的時候 panic
,鎖就會被標記爲 poisoned
。此後任何線程嘗試獲取這個鎖時,都會得到一個 PoisonError
,它包含一個標識鎖是否被 poisoned
的標誌。這樣,線程可以檢測到之前的 panic
,並進行相應的處理。
Mutex
通過在 LockResult
中包裝 PoisonError
來表示這種情況。具體來說,LockResult
的 Err
分支是一個 PoisonError
,其中包含一個 MutexGuard
。你可以通過 into_inner
方法來獲取 MutexGuard
,然後繼續操作。
以下是一個簡單的例子,演示了鎖的 "poisoning",以及如何處理:
use std::sync::{Mutex, Arc, LockResult, PoisonError};
use std::thread;
fn main() {
// 創建一個可共享的可變整數
let counter = Arc::new(Mutex::new(0));
// 創建多個線程來增加計數器的值
let mut handles = vec![];
for _ in 0..5 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
// 獲取鎖
let result: LockResult<_> = counter.lock();
// 嘗試獲取鎖,如果獲取失敗就打印錯誤信息
match result {
Ok(mut num) => {
*num += 1;
// 模擬 panic
if *num == 3 {
panic!("Simulated panic!");
}
}
Err(poisoned) => {
// 鎖被 "poisoned",處理錯誤
println!("Thread encountered a poisoned lock: {:?}", poisoned);
// 獲取 MutexGuard,繼續操作
let mut num = poisoned.into_inner();
*num += 1;
}
}
});
handles.push(handle);
}
// 等待所有線程完成
for handle in handles {
handle.join().unwrap();
}
// 打印最終的計數器值
println!("Final count: {}", *counter.lock().unwrap());
}
在這個例子中,當計數器的值達到 3 時,一個線程故意引發了 panic,其他線程在嘗試獲取鎖時就會得到一個 PoisonError
。在錯誤處理分支,我們打印錯誤信息,然後使用 into_inner
方法獲取 MutexGuard
,以確保鎖被正確釋放。這樣其他線程就能夠繼續正常地使用鎖。
更快的釋放互斥鎖
前面說了,因爲MutexGuard
實現了Drop
了,所以鎖可以自動釋放,可是如果鎖的 scope 太大,我們想盡快的釋放,該怎麼辦呢?
第一種方式你可以通過創建一個新的內部的作用域 (scope
) 來達到類似手動釋放 Mutex 的效果。在新的作用域中,MutexGuard
將在離開作用域時自動釋放鎖。這是通過作用域的離開而觸發的 Drop
trait 的實現。:
use std::sync::{Mutex, Arc};
use std::thread;
fn main() {
// 創建一個可共享的可變整數
let counter = Arc::new(Mutex::new(0));
// 創建多個線程來增加計數器的值
let mut handles = vec![];
for _ in 0..5 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
// 進入一個新的作用域!!!!!!!!!!!!!!
{
// 獲取鎖
let mut num = counter.lock().unwrap();
*num += 1;
// MutexGuard 在這個作用域結束時自動釋放鎖
}
// 在這裏,鎖已經被釋放
// 這裏可以進行其他操作
});
handles.push(handle);
}
// 等待所有線程完成
for handle in handles {
handle.join().unwrap();
}
// 打印最終的計數器值
println!("Final count: {}", *counter.lock().unwrap());
}
第二種方法就是主動drop
或者unlock
, 以下是一個演示手動釋放 Mutex 的例子:
use std::sync::{Mutex, Arc};
use std::thread;
fn main() {
// 創建一個可共享的可變整數
let counter = Arc::new(Mutex::new(0));
// 創建多個線程來增加計數器的值
let mut handles = vec![];
for _ in 0..5 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
// 獲取鎖
let mut num = counter.lock().unwrap();
*num += 1;
// 手動釋放鎖!!!!!!!!
drop(num);
});
handles.push(handle);
}
// 等待所有線程完成
for handle in handles {
handle.join().unwrap();
}
// 打印最終的計數器值
println!("Final count: {}", *counter.lock().unwrap());
}
Mutex
是可重入鎖嗎?應該不是,但是官方文檔把它標記爲未定義的行爲 [1],所以不要試圖在同一個線程中獲取兩次鎖, 如果你想使用可重入鎖,請使用我將來要介紹的第三方併發庫。同樣需要注意的是讀寫鎖RWMutex
。
讀寫鎖 RWMutex
RWMutex
是 Rust 中的讀寫鎖(Read-Write Lock),允許多個線程同時獲取共享數據的讀取訪問權限,但在寫入時會排他。這意味着多個線程可以同時讀取數據,但只有一個線程能夠寫入數據,且寫入時不允許其他線程讀取或寫入。
讀寫鎖一般使用在下面的場景中:
-
讀多寫少的情況:當多個線程需要同時讀取共享數據而寫入操作較少時,使用 RWMutex 可以提高併發性能。多個線程可以同時獲取讀取鎖,而寫入操作會排他進行。
-
只讀訪問和寫入訪問不衝突的情況:如果在程序的邏輯中,讀取操作和寫入操作是獨立的,沒有衝突,那麼使用 RWMutex 可以更好地利用併發性能。
-
資源分配和釋放階段:當需要在一段時間內只允許讀取,然後在另一段時間內只允許寫入時,RWMutex 可以提供更靈活的控制
以下是使用 RWMutex
的例子 :
use std::sync::{RwLock, Arc};
use std::thread;
fn main() {
// 創建一個可共享的可變整數,使用RwLock包裝
let counter = Arc::new(RwLock::new(0));
// 創建多個線程來讀取計數器的值
let mut read_handles = vec![];
for _ in 0..3 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
// 獲取讀取鎖
let num = counter.read().unwrap();
println!("Reader {}: {}", thread::current().id(), *num);
});
read_handles.push(handle);
}
// 創建一個線程來寫入計數器的值
let write_handle = thread::spawn(move || {
// 獲取寫入鎖
let mut num = counter.write().unwrap();
*num += 1;
println!("Writer {}: Incremented counter to {}", thread::current().id(), *num);
});
// 等待所有讀取線程完成
for handle in read_handles {
handle.join().unwrap();
}
// 等待寫入線程完成
write_handle.join().unwrap();
}
它的使用和互斥鎖類似,只不過需要調用read()
方法獲得讀鎖,使用write()
方法獲得寫鎖。
讀寫鎖有以下的性質:
-
多個線程可以同時獲取讀鎖, 實現併發讀
-
只有一個線程可以獲取寫鎖, 寫時會獨佔鎖
-
如果已經獲取了讀鎖, 則不能獲取寫鎖, 防止數據競爭
-
如果已經獲取了寫鎖, 則不能再獲取讀鎖或寫鎖, 寫獨佔時防止併發讀寫
如果一個線程已經持有讀鎖,而另一個線程請求寫鎖,它必須等待讀鎖被釋放。這確保在寫入操作進行時,沒有其他線程能夠同時持有讀鎖。寫鎖確保了對共享數據的寫入操作是獨佔的。
use std::sync::{RwLock, Arc};
use std::thread;
fn main() {
// 創建一個可共享的可變整數,使用RwLock包裝
let counter = Arc::new(RwLock::new(0));
// 創建一個線程持有讀鎖
let read_handle = {
let counter = Arc::clone(&counter);
thread::spawn(move || {
// 獲取讀鎖
let num = counter.read().unwrap();
println!("Reader {}: {}", thread::current().id(), *num);
// 休眠模擬讀取操作
thread::sleep(std::time::Duration::from_secs(10));
})
};
// 創建一個線程請求寫鎖
let write_handle = {
let counter = Arc::clone(&counter);
thread::spawn(move || {
// 休眠一小段時間,確保讀鎖已經被獲取
thread::sleep(std::time::Duration::from_secs(1));
// 嘗試獲取寫鎖
// 注意:這裏會等待讀鎖被釋放
let mut num = counter.write().unwrap();
*num += 1;
println!("Writer {}: Incremented counter to {}", thread::current().id(), *num);
})
};
// 等待讀取線程和寫入線程完成
read_handle.join().unwrap();
write_handle.join().unwrap();
}
更進一步,在寫鎖請求後,再有新的讀鎖請求進來,它是在等待寫鎖釋放?還是直接獲得讀鎖?答案是等待寫鎖釋放,看下面的例子:
// 創建一個可共享的可變整數,使用RwLock包裝
let counter = Arc::new(RwLock::new(0));
// 創建一個線程持有讀鎖
let read_handle = {
let counter = counter.clone();
thread::spawn(move || {
// 獲取讀鎖
let num = counter.read().unwrap();
println!("Reader#1: {}", *num);
// 休眠模擬讀取操作
thread::sleep(std::time::Duration::from_secs(10));
})
};
// 創建一個線程請求寫鎖
let write_handle = {
let counter = counter.clone();
thread::spawn(move || {
// 休眠一小段時間,確保讀鎖已經被獲取
thread::sleep(std::time::Duration::from_secs(1));
// 嘗試獲取寫鎖
let mut num = counter.write().unwrap();
*num += 1;
println!("Writer : Incremented counter to {}", *num);
})
};
// 創建一個線程請求讀鎖
let read_handle_2 = {
let counter = counter.clone();
thread::spawn(move || {
// 休眠一小段時間,確保寫鎖已經被獲取
thread::sleep(std::time::Duration::from_secs(2));
// 嘗試獲取讀鎖
let num = counter.read().unwrap();
println!("Reader#2: {}", *num);
})
};
// 等待讀取線程和寫入線程完成
read_handle.join().unwrap();
write_handle.join().unwrap();
read_handle_2.join().unwrap();
死鎖是一種併發編程中的常見問題,可能發生在 RwLock 使用不當的情況下。一個典型的死鎖場景是,一個線程在持有讀鎖的情況下嘗試獲取寫鎖,而其他線程持有寫鎖並嘗試獲取讀鎖,導致彼此相互等待。
以下是一個簡單的例子,演示了可能導致 RwLock 死鎖的情況:
use std::sync::{RwLock, Arc};
use std::thread;
fn main() {
// 創建一個可共享的可變整數,使用RwLock包裝
let counter = Arc::new(RwLock::new(0));
// 創建一個線程持有讀鎖,嘗試獲取寫鎖
let read_and_write_handle = {
let counter = Arc::clone(&counter);
thread::spawn(move || {
// 獲取讀鎖
let num = counter.read().unwrap();
println!("Reader {}: {}", thread::current().id(), *num);
// 嘗試獲取寫鎖,這會導致死鎖
let mut num = counter.write().unwrap();
*num += 1;
println!("Reader {}: Incremented counter to {}", thread::current().id(), *num);
})
};
// 創建一個線程持有寫鎖,嘗試獲取讀鎖
let write_and_read_handle = {
let counter = Arc::clone(&counter);
thread::spawn(move || {
// 獲取寫鎖
let mut num = counter.write().unwrap();
*num += 1;
println!("Writer {}: Incremented counter to {}", thread::current().id(), *num);
// 嘗試獲取讀鎖,這會導致死鎖
let num = counter.read().unwrap();
println!("Writer {}: {}", thread::current().id(), *num);
})
};
// 等待線程完成
read_and_write_handle.join().unwrap();
write_and_read_handle.join().unwrap();
}
和 Mutex 一樣, RwLock 在 panic 時也會變爲中毒狀態。但是請注意, 只有在 RwLock 被獨佔式寫入鎖住時發生 panic, 它纔會中毒。如果在任意 reader 中發生 panic, 該鎖則不會中毒。
原因是:
-
RwLock 允許多個 reader 同時獲取讀鎖, 讀是非獨佔的。
-
如果任一個 reader panic, 其他讀者依然持有讀鎖, 所以不能將狀態標記爲中毒。
-
只有當前線程獨佔式擁有寫鎖時發生 panic, 由於沒有其他線程持有鎖, 這時可以安全地將狀態標記爲中毒。
所以綜上, RwLock 只會在獨佔式寫入時發生 panic 時中毒。而 reader panic 不會導致中毒。這是由 RwLock 讀寫鎖語義決定的。
這種機制可以避免不必要的中毒, 因爲非獨佔的讀鎖之間不會互相影響, 其中任一個鎖持有者 panic 不應影響其他讀者。只有獨佔寫鎖需要特殊處理。
一次初始化 Once
std::sync::Once
是 Rust 中的一種併發原語,用於確保某個操作在整個程序生命週期內只執行一次。Once
主要用於在多線程環境中執行初始化代碼,確保該代碼只被執行一次,即使有多個線程同時嘗試執行它。
以下是使用Once
的一個例子:
use std::sync::{Once, ONCE_INIT};
static INIT: Once = ONCE_INIT;
fn main() {
// 通過 call_once 方法確保某個操作只執行一次
INIT.call_once(|| {
// 這裏放置需要執行一次的初始化代碼
println!("Initialization code executed!");
});
// 之後再調用 call_once,初始化代碼不會再次執行
INIT.call_once(|| {
println!("This won't be printed.");
});
}
使用場景:
-
全局初始化:在程序啓動時執行一些全局初始化操作,例如初始化全局變量、加載配置等。
-
懶加載:在需要時進行一次性初始化,例如懶加載全局配置。
-
單例模式:通過
Once
可以實現線程安全的單例模式,確保某個對象在整個程序生命週期內只被初始化一次。
下面這個例子是帶返回值的例子,實現懶加載全局配置的場景:
use std::sync::{Once, ONCE_INIT};
static mut GLOBAL_CONFIG: Option<String> = None;
static INIT: Once = ONCE_INIT;
fn init_global_config() {
unsafe {
GLOBAL_CONFIG = Some("Initialized global configuration".to_string());
}
}
fn get_global_config() -> &'static str {
INIT.call_once(|| init_global_config());
unsafe {
GLOBAL_CONFIG.as_ref().unwrap()
}
}
fn main() {
println!("{}", get_global_config());
println!("{}", get_global_config()); // 不會重新初始化,只會輸出一次
}
在這個例子中,get_global_config 函數通過 Once 確保 init_global_config 函數只會被調用一次,從而實現了全局配置的懶加載。
上一章我們還介紹了OnceCell
和OnceLock
, 它們都是同一族的單次初始化的併發原語,主要區別是:
-
Once
是用於確保某個操作在整個程序生命週期內只執行一次的原語。它適用於全局初始化、懶加載和單例模式等場景。 -
OnceCell
是一個針對某種數據類型進行包裝的懶加載容器,可以在需要時執行一次性初始化,並在之後提供對初始化值的訪問。 -
OnceLock
是一個可用於線程安全的懶加載的原語,類似於OnceCell
,但是更簡單,只能存儲 Copy 類型的數據。
OnceCell
不是線程安全的,而OnceLock
是線程安全的,但是OnceLock
只能存儲 Copy 類型的數據,而OnceCell
可以存儲任意類型的數據。
還有一個被廣泛使用的第三方庫once_cell
, 它提供了線程安全和非線程安全的兩種類型的OnceCell
, 比如下面就是一個線程安全的例子:
use once_cell::sync::OnceCell;
static CELL: OnceCell<String> = OnceCell::new();
assert!(CELL.get().is_none());
std::thread::spawn(|| {
let value: &String = CELL.get_or_init(|| {
"Hello, World!".to_string()
});
assert_eq!(value, "Hello, World!");
}).join().unwrap();
let value: Option<&String> = CELL.get();
assert!(value.is_some());
assert_eq!(value.unwrap().as_str(), "Hello, World!");
屏障 / 柵欄 Barrier
Barrier 是 Rust 標準庫中的一種併發原語,用於在多個線程之間創建一個同步點。它允許多個線程在某個點上等待,直到所有線程都到達該點,然後它們可以同時繼續執行。
下面是一個使用Barrier
的例子:
use std::sync::{Arc, Barrier};
use std::thread;
fn main() {
// 創建一個 Barrier,指定參與同步的線程數量
let barrier = Arc::new(Barrier::new(3)); // 在這個例子中,有 3 個線程參與同步
// 創建多個線程
let mut handles = vec![];
for id in 0..3 {
let barrier = Arc::clone(&barrier);
let handle = thread::spawn(move || {
// 模擬一些工作
println!("Thread {} working", id);
thread::sleep(std::time::Duration::from_secs(id as u64));
// 線程到達同步點
barrier.wait();
// 執行同步後的操作
println!("Thread {} resumed", id);
});
handles.push(handle);
}
// 等待所有線程完成
for handle in handles {
handle.join().unwrap();
}
}
在這個例子中,創建了一個 Barrier
,並指定了參與同步的線程數量爲 3。然後,創建了三個線程,每個線程模擬一些工作,然後調用 barrier.wait()
來等待其他線程。當所有線程都調用了 wait 後,它們同時繼續執行。
使用場景
-
並行計算:當需要確保多個線程在某個點上同步,以便執行某些計算或任務時,可以使用
Barrier
。 -
迭代步驟同步:在一些算法中,可能需要多個步驟,每個步驟的結果都依賴於其他步驟的完成。
Barrier
可以用於確保所有線程完成當前步驟後再繼續下一步。 -
協同工作的階段:在多階段的任務中,可以使用 Barrier 來同步各個階段。
Barrier
的靈活性使得它在協調多個線程的執行流程時非常有用。
那麼,Barrier 可以循環使用嗎?一旦所有線程都通過 wait 方法達到同步點後,Barrier 就被重置,可以再次使用。這種重置操作是自動的。
當所有線程都調用 wait 方法後,Barrier 的內部狀態會被重置,下一次調用 wait 方法時,線程會重新被阻塞,直到所有線程再次到達同步點。這樣,Barrier 可以被循環使用,用於多輪的同步。
以下是一個簡單的示例,演示了 Barrier 的循環使用:
let barrier = Arc::new(Barrier::new(10));
let mut handles = vec![];
for _ in 0..10 {
let barrier = barrier.clone();
handles.push(thread::spawn(move || {
println!("before wait1");
let dur = rand::thread_rng().gen_range(100..1000);
thread::sleep(std::time::Duration::from_millis(dur));
//step1
barrier.wait();
println!("after wait1");
thread::sleep(time::Duration::from_secs(1));
//step2
barrier.wait();
println!("after wait2");
}));
}
for handle in handles {
handle.join().unwrap();
}
條件變量 Condvar
Condvar
是 Rust 標準庫中的條件變量(Condition Variable),用於在多線程之間進行線程間的協調和通信。條件變量允許線程等待某個特定的條件成立,當條件滿足時,線程可以被喚醒並繼續執行。
以下是 Condvar 的一個例子:
use std::sync::{Arc, Mutex, Condvar};
use std::thread;
fn main() {
// 創建一個 Mutex 和 Condvar,用於共享狀態和線程協調
let mutex = Arc::new(Mutex::new(false));
let condvar = Arc::new(Condvar::new());
// 創建多個線程
let mut handles = vec![];
for id in 0..3 {
let mutex = Arc::clone(&mutex);
let condvar = Arc::clone(&condvar);
let handle = thread::spawn(move || {
// 獲取 Mutex 鎖
let mut guard = mutex.lock().unwrap();
// 線程等待條件變量爲 true
while !*guard {
guard = condvar.wait(guard).unwrap();
}
// 條件滿足後執行的操作
println!("Thread {} woke up", id);
});
handles.push(handle);
}
// 模擬條件滿足後喚醒等待的線程
thread::sleep(std::time::Duration::from_secs(2));
// 修改條件,並喚醒等待的線程
{
let mut guard = mutex.lock().unwrap();
*guard = true;
condvar.notify_all();
}
// 等待所有線程完成
for handle in handles {
handle.join().unwrap();
}
}
在這個例子中,創建了一個 Mutex
和 Condvar
,其中 Mutex
用於保護共享狀態(條件),而 Condvar
用於等待和喚醒線程。多個線程在 Mutex
上加鎖後,通過 condvar.wait()
方法等待條件滿足,然後在主線程中修改條件,並通過 condvar.notify_all()
喚醒所有等待的線程。
使用場景
-
線程間同步:
Condvar
可以用於線程之間的同步,使得線程能夠等待某個條件的成立而不是輪詢檢查。 -
生產者 - 消費者模型:在多線程環境中,生產者線程可以通過條件變量通知消費者線程有新的數據產生。
-
線程池:在線程池中,任務線程可以等待條件變量,等待新的任務到達時被喚醒執行。
需要注意的是,使用 Condvar
時,通常需要配合 Mutex
使用,以確保在等待和修改條件時的線程安全性。
Condvar
可以通過調用 notify_one()
方法來發出信號。當 notify_one()
方法被調用時,Condvar
會隨機選擇一個正在等待信號的線程,並釋放該線程。Condvar
也可以通過調用 notify_all()
方法來發出信號。當 notify_all()
方法被調用時,Condvar
會釋放所有正在等待信號的線程。
LazyCell 和 LazyLock
我們介紹了OnceCell
和OnceLock
, 我們再介紹兩個類似的用於懶加載的併發原語LazyCell
和LazyLock
。
Rust 中的 LazyCell 和 LazyLock 都是用於懶惰初始化對象的工具。LazyCell 用於懶惰初始化值,LazyLock 用於懶惰初始化資源。
| 類型 | 用途 | 初始化時機 | 線程安全 | | --- | --- | --- | --- | | LazyCell | 懶惰初始化值 | 第一次訪問 | 否 | | LazyLock | 懶惰初始化資源 | 第一次獲取鎖 | 是 | | OnceCell | 懶惰初始化單例值 | 第一次調用 | get_or_init() 方法 | | OnceLock | 懶惰初始化互斥鎖 | 第一次調用 | lock() 方法 |
Exclusive
Rust 中的 Exclusive
是一個用於保證某個資源只被一個線程訪問的工具。Exclusive
可以通過導入 std::sync::Exclusive
來使用。
let mut exclusive = Exclusive::new(92);
println!("ready");
std::thread::spawn(move || {
let counter = exclusive.get_mut();
println!("{}", *counter);
*counter = 100;
}).join().unwrap();
和 Mutex 有什麼區別?Exclusive
僅提供對底層值的可變訪問,也稱爲對底層值的獨佔訪問。它不提供對底層值的不可變或共享訪問。
雖然這可能看起來不太有用,但它允許 Exclusive
無條件地實現 Sync
。事實上,Sync
的安全要求是,對於 Exclusive
而言,它必須可以安全地跨線程共享,也就是說,&Exclusive 跨越線程邊界時必須是安全的。出於設計考慮,&Exclusive 沒有任何 API,使其無用,因此無害,因此內存安全。
這個類型還是一個 nightly 的實驗性特性,所以我們不妨等它穩定後在學習和使用。
mpsc
mpsc
是 Rust 標準庫中的一個模塊,提供了多生產者、單消費者(Multiple Producers, Single Consumer)的消息傳遞通道。mpsc 是 multiple-producer, single-consumer 的縮寫。這個模塊基於 channel 的基於消息傳遞的通訊,具體定義了三個類型:
-
Sender
:發送者,用於異步發送消息。 -
SyncSender
:同步發送者,用於同步發送消息。 -
Receiver
:接收者,用於從同步 channel 或異步 channel 中接收消息,只能有一個線程訪問。
Sender
或 SyncSender
用於向 Receiver
發送數據。兩種 sender 都是可 clone 的 (多生產者), 這樣多個線程就可以同時向一個 receiver(單消費者) 發送。
這些通道有兩種類型:
-
異步的, 無限緩衝區的通道。
channel
函數將返回一個(Sender, Receiver)
元組, 其中所有發送將是異步的 (永不阻塞)。該通道在概念上具有無限的緩衝區。 -
同步的, 有界的通道。
sync_channel
函數將返回一個(SyncSender, Receiver)
元組, 待發送消息的存儲區是一個固定大小的預分配緩衝區。所有發送將是同步的, 通過阻塞直到有空閒的緩衝區空間。注意綁定大小爲 0 也是允許的, 這將使通道變成一個 “約定” 通道, 每個發送方原子地將一條消息交給接收方。
使用場景
-
併發消息傳遞:適用於多個線程(生產者)向一個線程(消費者)發送消息的場景。
-
任務協調:用於協調多個併發任務的執行流程。
每當看到 rust 的 mpsc, 我總是和 Go 的 channel 作比較,事實上 rust 的 channel 使用起來也非常的簡單。
一個簡單的 channel 例子如下:
use std::thread;
use std::sync::mpsc::channel;
// Create a simple streaming channel
let (tx, rx) = channel();
thread::spawn(move|| {
tx.send(10).unwrap();
});
assert_eq!(rx.recv().unwrap(), 10);
一個多生產者單消費者的例子:
use std::thread;
use std::sync::mpsc::channel;
// Create a shared channel that can be sent along from many threads
// where tx is the sending half (tx for transmission), and rx is the receiving
// half (rx for receiving).
let (tx, rx) = channel();
for i in 0..10 {
let tx = tx.clone();
thread::spawn(move|| {
tx.send(i).unwrap();
});
}
for _ in 0..10 {
let j = rx.recv().unwrap();
assert!(0 <= j && j < 10);
}
一個同步 channel 的例子:
use std::sync::mpsc::sync_channel;
use std::thread;
let (tx, rx) = sync_channel(3);
for _ in 0..3 {
// It would be the same without thread and clone here
// since there will still be one `tx` left.
let tx = tx.clone();
// cloned tx dropped within thread
thread::spawn(move || tx.send("ok").unwrap());
}
// Drop the last sender to stop `rx` waiting for message.
// The program will not complete if we comment this out.
// **All** `tx` needs to be dropped for `rx` to have `Err`.
drop(tx);
// Unbounded receiver waiting for all senders to complete.
while let Ok(msg) = rx.recv() {
println!("{msg}");
}
println!("completed");
發送端釋放的情況下,receiver 會收到 error:
use std::sync::mpsc::channel;
// The call to recv() will return an error because the channel has already
// hung up (or been deallocated)
let (tx, rx) = channel::<i32>();
drop(tx);
assert!(rx.recv().is_err());
在 Rust 標準庫中,目前沒有提供原生的 MPMC
(Multiple Producers, Multiple Consumers)通道。std::sync::mpsc
模塊提供的是單一消費者的通道,主要是出於設計和性能的考慮。
設計上,MPSC 通道的實現相對較簡單,可以更容易地滿足特定的性能需求,並且在很多情況下是足夠的。同時,MPSC 通道的使用場景更爲常見,例如在線程池中有一個任務隊列,多個生產者將任務推送到隊列中,而單個消費者負責執行這些任務。
未來我會在專門的章節中介紹更多的第三方庫提供的 channel 以及類似的同步原語,如oneshot
、broadcaster
、mpmc
等。
依照這個 mpmc[2] 的介紹,以前的 rust 標準庫應該是實現了mpmc
, 這個庫就是從老的標準庫中抽取出來的。
信號量 Semaphore
標準庫中沒有 Semaphore 的實現,單這個是在是非常通用的一個併發原語,理論上也應該在這裏介紹。
但是這一章內容也非常多了,而且我也會在 tokio 中介紹信號兩,在一個專門的特殊併發原語 (第十四章或者更多),所以不在這個章節專門介紹了。
這個章節還是偏重標準庫的併發原語的介紹。
原子操作 atomic
Rust 中的原子操作(Atomic Operation)是一種特殊的操作,可以在多線程環境中以原子方式進行,即不會被其他線程的操作打斷。原子操作可以保證數據的線程安全性,避免數據競爭。
在 Rust 中,std::sync::atomic
模塊提供了一系列用於原子操作的類型和函數。原子操作是一種特殊的操作,可以在多線程環境中安全地執行,而不需要使用額外的鎖。
atomic 可以用於各種場景,例如:
-
保證某個值的一致性。
-
防止多個線程同時修改某個值。
-
實現互斥鎖。
目前 Rust 原子類型遵循與 C++20 atomic[3] 相同的規則, 具體來說就是atomic_ref
。基本上, 創建 Rust 原子類型的一個共享引用, 相當於在 C++ 中創建一個atomic_ref
; 當共享引用的生命週期結束時,atomic_ref
也會被銷燬。(一個被獨佔擁有或者位於可變引用後面的 Rust 原子類型, 並不對應 C++ 中的 “原子對象”, 因爲它可以通過非原子操作被訪問。)
這個模塊爲一些基本類型定義了原子版本, 包括AtomicBool
、AtomicIsize
、AtomicUsize
、AtomicI8
、AtomicU16
等。原子類型提供的操作在正確使用時可以在線程間同步更新。
每個方法都帶有一個 Ordering[4] 參數, 表示該操作的內存屏障的強度。這些排序與 C++20 原子排序 [5] 相同。更多信息請參閱 nomicon[6]。
原子變量在線程間安全共享 (實現了 Sync), 但它本身不提供共享機制, 遵循 Rust 的線程模型。共享一個原子變量最常見的方式是把它放到一個Arc
中 (一個原子引用計數的共享指針)。
原子類型可以存儲在靜態變量中, 使用像AtomicBool::new
這樣的常量初始化器初始化。原子靜態變量通常用於懶惰的全局初始化。
我們已經說了,這個模塊爲一些基本類型定義了原子版本,包括AtomicBool
、AtomicIsize
、AtomicUsize
、AtomicI8
、AtomicU16
等,其實每一種類似的方法都比較類似的,所以我們以AtomicI64
介紹。可以通過pub const fn new(v: i64) -> AtomicI64
得到一個AtomicI64
對象, AtomicI64
定義了一些方法,用於對原子變量進行操作,例如:
// i64 和 AtomicI64的轉換,以及一組對象之間的轉換
pub unsafe fn from_ptr<'a>(ptr: *mut i64) -> &'a AtomicI64
pub const fn as_ptr(&self) -> *mut i64
pub fn get_mut(&mut self) -> &mut i64
pub fn from_mut(v: &mut i64) -> &mut AtomicI64
pub fn get_mut_slice(this: &mut [AtomicI64]) -> &mut [i64]
pub fn from_mut_slice(v: &mut [i64]) -> &mut [AtomicI64]
pub fn into_inner(self) -> i64
// 原子操作
pub fn load(&self, order: Ordering) -> i64
pub fn store(&self, val: i64, order: Ordering)
pub fn swap(&self, val: i64, order: Ordering) -> i64
pub fn compare_and_swap(&self, current: i64, new: i64, order: Ordering) -> i64 // 棄用
pub fn compare_exchange(
&self,
current: i64,
new: i64,
success: Ordering,
failure: Ordering
) -> Result<i64, i64>
pub fn compare_exchange_weak(
&self,
current: i64,
new: i64,
success: Ordering,
failure: Ordering
) -> Result<i64, i64>
pub fn fetch_add(&self, val: i64, order: Ordering) -> i64
pub fn fetch_sub(&self, val: i64, order: Ordering) -> i64
pub fn fetch_and(&self, val: i64, order: Ordering) -> i64
pub fn fetch_nand(&self, val: i64, order: Ordering) -> i64
pub fn fetch_or(&self, val: i64, order: Ordering) -> i64
pub fn fetch_xor(&self, val: i64, order: Ordering) -> i64
pub fn fetch_update<F>(
&self,
set_order: Ordering,
fetch_order: Ordering,
f: F
) -> Result<i64, i64>
where
F: FnMut(i64) -> Option<i64>,
pub fn fetch_max(&self, val: i64, order: Ordering) -> i64
pub fn fetch_min(&self, val: i64, order: Ordering) -> i64
如果你有一點原子操作的基礎,就不難理解這些原子操作以及它們的變種了:
-
store
: 原子寫入 -
load
: 原子讀取 -
swap
: 原子交換 -
compare_and_swap
: 原子比較並交換 -
fetch_add
: 原子加法後返回舊值
下面這個例子演示了AtomicI64
的基本原子操作:
use std::sync::atomic::{AtomicI64, Ordering};
let atomic_num = AtomicI64::new(0);
// 原子加載
let num = atomic_num.load(Ordering::Relaxed);
// 原子加法並返回舊值
let old = atomic_num.fetch_add(10, Ordering::SeqCst);
// 原子比較並交換
atomic_num.compare_and_swap(old, 100, Ordering::SeqCst);
// 原子交換
let swapped = atomic_num.swap(200, Ordering::Release);
// 原子存儲
atomic_num.store(1000, Ordering::Relaxed);
上面示例了:
-
load: 原子加載
-
fetch_add: 原子加法並返回舊值
-
compare_and_swap: 原子比較並交換
-
swap: 原子交換
-
store: 原子存儲
這些原子操作都可以確保線程安全, 不會出現數據競爭。
不同的Ordering
表示內存序不同強度的屏障, 可以根據需要選擇。
AtomicI64
提供了豐富的原子操作, 可以實現無鎖的併發算法和數據結構
原子操作的 Ordering
在 Rust 中,Ordering
枚舉用於指定原子操作時的內存屏障(memory ordering)。這與 C++ 的內存模型中的原子操作順序性有一些相似之處,但也有一些不同之處。下面是 Ordering 的三個主要成員以及它們與 C++ 中的內存順序的對應關係:
- Ordering::Relaxed
-
Rust(Ordering::Relaxed):最輕量級的內存屏障,沒有對執行順序進行強制排序。允許編譯器和處理器在原子操作周圍進行指令重排。\
-
C++(memory_order_relaxed):具有相似的語義,允許編譯器和處理器在原子操作周圍進行輕量級的指令重排。
- Ordering::Acquire
-
Rust(Ordering::Acquire):插入一個獲取內存屏障,防止後續的讀操作被重排序到當前操作之前。確保當前操作之前的所有讀取操作都在當前操作之前執行。
-
C++(memory_order_acquire):在 C++ 中,memory_order_acquire 表示獲取操作,確保當前操作之前的讀取操作都在當前操作之前執行。
- Ordering::Release
-
Rust(Ordering::Release):插入一個釋放內存屏障,防止之前的寫操作被重排序到當前操作之後。確保當前操作之後的所有寫操作都在當前操作之後執行。
-
C++(memory_order_release):在 C++ 中,memory_order_release 表示釋放操作,確保之前的寫操作都在當前操作之後執行。
- Ordering::AcqRel
-
Rust(Ordering::AcqRel):插入一個獲取釋放內存屏障,既確保當前操作之前的所有讀取操作都在當前操作之前執行,又確保之前的所有寫操作都在當前操作之後執行。這種內存屏障提供了一種平衡,適用於某些獲取和釋放操作交替進行的場景。
-
C++(memory_order_acq_rel):也表示獲取釋放操作,它是獲取和釋放的組合。確保當前操作之前的所有讀取操作都在當前操作之前執行,同時確保之前的所有寫操作都在當前操作之後執行。
- Ordering::SeqCst
-
Rust(Ordering::SeqCst):插入一個全序內存屏障,保證所有線程都能看到一致的操作順序。是最強的內存順序,用於實現全局同步。
-
C++(memory_order_seq_cst):在 C++ 中,memory_order_seq_cst 也表示全序操作,保證所有線程都能看到一致的操作順序。是 C++ 中的最強的內存順序。
合理選擇 Ordering 可以最大程度提高性能, 同時保證需要的內存序約束。
但是如何合理的選擇,這就依賴開發者的基本賬功了,使用原子操作時需要小心,確保正確地選擇適當的 Ordering,以及避免競態條件和數據競爭。
像 Go 語言,直接使用了Ordering::SeqCst
作爲它的默認的內存屏障,開發者使用起來就沒有心智負擔了,但是你如果想更精細化的使用Ordering
, 請確保你一定清晰的瞭解你的代碼邏輯和 Ordering 的意義。
Ordering::Relaxed
Ordering::Relaxed
是最輕量級的內存順序,允許編譯器和處理器在原子操作周圍進行指令重排,不提供強制的執行順序。這通常在對程序執行的順序沒有嚴格要求時使用,以獲得更高的性能。
以下是一個簡單的例子,演示了 Ordering::Relaxed 的用法:
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
fn main() {
// 創建一個原子布爾值
let atomic_bool = AtomicBool::new(false);
// 創建一個生產者線程,設置布爾值爲 true
let producer_thread = thread::spawn(move || {
// 這裏可能會有指令重排,因爲使用了 Ordering::Relaxed
atomic_bool.store(true, Ordering::Relaxed);
});
// 創建一個消費者線程,檢查布爾值的狀態
let consumer_thread = thread::spawn(move || {
// 這裏可能會有指令重排,因爲使用了 Ordering::Relaxed
let value = atomic_bool.load(Ordering::Relaxed);
println!("Received value: {}", value);
});
// 等待線程完成
producer_thread.join().unwrap();
consumer_thread.join().unwrap();
}
Ordering::Acquire
Ordering::Acquire
在 Rust 中表示插入一個獲取內存屏障,確保當前操作之前的所有讀取操作都在當前操作之前執行。這個內存順序常常用於同步共享數據,以確保線程能夠正確地觀察到之前的寫入操作。
以下是一個使用 Ordering::Acquire
的簡單例子:
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
fn main() {
// 創建一個原子布爾值
let atomic_bool = AtomicBool::new(false);
// 創建一個生產者線程,設置布爾值爲 true
let producer_thread = thread::spawn(move || {
// 設置布爾值爲 true
atomic_bool.store(true, Ordering::Release);
});
// 創建一個消費者線程,讀取布爾值的狀態
let consumer_thread = thread::spawn(move || {
// 等待直到讀取到布爾值爲 true
while !atomic_bool.load(Ordering::Acquire) {
// 這裏可能進行自旋,直到獲取到 Acquire 順序的布爾值
// 注意:在實際應用中,可以使用更高級的同步原語而不是自旋
}
println!("Received value: true");
});
// 等待線程完成
producer_thread.join().unwrap();
consumer_thread.join().unwrap();
}
Ordering::Release
Ordering::Release
在 Rust 中表示插入一個釋放內存屏障,確保之前的所有寫入操作都在當前操作之後執行。這個內存順序通常用於同步共享數據,以確保之前的寫入操作對其他線程可見。
以下是一個使用 Ordering::Release
的簡單例子:
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
fn main() {
// 創建一個原子布爾值
let atomic_bool = AtomicBool::new(false);
// 創建一個生產者線程,設置布爾值爲 true
let producer_thread = thread::spawn(move || {
// 設置布爾值爲 true
atomic_bool.store(true, Ordering::Release);
});
// 創建一個消費者線程,讀取布爾值的狀態
let consumer_thread = thread::spawn(move || {
// 等待直到讀取到布爾值爲 true
while !atomic_bool.load(Ordering::Acquire) {
// 這裏可能進行自旋,直到獲取到 Release 順序的布爾值
// 注意:在實際應用中,可以使用更高級的同步原語而不是自旋
}
println!("Received value: true");
});
// 等待線程完成
producer_thread.join().unwrap();
consumer_thread.join().unwrap();
}
在這個例子中,生產者線程使用 store
方法將布爾值設置爲 true
,而消費者線程使用 load
方法等待並讀取布爾值的狀態。由於使用了 Ordering::Release
,在生產者線程設置布爾值之後,會插入釋放內存屏障,確保之前的所有寫入操作都在當前操作之後執行。這確保了消費者線程能夠正確地觀察到生產者線程的寫入操作。
Ordering::AcqRel
Ordering::AcqRel
在 Rust 中表示插入一個獲取釋放內存屏障,即同時包含獲取和釋放操作。它確保當前操作之前的所有讀取操作都在當前操作之前執行,並確保之前的所有寫入操作都在當前操作之後執行。這個內存順序通常用於同步共享數據,同時提供了一些平衡,適用於需要同時執行獲取和釋放操作的場景。
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
fn main() {
// 創建一個原子布爾值
let atomic_bool = AtomicBool::new(false);
// 創建一個生產者線程,設置布爾值爲 true
let producer_thread = thread::spawn(move || {
// 設置布爾值爲 true
atomic_bool.store(true, Ordering::AcqRel);
});
// 創建一個消費者線程,讀取布爾值的狀態
let consumer_thread = thread::spawn(move || {
// 等待直到讀取到布爾值爲 true
while !atomic_bool.load(Ordering::AcqRel) {
// 這裏可能進行自旋,直到獲取到 AcqRel 順序的布爾值
// 注意:在實際應用中,可以使用更高級的同步原語而不是自旋
}
println!("Received value: true");
});
// 等待線程完成
producer_thread.join().unwrap();
consumer_thread.join().unwrap();
}
在這個例子中,生產者線程使用 store 方法將布爾值設置爲 true,而消費者線程使用 load 方法等待並讀取布爾值的狀態。由於使用了 Ordering::AcqRel,在生產者線程設置布爾值之後,會插入獲取釋放內存屏障,確保之前的所有讀取操作都在當前操作之前執行,同時確保之前的所有寫入操作都在當前操作之後執行。這確保了消費者線程能夠正確地觀察到生產者線程的寫入操作。
Ordering::SeqCst
Ordering::SeqCst
在 Rust 中表示插入一個全序內存屏障,保證所有線程都能看到一致的操作順序。這是最強的內存順序,通常用於實現全局同步。
以下是一個使用 Ordering::SeqCst 的簡單例子:
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
fn main() {
// 創建一個原子布爾值
let atomic_bool = AtomicBool::new(false);
// 創建一個生產者線程,設置布爾值爲 true
let producer_thread = thread::spawn(move || {
// 設置布爾值爲 true
atomic_bool.store(true, Ordering::SeqCst);
});
// 創建一個消費者線程,讀取布爾值的狀態
let consumer_thread = thread::spawn(move || {
// 等待直到讀取到布爾值爲 true
while !atomic_bool.load(Ordering::SeqCst) {
// 這裏可能進行自旋,直到獲取到 SeqCst 順序的布爾值
// 注意:在實際應用中,可以使用更高級的同步原語而不是自旋
}
println!("Received value: true");
});
// 等待線程完成
producer_thread.join().unwrap();
consumer_thread.join().unwrap();
}
在這個例子中,生產者線程使用 store
方法將布爾值設置爲 true,而消費者線程使用 load
方法等待並讀取布爾值的狀態。由於使用了 Ordering::SeqCst
,在生產者線程設置布爾值之後,會插入全序內存屏障,確保所有線程都能看到一致的操作順序。這確保了消費者線程能夠正確地觀察到生產者線程的寫入操作。SeqCst
是最強的內存順序,提供了最高級別的同步保證。
在 Rust 中,Ordering::Acquire
內存順序通常與Ordering::Release
配合使用。
Ordering::Acquire
和Ordering::Release
之間形成happens-before
關係, 可以實現不同線程之間的同步。
其典型用法是:
-
當一個線程使用 Ordering::Release 寫一個變量時, 這給寫操作建立一個釋放屏障。
-
其他線程使用 Ordering::Acquire 讀這個變量時, 這給讀操作建立一個獲取屏障。
-
獲取屏障確保讀操作必須發生在釋放屏障之後。
這樣就可以實現:
-
寫線程確保寫發生在之前的任何讀之前
-
讀線程可以看到最新的寫入值
此外, Ordering::AcqRel
也經常被用來同時具有兩者的語義。
如果用happens-before
描述這五種內存順序,那麼:
-
Relaxed: 沒有 happens-before 關係
-
Release: 對於給定的寫操作 A, 該釋放操作 happens-before 讀操作 B, 當 B 讀取的是 A 寫入的最新值。和
Acquire
配套使用。 -
Acquire: 對於給定的讀操作 A, 該獲取操作 happens-after 寫操作 B, 當 A 讀取的是 B 寫入的最新值。和
Release
配套使用。 -
AcqRel: 同時滿足 Acquire 和 Release 的 happens-before 關係。
-
SeqCst: 所有的 SeqCst 操作之間都存在 happens-before 關係, 形成一個全序。
happens-before
關係表示對給定兩個操作 A 和 B:
-
如果
A happens-before B
, 那麼 A 對所有線程可見, 必須先於 B 執行。 -
如果沒有
happens-before
關係, 則 A 和 B 之間可能存在重排序和可見性問題。
Release
建立寫之前的happens-before
關係,Acquire
建立讀之後的關係。兩者搭配可以實現寫入對其他線程可見。、SeqCst
強制一個全序, 所有操作都是有序的。
參考資料
[1]
未定義的行爲: https://github.com/rust-lang/rust/issues/32260
[2]
mpmc: https://crates.io/crates/mpmc
[3]
C++20 atomic: https://en.cppreference.com/w/cpp/atomic
[4]
Ordering: https://doc.rust-lang.org/std/sync/atomic/enum.Ordering.html
[5]
C++20 原子排序: https://en.cppreference.com/w/cpp/atomic/memory_order
[6]
nomicon: https://doc.rust-lang.org/nomicon/atomics.html
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/fV8cnzh9U3XmPWciQX1Ihg