Rust 併發控制之 Condvar
上次提到的 Barrier 用到了 Rust 的 condvar 和 mutex,今天來看下 condvar 的用法。
文章目錄
-
喚醒順序不保證
-
虛假喚醒
condvar 即 condition variable(條件變量),是一種線程同步的方式,用於線程間的通信。它可以阻塞(wait)線程,期間不消耗 CPU,直到某個時間發生喚醒(notify)線程。
代碼舉例來說:
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
fn main() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = Arc::clone(&pair);
thread::spawn(move || {
let (lock, cvar) = &*pair2;
let mut started = lock.lock().unwrap();
*started = true;
// We notify the condvar that the value has changed.
cvar.notify_one();
});
// Wait for the thread to start up.
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap();
}
}
代碼中,創建一個線程在修改 started 變量後喚醒等待的線程。main 中等待的的線程會一直阻塞(wait)直到 started 的值被修改。
其中 wait 會需要一個鎖的 MutexGuard 來配合,wait 會自動釋放鎖,並阻塞當前線程,直到被喚醒時重新獲取鎖,並返回鎖的 MutexGuard,來獲取鎖當前保護的值
Tips: MutexGuard 實現了銷燬時自動釋放鎖和可以通過解引用(deref)到它保護的值
這裏有兩個有意思的點:
-
爲什麼要和 mutex 一起使用?
-
爲什麼喚醒時要檢查條件是否滿足?
這個要從 condvar 喚醒的機制說起。
喚醒順序不保證
先來看下喚醒的順序,我們起兩批同樣數目的線程,一批線程每個線程會修改一次變量並喚醒一個另一批等待的線程,爲了觀測喚醒順序,代碼如下:
use std::sync::{Arc, Condvar, Mutex};
use std::thread::{self};
struct SharedData {
counter: Mutex<usize>,
condvar: Condvar,
}
fn main() {
let shared_data = Arc::new(SharedData {
counter: Mutex::new(0),
condvar: Condvar::new(),
});
let thread_num = 5;
let mut workers = Vec::new();
let mut waits = Vec::new();
for i in 0..thread_num {
do_wait(i, Arc::clone(&shared_data), &mut waits);
}
for i in 0..thread_num {
do_work(i, Arc::clone(&shared_data), &mut workers)
}
waits.into_iter().for_each(|w| w.join().unwrap());
workers.into_iter().for_each(|w| w.join().unwrap());
}
fn do_work(i: i32, data: Arc<SharedData>, workers: &mut Vec<thread::JoinHandle<()>>) {
workers.push(thread::spawn(move || {
let SharedData { counter, condvar } = &*data;
let mut data = counter.lock().unwrap();
*data += 1;
println!("Woker thread {} before notify: Counter {}", i, data);
condvar.notify_one();
}));
}
fn do_wait(i: i32, data: Arc<SharedData>, waits: &mut Vec<thread::JoinHandle<()>>) {
waits.push(thread::spawn(move || {
let SharedData { counter, condvar } = &*data;
let mut data = counter.lock().unwrap();
data = condvar.wait(data).unwrap();
println!(" Wait thread {} after wake up: Counter {}", i, data);
}));
}
運行結果不唯一,比如如下結果,五次修改觸發了五次喚醒,但是 wait 喚醒順序不一定是按照 worker 修改順序(而修改順序是符合預期的,因爲是加鎖保證的):
Woker thread 0 before notify: Counter 1
Woker thread 4 before notify: Counter 2
Woker thread 2 before notify: Counter 3
Wait thread 1 after wake up: Counter 3
Wait thread 3 after wake up: Counter 3
Woker thread 3 before notify: Counter 4
Wait thread 0 after wake up: Counter 4
Woker thread 1 before notify: Counter 5
Wait thread 4 after wake up: Counter 5
Wait thread 2 after wake up: Counter 5
甚至有可能是喚醒次數少於五次,導致有些線程一直阻塞,比如如下結果,只有四次喚醒,導致有 1 個線程一直阻塞:
Woker thread 1 before notify: Counter 1
Wait thread 2 after wake up: Counter 1
Woker thread 3 before notify: Counter 2
Woker thread 0 before notify: Counter 3
Wait thread 4 after wake up: Counter 3
Woker thread 4 before notify: Counter 4
Wait thread 3 after wake up: Counter 4
Wait thread 1 after wake up: Counter 4
Woker thread 2 before notify: Counter 5
# 有一個線程一直阻塞在這裏
爲什麼順序不保證呢?condvar 實現是基於操作系統的條件變量實現,順序取決於操作系統調度時當前可喚醒的線程是哪個,要保證喚醒順序需要額外的開銷,而這個開銷是不必要的,因爲喚醒順序對於線程間的通信是沒有意義的,所以底層實現並不保證喚醒順序。這裏 [1] 有相關討論
所以多個線程等待同一條件變量時,notify_one 喚醒和等待也不是一定是一對一的調用,每次喚醒也不能保證都是不同的等待線程。
至於爲什麼會有線程一直阻塞的情況,是因爲喚醒次數少於等待次數,導致有些線程一直阻塞。 因爲是多線程併發構建的 notify_one 和 wait,存在調用 notify_one 時沒有線程在等待的可能,導致喚醒次數少於等待次數的情況。
虛假喚醒
還有就是虛假喚醒,即 wait 返回時,條件由於併發原因已經不滿足,還可能因爲喚醒並不是由於顯示的 notify 調用,這個聽起來很奇怪,但不是一個 bug,是底層操作系統實現導致的,具體看看 wiki[2] 上的說明吧。
綜上這兩點,condvar 喚醒時是需要重新檢查條件是否依舊滿足,而且需要和 mutex 一起使用,來確保條件值獲取的併發安全。
除此 condvar 還有一些方便的方法,比如提供了
-
notify_all 來廣播喚醒所有等待的線程;
-
wait_while 可以根據條件等待條件直到滿足;
-
wait_timeout 只等待一段時間如果不能及時被喚醒。
官方文檔都有例子,就不展開了。
關於 condvar 比較實際的例子有 WaitGroup,不需要像 Barrier 一樣初始化時指定線程數量,而是在運行時動態增加線程數量,在 crossbeam-utils[3] 中有實現,代碼很精煉,感興趣可以看下
參考資料
[1]
這裏: https://www.reddit.com/r/C_Programming/comments/12itrvd/condition_variables_wakeup_ordering/
[2]
wiki: https://en.wikipedia.org/wiki/Spurious_wakeup
[3]
crossbeam-utils: https://github.com/crossbeam-rs/crossbeam/blob/master/crossbeam-utils/src/sync/wait_group.rs
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/yiUQl7TI6lNwKg8eUFUNRg