使用 Rust 構建可以併發執行多個任務的線程池

在這篇文章中讓我們探討一下如何使用 Rust 構建線程池來併發地管理多個任務。

在開始實際的編碼之前,讓我們首先了解線程池是什麼以及它是如何工作的。

線程池

線程池是工作線程的集合,創建這些線程是爲了同時執行多個任務並等待新任務的到來。這意味着一開始創建了多個線程,並且所有線程都處於空閒狀態。

每當你的系統獲得任務時,它可以快速地將任務分配給這些線程,從而節省大量時間,而無需多次創建和刪除線程。

正如圖所看到的,線程池是等待從主線程接收任務以執行的多個線程的集合。

在該圖中,主線程中總共有 15 個任務,所有這些任務都被轉發給不同的工作線程併發執行。瞭解了線程池的概念後,讓我們來理解線程池的內部工作原理。

線程池是如何工作的?

在線程池體系結構中,主線程只有兩個任務:

1,接收所有的任務並將它們存儲在一個地方。

2,創建多個線程,並定期爲它們分配不同的任務。

在接收任務之前創建線程集,並使用 ID 存儲在某個地方,以便我們可以通過 ID 識別它們。

然後每個線程都在等待接收任務,如果它們得到任務,就開始處理任務。完成任務後,他們再次等待下一個任務。

當該線程忙於執行任務時,主線程將更多的任務分配給其他線程,這樣在主線程結束任務之前沒有線程空閒。在完成所有任務後,主線程終止所有線程並關閉線程池。

現在我們瞭解了線程池是如何工作的。接下來,讓我們使用 Rust 實現一個線程池。

使用 Rust 實現線程池

  1. 創建線程

我們需要一個函數來生成一個線程並返回它的 JoinHandle。

此外,我們需要知道線程的 ID,如果我們搞砸了,就可以用線程 ID 記錄錯誤,這樣我們就可以知道哪個線程出錯了。

可以看出,如果兩個相互關聯的數據需要組合,需要一個結構體。我們來創建一個:

struct Worker {
    id: usize,
    thread: JoinHandle<()>
}

現在我們實現一個可以返回新 Worker 的構造函數:

impl Worker {
    fn new(id: usize) -> Self {
        let thread = thread::spawn(|| {});

        Self {id, thread}
    }
}

現在,我們的函數已經準備好創建線程並將它們返回給調用者。

  1. 存放線程

我們需要一個結構來保存所有線程的所有 JoinHandles,我們還想控制線程池可以擁有多少線程。

這意味着,我們需要一個帶有構造函數的結構體,該函數指定一個數字來指示線程的數量,並且必須調用 Worker 來創建線程。

struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    fn new(size: usize) -> Self {
        assert!(size > 0, "Need at least 1 worker!");

        let mut workers = Vec::with_capacity(size);

        for i in 0..size {
            workers.push(Worker::new(i));
        }

        Self { workers }
    }
}

我們有了創建線程和管理線程的函數,現在是時候創建一個可以將任務分配給不同線程的函數了。

  1. 給線程分配任務

我們的線程池結構體必須有一個函數,該函數可以在線程內部分配和執行任務。但是有一個問題,我們如何將任務發送給線程,以便線程能夠執行任務?

爲此,我們需要一個 task 類型來表示我們需要完成的任務:

type task = Box<dyn FnOnce() + Send + 'static>;

在這裏,意味着我們的任務必須實現 Box 裏的這些 Trait:

1,實現 FnOnce() 意味着我們的任務是一個只能運行一次的函數。

2,實現 Send,因爲我們將任務從主線程發送到工作線程,所以將任務設置爲 Send 類型,以便它可以在線程之間安全地傳輸。

3,實現'static,意味着我們的任務必須和程序運行的時間一樣長。

現在是時候將任務發送給工作線程了,但要做到這一點,我們必須在主線程和所有工作線程之間建立一個通道,因此我們需要使用 Arc<Mutex<()>>。

讓我們來更新這兩個構造函數:

struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>
}

impl ThreadPool {
    fn new(size: usize) -> Self {
        assert!(size > 0, "Need at least 1 worker!");

        let (sender, reciever) = mpsc::channel();
        let reciever = Arc::new(Mutex::new(reciever));

        let mut workers = Vec::with_capacity(size);

        for i in 0..size {
            workers.push(Worker::new(i, Arc::clone(&reciever)));
        }

        Self {
            workers,
            sender: Some(sender)
        }
    }
}

impl Worker {
    fn new(id: usize, reciever: Arc<Mutex<Receiver<Task>>>) -> Self {
        let thread = thread::spawn(move || {});

        Self {
            id,
            thread
        }
    }
}

在 ThreadPool 構造函數中,我們創建了一個新的通道,並在 Arc<Mutex<()>> 中封裝了接收器,我們把接收器發送給工作線程,以便主線程可以發送任務,工作線程可以接收任務。

此外,我們必須更新 ThreadPool 結構體,以包含一個發送者,它將被主線程用來向不同的線程發送任務。

現在,讓我們實現在工作線程中執行任務的邏輯:

fn new(id: usize, reciever: Arc<Mutex<Receiver<task>>>) -> Self {
    let thread = thread::spawn(move || {
        loop {
            let receiver = reciever.lock()
                .expect("Failed to grab the lock!")
                .recv();

            match receiver {
                Ok(task) ={
                    println!("Thread {} got the task& executing.", id);
                    task();
                    thread::sleep(Duration::from_millis(10));
                },

                Err(_) ={
                    println!("No got the task");
                    break;
                }
            }
        }
    });

    Self {
        id,
        thread
    }
}

這裏,在每個循環中,我們都試圖獲得鎖並調用鎖上的 recv(),以便我們可以獲得主線程發送的任務。

接下來,我們在 ThreadPool 中實現一個函數,將任務發送到不同的線程。

impl ThreadPool {
    fn new(size: usize) -> Self {
        // snip
    }

    fn execute<F>(&self, job: F)
    where
        F: FnOnce() + Send + 'static
    {
        let job = Box::new(job);

        self.sender.send(job)
            .expect("Failed to send the job to workers!");
    }
}

我們還需要創建一個函數,在 ThreadPool 結束時動態終止所有線程。簡單地說,我們必須手動實現 ThreadPool 的 Drop 特性,在那裏我們將終止所有線程。

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Thread {} is shutting down.", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join()..unwrap_or_else(|_| panic!("Failed to join the thread {}", worker.id));}
        }
    }
}

這裏我們還必須刪除發送方,因爲如果我們不這樣做,那麼接收方將永遠循環。如果刪除發送者,那麼接收者也會自動刪除,我們就可以成功地退出這個程序。

測試

main 函數代碼如下:

fn main() {
    let pool = ThreadPool::new(5);

    for _ in 0..10 {
        pool.execute(|| println!("Doing something"));
    }
}

運行結果:

Thread 0 is shutting down.
Thread 0 got the job & executing.
Doing something
Thread 3 got the job & executing.
Doing something
Thread 1 got the job & executing.
Thread 2 got the job & executing.
Doing something
Thread 4 got the job & executing.
Doing something
Doing something
Thread 0 got the job & executing.
Doing something
Thread 4 got the job & executing.
Doing something
Thread 3 got the job & executing.
Doing something
Thread 2 got the job & executing.
Doing something
Thread 1 got the job & executing.
Doing something
No got the job
Thread 1 is shutting down.
No got the job
No got the job
No got the job
No got the job
Thread 2 is shutting down.
Thread 3 is shutting down.
Thread 4 is shutting down.
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/eQsajkmtyVY1UCLHnt6Nxw