B 站分佈式 KV 存儲實踐

1 背景   

在 B 站的業務場景中,存在很多種不同模型的數據,有些數據關係比較複雜像:賬號、稿件信息。有些數據關係比較簡單,只需要簡單的 kv 模型即可滿足。此外,又存在某些讀寫吞吐比較高的業務場景,該場景早期的解決方案是通過 MySQL 來進行數據的持久化存儲,同時通過 redis 來提升訪問的速度與吞吐。但是這種模式帶來了兩個問題,其一是存儲與緩存一致性的問題,該問題在 B 站通過 canal 異步更新緩存的方式得以解決,其二則是開發的複雜度,對於這樣一套存儲系統,每個業務都需要額外維護一個任務腳本來消費 canal 數據進行緩存數據的更新。基於這種場景,業務需要的其實是一個介於 Redis 與 MySQL 之間的提供持久化高性能的 kv 存儲。此外對象存儲的元數據,對數據的一致性、可靠性與擴展性有着很高的要求。

基於此背景,我們對自研 KV 的定位從一開始就是構建一個高可靠、高可用、高性能、高拓展的系統。對於存儲系統,核心是保證數據的可靠性,當數據不可靠時提供再高的可用性也是沒用的。可靠性的一個核心因素就是數據的多副本容災,通過 raft 一致性協議保證多副本數據的一致性。

分佈式系統,如何對數據進行分片放置,業界通常有兩種做法,一是基於 hash 進行分區,二是基於 range 進行分區,兩種方式各有優缺點。hash 分區,可以有效防止熱點問題,但是由於 key 是 hash 以後放置的,無法保證 key 的全局有序。range 分區,由於相鄰的數據都放在一起,因此可以保證數據的有序,但是同時也可能帶來寫入熱點的問題。基於 B 站的業務場景,我們同時支持了 range 分區和 hash 分區,業務接入的時候可以根據業務特性進行選擇。大部分場景,並不需要全局有序,所以默認推薦 hash 分區的接入方式,比如觀看記錄、用戶動態這些場景,只需要保證同一個用戶維度的數據有序即可,同一個用戶維度的數據可以通過 hashtag 的方式保證局部有序。

2 架構設計   

2.1 總體架構

整個系統核心分爲三個組件:

Metaserver 用戶集羣元信息的管理,包括對 kv 節點的健康監測、故障轉移以及負載均衡。

Node 爲 kv 數據存儲節點,用於實際存儲 kv 數據,每個 Node 上保存數據的一個副本,不同 Node 之間的分片副本通過 raft 保證數據的一致性,並選出主節點對外提供讀寫,業務也可以根據對數據一致性的需求指定是否允許讀從節點,在對數據一致性要求不高的場景時,通過設置允許讀從節點可以提高可用性以及降低長尾。

Client 模塊爲用戶訪問入口,對外提供了兩種接入方式,一種是通過 proxy 模式的方式進行接入,另一種是通過原生的 SDK 直接訪問,proxy 本身也是封裝自 c++ 的原生 SDK。SDK 從 Metaserver 獲取表的元數據分佈信息,根據元數據信息決定將用戶請求具體發送到哪個對應的 Node 節點。同時爲了保證高可用,SDK 還實現了重試機制以及 backoff 請求。

2.2 集羣拓撲

集羣的拓撲結構包含了幾個概念,分別是 Pool、Zone、Node、Table、Shard 與 Replica。

3 核心特徵   

3.1 分區分裂

基於不同的業務場景,我們同時支持了 range 分區和 hash 分區。對於 range 場景,隨着用戶數據的增長,需要對分區數據進行分裂遷移。對於 hash 分區的場景,使用上通常會根據業務的數據量做幾倍的冗餘預估,然後創建合適的分片數。但是即便是幾倍的冗餘預估,由於業務發展速度的不可預測,也很容易出現實際使用遠超預估的場景,從而導致單個數據分片過大。

之所以不在一開始就創建足夠的分片數有兩個原因:其一,由於每一個 replica 都包含一個獨立的 engine,過多的分片會導致數據文件過多,同時對於批量寫入場景存在一定的寫扇出放大。其二,每一個 shard 都是一組 raftgroup,過多的 raft 心跳會對服務造成額外的開銷,這一點後續我們會考慮基於節點做心跳合併優化減少集羣心跳數。

