Rust 無畏併發 - 2
在本系列的第 1 部分中,我們討論了 Rust 如何使用生命週期和借用檢查器來避免數據競爭。我們甚至將一個帶有線程錯誤的 c++ 程序與一個等價的 Rust 程序進行了比較,並看到 Rust 是如何不讓這個有錯誤的版本通過編譯的。
在本系列的第 2 部分中,我們將探討 Rust 如何實現一種常見的設計模式:分解大量的計算任務,在 cpu 之間切換,並組合結果。我們將繼續使用一個故意沒有效率的函數來尋找質數。我們使用的函數與上次相同:
fn is_prime(n: u32) -> bool {
(2..=n/2).all(|i| n % i != 0)
}
我們將要創建的程序包含下面的功能:
-
創建一個包含數字 2 到 2,000,000 的 Vec。
-
通過 is_prime 函數過濾 Vec 中的數字,使其只包含質數。
-
打印找到的質數的數量和執行計算所需的時間。
單線程負載
作爲基準,我們將從單線程版本開始。我們使用 Rust 的迭代器:
use std::time::Instant;
fn is_prime(n: u32) -> bool {
(2..=n / 2).all(|i| n % i != 0)
}
fn main() {
let now = Instant::now();
let primes_under_2_million: Vec<u32> = (2..2_000_000).filter(|n| is_prime(*n)).collect();
let elapsed = now.elapsed().as_secs_f32();
println!(
"Found {} primes in {:1} seconds",
primes_under_2_million.len(),
elapsed
);
}
使用單核,在 debug 模式下 (cargo run) 計算需要 712 秒,在 release 模式下 (cargo run - release) 計算需要 89 秒。
這是一個很好的基準,但我們絕對可以在此基礎上做得更好!
使用原生 Rust 線程進行拆分
只使用原生的 Rust(沒有外部依賴),我們可以利用一些 Rust 特性來幫助我們劃分工作負載。
use std::{
thread::{self, JoinHandle},
time::Instant,
};
fn is_prime(n: u32) -> bool {
(2..=n / 2).all(|i| n % i != 0)
}
fn main() {
let now = Instant::now();
let candidates: Vec<u32> = (2..2_000_000).collect();
// Build a vector of thread handles, each running 1/12th of the workload
let mut threads: Vec<JoinHandle<Vec<u32>>> = candidates
.chunks(2_000_000 / 12)
.into_iter()
.map(|chunk| {
// We have to take ownership of the chunk, otherwise the borrow
// checked complains that we are potentially borrowing references
// to candidates beyond the lifetime of the program.
let my_chunk: Vec<u32> = chunk.to_owned();
// Spawn the thread, moving our own copy of chunks into the thread.
thread::spawn(move || {
my_chunk
.iter()
.filter(|n| is_prime(**n))
.map(|n| *n)
.collect()
})
})
.collect();
// Combine the results
let primes_under_2_million: Vec<u32> = threads
.drain(0..)
.map(|t| t.join().unwrap())
.flatten()
.collect();
let elapsed = now.elapsed().as_secs_f32();
println!(
"Found {} primes in {:1} seconds",
primes_under_2_million.len(),
elapsed
);
}
這段代碼首先創建一個由數字 2 到 2,000,000 組成的 Vec。然後,它使用 chunks(Vec 的內置函數) 將 Vec 分成 12 個塊,我們可以使用 into_iter() 來遍歷每個塊。
每個塊我們生成一個線程,該線程將返回一個 Result<Vec>,其中包含來自該塊的質數。每個線程返回 JoinHandle,JoinHandle 是一個指向線程的指針,該線程在結束時將包含返回結果,可以在主線程上調用 join 來等待它完成。
最後,我們清空 Vec 併合並結果。(Drain 會在處理過程中從 vector 中刪除每個項,因爲我們不會重用向量,這是避免所有權問題的好方法)。
這個策略提供了一個顯著的加速。在 Debug 模式下只需 123 秒,而在 release 模式下只需 15 秒。
使用 Rayon
Rayon 是一個很受歡迎的 Rust crate,它實現了很多線程功能。提供了工作線程池、工作竊取 (空閒線程將從掛起的隊列中獲取工作)、並行迭代器和許多其他特性,從而簡化了多線程編程。缺點是將依賴項添加到程序中。
在 Cargo.toml 文件中加入依賴:
[dependencies]
rayon = "1.5"
使用 Rayon 的最大優勢是它極大地簡化了代碼,你可以對原始的單線程版本做一些微小的更改,就能立即受益於多線程帶來的加速。這是 Rayon 的版本:
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use std::time::Instant;
fn is_prime(n: u32) -> bool {
(2..=n / 2).all(|i| n % i != 0)
}
fn main() {
let now = Instant::now();
let primes_under_2_million: Vec<u32> = (2..2_000_000)
.into_par_iter()
.filter(|n| is_prime(*n))
.collect();
let elapsed = now.elapsed().as_secs_f32();
println!(
"Found {} primes in {:1} seconds",
primes_under_2_million.len(),
elapsed
);
}
注意,我們從 Rayon 導入了兩個特性:IntoParallelIterator 和 ParallelIterator。這些是將 2..2_000_000 轉換爲並行迭代器需要的。我們不需要將候選對象分割成塊,也不需要擔心線程連接的問題——Rayon 負責所有這些工作。
最重要的是,它更快。在 Debug 模式下用時 72 秒,在 Release 模式下用時 8 秒。
結果
讓我們比較一下我們測試的三種結果:
就複雜性而言,單線程版本是最短的,其次是 Rayon。分塊版本的代碼量是單線程版本的兩倍多。
令人驚訝的是,儘管增加了複雜性——分塊版本並不是最快的。最快的是 Rayon 版本。
令人驚訝的是,儘管增加了複雜性——分塊版本並不是最快的。最快的是 Rayon 版本。
總結
使用原生 Rust 將工作負載劃分爲多個線程有點複雜,但工作得非常好。與單線程模型相比,它提供了很好的性能提升。它有 47 行代碼 (包括註釋),比單線程模型複雜得多——但可以肯定它是快速和安全的。
如果你可以接受添加 Rayon 依賴,那麼你可以很容易地將單線程模型的簡單性與併發模型的強大功能結合起來。它甚至比原生的 Rust 版本還要快,並且保留了你期望的所有安全特性。
本文翻譯自:
https://medium.com/pragmatic-programmers/fearless-concurrency-with-rust-part-2-6cb215718ded
coding 到燈火闌珊 專注於技術分享,包括 Rust、Golang、分佈式架構、雲原生等。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/b3pKfZMXvOWvNiU6UPwxUg