Rust:深入瞭解線程池

在某些情況下,你需要併發地執行許多短期任務。創建和銷燬執行這些任務線程的開銷可能會抑制程序的性能。解決這個問題的一個辦法是建立一個任務池,並在需要時將它們從這個任務池中取出。

任務池的另一個優點是,可用線程的數量可以根據可用的計算資源進行調整,即處理器內核的數量或內存量。

這些任務的約束之一是它們不是相互依賴的,也就是說,一個任務的結果不依賴於前一個任務的結果,或者下一個任務不應依賴於當前任務的結果。這使任務保持隔離,並且易於存儲在池中。

典型的用例包括:

**Web 服務和 api 服務:**請求通常非常小且生命週期很短,因此非常適合於線程池,實際上許多 web 服務器都實現了線程池。

**批量處理圖像、視頻文件或音頻文件:**例如,調整圖像大小也是非常適合線程池的小型且定義良好的任務。

**數據處理管道:**數據處理管道中的每個階段都可以由線程池處理。如前所述,任務不應該相互依賴,以提高線程池的效率。

使用 Rust 實現線程池

在這個例子中,我們將構建一個簡單的線程池,但這可以很容易地擴展到一個真正的線程池。

在開始之前,需要添加一個庫到 Cargo.toml 文件中:

[dependencies]
fstrings = "0.2.3"

我們將使用這個 crate 以類似 python 的方式格式化字符串。

接下來在 src/main.rs 文件中添加以下幾行:

use std::sync::{Arc, Mutex};
use std::thread;
#[macro_use]
extern crate fstrings;

定義任務

在 main.rs 中定義一個 WebRequest 結構體:

struct WebRequest {
    work: Box<dyn FnOnce(&str) + Send + Sync>,
}

impl WebRequest {
    fn new<F>(f: F) -> Self
    where
        F: FnOnce(&str) + Send + Sync + 'static,
    {
        WebRequest { work: Box::new(f) }
    }
}

在這段代碼中,WebRequest 包含一個字段 work,它是一個 Box 封裝的閉包。爲什麼要使用 Box?因爲閉包的大小是動態的,換句話說,它的大小在編譯時是未知的,所以我們需要將它存儲在像 Box 這樣的堆分配容器中。Send 和 Sync 特性向編譯器表明,這個特定的閉包可以安全地跨多個線程發送和訪問。

構造函數接受閉包作爲它的唯一參數。當然,它必須滿足與結構體中字段相同的約束。靜態生命週期是必需的,因爲閉包可能比定義它的作用域活得更長。

實現線程池

在 main.rs 中定義一個 ThreadPool 結構體:

struct ThreadPool {
    workers: Vec<thread::JoinHandle<()>>,
    queue: Arc<Mutex<Vec<WebRequest>>>,
}

現在我們看一下實現。首先是構造函數:

impl ThreadPool {
    fn new(num_threads: usize) -> ThreadPool {
        let mut workers = Vec::with_capacity(num_threads);
        let queue = Arc::new(Mutex::new(Vec::<WebRequest>::new()));

        for i in 0..num_threads {
            let number = f!("Request {i}");
            let queue_clone = Arc::clone(&queue);
            workers.push(thread::spawn(move || loop {
                let task = queue_clone.lock().unwrap().pop();
                if let Some(task) = task {
                    (task.work)(&number);
                } else {
                    break;
                }
            }));
        }
        ThreadPool { workers, queue }
    }
}

此方法使用指定的線程數初始化池,創建隊列。之後,構造函數生成工作線程。這些線程進入一個循環,彈出隊列的任務,並執行它們。如果隊列恰好爲空,則工作線程中斷循環。

然後,我們看一下 execute() 方法:

impl ThreadPool {
    ......

    fn execute<F>(&self, f: F)
    where
        F: FnOnce(&str) + Send + Sync + 'static,
    {
        let task = WebRequest::new(f);
        self.queue.lock().unwrap().push(task);
    }
}

這個方法只是用指定的閉包創建一個新的 WebRequest,並將其 push 到任務隊列中。

接下來,我們看一下 join() 方法:

impl ThreadPool {
    ......

    fn join(self) {
        for worker in self.workers {
            worker.join().unwrap();
        }
    }
}

這是一個阻塞操作,等待線程完成。

測試

使用如下代碼測試線程池:

fn main() {
    let pool = ThreadPool::new(6);
    for i in 0..6 {
        pool.execute(move |message| {
            println!("Task: {} prints  {}",i, message);
        });
    }
    pool.join();
}

執行結果如下:

Task: 3 prints  Request 3
Task: 1 prints  Request 3
Task: 5 prints  Request 1
Task: 2 prints  Request 5
Task: 0 prints  Request 3
Task: 4 prints  Request 4

總結

正如你所看到的,這種模式非常靈活,但是使用時,請考慮以下影響性能和資源因素:

總而言之,找到正確的線程數有時可能非常清楚,有時需要使用一些嘗試和錯誤來找到最佳數量。更高級的做法是可以根據需求動態地增加和減少可用線程的數量。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/YW_6g5gVf0RqO0vSxjZ_Lg