深入解讀 Raft 算法與 etcd 工程實現

作者:jettchen,騰訊 IEG 後臺開發工程師

本文不對 raft 算法從頭到尾細細講解,而是以 raft 算法論文爲起點,逐步解讀 raft 算法的理論,幫助讀者理解 raft 算法的正確性。然後,etcd 不僅是 raft 算法最爲熱門的工程實現,同時也是雲原生 kubernetes 的核心存儲,本文也對 etcd 的底層實現進行剖析,讓讀者在使用 etcd 組件的過程中能夠做到心中有數。對 raft 算法足夠熟悉的同學,也可以直接閱讀 etcd 工程實現那塊內容。

1. raft 算法的簡單介紹

在 raft 算法中,每個機器節點的狀態包含三種:leader、follower、candidate。系統在時間上被劃分爲一系列連續的任期 term,每個 term 的 leader 可以產生連續的 log,如下圖所示。每個任期 term 可以選舉出一個 leader,該 term 的 leader 選舉出來後可以產生日誌。異常情況下,一些任期 term 可能選舉 leader 會失敗而直接進入下一個 term,或者 leader 沒有產生任何日誌就超時從而進入下一個選舉週期。

leader 節點需要將其產生的 log 複製給其他節點,當多數派節點收到 log 則表明該 log 可提交。對於集羣機器更換或者擴縮容,leader 節點生成配置變更日誌並且複製給其他節點來達成一致。

從上面對 raft 算法的介紹中,可以得出 raft 需要解決以下三個問題。後續章節將圍繞這三個問題剖析 raft 算法的實現。

  1. raft 如何安全地選舉出一個 leader?

  2. leader 如何將 log 安全地複製到其他節點?

  3. 集羣如何安全地變更機器節點?

2. leader 選舉以及選舉安全性

2.1 leader 的選舉流程

下圖是 leader 選舉的流程圖。節點初始化的時候,首先進入到 follower 狀態,一段時間內沒有收到 leader 節點的心跳就會切換到 candidate 狀態去競選 leader。節點進入到 candidate 狀態後,首先將自身任期 term 加一,然後給自己投票,並且向其他節點廣播 RequestVote 投票請求。candidate 競選 leader 的結果有三種:

  1. 拿到多數派投票,切換爲 leader。

  2. 發現其他節點已經是 leader(其任期 term 不小於自身 term),則切換爲 follower。

  3. 選舉超時後重來一遍選舉流程(比如多個 candidate 同時參與競選 leader 導致每個 candidate 都拿不到多數派投票)。

candidate 每次選舉時都會設置隨機的選舉超時時間,避免一直有多個 candidate 同時參與競選。candidate 競選成爲 leader 後,就不停地向其他節點廣播心跳請求,以維持自己的 leader 狀態同時也爲了阻止其他節點切換爲 candidate 去競選 leader。

另外有一種異常情況,比如某個機器網絡故障導致它一直收不到 leader 的心跳消息,那它就會切換到 candidate 狀態,並且會一直選舉超時,那它就會一直增加自身的任期 term,當網絡恢復正常的時候,原有 leader 就會收到較高任期 term 的請求從而被迫切換到 follower 狀態,這樣就會影響到整個集羣的穩定性。因此在工程實現的時候,candidate 都會增加一個 preVote 預投票階段。在預投票階段,candidate 不增加自身 term 而只會廣播投票請求,只有拿到多數派投票後才進入正式投票階段,這樣就可以避免由於網絡分區導致集羣的 term 不斷增大進而影響集羣的穩定性。

最後,因爲日誌複製只會從 leader 複製到其他節點,所以在選舉的時候,必須確保新 leader 包含之前任期所有提交的日誌。接下來我們來看 raft 是如何保證新 leader 一定包含之前任期所有提交的日誌。

2.2 leader 選舉的安全性

下圖描述的是 leader 選舉過程中,候選者 candidate 發出的投票請求協議。投票請求會帶上候選者自身的任期 term、memberId、最新日誌的任期 term 和 index,其他節點收到請求後如果發現候選者的任期 >= 自身任期 並且 候選者的最新日誌 >= 自身的最新日誌,則回覆同意。

每條日誌的元數據包括任期 term 以及一個自增的日誌索引。日誌大小的比較規則是:先比較日誌的任期 term,term 相同則再比較日誌的 logIndex。

下面用個例子來證明 leader 選舉的安全性。比如有 5 臺機器,多數派機器擁有最新提交的日誌,如果此時有 1 臺機器拿到了多數派投票成爲 leader,因爲兩個多數派必然存在交集,所以被選出來的 leader,其日誌必然 >= 最新提交的日誌。因此可以得出 1 個結論:新 leader 節點一定包含最新提交的日誌。

3. raft 的日誌複製以及日誌安全性

3.1 raft 的日誌複製請求

