透過 rust 探索系統的本原:併發篇

rust 是一門非常優秀的語言,我雖然沒有特別正式介紹過 rust 本身,但其實已經寫了好多篇跟 rust 相關的文章:

我打算寫一個系列,講講如果透過 rust 來更好地探索系統的本原。我不知道我能寫多少,也許就這一篇,也許很多篇,不管怎樣,每篇都會介紹獨立的概念。這個系列並不會介紹大量的 rust 代碼,因此其內容對非 rust 程序員也有好處。

這一篇我們講併發。幾年前我曾經寫過一篇介紹併發概念的文章:concurrency,大家感興趣可以看看。這篇我們從更加務實的角度,以一個簡單的字典服務器程序的迭代爲引子,把併發中涉及的概念和解決方法串起來。

v1:循環處理

我們的字典服務器監聽 8888 端口,在服務器端維護一個 KV db(使用 hash map)。客戶端可以插入(更新)一個 key 和相關的 value,也可以查詢一個 key,獲得對應的 value。嗯,就像 redis 服務器一樣,只不過比 redis 簡單十萬八千倍。

這個需求很簡單,我們馬上可以想到:

  1. 監聽 8888 端口

  2. 寫一個死循環,不斷 accept socket,然後對 socket 裏收到的數據進行處理。

但這樣是串行服務,我們只有處理完上一個 socket 的數據,纔有機會處理下一個 socket,吞吐量非常有限。顯然,我們需要改進。

v2:多線程處理

接下來我們需要解決串行服務的瓶頸。一個方法是 accept 之後,將新的 socket 放入一個線程裏執行,於是主線程不會被阻塞住,可以繼續 accept 後續的 socket。這樣,每個 client 過來的請求都可以獨立地處理。

可是,這帶來了一個顯而易見的問題:我們的 KV db 成爲了一個共享狀態,它在多個線程之間共享數據。這是併發處理的第一種範式:共享狀態的併發(Shared-State Concurrency)。

既然引入了共享狀態,那麼我們需要在訪問它的時候做妥善的保護 —— 這個訪問和操作共享狀態的代碼區域叫臨界區(Critical Section)。如果你還記得操作系統課程的內容,你會知道,最基本的操作是使用互斥量(Mutex)來保護臨界區。

互斥量本質是一種二元鎖。當線程獲得鎖之後,便擁有了對共享狀態的獨佔訪問;反之,如果無法獲得鎖,那麼將會在訪問鎖的位置阻塞,直到能夠獲得鎖。在完成對共享狀態的訪問後(臨界區的出口),我們需要釋放鎖,這樣,其它訪問者纔有機會退出阻塞狀態。一旦忘記釋放鎖,或者使用多把鎖的過程中造成了死鎖,那麼程序就無法響應或者崩潰。rust 的內存安全模型能夠避免忘記釋放鎖,這讓開發變得非常輕鬆,並且最大程度上解決了(不同函數間)死鎖問題。

但任何語言的任何保護都無法避免邏輯上的死鎖,比如下面這個顯而易見的例子:

use std::sync::Mutex;
fn main() {
    let data = Mutex::new(0);
    let _d1 = data.lock();
    let _d2 = data.lock(); // deadlock now
}

互斥鎖往往鎖的粒度太大,在很多場景下效率太低。於是我們在此基礎上分離了讀寫的操作,產生了讀寫鎖(RwLock),它同一時刻允許任意數量的共享讀者或者一個寫者。讀寫鎖的一個優化是順序鎖(SeqLock),它提高了讀鎖和寫鎖的獨立性 —— 寫鎖不會被讀鎖阻塞,讀鎖也不會被寫鎖阻塞。,但寫鎖會被寫鎖阻塞。

讀寫鎖適用於讀者數量遠大於寫者,或者讀多寫少的場景。在我們這個場景下,讀寫的比例差別可能並不是特別明顯,從 Mutex 換到 RwLock 的收益需要在生產環境中具體測試一下才能有結論。

v3:鎖的優化

