Rust 併發編程 - 併發集合

集合類型是我們編程中常用的數據類型,Rust 中提供了一些集合類型,比如Vec<T>HashMap<K, V>HashSet<T>VecDeque<T>LinkedList<T>BTreeMap<K, V>BTreeSet<T>等,它們的特點如下:

不幸的是這些類型都不是線程安全的,沒有辦法在線程中共享使用,有幸的是,我們可以使用前面介紹的併發原語,對這些類型進行包裝,使之成爲線程安全的。

線程安全的 Vec

要實現線程安全的 Vec,可以使用 Arc(原子引用計數)和 Mutex(互斥鎖)的組合。Arc 允許多個線程共享擁有相同數據的所有權,而 Mutex 用於在訪問數據時進行同步,確保只有一個線程能夠修改數據。

以下是一個簡單的例子,演示如何創建線程安全的 Vec:

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    // 使用 Arc 和 Mutex 包裝 Vec
    let shared_vec = Arc::new(Mutex::new(Vec::new()));

    // 創建一些線程,共同向 Vec 中添加元素
    let mut handles = vec![];
    for i in 0..5 {
        let shared_vec = Arc::clone(&shared_vec);
        let handle = thread::spawn(move || {
            // 獲取鎖
            let mut vec = shared_vec.lock().unwrap();

            // 修改 Vec
            vec.push(i);
        });
        handles.push(handle);
    }

    // 等待所有線程完成
    for handle in handles {
        handle.join().unwrap();
    }

    // 獲取 Vec,並輸出結果
    let final_vec = shared_vec.lock().unwrap();
    println!("Final Vec: {:?}", *final_vec);
}

在這個例子中,shared_vec 是一個 Mutex 包裝的 Arc,使得多個線程能夠共享對 Vec 的所有權。每個線程在修改 Vec 之前需要先獲取鎖,確保同一時刻只有一個線程能夠修改數據。

線程安全的 HashMap

要實現線程安全的 HashMap,可以使用 Arc(原子引用計數)和 Mutex(互斥鎖)的組合,或者使用 RwLock(讀寫鎖)來提供更細粒度的併發控制。以下是使用 Arc 和 Mutex 的簡單示例:

use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    // 使用 Arc 和 Mutex 包裝 HashMap
    let shared_map = Arc::new(Mutex::new(HashMap::new()));

    // 創建一些線程,共同向 HashMap 中添加鍵值對
    let mut handles = vec![];
    for i in 0..5 {
        let shared_map = Arc::clone(&shared_map);
        let handle = thread::spawn(move || {
            // 獲取鎖
            let mut map = shared_map.lock().unwrap();

            // 修改 HashMap
            map.insert(i, i * i);
        });
        handles.push(handle);
    }

    // 等待所有線程完成
    for handle in handles {
        handle.join().unwrap();
    }

    // 獲取 HashMap,並輸出結果
    let final_map = shared_map.lock().unwrap();
    println!("Final HashMap: {:?}", *final_map);
}

你發覺處理的套路都是一樣的,就是使用Arc<Mutex<T>>實現。使用 Arc<Mutex<T>> 組合是一種常見的方式來實現線程安全的集合類型,但不是唯一的選擇。這種組合的基本思想是使用 Arc(原子引用計數)來實現多線程間的所有權共享,而 Mutex 則提供了互斥鎖,確保在任何時刻只有一個線程能夠修改數據。

後面的幾種集合類型都可以這麼去實現。

有些場景下你可能使用Arc<RwLock<T>>更合適,允許多個線程同時讀取數據,但只有一個線程能夠寫入數據。適用於讀操作頻繁、寫操作較少的場景。

dashmap

dashmap 是極快的 Rust 併發 map 實現。

DashMap是一個 Rust 中併發關聯 array/hashmap 的實現。

DashMap試圖實現一個類似於std::collections::HashMap的簡單易用的 API, 並做了一些細微的改變來處理併發。

DashMap的目標是變得非常簡單易用, 並可直接替代 RwLock<HashMap<K, V>>。爲實現這些目標, 所有方法採用&self而不是修改方法採用&mut self。這允許您將一個 DashMap 放入一個 Arc 中, 並在線程之間共享它, 同時仍然能夠修改它。