下圖描述的是 leader 處理寫請求過程中,向其他節點發出的日誌複製請求協議。請求會帶上 leader 自己的任期 term、memberId、本次待複製的日誌列表、上一條日誌的 prevLogIndex 和 prevLogTerm、已達到多數派一致而提交的最大日誌索引 commitIndex。其他節點收到請求後,如果發現 leader 的任期 >= 自身任期 並且 日誌一致性檢查通過,則用請求中待複製的日誌列表直接覆蓋本地的日誌,並且更新本地的 commitIndex。日誌一致性檢查的邏輯是:自身節點已存在的日誌列表中如果包含請求中指定 prevLogIndex、prevLogTerm 的日誌,則檢查通過。

接下來用下圖這個例子來講解日誌複製的過程。機器節點 d 作爲 term 7 的 leader 節點,產生兩條日誌後發生異常,之後其中一臺機器在 term 8 成功競選成爲 leader 並生成了一條新日誌,這條新日誌的 logTerm 爲 8,logIndex 爲 11。這個新任 leader 在將這條新日誌複製給其他節點的時候,會帶上前一條日誌的元數據,也就是 prevLogTerm 爲 6,prevLogIndex 爲 10。剛開始由於只有節點 c 和 d 包含這個前一條日誌而複製成功,其他節點則會拒絕複製。leader 節點收到複製失敗的回包後,需要往前移動待複製的日誌列表然後重新發送日誌複製請求。例如 leader 節點能夠成功向節點 b 複製日誌的請求,該請求體的內容爲:前一條日誌的 prevLogTerm 爲 4,prevLogIndex 爲 4,而待複製的日誌列表則包含從 logIndex 爲 5 開始的所有日誌。

3.2 raft 的日誌匹配性質

日誌複製到其他節點後,不同節點上的日誌會滿足一個匹配性質。不同節點上的兩個日誌條目,如果 logTerm 、logIndex 都相同,則有:

  1. 由於 leader 節點對於給定的任期 term、給定的 logIndex 至多創建 1 個日誌條目,那麼這兩條日誌必然包含相同的狀態機輸入。

  2. 因爲存在日誌複製請求的一致性檢查,所以這兩個節點上,位於這條相同日誌之前的所有日誌條目必然也會相同。

通過這個日誌匹配性質,就可以總結出:所有節點都會擁有一致的狀態機輸入序列。這樣,各個節點就可以通過一致的初始狀態 + 一致的狀態機輸入序列 從而 得到一致的最終狀態

3.3 raft 日誌的提交安全性

日誌成功複製給多數派節點,就可以提交,進而 apply 到業務狀態機。但日誌提交的時候存在一個限制:不能直接提交之前任期 term 的日誌,只能提交當前任期下的日誌。

以下面這個圖爲例子,在集羣處於狀態 c 的時候,節點 S1 在 term 4 成爲 leader,並且已經將 term 2 的日誌複製給多數派,此時節點 S1 將 term 2 的日誌 commit 後宕機。之後集羣進入到狀態 d,此時節點 S5 成爲 leader 並且將 term 3 的日誌複製給其他節點,這樣就會導致之前已 commit 的 term 2 日誌被回滾覆蓋。

因此爲了避免這個問題,之前節點 S1 在任期 term 4 的時候,不能直接 commit 之前任期 term 的日誌,只能通過將自己任期 term 4 的日誌複製給多數派從而 commit 自己任期內的日誌,如圖中狀態 e 所示。而一旦自己任期 term 內的日誌得到 commit,那麼由於日誌一致性檢查的存在,那麼之前任期 term 下的日誌必然也達到了多數派一致,因此之前任期 term 的日誌此時也可以安全地 commit。

4. raft 的集羣成員變更

4.1 集羣成員變更的問題

集羣在擴縮容或者機器節點發生故障的時候,我們需要對集羣的成員進行變更。以下圖爲例,如果我們直接將集羣的節點配置切換到新配置,由於無法將所有節點的配置同時切換到新配置,因此存在某一個時刻,server 1 和 server 2 可以形成老配置的多數派,server 3、server 4 和 server 5 可以形成新配置的多數派,這樣在同一個任期 term 內就可以選舉出兩個 leader,使得集羣產生腦裂。

那麼如何解決這種成員變更的問題呢?有兩種方式:1. 聯合共識。2. 單成員變更。

4.2 聯合共識 - 解決集羣成員變更問題

如下圖所示的聯合共識中,集羣分爲三個階段。

  1. 集羣還在採用 Cold 配置,此時 Cold 配置中的多數派達成一致就可以做出決議。

  2. 向集羣寫入一個 Cold,new 的配置後,集羣進入聯合共識狀態,此時需要 Cold 配置中的多數派與 Cnew 配置中的多數派同時達成一致纔可以做出決議。

  3. 向集羣寫入一個 Cnew 的配置,集羣進入最終狀態,此時 Cnew 配置中的多數派達成一致就可以做出決議。

在上面三個階段中,下面兩個條件必然成立。因此在聯合共識的成員變更過程中,任何時刻都不會選舉出兩個 leader。

  1. 階段一的多數派和階段二的多數派一定存在交集。

  2. 階段二的多數派和階段三的多數派一定存在交集。

4.3 單成員變更 - 解決集羣成員變更問題

單成員變更的意思就是集羣每次只變更一個節點。如下圖所示,在單成員變更的方式中,變更前後的兩個多數派一定存在交集,也就是變更過程中不可能產生兩個 leader,因此採用單成員變更的方式,集羣不需要引入聯合共識的過渡狀態。

