Rust 無畏併發 - 1
Rust 的一個特點就是無所畏懼的併發。併發編程常常充滿危險:
-
數據競爭問題:當多個線程在沒有原子類型或鎖機制保護的情況下寫入相同的數據。
-
生命週期問題:當線程的生命週期超過變量的生命週期時,就會發生生命週期問題。例:這個變量被父線程銷燬,而子線程仍然依賴於它。
讓我們比較一些 Rust 和 c++ 代碼,看看 Rust 是如何保護你避免出現這些問題的。
找質數
有許多方法可以分解一個數並檢測它是否是質數。有些算法非常快——但在本文中,我們將故意使用一種慢的方法。一個簡單的算法使用更多的 CPU 時間,以便更好地展示多線程的好處。我們將使用以下函數來確定一個數是否爲質數:
fn is_prime(n: u32) -> bool {
(2 ..= n/2).all(|i| n % i != 0 )
}
爲了確保它能工作,這裏有一個快速的單元測試,用於檢查 100 以下的質數列表:
#[test]
fn test_first_hundred_primes() {
let primes: Vec<u32> = (2..100).filter(|n| is_prime(*n)).collect();
assert_eq!(
primes,
[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97]
);
}
這裏有一個等價的 c++ 函數:
bool is_prime(uint32_t n) {
for (uint32_t i = 2; i <= n / 2; ++i) {
if (n % i == 0) return false;
}
return true;
}
單線程
下面的 Rust 程序計算了它能在 2 到 20 萬之間找到多少個質數:
fn main() {
const MAX: u32 = 200_000;
let n_primes = (2..MAX).filter(|n| is_prime(*n)).count();
println!("Found {n_primes} prime numbers in the range 2..{MAX}");
}
這個程序在 2 到 20 萬之間找到 17984 個質數。
類似的 c++ 程序如下所示:
int main() {
const auto max = 200000;
uint32_t count = 0;
for (auto i=2; i < max; i++) {
if (is_prime(i)) ++count;
}
std::cout << "Found " << count << " prime numbers.\n";
}
這個程序也能找到 17984 個質數。到目前爲止,一切順利。讓我們來看一下數據競爭吧!
數據競爭
爲了加速我們的計算,我們啓動了 c++ std::thread 對象,將我們的工作負載分配給了兩個線程:
int main() {
const auto max = 200000;
uint32_t count = 0;
std::thread t1([&count, max]() {
for (auto i = 2; i < max/2; i++) {
if (is_prime(i)) ++count;
}
});
std::thread t2([&count, max]() {
for (auto i = max/2; i < max; i++) {
if (is_prime(i)) ++count;
}
});
t1.join();
t2.join();
std::cout << "Found " << count << " prime numbers.\n";
}
這個程序編譯並運行,Visual Studio(我的編譯器) 沒有顯示任何警告。代碼將工作負載拆分爲兩個,生成了兩個線程—每個線程覆蓋計算的一半。
不幸的是,這行不通。我第一次運行的時候,它統計出了 17983 個質數。第二次,它只數到 17981!線程併發訪問 count。增加一個變量分爲三個階段:讀取當前值,向其加 1,存儲結果。不能保證所有這些步驟都是在一個線程中完成的——因此產生了數據競爭。
也許用 Rust 實現的版本可以工作?
fn main() {
const MAX: u32 = 200_000;
let mut counter = &mut 0;
let t1 = std::thread::spawn(move || {
*counter += (2..MAX/2).filter(|n| is_prime(*n)).count();
});
let t2 = std::thread::spawn(move || {
*counter += (MAX/2..MAX).filter(|n| is_prime(*n)).count();
});
t1.join().unwrap();
t2.join().unwrap();
println!("Found {counter} prime numbers in the range 2..{MAX}");
}
同一個程序的 Rust 版本無法編譯。它給出了兩個錯誤:
-
計數器的可變借用不得超過一次。
-
計數器借用的生命週期可能超過其值的生命週期。
Rust 編譯器已經檢測到你的程序是不安全的,因此編譯不通過。
安全的多線程
下面是 Rust 的一個安全版本,它可以編譯成功並給出正確的結果:
fn main() {
const MAX: u32 = 200_000;
let counter = Arc::new(AtomicUsize::new(0));
let counter_1 = counter.clone();
let counter_2 = counter.clone();
let t1 = std::thread::spawn(move || {
counter_1.fetch_add(
(2 .. MAX/2).filter(|n| is_prime(*n)).count(),
std::sync::atomic::Ordering::SeqCst
);
});
let t2 = std::thread::spawn(move || {
counter_2.fetch_add(
(MAX/2 .. MAX).filter(|n| is_prime(*n)).count(),
std::sync::atomic::Ordering::SeqCst
);
});
t1.join();
t2.join();
println!("Found {} prime numbers in the range 2..{MAX}",
counter.load(Ordering::SeqCst));
}
在這個程序中有三個新的概念:
-
AtomicUSize 是一個特殊的 “原子” 類型。原子類型提供了針對數據競爭的自動保護。它們通常非常快,直接映射到等價的 CPU 指令。無論何時訪問原子類型,都必須指出所需的同步保證—在這種情況下,SeqCst 提供了最高級別的保護。
-
Arc 是一個 “原子引用計數器”。當你在 Arc 中封裝一個類型時,可以安全地克隆它並在線程之間移動。每個克隆都是指向所包含數據的指針,Arc 保證底層數據不會被刪除。
-
線程閉包 (內聯函數) 移動了它們捕獲的數據。他們在 Arc 的克隆上進行操作 - 所以他們都指向同一個地址。
結合這些概念可以讓我們避免數據競爭和生命週期問題:
-
Rust 的借用檢查器很高興,因爲數據不是在線程之間借用的:Arc 克隆,可以安全地共享一個指向計數器數據的指針。
-
使用 AtomicUsize 不會導致數據競爭。
總結
Rust 無所畏懼的併發性保證將我們從數據競爭中拯救出來!c++ 沒有警告錯誤,它只是給出了不正確的結果。Rust 通過拒絕編譯不正確的代碼挽救了局面。在大型程序中,這可以節省大量的調試時間。
Rust 的 Arc 和 AtomicUSize 類型使得代碼看起來很複雜,在本系列的下一部分中,我將向你展示如何降低複雜性並創建快速的、多線程的、易於閱讀的代碼。
本文翻譯自:
https://medium.com/pragmatic-programmers/fearless-concurrency-with-rust-683cef1acb6b
coding 到燈火闌珊 專注於技術分享,包括 Rust、Golang、分佈式架構、雲原生等。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/LPIHtFYA8bXXJpdA2cwwYg