爲了滿足業務的需求場景,我們同時支持了 range 和 hash 兩種模式下的分裂。兩種模式分裂流程類似,下面以 hash 爲例進行說明。

hash 模式下的分裂爲直接根據當前分片數進行倍增。分裂的流程主要涉及三個模塊的交互。

metaserver

分裂時,metaserver 會根據當前分片數計算出目標分片數,並且下發創建 replica 指令到對應的 Node 節點,同時更新 shard 分佈信息,唯一不同的是,處於分裂中的 shard 狀態爲 splitting。該狀態用於 client 流量請求路由識別。當 Node 完成數據分裂以後上報 metaserver,metaserver 更新 shard 狀態爲 normal 從而完成分裂。

Node

node 收到分裂請求以後,會根據需要分裂的分片 id 在原地拉起創建一個新的分片。然後對舊分片的數據進行 checkpoint,同時記錄舊分片 checkpoint 對應的 logid。新分片創建完成後,會直接從舊分片的 checkpoint 進行 open,然後在異步複製 logid 之後的數據保證數據的一致性。新分片加載完 checkpoint 後,原來的舊分片會向 raftgroup 提交一條分裂完成日誌,該日誌處理流程與普通 raft 日誌一致。分裂完成後上報分裂狀態到 metaserver,同時舊分片開始拒絕不再屬於自己分片的數據寫入,client 收到分片錯誤以後會請求 metaserver 更新 shard 分佈。

完成分裂以後的兩個分片擁有的兩倍冗餘數據,這些數據會在 engine compaction 的時候根據 compaction_filter 過濾進行刪除。

Client

用戶請求時,根據 hash(key) % shard_cnt 獲取目標分片。表分裂期間,該 shard_cnt 表示分裂完成後的最終分片數。以上圖 3 分片的分裂爲例:

hash(key) = 4, 分裂前 shard_cnt 爲 3,因此該請求會被髮送到 shard1. 分裂期間,由於 shard_cnt 變爲 6,因此目標分片應該是 shard4, 但是由於 shard4 爲 splitting,因此 client 會重新計算分片從而將請求繼續發送給 shard1. 等到最終分裂完成後,shard4 狀態變更爲 Normal,請求才會被髮送到 shard4.

分裂期間,如果 Node 返回分片信息錯誤,那麼 client 會請求 metaserver 更新分片分佈信息。

3.2 binlog 支持

類似於 MySQL 的 binlog,我們基於 raftlog 日誌實現了 kv 的 binlog. 業務可以根據 binlog 進行實時的事件流訂閱,同時爲了滿足事件流回溯的需求,我們還對 binlog 數據進行冷備。通過將 binlog 冷備到對象存儲,滿足了部分場景需要回溯較長事件記錄的需求。

直接複用 raftlog 作爲用戶行爲的 binlog,可以減少 binlog 產生的額外寫放大,唯一需要處理的是過濾 raft 本身的配置變更信息。learner 通過實時監聽不斷拉取分片產生的 binlog 到本地並解析。根據 learner 配置信息決定將數據同步到對應的下游。同時 binlog 數據還會被異步備份到對象存儲,當業務需要回溯較長時間的事件流的時候,可以直接指定位置從 S3 拉取歷史 binlog 進行解析。

3.3 多活

基於上述提到的 binlog 能力,我們還基於此實現了 kv 的多活。learner 模塊會實時將用戶寫入的數據同步到跨數據中心的其他 kv 集羣。對於跨數據中心部署的業務,業務可以選擇就近的 kv 集羣進行讀取訪問,降低訪問延時。

kv 的多活分爲讀多活和寫多活。對於讀多活,機房 A 的寫入會被異步複製到機房 B,機房 B 的服務可以直接讀取本機房的數據,該模式下只有機房 A 的 kv 可以寫入。對於寫多活,kv 在機房 A B 都能同時提供寫入並且進行雙向同步,但是爲了保證數據的一致性,需要業務上做數據的單元化寫入,保證兩個機房不會同時修改同一條記錄。通過將用戶劃分單元,提供了寫多活的能力。通過對 binlog 數據打標,解決了雙向同步時候的數據迴環問題。