5. raft 的開源項目

etcd 作爲雲原生 kubernetes 中的核心存儲,也是 raft 算法實現中最火熱的開源項目,接下來向大家介紹下 etcd 的工程實現。

6. etcd 的架構

etcd 的架構如下圖所示。api 接口支持 http 協議和 grpc 協議,node 主要負責 raft 算法的實現,storage 主要負責 raft 日誌以及 snap 快照文件的存儲,transport 主要負責集羣節點間的通信。kvstore 分爲 v2 和 v3 兩個版本數據庫,主要負責業務數據的存儲,其中 v3 版本數據庫的實現採用 lboltdb 和 keyIndex,支持 mvcc 機制。

7. etcd 的 raft 日誌模塊

7.1 etcd 的 raft 日誌存儲

從前面 raft 算法理論的學習中,可以得出有兩類數據需要刷盤:

  1. raft 日誌:協議交互流程中的一種承諾,一個節點一旦告訴其他節點自己已接收某條日誌,則這條日誌就不能丟失。

  2. 節點的狀態:包括當前的任期 term、當前的投票目標 vote、可提交的最大日誌索引 commit 三個字段。前兩個字段是 leader 選舉流程中的承諾,第三個字段是節點在重啓恢復時用來控制日誌回放到哪一條日誌。

etcd 採用 wal 文件來保存上面兩種數據的,保存的格式如下圖所示。第一個 wal 文件在文件開頭首先寫入 0 值的 crc32 記錄,之後每一個 raft 日誌或者節點狀態的記錄,其 crc32 值 = calc(pre_crc32, 本記錄的二進制值)。而對於第二個及之後的 wal 文件,文件開頭的初始 crc32 值 = 上一個 wal 文件最後一條記錄的 crc32 值。可以看到:所有 wal 文件,其所有記錄的 crc32 值可以形成一個可進行強校驗的鏈表。這樣在重啓恢復的時候,etcd 就可以對 wal 文件的內容進行精細化的校驗。

7.2 etcd 的 raft 日誌壓縮

如果不對 raft 日誌進行壓縮的,那 wal 文件佔用的磁盤空間就會越來越大,所以需要一個日誌壓縮機制,接下來通過下面這個圖來講述日誌壓縮機制。在 raft 日誌中,首先定義幾個概念:

  1. log_index:最新的日誌位置索引。

  2. commit_index:已達成多數派一致,可提交的最大日誌位置索引。

  3. apply_index:已應用到業務狀態機的最大日誌位置索引。

  4. compact_index:raft 日誌清理的臨界位置索引,在該索引之前的所有 raft 日誌,都可以清掉。

  5. last_snap_index:上一個 snap 快照文件的日誌索引,代表 snap 快照文件生成時刻的 apply_index。

當 apply_index - last_snap_index > Cfg.SnapshotCount(默認值爲 10w)時,會觸發 snap 快照文件的生成,etcd 將數據庫當前的快照數據寫入 snap 文件的同時,也會在 wal 文件中追加一個 snapshotType 的記錄(該記錄包含此時的任期 term 和 apply_index)。

理論上講此時 apply_index 之前的所有 raft 日誌都可以清掉了,但是在生產環境中其他 follower 節點的日誌複製進度可能比較落後,還在學習 apply_index 之前的日誌,如果此時 apply_index 之前的日誌都被清掉了,那麼 leader 節點必須發送 snap 文件 + apply_index 之後的 raft 日誌,而發送 snap 文件是一個非常耗性能的操作,因此爲了避免頻繁發生這種 snap 文件的發送操作,在清理 raft 日誌的時候,一般在 apply_index 前面保留 Cfg.SnapshotCatchUpEntries(默認值爲 5000)個 raft 日誌。

7.3 etcd 重啓時如何根據 raft 日誌恢復數據

從前面可以知道,etcd 在運行過程中會不斷生成 wal 文件和 sanp 文件,那 etcd 在重啓時是如何恢復數據的呢?

以下圖爲例來講解恢復流程。首先,介紹下 wal 文件和 snap 文件的命名規則。wal 文件的命名包含一個遞增的 seq、本 wal 文件中的起始日誌索引 log_index。snap 文件的命名包含生成快照時刻的任期 term、應用到狀態機的 apply_index。恢復流程的具體步驟爲:

  1. 讀取所有 wal 文件,從 wal 文件中拿到所有的 snap 文件名,前面講過每次日誌壓縮生成 snap 文件時都會往 wal 文件中寫一條 snapshotType 記錄(根據該記錄可以拿到 snap 文件名)。

  2. 從所有 snap 文件中選擇一個最新且未被損壞的 snap 文件來恢復存儲數據,在該例子中則會選中 0000000000000001-0000000000024000.snap 文件。

  3. 根據 0000000000000001-0000000000024000.snap 文件篩選出所需要的 wal 文件列表,此時只需要讀取 24000 之後的 raft 日誌,所以只需要篩選出 0000000000000002-0000000000020000.wal 和 0000000000000003-0000000000030000.wal 兩個文件,然後讀取這兩個 wal 文件恢復 HardState 狀態,並且在 snap 文件的基礎上做日誌回放。