但即使我們無法通過使用不同實現的鎖來優化對共享狀態訪問的效率,我們還是有很多方法來優化鎖。無論何種方法,其核心思想是:儘可能減少鎖的粒度。比如,對數據庫而言,我們可以對整個數據庫管理系統加鎖,也可以對單個數據庫的訪問加鎖,還可以對數據表的訪問加鎖,甚至對數據表中的一行或者一列加鎖。對於我們的 KV db 而言,我們可以創建 N 個 hashmap(模擬多個數據庫),然後把 Key 分散到這 N 個 hashmap 中,這樣,不管使用什麼鎖,其粒度都變成之前的 1/N 了。

新的 KV db 的定義,以及添加 / 訪問數據的代碼:

use std::collections::{hash_map::DefaultHasher, HashMap};
use std::hash::{Hash, Hasher};
use std::sync::{Arc, RwLock};
struct KvDb(Arc<Vec<RwLock<HashMap<String, Vec<u8>>>>>);
impl KvDb {
    pub fn new(len: usize) -> Self {
        let mut dbs: Vec<RwLock<HashMap<String, Vec<u8>>>> = Vec::with_capacity(len);
        for _i in 0..len {
            dbs.push(RwLock::new(HashMap::new()))
        }
        Self(Arc::new(dbs))
    }
    pub fn insert(&self, k: &str, v: Vec<u8>) {
        let dbs = self.0.clone();
        let mut writer = dbs[(self.hash(k) % dbs.len()) as usize].write().unwrap();
        writer.insert(k.into(), v);
    }
    pub fn get(&self, k: &str) -> Vec<u8> {
        let dbs = self.0.clone();
        let reader = dbs[(self.hash(k) % dbs.len()) as usize].read().unwrap();
        reader.get(k).unwrap().to_owned()
    }
    fn hash(&self, k: &str) -> usize {
        let mut hasher = DefaultHasher::new();
        k.to_owned().hash(&mut hasher);
        hasher.finish() as usize
    }
}

rust 裏面的 dashmap 提供了一個類似思路的高併發訪問的 hashmap。

v4:share memory by communicating

前面的迭代不管怎麼優化都跳脫不出同一種思路:Shared-state concurrency,或者說:communicate by share memory。這種方法限制很少,非常靈活,適用於任何併發場景,因而它是所有併發方案的基石。然而,靈活度帶來的問題就是容易出錯,需要額外的約定和限制來避免一些問題的產生。

我們知道,計算機軟件系統不斷髮展的過程就是一個爲特定的需求不斷添加約定和限制的過程。就像 這句計算機史上影響了無數設計的名言所說的:

那麼,有沒有辦法把併發的需求抽象出來,設計一些更高級的數據結構和使用方法,把鎖的使用隱藏起來?

當然有。

其中最有效最優雅的方法是消息傳遞(message passing)。我們把問題的兩端分別定義成生產者和消費者。KvDb 的客戶端是生產者,它們提交請求(update / get),而 KvDb 的服務器是消費者,它接受請求,返回處理的結果。連接兩端的是一個消息通道(channel)。我們可以根據消息通道的兩端的使用情況,將其進一步細分成幾種訪問模型:

使用消息通道的思路,我們可以進一步迭代我們的 KvDb —— 在處理 socket 的線程和處理 state 的線程之間建立一個 mpsc channel:

這種方式是否更高效?不見得。但從併發處理的角度來看,它結構上更清晰,不容易出錯。

使用消息傳遞來處理併發的思路是如此重要,以至於兩門非常有影響力的語言將其內置在語言的運行時裏,成爲語言的一部分:

golang 內建了 channel,使用 goroutine 和 channel 來處理併發。其語言的核心思想是:

Do not communicate by sharing memory; instead, share memory by communicating.

而 erlang 內建了 actor model,讓 sendreceive 成爲其最基本的六個函數之一。兩個 actor(process)之間唯一的交流方式就是找到對方的 pid,然後發送消息。

v5:協程(async/await or 異步處理)

我們在使用多線程做併發處理時,使用的是操作系統的調度能力。這樣的好處是,我們無需自己再做一個調度器,進行復雜的調度處理;壞處是,操作系統處理線程的調度需要複雜的上下文切換,其中包括用戶態和內核態的切換,所以它的效率不夠高,尤其是如果我們需要大量的隨用隨拋的「線程」時。