3.4 bulk load

對於用戶畫像和特徵引擎等場景,需要將離線生成的大量數據快速導入 KV 存儲系統提供用戶讀取訪問。傳統的寫入方式是根據生成的數據記錄一條條寫入 kv 存儲,這樣帶來兩個問題。其一,大批量寫入會對 kv 造成額外的負載與寫入帶寬放大造成浪費。其次,由於寫入量巨大,每次導入需要花費較長的時間。爲了減少寫入放大以及導入提速,我們支持了 bulk load 的能力。離線平臺只需要根據 kv 的存儲格式離線生成對應的 SST 文件,然後上傳到對象存儲服務。kv 直接從對象存儲拉取 SST 文件到本地,然後直接加載 SST 文件即可對外提供讀服務。bulk load 的另外一個好處是可以直接在生成 SST 後離線進行 compaction,將 compaction 的負載 offload 到離線的同時也降低了空間的放大。

3.5 kv 存儲分離

由於 LSM tree 的寫入特性,數據需要被不斷的 compaction 到更底層的 level。在 compaction 時,如果該 key 還有效,那麼會被寫入到更底層的 level 裏,如果該 key 已經被刪除,那麼會判斷當前 level 是否是最底層的,一條被刪除的 key,會被標記爲刪除,直到被 compaction 到最底層 level 的時候纔會被真正刪除。compaction 的時候會帶來額外的寫放大,尤其當 value 比較大的時候,會造成巨大的帶寬浪費。爲了降低寫放大,我們參考了 Bitcask 實現了 kv 分離的存儲引擎 sparrowdb.

sparrowdb 介紹

用戶寫入的時候,value 通過 append only 的方式寫入 data 文件,然後更新索引信息,索引的 value 包含實際數據所在的 data 文件 id,value 大小以及 position 信息,同時 data 文件也會包含索引信息。與原始的 bitcask 實現不一樣的是,我們將索引信息保存在 rocksdb。

更新寫入的時候,只需要更新對應的索引即可。compaction 的時候,只需將索引寫入底層的 level,而無需進行 data 的拷貝寫入。對於已經失效的 data,通過後臺線程進行檢查,當發現 data 文件裏的索引與 rocksdb 保存的索引不一致的時候,說明該 data 已經被刪除或更新,數據可以被回收淘汰。

使用 kv 存儲分離降低了寫放大的問題,但是由於 kv 分離存儲,會導致讀的時候多了一次 io,讀請求需要先根據 key 讀到索引信息,再根據索引信息去對應的文件讀取 data 數據。爲了降低讀訪問的開銷,我們針對 value 比較小的數據進行了 inline,只有當 value 超過一定閾值的時候纔會被分離存儲到 data 文件。通過 inline 以及 kv 分離獲取讀性能與寫放大之間的平衡。

3.6 負載均衡

在分佈式系統中,負載均衡是繞不過去的問題。一個好的負載均衡策略可以防止機器資源的空閒浪費。同時通過負載均衡,可以防止流量傾斜導致部分節點負載過高從而影響請求質量。對於存儲系統,負載均衡不僅涉及到磁盤的空間,也涉及到機器的內存、cpu、磁盤 io 等。同時由於使用 raft 進行主從選主,保證主節點儘可能的打散也是均衡需要考慮的問題。

副本均衡

由於設計上我們會盡量保證每個副本的大小盡量相等,因此對於空間的負載其實可以等價爲每塊磁盤的副本數。創建副本時,會從可用的 zone 中尋找包含副本數最少的節點進行創建。同時考慮到不同業務類型的副本讀寫吞吐可能不一樣導致 CPU 負載不一致,在挑選副本的時候會進一步檢查當前節點的負載情況,如果當前節點負載超過閾值,則跳過該節點繼續選擇其他合適的節點。目前基於最少副本數以及負載校驗基本可以做到集羣內部的節點負載均衡。