7.4 etcd 如何優化 raft 日誌的讀寫

外部的每一個寫請求都會生成一條 raft 日誌,而 raft 日誌是需要刷盤的。如果每生成一條 raft 日誌就刷盤一次,那 etcd 的寫入性能必然很低,因此 etcd 採用異步批量刷盤的方式來優化寫入性能,如下圖所示。

  1. 外部的寫入請求先由 go routine 1 寫入到 propc 通道。

  2. go routine 2 消費 propc 通道中的請求,將其轉化爲 unstable_log(保存在內存中,表示尚未達成多數派一致的 raft 日誌),也會在待發送消息的緩衝區中生成日誌複製請求。

  3. go routine 3 會將 unstable_log、待發送的日誌複製請求打包成一個 ready 結構,寫入道 readyc 通道。

  4. go routine 4 消費 readyc 中的數據,將 raft 日誌刷盤到 wal 文件以及追加到 stable_log(保存在內存中,可理解爲 wal 文件中的 raft 日誌在內存中的副本),同時將日誌複製請求發送給 follower 節點。

  5. 對於已達成多數派一致的那些日誌,unstable_log 緩衝區就可以清理掉了。

etcd 通過這種日誌處理方式,不僅將多次寫請求合併爲一次刷盤優化了寫入性能,而且通過在 stable_log 內存緩衝區中額外維護一份 wal 文件中 raft 日誌的副本,從而優化了 raft 日誌的讀取性能。

8. etcd 的 mvcc 機制

8.1 etcd 爲什麼要引入 mvcc?

etcd 之前的 v2 版本數據庫是一個樹型的內存數據庫,整個數據庫擁有一個大的 RWLock,寫鎖和讀鎖相互阻塞,影響讀寫性能。另外,etcd v2 數據庫不會保存歷史版本數據,在 v2 版本的 watch 機制中,v2 版本基於滑動窗口最多保留最近的 1000 條歷史事件,當 etcd server 遇到寫請求較多、網絡波動等場景時,很容易出現事件丟失問題,進而觸發 client 全量拉取數據,產生大量 expensive request,甚至導致 etcd 雪崩。

因此 etcd 併發性能問題導致 Kubernetes 集羣規模受限,而 watch 機制可靠性問題又直接影響到 Kubernetes controller 的正常運行。解決併發問題的方法有很多,而 mvcc 在解決併發問題的同時,還能通過存儲多版本數據來解決 watch 機制可靠性問題。所以,etcd v3 版本果斷選擇了基於 mvcc 來實現多版本併發控制。mvcc 能最大化地實現高效地讀寫併發,尤其是高效地讀,非常適合讀多寫少的場景。

8.2 etcd 如何存儲數據以支持 mvcc 機制

如下圖所示,etcd 採用 B 樹存儲 key 的歷史版本號數據,並且通過 keyIndex 結構來構建整個 B 樹,這個 keyIndex 存儲了某一個 key 的歷史版本號數據。

keyIndex 的結構如下所示,其中的每個 generation 代表 key 的一次創建到刪除的版本號變更流水。

// generation contains multiple revisions of a key.
type generation struct {
    ver     int64 // 修改次數
    created revision // when the generation is created (put in first revision).
    revs    []revision  // 修改版本號列表
}

type keyIndex struct {
    key         []byte  // key的值
    modified    revision // the main rev of the last modification
    generations []generation  // 生命週期數組,每個generation代表key的一次創建到刪除的版本號變更流水
}

8.3 etcd 如何維護 key 的版本號數據

接下來以下圖爲例講解 keyIndex 的變更過程,進而說明 etcd 是如何維護版本號數據的。版本號由 main version 和 sub version 構成,每開啓一個寫事務則 main version 加 1,而在同一個寫事務中,每進行一次寫操作則 sub version 加 1。在該例子中,key 爲 12345,第一次寫入該 key 時,版本號爲 101:0,此時會創建第 0 代 generation。第二次寫入時,keyIndex 的修改版本號變成 102:0,並且會往第 0 代 generation 中 append 一個 102:0 的版本號。之後刪除該 key,會往第 0 代 generation 中 append 一個 103:0 的版本號,並且新創建一個空的第 1 代 generation。最後在 main version 爲 106 時再次寫入該 key,則會往第 1 代 generation 中 append 一個 106:0 的版本號。

這種 keyIndex 結構設計的優點是:

  1. 通過引入 generation 結構,存儲的時候不用區分修改版本號與刪除版本號,這兩類版本號在數據結構上是同構的。在判斷 key 是否存在時,我們只需要判斷 keyIndex 是否存在或者其最後一代 generation 是否爲空。

  2. 在查找 key 的指定版本號數據時,可以先查找 generation,然後再在 generation 中查找具體的 version,相當於將一個大數組的查找劃分爲兩個小數組的查找,加快了查找速度。

8.4 etcd 如何存儲每個版本號的數據