然而,「現代」的應用程序因爲複雜程度越來越高,所以其併發程度也越來越高,大量的操作都涉及隨用隨拋的「線程」。如果我們用操作系統線程來實現這些「線程」,會大大拖累系統的整體效率,甚至會觸及操作系統的限制(/proc/sys/kernel/threads-max)。

因而,「現代」的編程語言都有協程的支持 —— 在 golang 裏是 goroutine,在 erlang 裏是 process,在 python 裏是 coroutine,在 rust 裏是 future。它們可以以一個更小的粒度在用戶態進行併發處理,代價是用戶態需要一個調度器。golang / erlang 在語言層面的運行時提供了這個調度器,而 rust 需要引入相關的庫。這些語言的用戶態調度器的實現都大同小異:

無論從 v3 還是 v4 版本,我們都很容易把一個多線程的實現變成多協程的實現。對於 rust 而言,就是引入 async / await:

對於我們的 kv server,因爲協程處理的流程圖和線程處理類似(內部機制大不一樣),所以這裏我就不附圖了。

One more thing:線程和協程間的同步

在一個複雜的系統裏,線程和協程可能會同時出現。我們用線程做計算密集的事情,而用協程做 IO 密集的事情,這樣系統可以達到最好的吞吐能力。遺憾的是,很多以協程爲賣點的語言,如 erlang 和 golang,你所面臨的環境是受控的(某種意義上說,這也是優勢 - don't make me think),只能創建協程,而不能創建線程。所以無法做這樣的優化。而另一些語言,如 Python,Scala,雖然同時支持線程和協程,兩者混合使用要麼效率不高,要麼沒有很好的庫,用起來很彆扭(我並沒有 scala 經驗,關於 akka 和 thread 混用的彆扭只是道聽途說)。

而 Rust 處理得很優雅 — tokio::sync 提供了在同步和異步線程之間使用 channel 同步的工具。你甚至感覺不到你的數據在不同的 runtime 間穿梭。其實站在操作系統的角度想想也能釋然:管它是線程和協程,在操作系統層面都是線程,只不過協程是運行在某些線程上的受那些線程獨立調度的數據結構而已。所以,線程和協程間的同步,歸根結底,還是線程之間的同步問題。而線程間同步的手段,我們都可以使用,只不過在這種場景下,channel 是最好(最舒服)的選擇。

所以,我們可以在系統啓動時(或者服務器啓動時),在普通的線程和 tokio 管理的線程(Runtime)間創建好一個 channel,然後在各自的上下文中處理流入流出 channel 的數據,如下圖所示:

本文中我們提到的這個 KV store 的例子太簡單,並不涉及同步線程和異步線程之間的同步,我舉個其它例子。上篇文章《從微秒到納秒》講了如何使用多線程來處理不同 repo 下的事件的寫入。下圖是之前文章裏的主流程:

在這個流程的基礎上,我們需要添加一個新的功能:當日志文件 rotate 時,我們發一個消息出去,由一組 uploader 線程負責把剛剛關閉封存的日誌文件傳輸到 S3。

Rust 下和 S3 打交道的庫是 Rusoto,Rusoto 是全異步的處理,因而我們需要一個 Tokio runtime 來處理異步的任務。我們可以在 Server.start 接口來處理 Runtime 的創建,然後創建 channel,把 rx 交給 Tokio runtime 下運行的一個死循環的異步任務,這個任務從 rx 裏取數據,然後 spawn 新的異步任務將 file 上傳到 S3 對應 bucket 的 key 下。而 channel 的 tx 端則傳給每個 repo 的 LoggerWriter,這樣,LoggerWriter 在做 rotation 的時候,就可以通過 tx 發送要上傳給 S3 的本地文件名 file,以及上傳到 S3 的對象的 key。如下圖所示:

整個流程同樣看上去不容易實現,但最終添加的也就是二十行代碼而已(不計入 S3 具體上傳的代碼)。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/9g0wVT-5PpmXRoKJZo-skA