Rust 併發編程 - 併發集合
集合類型是我們編程中常用的數據類型,Rust 中提供了一些集合類型,比如Vec<T>
、HashMap<K, V>
、HashSet<T>
、VecDeque<T>
、LinkedList<T>
、BTreeMap<K, V>
、BTreeSet<T>
等,它們的特點如下:
-
Vec- 這是一種可變大小的數組, 允許在頭部或尾部高效地添加和刪除元素。它類似於 C++ 的 vector 或 Java 的 ArrayList。
-
HashMap<K,V> - 這是一個哈希映射, 允許通過鍵快速查找值。它類似於 C++ 的 unordered_map 或 Java 的 HashMap。
-
HashSet- 這是一個基於哈希的集, 可以快速判斷一個值是否在集合中。它類似於 C++ 的 unordered_set 或 Java 的 HashSet。
-
VecDeque- 這是一個雙端隊列, 允許在頭部或尾部高效地添加和刪除元素。它類似於 C++ 的 deque 或 Java 的 ArrayDeque。
-
LinkedList - 這是一個鏈表數據結構, 允許在頭部或尾部快速添加和刪除元素。
-
BTreeMap<K,V> - 這是一個有序的映射, 可以通過鍵快速查找, 同時保持元素的排序。它使用 B 樹作爲底層數據結構。
-
BTreeSet- 這是一個有序的集合, 元素會自動排序。它使用 B 樹作爲底層數據結構。
不幸的是這些類型都不是線程安全的,沒有辦法在線程中共享使用,有幸的是,我們可以使用前面介紹的併發原語,對這些類型進行包裝,使之成爲線程安全的。
線程安全的 Vec
要實現線程安全的 Vec,可以使用 Arc(原子引用計數)和 Mutex(互斥鎖)的組合。Arc 允許多個線程共享擁有相同數據的所有權,而 Mutex 用於在訪問數據時進行同步,確保只有一個線程能夠修改數據。
以下是一個簡單的例子,演示如何創建線程安全的 Vec:
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
// 使用 Arc 和 Mutex 包裝 Vec
let shared_vec = Arc::new(Mutex::new(Vec::new()));
// 創建一些線程,共同向 Vec 中添加元素
let mut handles = vec![];
for i in 0..5 {
let shared_vec = Arc::clone(&shared_vec);
let handle = thread::spawn(move || {
// 獲取鎖
let mut vec = shared_vec.lock().unwrap();
// 修改 Vec
vec.push(i);
});
handles.push(handle);
}
// 等待所有線程完成
for handle in handles {
handle.join().unwrap();
}
// 獲取 Vec,並輸出結果
let final_vec = shared_vec.lock().unwrap();
println!("Final Vec: {:?}", *final_vec);
}
在這個例子中,shared_vec 是一個 Mutex 包裝的 Arc,使得多個線程能夠共享對 Vec 的所有權。每個線程在修改 Vec 之前需要先獲取鎖,確保同一時刻只有一個線程能夠修改數據。
線程安全的 HashMap
要實現線程安全的 HashMap,可以使用 Arc(原子引用計數)和 Mutex(互斥鎖)的組合,或者使用 RwLock(讀寫鎖)來提供更細粒度的併發控制。以下是使用 Arc 和 Mutex 的簡單示例:
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
// 使用 Arc 和 Mutex 包裝 HashMap
let shared_map = Arc::new(Mutex::new(HashMap::new()));
// 創建一些線程,共同向 HashMap 中添加鍵值對
let mut handles = vec![];
for i in 0..5 {
let shared_map = Arc::clone(&shared_map);
let handle = thread::spawn(move || {
// 獲取鎖
let mut map = shared_map.lock().unwrap();
// 修改 HashMap
map.insert(i, i * i);
});
handles.push(handle);
}
// 等待所有線程完成
for handle in handles {
handle.join().unwrap();
}
// 獲取 HashMap,並輸出結果
let final_map = shared_map.lock().unwrap();
println!("Final HashMap: {:?}", *final_map);
}
你發覺處理的套路都是一樣的,就是使用Arc<Mutex<T>>
實現。使用 Arc<Mutex<T>>
組合是一種常見的方式來實現線程安全的集合類型,但不是唯一的選擇。這種組合的基本思想是使用 Arc(原子引用計數)來實現多線程間的所有權共享,而 Mutex 則提供了互斥鎖,確保在任何時刻只有一個線程能夠修改數據。
後面的幾種集合類型都可以這麼去實現。
有些場景下你可能使用Arc<RwLock<T>>
更合適,允許多個線程同時讀取數據,但只有一個線程能夠寫入數據。適用於讀操作頻繁、寫操作較少的場景。
dashmap
dashmap 是極快的 Rust 併發 map 實現。
DashMap
是一個 Rust 中併發關聯 array/hashmap 的實現。
DashMap
試圖實現一個類似於std::collections::HashMap
的簡單易用的 API, 並做了一些細微的改變來處理併發。
DashMap
的目標是變得非常簡單易用, 並可直接替代 RwLock<HashMap<K, V>>
。爲實現這些目標, 所有方法採用&self
而不是修改方法採用&mut self
。這允許您將一個 DashMap 放入一個 Arc 中, 並在線程之間共享它, 同時仍然能夠修改它。
DashMap
非常注重性能, 並旨在儘可能快。
let map = Arc::new(DashMap::new());
let mut handles = vec![];
for i in 0..10 {
let map = Arc::clone(&map);
handles.push(std::thread::spawn(move || {
map.insert(i, i);
}));
}
for handle in handles {
handle.join().unwrap();
}
println!("DashMap: {:?}", map);
基於DashMap
, 它還實現了DashSet
lockfree
還有一個lockfree
庫,也提供了豐富的線程安全的集合類,因爲已經五年沒有維護了,我就不想介紹它了。
-
Per-Object Thread-Local Storage
-
Channels (SPSC, MPSC, SPMC, MPMC)
-
Map
-
Set
-
Stack
-
Queue
cuckoofilter
Cuckoo Filter 是一種基於哈希的數據結構,用於實現高效的近似集合成員檢查。它的設計靈感來自於 Cuckoo Hashing,但在性能和內存佔用方面有更好的表現。Cuckoo Filter 主要用於解決布隆過濾器的一些問題,如高內存佔用和不支持刪除操作。
下面是一個使用 cuckoofilter 庫的例子:
let value: &str = "hello world";
// Create cuckoo filter with default max capacity of 1000000 items
let mut cf = CuckooFilter::new();
// Add data to the filter
cf.add(value).unwrap();
// Lookup if data is in the filter
let success = cf.contains(value);
assert!(success);
// Test and add to the filter (if data does not exists then add)
let success = cf.test_and_add(value).unwrap();
assert!(!success);
// Remove data from the filter.
let success = cf.delete(value);
assert!(success);
evmap
evmap
(eventual map)是一個 Rust 庫,提供了一個併發的、基於事件的映射(Map)實現。它允許多個線程併發地讀取和寫入映射,同時支持觀察者模式,允許在映射的變化上註冊事件監聽器。
以下是 evmap
的一些關鍵特性和概念:
-
併發讀寫:
evmap
允許多個線程併發地讀取和寫入映射,而不需要使用鎖。這是通過將映射分爲多個片段來實現的,每個片段都可以獨立地讀取和寫入。 -
事件觸發:
evmap
允許在映射的變化上註冊事件監聽器。當映射發生變化時,註冊的監聽器會被觸發,從而允許用戶執行自定義的邏輯。 -
鍵和值: 映射中的鍵和值可以是任意類型,只要它們實現了
Clone
和Eq
trait。這樣允許用戶使用自定義類型作爲鍵和值。 -
異步觸發事件: evmap 支持異步的事件觸發。這使得在事件發生時執行一些異步任務成爲可能。
以下是一個簡單的示例,演示瞭如何使用 evmap:
let (book_reviews_r, book_reviews_w) = evmap::new();
// start some writers.
// since evmap does not support concurrent writes, we need
// to protect the write handle by a mutex.
let w = Arc::new(Mutex::new(book_reviews_w));
let writers: Vec<_> = (0..4)
.map(|i| {
let w = w.clone();
std::thread::spawn(move || {
let mut w = w.lock().unwrap();
w.insert(i, true);
w.refresh();
})
})
.collect();
// eventually we should see all the writes
while book_reviews_r.len() < 4 {
std::thread::yield_now();
}
// all the threads should eventually finish writing
for w in writers.into_iter() {
assert!(w.join().is_ok());
}
arc-swap
arc-swap
是一個 Rust 庫,提供了基於 Arc
和 Atomic
的數據結構,用於在多線程之間原子地交換數據。它的設計目的是提供一種高效的方式來實現線程間的共享數據更新,避免鎖的開銷。 你可以把它看成Atomic<Arc<T>>
或者RwLock<Arc<T>>
。
在許多情況下, 可能需要一些數據結構, 這些數據結構經常被讀取而很少更新。一些例子可能是服務的配置, 路由表, 每幾分鐘更新一次的某些數據的快照等。
在所有這些情況下, 需要:
-
快速、頻繁並從多個線程併發讀取數據結構的當前值。
-
在更長的時間內使用數據結構的相同版本 —— 查詢應該由數據的一致版本回答, 數據包應該由舊版本或新版本的路由表路由, 而不是由組合路由。
-
在不中斷處理的情況下執行更新。
第一個想法是使用 RwLock<T>
並在整個處理時間內保持讀鎖。但是更新會暫停所有處理直到完成。 更好的選擇是使用 RwLock<Arc<T>>
。然後可以獲取鎖, 克隆Arc
並解鎖。這會受到 CPU 級別的爭用 (鎖和 Arc
的引用計數) 的影響, 從而相對較慢。根據實現的不同, 穩定的 reader 流入可能會阻塞更新任意長的時間。
可以使用 ArcSwap
替代, 它解決了上述問題, 在競爭和非競爭場景下, 性能特徵都優於 RwLock
。
下面是一個 arc-swap 的例子:
use arc_swap::ArcSwap;
fn main() {
// 創建 ArcSwap 包含整數
let data = ArcSwap::new(1);
// 打印當前值
println!("Initial Value: {}", data.load());
// 原子地交換值
data.store(Arc::new(2));
// 打印新值
println!("New Value: {}", data.load());
}
在這個例子中,ArcSwap
包含一個整數,並通過原子的 store
操作交換了新的 Arc
。這使得多個線程可以安全地共享和更新 Arc
。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/rXHc1q9Zgj0p6nMHDBozzA