etcd 底層默認採用 boltdb 來存儲每個版本號對應的 value,boltdb 是採用 B + 樹來存儲數據的,如下圖所示。boltdb 實現的幾個關鍵點有:

  1. 元信息頁 metaPage 中存儲了整個 db 根節點的 page_id。

  2. 每個 key-value 鍵值對必須存儲在 bucket 桶中,每一個 bucket 桶的數據都由一個獨立的 B + 樹來維護。bucket 桶類似於命名空間的概念。

  3. boltdb 支持創建多個 bucket 桶,並且 bucket 桶支持嵌套創建子 bucket 桶。

boltdb 支持多 bucket 桶的設計,使業務數據可以按功能分類,不同類別的數據互不影響,佔較小存儲空間的 bucket 數據,其讀寫性能不會因爲 db 存在佔較大存儲空間的 bucket 而降低。

8.5 etcd 的歷史版本號數據壓縮清理

由於 etcd v3 版本數據庫會保存 key 的所有版本號數據,如果不進行定期壓縮清理的話,那數據庫佔用的空間將越來越大,因此 etcd 實現了兩種數據壓縮方式。

  1. 第一種方式:保存過去一段時間內的版本號數據,如下圖所示。etcd 支持配置一個壓縮週期,而採樣週期 = 壓縮週期 / 10,然後每個採樣週期都獲取下當前的 db 版本號,並將這個版本號 push 到一個隊列中,當隊列滿時 pop 出一個 version 然後開啓第一次壓縮,壓縮時該 version 之前的歷史版本號數據就可以清理掉了。之後每間隔一個壓縮週期,都執行一次壓縮邏輯。

  2. 第二種方式:保存指定數量的歷史版本號數據,如下圖所示。etcd 每間隔一個壓縮週期就執行一次壓縮,壓縮時的壓縮 version = 當前 db 的 version - 配置文件指定的保留版本號數量。

9. etcd 的 watch 機制

etcd 允許客戶端監聽某個 key 或者某段 key 範圍區間的變化。如果監聽範圍內的某個 key,其數據發生變化,則 watch 的客戶端會立刻收到通知。

9.1 etcd 的 watch 架構

etcd 的 watch 架構如下圖所示。watch 客戶端發出 watch 請求後,server 端首先創建一個 serverWatchStream,該 serverWatchStream 會創建一個 recv loop 協程和 send loop 協程。recv loop 協程負責監聽客戶端對具體 key 的 watch 請求,一旦收到 watch 請求就創建一個 watcher 並存儲在 watchableStore 的 synced watchers 中,如果客戶端指定從過去的歷史版本號開始監聽變化,則 watcher 會被存儲在 unsynced watchers 中(表示該 watcher 需要追趕監聽進度)。synced watchers 和 unsynced watchers 通過訪問擁有 mvcc 機制的數據層將相關事件通過 serverWatchStream 的 send loop 協程發送給客戶端。另外,synced watchers 和 unsynced watchers 有兩類數據結構存儲 watcher,map 結構負責存儲只 watch 一個 key 的 watcher,interval tree 負責存儲 watch 一段 key 範圍區間的 watcher。

9.2 etcd 中落後的 watcher 如何追趕進度

前面提到 unsynced watchers 中存儲的 watcher,其監聽版本號是小於當前數據庫版本號的,這些 watcher 需要一個機制去追趕數據庫版本進度。etcd 在 watchableStore 初始化的時候,會創建一個 syncWatchersLoop 協程,這個協程的工作邏輯如下圖所示:它會獲取 unsynced watchers 中的最小監聽版本號,然後根據這個版本號獲取相關的 event 事件併發送給客戶端。如果發送成功且監聽版本號已達到數據庫當前版本號,則將這些 watcher 移動到 synced watchers 中;如果發送失敗,則將這些 event 事件放入 victims 緩衝區,有一個 syncVictimsLoop 協程專門對 victims 中之前發送失敗的 event 事件進行重試。

9.3 etcd 中新的變更如何通知給 watcher

synced watchers 中存儲的 watcher,其監聽版本號雖然沒有落後於當前數據庫版本號,但數據庫的最新 event 事件依然需要通知給這些 watcher,具體的工作邏輯如下圖所示。其中部分邏輯與 syncWatchersLoop 協程的邏輯一致,這裏不再詳細講解。

10. etcd 的租約機制

10.1 etcd 租約的實現架構

etcd 租約實現的架構如下圖所示。lease api 層提供租約接口,其中 grant / revoke 負責處理創建、銷燬租約的請求,renew 負責續期租約的請求,attatch / detach 負責處理將 key 關聯或者取消關聯租約的請求。在 lessor 模塊中:leaseExpiredNotifierHeap 採用最小堆實現,用來檢測租約是否過期,checkpointHeap 採用最小堆實現,用來定時刷新每一個租約的剩餘 TTL,itemMap 負責維護每一個 key 關聯的租約信息,leaseMap 負責維護每一個租約 ID 的具體信息。

另外,這些涉及租約數據變化的 api 接口,都會走一輪 raft 算法,這樣各個節點的租約數據纔會一致。租約的續期和過期處理稍微特殊一點,這兩類操作統一由 leader 節點觸發。對於續期操作,續期成功後,follower 節點上租約的剩餘 TTL 依賴租約的 checkpoint 機制刷新;對於過期處理,leader 節點判斷租約過期後,會在 raft 算法層提議一個 revoke 撤銷租約的請求。etcd v3 版本中,如果不同 key 的 TTL 相同,則可以複用同一個租約 ID,這樣就顯著減少了租約數量。最後,租約數據會持久化到 boltdb。

