Rust 併發控制之 Condvar

上次提到的 Barrier 用到了 Rust 的 condvar 和 mutex,今天來看下 condvar 的用法。

文章目錄

condvar 即 condition variable(條件變量),是一種線程同步的方式,用於線程間的通信。它可以阻塞(wait)線程,期間不消耗 CPU,直到某個時間發生喚醒(notify)線程。

代碼舉例來說:

use std::sync::{Arc, Condvar, Mutex};
use std::thread;
fn main() {
    let pair = Arc::new((Mutex::new(false), Condvar::new()));
    let pair2 = Arc::clone(&pair);

    thread::spawn(move || {
        let (lock, cvar) = &*pair2;
        let mut started = lock.lock().unwrap();
        *started = true;
        // We notify the condvar that the value has changed.
        cvar.notify_one();
    });

    // Wait for the thread to start up.
    let (lock, cvar) = &*pair;
    let mut started = lock.lock().unwrap();
    while !*started {
        started = cvar.wait(started).unwrap();
    }
}

代碼中,創建一個線程在修改 started 變量後喚醒等待的線程。main 中等待的的線程會一直阻塞(wait)直到 started 的值被修改。

其中 wait 會需要一個鎖的 MutexGuard 來配合,wait 會自動釋放鎖,並阻塞當前線程,直到被喚醒時重新獲取鎖,並返回鎖的 MutexGuard,來獲取鎖當前保護的值

Tips: MutexGuard 實現了銷燬時自動釋放鎖和可以通過解引用(deref)到它保護的值

這裏有兩個有意思的點:

這個要從 condvar 喚醒的機制說起。

喚醒順序不保證

先來看下喚醒的順序,我們起兩批同樣數目的線程,一批線程每個線程會修改一次變量並喚醒一個另一批等待的線程,爲了觀測喚醒順序,代碼如下:

use std::sync::{Arc, Condvar, Mutex};
use std::thread::{self};

struct SharedData {
    counter: Mutex<usize>,
    condvar: Condvar,
}

fn main() {
    let shared_data = Arc::new(SharedData {
        counter: Mutex::new(0),
        condvar: Condvar::new(),
    });
    let thread_num = 5;
    let mut workers = Vec::new();
    let mut waits = Vec::new();

    for i in 0..thread_num {
        do_wait(i, Arc::clone(&shared_data)&mut waits);
    }
    for i in 0..thread_num {
        do_work(i, Arc::clone(&shared_data)&mut workers)
    }
    waits.into_iter().for_each(|w| w.join().unwrap());
    workers.into_iter().for_each(|w| w.join().unwrap());
}

fn do_work(i: i32, data: Arc<SharedData>, workers: &mut Vec<thread::JoinHandle<()>>) {
    workers.push(thread::spawn(move || {
        let SharedData { counter, condvar } = &*data;
        let mut data = counter.lock().unwrap();
        *data += 1;
        println!("Woker thread {} before notify: Counter {}", i, data);
        condvar.notify_one();
    }));
}
fn do_wait(i: i32, data: Arc<SharedData>, waits: &mut Vec<thread::JoinHandle<()>>) {
    waits.push(thread::spawn(move || {
        let SharedData { counter, condvar } = &*data;
        let mut data = counter.lock().unwrap();
        data = condvar.wait(data).unwrap();
        println!("   Wait thread {} after wake up: Counter {}", i, data);
    }));
}

運行結果不唯一,比如如下結果,五次修改觸發了五次喚醒,但是 wait 喚醒順序不一定是按照 worker 修改順序(而修改順序是符合預期的,因爲是加鎖保證的):

Woker thread 0 before notify: Counter 1
Woker thread 4 before notify: Counter 2
Woker thread 2 before notify: Counter 3
   Wait thread 1 after wake up: Counter 3
   Wait thread 3 after wake up: Counter 3
Woker thread 3 before notify: Counter 4
   Wait thread 0 after wake up: Counter 4
Woker thread 1 before notify: Counter 5
   Wait thread 4 after wake up: Counter 5
   Wait thread 2 after wake up: Counter 5

甚至有可能是喚醒次數少於五次,導致有些線程一直阻塞,比如如下結果,只有四次喚醒,導致有 1 個線程一直阻塞:

Woker thread 1 before notify: Counter 1
   Wait thread 2 after wake up: Counter 1
Woker thread 3 before notify: Counter 2
Woker thread 0 before notify: Counter 3
   Wait thread 4 after wake up: Counter 3
Woker thread 4 before notify: Counter 4
   Wait thread 3 after wake up: Counter 4
   Wait thread 1 after wake up: Counter 4
Woker thread 2 before notify: Counter 5
# 有一個線程一直阻塞在這裏

爲什麼順序不保證呢?condvar 實現是基於操作系統的條件變量實現,順序取決於操作系統調度時當前可喚醒的線程是哪個,要保證喚醒順序需要額外的開銷,而這個開銷是不必要的,因爲喚醒順序對於線程間的通信是沒有意義的,所以底層實現並不保證喚醒順序。這裏 [1] 有相關討論

所以多個線程等待同一條件變量時,notify_one 喚醒和等待也不是一定是一對一的調用,每次喚醒也不能保證都是不同的等待線程。

至於爲什麼會有線程一直阻塞的情況,是因爲喚醒次數少於等待次數,導致有些線程一直阻塞。 因爲是多線程併發構建的 notify_one 和 wait,存在調用 notify_one 時沒有線程在等待的可能,導致喚醒次數少於等待次數的情況。

虛假喚醒

還有就是虛假喚醒,即 wait 返回時,條件由於併發原因已經不滿足,還可能因爲喚醒並不是由於顯示的 notify 調用,這個聽起來很奇怪,但不是一個 bug,是底層操作系統實現導致的,具體看看 wiki[2] 上的說明吧。

綜上這兩點,condvar 喚醒時是需要重新檢查條件是否依舊滿足,而且需要和 mutex 一起使用,來確保條件值獲取的併發安全。

除此 condvar 還有一些方便的方法,比如提供了

官方文檔都有例子,就不展開了。

關於 condvar 比較實際的例子有 WaitGroup,不需要像 Barrier 一樣初始化時指定線程數量,而是在運行時動態增加線程數量,在 crossbeam-utils[3] 中有實現,代碼很精煉,感興趣可以看下

參考資料

[1]

這裏: https://www.reddit.com/r/C_Programming/comments/12itrvd/condition_variables_wakeup_ordering/

[2]

wiki: https://en.wikipedia.org/wiki/Spurious_wakeup

[3]

crossbeam-utils: https://github.com/crossbeam-rs/crossbeam/blob/master/crossbeam-utils/src/sync/wait_group.rs


本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/yiUQl7TI6lNwKg8eUFUNRg