當出現負載傾斜時,則從負載較高的節點選擇副本進行遷出,從集羣中尋找負載最低的節點作爲待遷入節點。當出現節點故障下線以及新機器資源加入的時候,也是基於均值計算待遷出以及遷入節點進行均衡。

主從均衡

雖然通過最少副本數策略保證了節點副本數的均衡,但是由於 raft 選主的性質,可能出現主節點都集中在部分少數節點的情況。由於只有主節點對外提供寫入,主節點的傾斜也會導致負載的不均衡。爲了保證主節點的均衡,Node 節點會定期向 metaserver 上報當前節點上副本的主從信息。

主從均衡基於表維度進行操作。metaserver 會根據表在 Node 的分佈信息進行副本數的計算。主副本的數量基於最樸素簡單的數學期望進行計算: 主副本期望值 = 節點副本數 / 分片副本數。下面爲一個簡單的例子:

假設表 a 包含 10 個 shard,每個 shard 3 個 replica。在節點 A、B、C、D 的分佈爲 10、5、6、9. 那麼 A、B、C、D 的主副本數期望值應該爲 3、1、2、3. 如果節點數實際的主副本數少於期望值,那麼被放入待遷入區,如果大於期望值,那麼被放入待遷出區。同時通過添加誤差值來避免頻繁的遷入遷出。只要節點的實際主副本數處於 [x-δx,x+δx] 則表示主副本數處於穩定期間,x、δx 分別表示期望值和誤差值。

需要注意的是,當對 raft 進行主從切換的時候,從節點需要追上所有已提交的日誌以後才能成功選爲主,如果有節點落後的時候進行主從切換,那麼可能導致由於追數據產生的一段時間無主的情況。因此在做主從切換的時候必須要檢查主從的日誌複製狀態,當存在慢節點的時候禁止進行切換。

3.7 故障檢測 & 修復

一個小概率的事件,隨着規模的變大,也會變成大概率的事件。分佈式系統下,隨着集羣規模的變大,機器的故障將變得愈發頻繁。因此如何對故障進行自動檢測容災修復也是分佈式系統的核心問題。故障的容災主要通過多副本 raft 來保證,那麼如何進行故障的自動發現與修復呢。

健康監測

metaserver 會定期向 node 節點發送心跳檢查 node 的健康狀態,如果 node 出現故障不可達,那麼 metaserver 會將 node 標記爲故障狀態並剔除,同時將 node 上原來的 replica 遷移到其他健康的節點。

爲了防止部分 node 和 metaserver 之間部分網絡隔離的情況下 node 節點被誤剔除,我們添加了心跳轉發的功能。上圖中三個 node 節點對於客戶端都是正常的,但是 node3 由於網絡隔離與 metaserver 不可達了,如果 metaserver 此時直接剔除 node3 會造成節點無必要的剔除操作。通過 node2 轉發心跳探測 node3 的狀態避免了誤剔除操作。

除了對節點的狀態進行檢測外,node 節點本身還會檢查磁盤信息並進行上報,當出現磁盤異常時上報異常磁盤信息並進行踢盤。磁盤的異常主要通過 dmesg 日誌進行採集分析。

故障修復

當出現磁盤節點故障時,需要將原有故障設備的 replica 遷移到其他健康節點,metaserver 根據負載均衡策略選擇合適的 node 並創建新 replica, 新創建的 replica 會被加入原有 shard 的 raft group 並從 leader 複製快照數據,複製完快照以後成功加入 raft group 完成故障 replica 的修復。

故障的修復主要涉及快照的複製。每一個 replica 會定期創建快照刪除舊的 raftlog,快照信息爲完整的 rocksdb checkpoint。通過快照進行修復時,只需要拷貝 checkpoint 下的所有文件即可。通過直接拷貝文件可以大幅減少快照修復的時間。需要注意的是快照拷貝也需要進行 io 限速,防止文件拷貝影響在線 io.

4 實踐經驗   

4.1 rocksdb

4.2 Raft

5 未來探討 

參考文獻

[1] Bitcask A Log-Structured Hash Table for Fast Key/Value Data

[2] Lethe: A Tunable Delete-Aware LSM Engine

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/KAkOXM2Yy-b-MOn0krPwvg