10.2 如何理解租約的 checkpoint 機制

此 checkpoint 機制並不是生成租約數據的快照,而是 leader 節點爲每個租約,每間隔 cfg.CheckpointInterval(默認值爲 5min)時間就設置一個 checkpoint 時間點,然後 leader 節點每 500ms 爲那些已到 checkpoint 時間點的所有租約生成一個 pb.LeaseCheckpointRequest 請求,該請求會通過 raft 算法提議,提議通過後在 apply 到狀態機的時候刷新租約的剩餘 TTL。

這個 checkpoint 機制是爲了解決一個問題:leader 發生切換的時候,follower 節點升爲 leader 後會根據租約過期時間重建 leaseExpiredNotifierHeap 最小堆,如果 follower 節點之前一直不刷新剩餘 TTL 的話,那重建的時候就會採用創建租約時的總 TTL 時間(相當於自動進行了一次續期)。如果 leader 頻繁發生切換,切換時間小於租約的 TTL,這就會導致租約永遠無法刪除,導致 etcd 大量 key 堆積從而引發 db 大小超過配額等異常。

10.3 etcd 重啓時的租約恢復

grant api 接口創建租約的時候,會將租約數據(包括租約 ID、租約的總 TTL、租約的剩餘 TTL)持久化到 boltdb 的 leaseBucket 桶中,如下所示。

leaseBucketName = []byte("lease")
func (l *Lease) persistTo(b backend.Backend, ci cindex.ConsistentIndexer) {
    key := int64ToBytes(int64(l.ID))
    lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL}
    val, err := lpb.Marshal()

    b.BatchTx().Lock()
 // 租約數據寫入boltdb
    b.BatchTx().UnsafePut(leaseBucketName, key, val)
    b.BatchTx().Unlock()
}

另外,key-value 鍵值對寫入 boltdb 的 keyBucket 桶時,value 存儲的結構如下所示,包含了 key 關聯的租約 ID。

keyBucketName  = []byte("key")
kv := mvccpb.KeyValue{
        Key:            key,
        Value:          value,
        CreateRevision: c,
        ModRevision:    rev,
        Version:        ver,
        Lease:          int64(leaseID),  // key關聯的租約ID
    }

這樣 etcd 重啓的時候,通過遍歷 leaseBucket 桶來恢復所有的租約數據,然後再遍歷 keyBucket 桶來恢復數據 key 與租約 ID 的映射關係。

11. etcd 實現的一些關鍵總結

  1. 爲什麼 etcd v3 版本的 KeyIndex 使用 B-tree 而不使用哈希表、平衡二叉樹?答:從功能特性上分析, 因 etcd 需要支持範圍查詢,因此保存索引的數據結構也必須支持範圍查詢纔行。所以哈希表不適合,而 B-tree 支持範圍查詢。從性能上分析,平橫二叉樹每個節點只能容納一個數據、導致樹的高度較高,而 B-tree 每個節點可以容納多個數據,樹的高度更低,更扁平,涉及的查找次數更少,具有優越的增、刪、改、查性能。

  2. etcd v3 版本數據是採用 boltdb 存儲的,boltdb 對於每一個寫事務都會進行一次刷盤,那 etcd 爲了優化寫入性能,做了什麼樣的處理?答:採用批量提交的,也就是用底層 boltdb 的單個寫事務來處理上層業務 api 接口的多次寫入請求。etcd 批量提交的代碼實現如下:

// 寫入key-value對的時候,並不會開啓一個新的寫事務,還是沿用之前的t.tx寫入數據,然後將t.pending加1.
func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) {
    bucket := t.tx.Bucket(bucketName)
 // 省略若干代碼
    bucket.Put(key, value)
    t.pending++
}

// 在業務釋放batchTx的鎖時,如果t.pending達到一定值時,提交事務
func (t *batchTx) Unlock() {
    if t.pending >= t.backend.batchLimit {
        t.commit(false)
    }
    t.Mutex.Unlock()
}

// 提交上一個boltdb事務,然後再開一個新事務供後續數據寫入
func (t *batchTx) commit(stop bool) {
    // commit the last tx
    if t.tx != nil {
        if t.pending == 0 && !stop {
            return
        }
  t.tx.Commit()
        t.pending = 0
    }
    if !stop {
  // 進程沒有收到stop信號,則立即開啓一個新的boltdb寫事務用於接下來的寫入請求
        t.tx = t.backend.begin(true)
    }
}