DashMap非常注重性能, 並旨在儘可能快。

    let map = Arc::new(DashMap::new());
    let mut handles = vec![];

    for i in 0..10 {
        let map = Arc::clone(&map);
        handles.push(std::thread::spawn(move || {
            map.insert(i, i);
        }));
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("DashMap: {:?}", map);

基於DashMap, 它還實現了DashSet

lockfree

還有一個lockfree庫,也提供了豐富的線程安全的集合類,因爲已經五年沒有維護了,我就不想介紹它了。

cuckoofilter

Cuckoo Filter 是一種基於哈希的數據結構,用於實現高效的近似集合成員檢查。它的設計靈感來自於 Cuckoo Hashing,但在性能和內存佔用方面有更好的表現。Cuckoo Filter 主要用於解決布隆過濾器的一些問題,如高內存佔用和不支持刪除操作。

下面是一個使用 cuckoofilter 庫的例子:

    let value: &str = "hello world";

    // Create cuckoo filter with default max capacity of 1000000 items
    let mut cf = CuckooFilter::new();

    // Add data to the filter
    cf.add(value).unwrap();

    // Lookup if data is in the filter
    let success = cf.contains(value);
    assert!(success);

    // Test and add to the filter (if data does not exists then add)
    let success = cf.test_and_add(value).unwrap();
    assert!(!success);

    // Remove data from the filter.
    let success = cf.delete(value);
    assert!(success);

evmap

evmap(eventual map)是一個 Rust 庫,提供了一個併發的、基於事件的映射(Map)實現。它允許多個線程併發地讀取和寫入映射,同時支持觀察者模式,允許在映射的變化上註冊事件監聽器。

以下是 evmap 的一些關鍵特性和概念:

以下是一個簡單的示例,演示瞭如何使用 evmap:

    let (book_reviews_r, book_reviews_w) = evmap::new();

    // start some writers.
    // since evmap does not support concurrent writes, we need
    // to protect the write handle by a mutex.
    let w = Arc::new(Mutex::new(book_reviews_w));
    let writers: Vec<_> = (0..4)
        .map(|i| {
            let w = w.clone();
            std::thread::spawn(move || {
                let mut w = w.lock().unwrap();
                w.insert(i, true);
                w.refresh();
            })
        })
        .collect();

    // eventually we should see all the writes
    while book_reviews_r.len() < 4 {
       std::thread::yield_now();
    }

    // all the threads should eventually finish writing
    for w in writers.into_iter() {
        assert!(w.join().is_ok());
    }

arc-swap

arc-swap 是一個 Rust 庫,提供了基於 ArcAtomic 的數據結構,用於在多線程之間原子地交換數據。它的設計目的是提供一種高效的方式來實現線程間的共享數據更新,避免鎖的開銷。 你可以把它看成Atomic<Arc<T>>或者RwLock<Arc<T>>

在許多情況下, 可能需要一些數據結構, 這些數據結構經常被讀取而很少更新。一些例子可能是服務的配置, 路由表, 每幾分鐘更新一次的某些數據的快照等。

在所有這些情況下, 需要:

第一個想法是使用 RwLock<T> 並在整個處理時間內保持讀鎖。但是更新會暫停所有處理直到完成。 更好的選擇是使用 RwLock<Arc<T>>。然後可以獲取鎖, 克隆Arc 並解鎖。這會受到 CPU 級別的爭用 (鎖和 Arc 的引用計數) 的影響, 從而相對較慢。根據實現的不同, 穩定的 reader 流入可能會阻塞更新任意長的時間。

可以使用 ArcSwap 替代, 它解決了上述問題, 在競爭和非競爭場景下, 性能特徵都優於 RwLock

下面是一個 arc-swap 的例子:

use arc_swap::ArcSwap;

fn main() {
    // 創建 ArcSwap 包含整數
    let data = ArcSwap::new(1);

    // 打印當前值
    println!("Initial Value: {}", data.load());

    // 原子地交換值
    data.store(Arc::new(2));

    // 打印新值
    println!("New Value: {}", data.load());
}

在這個例子中,ArcSwap 包含一個整數,並通過原子的 store 操作交換了新的 Arc。這使得多個線程可以安全地共享和更新 Arc

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/rXHc1q9Zgj0p6nMHDBozzA