Rust 併發編程 - 線程池
線程池是一種併發編程的設計模式,它由一組預先創建的線程組成,用於執行多個任務。線程池的主要作用是在任務到達時,重用已創建的線程,避免頻繁地創建和銷燬線程,從而提高系統的性能和資源利用率。線程池通常用於需要處理大量短期任務或併發請求的應用程序。
線程池的優勢包括:
-
減少線程創建和銷燬的開銷:線程的創建和銷燬是一項昂貴的操作,線程池通過重用線程減少了這些開銷,提高了系統的響應速度和效率。
-
控制併發度:線程池可以限制同時執行的線程數量,從而有效控制系統的併發度,避免資源耗盡和過度競爭。
-
任務調度和負載均衡:線程池使用任務隊列和調度算法來管理和分配任務,確保任務按照合理的方式分配給可用的線程,實現負載均衡和最優的資源利用。
2.1 rayon 線程池
Rayon 是 Rust 中的一個並行計算庫,它可以讓你更容易地編寫並行代碼,以充分利用多核處理器。Rayon 提供了一種簡單的 API,允許你將迭代操作並行化,從而加速處理大規模數據集的能力。除了這些核心功能外,它還提供構建線程池的能力。
rayon::ThreadPoolBuilder
是 Rayon 庫中的一個結構體,用於自定義和配置 Rayon 線程池的行爲。線程池是 Rayon 的核心部分,它管理並行任務的執行。通過使用 ThreadPoolBuilder
,你可以根據你的需求定製 Rayon 線程池的行爲,以便更好地適應你的並行計算任務。在創建線程池之後,你可以使用 Rayon 提供的方法來並行執行任務,利用多核處理器的性能優勢。
ThreadPoolBuilder 是以設計模式中的構建者模式設計的,以下是一些 ThreadPoolBuilder
的主要方法:
new()
方法:創建一個新的ThreadPoolBuilder
實例。
use rayon::ThreadPoolBuilder;
fn main() {
let builder = ThreadPoolBuilder::new();
}
num_threads()
方法:設置線程池的線程數量。你可以通過這個方法指定線程池中的線程數,以控制並行度。默認情況下,Rayon 會根據 CPU 內核數量自動設置線程數。
use rayon::ThreadPoolBuilder;
fn main() {
let builder = ThreadPoolBuilder::new().num_threads(4); // 設置線程池有 4 個線程
}
thread_name()
方法:爲線程池中的線程設置一個名稱,這可以幫助你在調試時更容易識別線程。
use rayon::ThreadPoolBuilder;
fn main() {
let builder = ThreadPoolBuilder::new().thread_name(|i| format!("worker-{}", i));
}
build()
方法:通過build
方法來創建線程池。這個方法會將之前的配置應用於線程池並返回一個rayon::ThreadPool
實例。
use rayon::ThreadPoolBuilder;
fn main() {
let pool = ThreadPoolBuilder::new()
.num_threads(4)
.thread_name(|i| format!("worker-{}", i))
.build()
.unwrap(); // 使用 unwrap() 來處理潛在的錯誤
}
build_global
方法 通過build_global
方法創建一個全局的線程池。不推薦你主動調用這個方法初始化全局的線程池,使用默認的配置就好,記得全局的線程池只會初始化一次。
rayon::ThreadPoolBuilder::new().num_threads(22).build_global().unwrap();
-
其他方法:
ThreadPoolBuilder
還提供了其他一些方法,用於配置線程池的行爲,如stack_size()
用於設置線程棧的大小。 -
它還提供了一些回調函數的設置,
start_handler()
用於設置線程啓動時的回調函數等。spawn_handler
實現定製化的函數來產生線程。panic_handler
提供對 panic 處理的回調函數。exit_handler
提供線程退出時的回調。
下面這個例子演示了使用 rayon 線程池計算斐波那契數列:
fn fib(n: usize) -> usize {
if n == 0 || n == 1 {
return n;
}
let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // 運行在rayon線程池中
return a + b;
}
pub fn rayon_threadpool() {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(8)
.build()
.unwrap();
let n = pool.install(|| fib(20));
println!("{}", n);
}
-
rayon::ThreadPoolBuilder 用來創建一個線程池。設置使用 8 個線程
-
pool.install() 在線程池中運行 fib
-
rayon::join 用於並行執行兩個函數並等待它們的結果。它使得你可以同時執行兩個獨立的任務,然後等待它們都完成,以便將它們的結果合併到一起。
通過在 join 中傳入 fib 遞歸任務, 實現並行計算 fib 數列
與直接 spawn thread 相比, 使用 rayon 的線程池有以下優點:
-
線程可重用, 避免頻繁創建 / 銷燬線程的開銷
-
線程數可配置, 一般根據 CPU 核心數設置
-
避免大量線程造成資源競爭問題
接下來在看一段使用build_scoped
的代碼:
scoped_tls::scoped_thread_local!(static POOL_DATA: Vec<i32>);
pub fn rayon_threadpool2() {
let pool_data = vec![1, 2, 3];
// We haven't assigned any TLS data yet.
assert!(!POOL_DATA.is_set());
rayon::ThreadPoolBuilder::new()
.build_scoped(
// Borrow `pool_data` in TLS for each thread.
|thread| POOL_DATA.set(&pool_data, || thread.run()),
// Do some work that needs the TLS data.
|pool| pool.install(|| assert!(POOL_DATA.is_set())),
).unwrap();
// Once we've returned, `pool_data` is no longer borrowed.
drop(pool_data);
}
這段 Rust 代碼使用了一些 Rust 庫來演示線程池的使用以及如何在線程池中共享線程本地存儲(TLS,Thread-Local Storage)。
-
scoped_tls::scoped_thread_local!(static POOL_DATA: Vec<i32>);
這一行代碼使用了scoped_tls
庫的宏scoped_thread_local!
來創建一個靜態的線程本地存儲變量POOL_DATA
,其類型是Vec<i32>
。這意味着每個線程都可以擁有自己的POOL_DATA
值,而這些值在不同線程之間是相互獨立的。 -
let pool_data = vec![1, 2, 3];
在main
函數內,創建了一個Vec<i32>
類型的變量pool_data
,其中包含了整數 1、2 和 3。 -
assert!(!POOL_DATA.is_set());
這一行代碼用來檢查在線程本地存儲中是否已經設置了POOL_DATA
。在此初始階段,我們還沒有爲它的任何線程分配值,因此應該返回false
。 -
rayon::ThreadPoolBuilder::new()
這一行開始構建一個 Rayon 線程池。 -
.build_scoped
在線程池建立之後,這裏使用.build_scoped
方法來定義線程池的行爲。這個方法需要兩個閉包作爲參數。
-
第一個閉包
|thread| POOL_DATA.set(&pool_data, || thread.run())
用於定義每個線程在啓動時要執行的操作。它將pool_data
的引用設置爲POOL_DATA
的線程本地存儲值,並在一個新的線程中運行thread.run()
,這個閉包的目的是爲每個線程設置線程本地存儲數據。 -
第二個閉包
|pool| pool.install(|| assert!(POOL_DATA.is_set()))
定義了線程池啓動後要執行的操作。它使用pool.install
方法來確保在線程池中的每個線程中都能夠訪問到線程本地存儲的值,並且執行了一個斷言來驗證POOL_DATA
在這個線程的線程本地存儲中已經被設置。
drop(pool_data);
在線程池的作用域結束後,這一行代碼用來釋放pool_data
變量。這是因爲線程本地存儲中的值是按線程管理的,所以在這個作用域結束後,我們需要手動釋放pool_data
,以確保它不再被任何線程訪問。
2.2 threadpool 庫
threadpool
是一個 Rust 庫,用於創建和管理線程池,使並行化任務變得更加容易。線程池是一種管理線程的機制,它可以在應用程序中重用線程,以減少線程創建和銷燬的開銷,並允許您有效地管理並行任務。下面是關於 threadpool
庫的一些基本介紹:
-
創建線程池:
threadpool
允許您輕鬆創建線程池,可以指定線程池的大小(即同時運行的線程數量)。這可以確保您不會創建過多的線程,從而避免不必要的開銷。 -
提交任務: 一旦創建了線程池,您可以將任務提交給線程池進行執行。這可以是任何實現了
FnOnce()
特質的閉包,通常用於表示您想要並行執行的工作單元。 -
任務調度: 線程池會自動將任務分發給可用線程,並在任務完成後回收線程,以便其他任務可以使用。這種任務調度可以減少線程創建和銷燬的開銷,並更好地利用系統資源。
-
等待任務完成: 您可以等待線程池中所有任務完成,以確保在繼續執行後續代碼之前,所有任務都已完成。這對於需要等待並行任務的結果的情況非常有用。
-
錯誤處理:
threadpool
提供了一些錯誤處理機制,以便您可以檢測和處理任務執行期間可能發生的錯誤。
下面是一個簡單的示例,演示如何使用 threadpool
庫創建一個線程池並提交任務:
use std::sync::mpsc::channel;
use threadpool::ThreadPool;
fn main() {
// 創建一個線程池,其中包含 4 個線程
let pool = threadpool::ThreadPool::new(4);
// 創建一個通道,用於接收任務的結果
let (sender, receiver) = channel();
// 提交一些任務給線程池
for i in 0..8 {
let sender = sender.clone();
pool.execute(move || {
let result = i * 2;
sender.send(result).expect("發送失敗");
});
}
// 等待所有任務完成,並接收它們的結果
for _ in 0..8 {
let result = receiver.recv().expect("接收失敗");
println!("任務結果: {}", result);
}
}
上述示例創建了一個包含 4 個線程的線程池,並向線程池提交了 8 個任務,每個任務計算一個數字的兩倍並將結果發送到通道。最後,它等待所有任務完成並打印結果。
接下來我們再看一個 threadpool + barrier 的例子。併發執行多個任務,並且使用 barrier 等待所有的任務完成。注意任務數一定不能大於 worker 的數量,否則會導致死鎖:
// create at least as many workers as jobs or you will deadlock yourself
let n_workers = 42;
let n_jobs = 23;
let pool = threadpool::ThreadPool::new(n_workers);
let an_atomic = Arc::new(AtomicUsize::new(0));
assert!(n_jobs <= n_workers, "too many jobs, will deadlock");
// 創建一個barrier,等待所有的任務完成
let barrier = Arc::new(Barrier::new(n_jobs + 1));
for _ in 0..n_jobs {
let barrier = barrier.clone();
let an_atomic = an_atomic.clone();
pool.execute(move || {
// 執行一個很重的任務
an_atomic.fetch_add(1, Ordering::Relaxed);
// 等待其他線程完成
barrier.wait();
});
}
// 等待線程完成
barrier.wait();
assert_eq!(an_atomic.load(Ordering::SeqCst), /* n_jobs = */ 23);
2.3 rusty_pool 庫
這是基於 crossbeam 多生產者多消費者通道實現的自適應線程池。它具有以下特點:
-
核心線程池和最大線程池兩種大小
-
核心線程持續存活, 額外線程有空閒回收機制
-
支持等待任務結果和異步任務
-
首次提交任務時才創建線程, 避免資源佔用
-
當核心線程池滿了時纔會創建額外線程
-
提供了 JoinHandle 來等待任務結果
-
如果任務 panic,JoinHandle 會收到一個取消錯誤
-
開啓 asyncfeature 時可以作爲 futures executor 使用
-
spawn 和 try_spawn 來提交 future, 會自動 polling
-
否則可以通過 complete 直接阻塞執行 future
總之, 該線程池實現了自動擴縮容、空閒回收、異步任務支持等功能。
其自適應控制和異步任務的支持使其可以很好地應對突發大流量, 而平時也可以節省資源。
從實現來看, 作者運用了 crossbeam 通道等 Rust 併發編程地道的方式, 代碼質量很高。
所以這是一個非常先進實用的線程池實現, 值得深入學習借鑑。可以成爲我們編寫彈性伸縮的 Rust 併發程序的很好選擇
pub fn rusty_pool_example() {
let pool = rusty_pool::ThreadPool::default();
for _ in 1..10 {
pool.execute(|| {
println!("Hello from a rusty_pool!");
});
}
pool.join();
}
這個例子展示瞭如何使用另一個線程池 rusty_pool 來實現併發。
主要步驟包括:
-
創建 rusty_pool 線程池, 默認配置
-
循環提交 10 個打印任務到線程池
-
在主線程中調用 join, 等待線程池內所有任務完成
與之前的 threadpool 類似, rusty_pool 也提供了一個方便的線程池抽象, 使用起來更簡單些。
下面這段代碼是提交一個任務給線程池運行後,等到結果返回的例子:
let handle = pool.evaluate(|| {
thread::sleep(Duration::from_secs(5));
return 4;
});
let result = handle.await_complete();
assert_eq!(result, 4);
下面這個例子展示瞭如何在 rusty_pool 線程池中執行異步任務。
主要包含兩個處理方式:
a1、創建默認的 rusty_pool 線程池
a2、使用 pool.complete 來同步執行一個 async 塊
-
在 async 塊中可以使用 await 運行異步函數
-
complete 會阻塞直到整個 async 塊完成
-
可以獲取 async 塊的返回值
b1、使用 pool.spawn 來異步執行 async 塊
-
spawn 會立即返回一個 JoinHandle
-
async 塊會在線程池中異步執行
-
這裏通過 Atomics 變量來保存結果
b2、在主線程中調用 join, 等待異步任務完成
b3、檢驗異步任務的結果
通過 complete 和 spawn 的結合, 可以靈活地在線程池中同步或異步地執行 Future 任務。
rusty_pool 通過內置的 async 運行時, 很好地支持了 Future based 的異步編程。
我們可以利用這種方式來實現複雜的異步業務, 而不需要自己管理線程和 Future。
pub fn rusty_pool_example2() {
let pool = rusty_pool::ThreadPool::default();
let handle = pool.complete(async {
let a = some_async_fn(4, 6).await; // 10
let b = some_async_fn(a, 3).await; // 13
let c = other_async_fn(b, a).await; // 3
some_async_fn(c, 5).await // 8
});
assert_eq!(handle.await_complete(), 8);
let count = Arc::new(AtomicI32::new(0));
let clone = count.clone();
pool.spawn(async move {
let a = some_async_fn(3, 6).await; // 9
let b = other_async_fn(a, 4).await; // 5
let c = some_async_fn(b, 7).await; // 12
clone.fetch_add(c, Ordering::SeqCst);
});
pool.join();
assert_eq!(count.load(Ordering::SeqCst), 12);
}
接下來是等待超時以及關閉線程池的例子:
pub fn rusty_pool_example3() {
let pool = ThreadPool::default();
for _ in 0..10 {
pool.execute(|| thread::sleep(Duration::from_secs(10)))
}
// 等待所有線程變得空閒,即所有任務都完成,包括此線程調用join()後由其他線程添加的任務,或者等待超時
pool.join_timeout(Duration::from_secs(5));
let count = Arc::new(AtomicI32::new(0));
for _ in 0..15 {
let clone = count.clone();
pool.execute(move || {
thread::sleep(Duration::from_secs(5));
clone.fetch_add(1, Ordering::SeqCst);
});
}
// 關閉並刪除此“ ThreadPool”的唯一實例(無克隆),導致通道被中斷,從而導致所有worker在完成當前工作後退出
pool.shutdown_join();
assert_eq!(count.load(Ordering::SeqCst), 15);
}
2.4 fast_threadpool 庫
這個線程池實現經過優化以獲取最小化延遲。特別是,保證你在執行你的任務之前不會支付線程生成的成本。新線程僅在工作線程的 "閒置時間"(例如,在返回作業結果後)期間生成。
唯一可能導致延遲的情況是 "可用" 工作線程不足。爲了最小化這種情況的發生概率,這個線程池會不斷保持一定數量的可用工作線程(可配置)。
這個實現允許你以異步方式等待任務的執行結果,因此你可以將其用作替代異步運行時的 spawn_blocking
函數。
pub fn fast_threadpool_example() -> Result<(), fast_threadpool::ThreadPoolDisconnected>{
let threadpool = fast_threadpool::ThreadPool::start(ThreadPoolConfig::default(), ()).into_sync_handler();
assert_eq!(4, threadpool.execute(|_| { 2 + 2 })?);
Ok(())
}
這個例子展示了 fast_threadpool crate 的用法。
主要步驟包括:
-
使用 default 配置創建線程池
-
將線程池轉換爲 sync_handler, 用於同步提交任務
-
提交一個簡單的計算任務到線程池
-
主線程中收集結果並驗證
下面這個例子異步執行任務的例子,這裏我們使用了 tokio 的異步運行時:
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let threadpool = fast_threadpool::ThreadPool::start(ThreadPoolConfig::default(), ()).into_async_handler();
assert_eq!(4, threadpool.execute(|_| { 2 + 2 }).await.unwrap());
});
2.5 scoped_threadpool 庫
在 Rust 多線程編程中, scoped 是一個特定的概念, 指的是一種限定作用域的線程。
scoped 線程的主要特徵是:
-
線程的生命週期限定在一個代碼塊中, 離開作用域自動停止
-
線程可以直接訪問外部狀態而無需 channel 或 mutex
-
借用檢查器自動確保線程安全
一個典型的 scoped 線程池用法如下:
pool.scoped(|scope| {
scope.execute(|| {
// 可以直接訪問外部狀態
});
}); // 作用域結束時,線程被Join
scoped 線程的優點是:
-
代碼簡潔, 無需手動同步線程
-
作用域控制自動管理線程 lifetime
-
借用檢查確保安全
scoped 線程適用於:
-
需要訪問共享狀態的短任務
-
難以手動管理線程 lifetime 的場景
-
對代碼安全性要求高的場景
總之, scoped 線程在 Rust 中提供了一種更安全便捷的多線程模式, 值得我們在多線程編程中考慮使用。
這一節我們就介紹一個專門的 scoped_threadpool 庫。
pub fn scoped_threadpool() {
let mut pool = scoped_threadpool::Pool::new(4);
let mut vec = vec![0, 1, 2, 3, 4, 5, 6, 7];
// Use the threads as scoped threads that can reference anything outside this closure
pool.scoped(|s| {
// Create references to each element in the vector ...
for e in &mut vec {
// ... and add 1 to it in a seperate thread
s.execute(move || {
*e += 1;
});
}
});
assert_eq!(vec, vec![1, 2, 3, 4, 5, 6, 7, 8]);
}
這個例子展示瞭如何使用 scoped_threadpool 庫創建一個 scoped 線程池。
-
首先創建一個 scoped 線程池, 指定使用 4 個線程
-
定義一個向量 vec 作爲外部共享狀態
-
在 pool.scoped 中啓動線程, 在閉包中可以訪問外部狀態 vec
-
每個線程讀取 vec 的一個元素, 並在線程內修改它
-
所有線程執行完成後, vec 的元素全部 + 1
scoped 線程池的主要特點:
-
線程可以直接訪問外部狀態, 不需要 channel 或 mutex
-
外部狀態的借用檢查自動進行
-
線程池作用域結束時, 自動等待所有線程完成
相比全局線程池, scoped 線程池的優勢在於:
-
代碼更簡潔, 無需手動同步外部狀態
-
借用檢查確保線程安全
-
作用域控制自動管理線程 lifetime
接下來可以擴展介紹:
-
在線程間共享不同類型的狀態
-
scoped 線程池的配置選項
-
與其他線程池的比較
-
使用案例, 如並行計算等
總之, scoped 線程池提供了一種更安全方便的併發模式, 很適合在 Rust 中使用。
2.6 scheduled_thread_pool 庫
scheduled-thread-pool 是一個 Rust 庫, 它提供了一個支持任務調度的線程池實現。下面我來介紹其主要功能和用法:
-
支持定時執行任務, 無需自己實現調度器
-
提供一次性和重複調度兩種方式
-
基於線程池模型, 避免線程重複創建銷燬
-
任務可隨時取消
pub fn scheduled_thread_pool() {
let (sender, receiver) = channel();
let pool = scheduled_thread_pool::ScheduledThreadPool::new(4);
let handle = pool.execute_after(Duration::from_millis(1000), move ||{
println!("Hello from a scheduled thread!");
sender.send("done").unwrap();
});
let _ = handle;
receiver.recv().unwrap();
}
這個例子展示瞭如何使用 scheduled_thread_pool crate 創建一個可調度的線程池。
-
創建一個包含 4 個線程的 scheduled 線程池
-
使用 pool.execute_after 在 1 秒後調度一個任務
-
任務中打印消息並向 channel 發送完成信號
-
主線程在 channel 中接收信號, 阻塞等待任務完成
scheduled 線程池的主要功能:
-
可以調度任務在未來的某時間點執行
-
提供一次性調度和定期調度兩種方式
-
採用工作線程池模型, 避免線程重複創建銷燬
相比普通線程池, scheduled 線程池的優勢在於:
-
可以將任務延遲或定期執行, 無需自己實現定時器
-
調度功能內置線程池, 無需自己管理線程
-
可以直接使用調度語義, 代碼更簡潔
2.7 poolite 庫
poolite 是一個非常輕量級的 Rust 線程池庫, 主要有以下特性:
- API 簡單易用
提供了基礎的創建池子、添加任務等接口:
let pool = poolite::Pool::new()?;
pool.push(|| println!("hello"));
- 支持 scoped 作用域線程
scoped 可以自動等待任務完成:
pool.scoped(|scope| {
scope.push(|| println!("hello"));
});
- 默認線程數爲 CPU 核數
可以通過 Builder 自定義線程數:
let pool = poolite::Pool::builder().thread_num(8).build()?;
- 和 arc、mutex 結合
對於我們常見的共享資源的訪問,poollite 也提供了很好的支持。下面的例子是計算斐波那契數列的併發版本:
use poolite::Pool;
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
/// `cargo run --example arc_mutex`
fn main() {
let pool = Pool::new().unwrap();
// You also can use RwLock instead of Mutex if you read more than write.
let map = Arc::new(Mutex::new(BTreeMap::<i32, i32>::new()));
for i in 0..10 {
let map = map.clone();
pool.push(move || test(i, map));
}
pool.join(); //wait for the pool
for (k, v) in map.lock().unwrap().iter() {
println!("key: {}\tvalue: {}", k, v);
}
}
fn test(msg: i32, map: Arc<Mutex<BTreeMap<i32, i32>>>) {
let res = fib(msg);
let mut maplock = map.lock().unwrap();
maplock.insert(msg, res);
}
fn fib(msg: i32) -> i32 {
match msg {
0...2 => 1,
x => fib(x - 1) + fib(x - 2),
}
}
- 和 mpsc 的配合
fn main() {
let pool = Pool::new().unwrap();
let (mp, sc) = channel();
for i in 0..38 {
let mp = mp.clone();
pool.push(move || test(i, mp));
}
pool.join(); // wait for the pool
println!("{:?}", pool);
while let Ok((k, v)) = sc.try_recv() {
println!("key: {}\tvalue: {}", k, v);
}
}
- 可以使用 builder 定製化 pool
fn main() {
let pool = Builder::new()
.min(1)
.max(9)
.daemon(None) // Close
.timeout(None) //Close
.name("Worker")
.stack_size(1024*1024*2) //2Mib
.build()
.unwrap();
for i in 0..38 {
pool.push(move || test(i));
}
pool.join(); //wait for the pool
println!("{:?}", pool);
}
poolite 整個庫只有約 500 多行代碼, 非常精簡。
poolite 提供了一個簡單實用的線程池實現,適合對性能要求不高, 但需要穩定和易用的場景, 如腳本語言的運行時等。
如果需要一個小而精的 Rust 線程池, poolite 是一個很不錯的選擇。
2.8 executor_service 庫
executor_service 是一個提供線程池抽象的 Rust 庫, 模仿 Java 的 ExecutorService, 主要特徵如下:
executor_service 是一個提供線程池抽象的 Rust 庫, 主要特徵如下:
- 支持固定和緩存線程池
可以按需創建不同類型的線程池:
// 固定線程數線程池
let pool = Executors::new_fixed_thread_pool(4)?;
// 緩存線程池
let pool = Executors::new_cached_thread_pool()?;
固定線程數的線程池顧名思義,也就是創建固定數量的線程,線程數量不會變化。
緩存線程池會按需創建線程,創建的新線程會被緩存起來。默認初始化 10 個線程,最多 150 個線程。最大線程值是個常量,看起來不能修改,但是初始化的線程數可以在初始化的時候設置,但也不能超過 150。
- 提供執行任務的接口
支持閉包、Future 等任務形式:
// 執行閉包
pool.execute(|| println!("hello"));
// 提交future
pool.spawn(async {
// ...
});
- 支持獲取任務結果
submit_sync 可以同步提交任務並獲取返回值:
let result = pool.submit_sync(|| {
// run task
return result;
})?;
- 提供方便的線程池構建器
可以自定義線程池參數:
ThreadPoolExecutor::builder()
.core_threads(4)
.max_threads(8)
.build()?;
這個例子展示瞭如何使用 executor_service 這個線程池庫:
pub fn executor_service_example() {
use executor_service::Executors;
let mut executor_service =
Executors::new_fixed_thread_pool(10).expect("Failed to create the thread pool");
let counter = Arc::new(AtomicUsize::new(0));
for _ in 0..10 {
let counter = counter.clone();
executor_service.execute(move || {
thread::sleep(Duration::from_millis(100));
counter.fetch_add(1, Ordering::SeqCst);
});
}
thread::sleep(Duration::from_millis(1000));
assert_eq!(counter.load(Ordering::SeqCst), 10);
let mut executor_service = Executors::new_fixed_thread_pool(2).expect("Failed to create the thread pool");
let some_param = "Mr White";
let res = executor_service.submit_sync(move || {
sleep(Duration::from_secs(5));
println!("Hello {:}", some_param);
println!("Long computation finished");
2
}).expect("Failed to submit function");
println!("Result: {:#?}", res);
assert_eq!(res, 2);
}
示例中做了以下幾件事:
-
創建一個固定 10 線程的線程池
-
提交 10 個任務, 每個任務暫停一段時間然後對計數器加 1
-
主線程暫停後驗證計數器的值
-
創建一個固定 2 線程的線程池
-
提交一個任務, 在任務內打印消息和暫停
-
主線程使用 submit_sync 同步執行任務並獲取返回值
2.9 threadpool_executor 庫
threadpool_executor 是一個功能豐富的 Rust 線程池庫, 提供了高度可配置的線程池實現。主要特性如下:
- 線程池構建器
通過構建器可以自定義線程池所有方面的參數:
ThreadPool::builder()
.core_threads(4)
.max_threads(8)
.keep_alive(Duration::from_secs(30))
.build();
- 支持不同的任務提交方式
閉包、async 塊、回調函數等:
// 閉包
pool.execute(|| println!("hello"));
// 異步任務
pool.execute(async {
// ...
});
- 任務返回 Result 類型用於錯誤處理
所有任務執行後返回 Result<T, E>:
let result = pool.execute(|| {
Ok(1 + 2)
})?;
let res = result.unwrap().get_result_timeout(std::time::Duration::from_secs(3));
assert!(res.is_err());
if let Err(err) = res {
matches!(err.kind(), threadpool_executor::error::ErrorKind::TimeOut);
}
- 提供任務取消接口
可以隨時取消已提交的任務:
let mut task = pool.execute(|| {}).unwrap();
task.cancel();
- 實現線程池擴容和空閒回收
按需創建線程, 自動回收空閒線程。
threadpool_executor 提供了完整可控的線程池實現, 適合對線程管理要求較高的場景。它的配置能力非常強大, 值得深入研究和使用。
這個例子展示瞭如何使用 threadpool_executor 這個線程池庫:
pub fn threadpool_executor_example() {
let pool = threadpool_executor::ThreadPool::new(1);
let mut expectation = pool.execute(|| "hello, thread pool!").unwrap();
assert_eq!(expectation.get_result().unwrap(), "hello, thread pool!");
let pool = threadpool_executor::threadpool::Builder::new()
.core_pool_size(1)
.maximum_pool_size(3)
.keep_alive_time(std::time::Duration::from_secs(300))
.exeed_limit_policy(threadpool_executor::threadpool::ExceedLimitPolicy::Wait)
.build();
pool.execute(|| {
std::thread::sleep(std::time::Duration::from_secs(3));
})
.unwrap();
let mut exp = pool.execute(|| {}).unwrap();
exp.cancel();
}
示例中做了以下幾件事:
-
創建一個單線程線程池, 提交一個任務並獲取結果
-
使用 Builder 創建一個可配置的線程池
-
設置核心線程數爲 1, 最大線程數爲 3
-
設置空閒線程存活時間爲 300 秒
-
任務溢出策略爲等待
-
提交一個長時間任務到線程池
-
提交一個任務後立即取消它
threadpool_executor 的一些關鍵特性:
-
提供線程池構建器進行細粒度配置
-
支持回調和閉包形式的任務提交
-
任務返回 Result 便於錯誤處理
-
實現了線程池擴縮容和空閒回收策略
-
提供任務取消和關閉線程池接口
後續可以擴展介紹:
-
各種配置參數的細節
-
任務優先級和執行策略
-
與其他線程池的性能比較
-
在線程池內使用通信機制
-
線程池的優化建議
threadpool_executor 提供了功能完備的線程池實現, 適合需要細粒度控制的場景。
參考資料
[1]
pdf 下載: https://github.com/smallnest/concurrency-programming-via-rust/blob/master/book_cn/rust_concurrency_cookbook.pdf
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/GHd3Kp2tUTgoylY-RCsHMw