// 每隔b.batchInterval時間,就檢查是否有待提交的寫入數據,如果有則提交下。
func (b *backend) run() {
    defer close(b.donec)
    t := time.NewTimer(b.batchInterval)
    defer t.Stop()
    for {
        select {
        case <-t.C:
        case <-b.stopc:
            b.batchTx.CommitAndStop()
            return
        }
        if b.batchTx.safePending() != 0 {
            b.batchTx.Commit()
        }
        t.Reset(b.batchInterval)
    }
}
  1. 採用批量提交,在尚未達到提交條件而系統宕機,會不會導致 v3 版本的部分數據丟失呢?答:不會,因爲宕機後重啓恢復的時候,可以通過回放 raft 日誌來恢復數據,而 v3 版本的存儲數據是支持 raft 日誌可重入回放的,在將 raft 日誌應用到 v3 版本數據的時候,會更新 consistentIndex,而這個 consistentIndex 在批量提交的時候也會 commit 到 boltdb 中。在系統宕機時,consistentIndex 的值也沒有刷盤,boltdb 底層保存的還是舊的 consistentIndex,這樣宕機後就可以通過重啓後的日誌回放來恢復數據。
// 保證日誌回放冪等性的consistentIndex也保存到底層boltdb
func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
    bs := ci.bytesBuf8
    binary.BigEndian.PutUint64(bs, ci.consistentIndex)
    // put the index into the underlying backend
    // tx has been locked in TxnBegin, so there is no need to lock it again
    tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
}

// 將raft日誌應用到v3版本數據的,只有日誌index大於consistentIndex(這個值代表已應用到v3版本數據的raft日誌索引)時,纔會apply到v3存儲,保證冪等性。
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
    shouldApplyV3 := false
    index := s.consistIndex.ConsistentIndex()
    if e.Index > index {
        // set the consistent index of current executing entry
        s.consistIndex.SetConsistentIndex(e.Index)
        shouldApplyV3 = true
    }
 // 省略若干代碼
}
  1. 採用批量提交,在尚未達到提交條件時,boltdb 底層的讀事務是讀不到這些數據的,那 etcd 是怎麼處理的?答:etcd 業務 api 接口開啓寫事務寫數據時,除寫一份到 boltdb 外,還寫一份數據到 txWriteBuffer,然後 api 接口在結束事務時將 txWriteBuffer 內存合併到 txReadBuffer。etcd 業務 api 讀接口會優先讀取 txReadBuffer 中的內容,然後再讀底層 boltdb 的數據。整個過程的代碼邏輯如下:
// 寫入數據前,通過這個函數開啓一個寫事務
func (s *store) Write(trace *traceutil.Trace) TxnWrite {
    s.mu.RLock()
    tx := s.b.BatchTx()
    tx.Lock()
    tw := &storeTxnWrite{
        storeTxnRead: storeTxnRead{s, tx, 0, 0, trace},
        tx:           tx,
        beginRev:     s.currentRev,
        changes:      make([]mvccpb.KeyValue, 0, 4),
    }
    return newMetricsTxnWrite(tw)
}

// 數據寫入boltdb(此時不一定會進行事務提交)後,同時寫入txWriteBuffer
func (t *batchTxBuffered) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
    t.batchTx.UnsafeSeqPut(bucketName, key, value)
    t.buf.putSeq(bucketName, key, value)
}

func (txw *txWriteBuffer) putSeq(bucket, k, v []byte) {
    b, ok := txw.buckets[string(bucket)]
    if !ok {
        b = newBucketBuffer()
        txw.buckets[string(bucket)] = b
    }
    b.add(k, v)
}

// 上層業務api寫請求的接口,寫入key-value鍵值對後,會調用func (tw *metricsTxnWrite) End()函數
func (tw *metricsTxnWrite) End() {
    defer tw.TxnWrite.End()   // 調用func (tw *storeTxnWrite) End()
 // 省略若干代碼
}

func (tw *storeTxnWrite) End() {
    // only update index if the txn modifies the mvcc state.
    if len(tw.changes) != 0 {
        tw.s.saveIndex(tw.tx)
        // hold revMu lock to prevent new read txns from opening until writeback.
        tw.s.revMu.Lock()
        tw.s.currentRev++
    }
    tw.tx.Unlock()  // 調用func (t *batchTxBuffered) Unlock()
    if len(tw.changes) != 0 {
        tw.s.revMu.Unlock()
    }
    tw.s.mu.RUnlock()
}

func (t *batchTxBuffered) Unlock() {
    if t.pending != 0 {
        t.backend.readTx.Lock() // blocks txReadBuffer for writing.
        t.buf.writeback(&t.backend.readTx.buf)  //調用func (txw *txWriteBuffer) writeback(txr *txReadBuffer)函數
        t.backend.readTx.Unlock()
        if t.pending >= t.backend.batchLimit {
            t.commit(false)  // 寫請求達到批量提交條件時提交boltdb寫事務,這個函數也會等待所有業務api讀請求結束後然後將txReadBuffer清掉,最後再開啓一個新的boltdb讀事務以供api讀請求使用。此時,下一個業務api讀請求必須要等待該boltdb寫事務刷盤結束後才能開始
        }
    }
    t.batchTx.Unlock()
}

// 將本次寫入的數據合併到txReadBuffer中,以提供讀請求讀取
func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
    for k, wb := range txw.buckets {
        rb, ok := txr.buckets[k]
        if !ok {
            delete(txw.buckets, k)
            txr.buckets[k] = wb
            continue
        }
        if !txw.seq && wb.used > 1 {
            // assume no duplicate keys
            sort.Sort(wb)
        }
        rb.merge(wb)
    }
    txw.reset()
}

// 結合txReadBuffer和底層boltdb,讀取數據
func (rt *concurrentReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte)
  1. keyIndex 結構中的 generation 結構爲什麼要存儲創建版本號?答:對於指定 key,創建其第 N 代生命週期的 generation 時,該 generation 的創建版本號與第一個修改版本號必然一樣,這樣看似乎創建版本號是冗餘的。但其實不然,因爲 keyIndex 有壓縮邏輯(清理太久之前的版本號數據),壓縮後該 generation 修改版本號列表中的第一個版本號就不是創建版本號了。因此,如果要獲取 key 的創建版本號,那 generation 就必須存儲一個創建版本號。
// generation contains multiple revisions of a key.
type generation struct {
    ver     int64 // 修改次數
    created revision // when the generation is created (put in first revision).
    revs    []revision  // 修改版本號列表
}

type keyIndex struct {
    key         []byte  // key的值
    modified    revision // the main rev of the last modification
    generations []generation  // 生命週期數組,每個generation代表key的一次創建到刪除的版本號變更流水
}
  1. etcd 爲了壓縮 raft 日誌,需要定時生成 snapshot 文件,然後清掉過期的 raft 日誌,那麼生成 snapshot 文件會不會很耗性能?答:對於 v3 版本數據,底層 boltdb 保存了當前 apply 到數據庫的最大 raft 日誌 index,因此一般情況下 v3 版本的數據不需要生成 snapshot 文件,boltdb 自身的數據本身就存儲在磁盤文件上(進程通過 mmap 機制進行讀寫),在重啓恢復的時候,v3 版本的數據可以直接加載 boltdb 的數據庫文件,然後通過回放 raft 日誌(v3 數據的日誌回放有冪等性保證)來恢復數據。

    但有一種場景 leader 需要將 v3 版本的 boltdb 文件發送給 follower,比如:如果 follower 節點進度太落後,其所需的 raft 日誌已被 leader 節點壓縮清理掉。這種場景,leader 將 v2 版本的 snapshot 數據 + 當前 boltdb 的文件 合併成一個 MergedSnapshot 發送給 follower。follower 節點收到後依次恢復 v2 和 v3 版本的數據,此時 v2 和 v3 版本數據的進度存在不一致(v3 版本的數據比較新),隨後 v2 版本的數據通過日誌回放追趕上,而 v3 版本的數據通過 boltdb 中的 consistentIndex 確保日誌回放的冪等性。在發送boltdb文件的時候,首先開啓一個boltdb的讀事務,然後創建一個pipe管道,讀事務每次最多從boltdb文件讀取32K寫入到pipe的write端,另一個協程則與讀事務協程交替着讀取、寫入pipe,讀取pipe內容後,通過http的流式傳輸發送給follower

// 用boltdb保存當前apply到數據庫的最大raft日誌index
func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
    bs := ci.bytesBuf8
    binary.BigEndian.PutUint64(bs, ci.consistentIndex)
    // put the index into the underlying backend
    // tx has been locked in TxnBegin, so there is no need to lock it again
    tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
}

func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
    shouldApplyV3 := false
    index := s.consistIndex.ConsistentIndex()
    if e.Index > index {
        // 保證冪等性,v3版本數據庫支持raft日誌的重複回放,對於重複的raft日誌會忽略掉
        // set the consistent index of current executing entry
        s.consistIndex.SetConsistentIndex(e.Index)
        shouldApplyV3 = true
    }
 // 省略若干代碼
}
  1. 集羣成員節點變更流程種,從代碼上看 Cnew 配置生效的時間節點與 raft 論文不一樣,怎麼理解,不會影響到正確性嗎?答:下面這個圖是 raft 論文中的,論文中 Cnew 日誌產生開始就可以 make dicision alone。看 etcd 的代碼,第一階段是隻需要 Cold 多數派達成一致就進入 Cold 和 Cnew 的聯合一致狀態 join consensus,第二階段是需要 Cold 和 Cnew 兩個多數派達成一致才實現最終 Cnew 配置(因爲 etcd 的代碼實現,集羣節點配置都是日誌 apply 的時候才生效)。

    這樣來看 Cnew 配置生效的時間節點與 raft 論文不一樣,但不會影響算法的正確性,理由是:raft 論文中描述的是 Cold 可以單獨做多數派決定的最晚時間點以及 Cnew 可以單獨做多數派決定的的最早時間點,也就是 join consensus 的最小生命週期。而在工程實現中,join consensus 生命週期只要包含那個最小生命週期就可以了,實際上只要保證 Cold 中的多數派在用 Cold 配置 以及 Cnew 中的多數派在用 Cnew 配置 這兩種情況不會同時發生就可以。

  1. etcd 是如何實現線性一致性讀的?答:因爲在網上查到講得比較好的文章,這裏就不再細講。感興趣的同學可以參考下面兩篇文章。etcd-raft 的線性一致讀方法一:ReadIndexetcd-raft 的線性一致讀方法二:LeaseRead

參考文章

Raft 成員變更的工程實踐

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/x-AdmN0UN5KT58XWO1BCOA