etcd-raft 模塊如何實現分佈式一致性?
你好,我是 aoho,大家週末快樂。今天我和你分享的主題是:etcd-raft 模塊如何實現分佈式一致性?
我們在上一篇介紹了 etcd 讀寫操作的底層實現,但至於 etcd 集羣如何實現分佈式數據一致性並沒有詳細介紹。在分佈式環境中,常用數據複製來避免單點故障,實現多副本,提高服務的高可用性以及系統的吞吐量。etcd 集羣中的多個節點不可避免地會出現相互之間數據不一致的情況。但不管是同步複製、異步複製還是半同步複製,會存在可用性或者一致性的問題。解決多個節點數據一致性的方案其實就是共識算法,常見的共識算法有 Paxos 和 Raft。Zookeeper 使用的 Zab 協議,etcd 使用的共識算法就是 Raft。
本課時將會首先介紹如何使用 raftexample,接着介紹 etcd-raft 模塊的實現。etcd-raft 模塊是 etcd 中解決分佈式一致性的模塊,我們結合源碼分析下 raft 在 etcd 中的實現。
使用 raftexample
etcd 項目中包含了 Raft 庫使用的示例。raftexample 基於 etcd-raft 庫實現了鍵值對存儲服務器。
raftexample 的入口方法實現代碼如下所示:
func main() {
cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")
id := flag.Int("id", 1, "node ID")
kvport := flag.Int("port", 9121, "key-value server port")
join := flag.Bool("join", false, "join an existing cluster")
flag.Parse()
// 構建 propose
proposeC := make(chan string)
defer close(proposeC)
confChangeC := make(chan raftpb.ConfChange)
defer close(confChangeC)
// raft 爲來自http api的提案提供 commit 流
var kvs *kvstore
getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }
commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)
kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)
// 鍵值對的處理器將會向 raft 發起提案來更新
serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)
}
在入口函數中創建了兩個 channel:proposeC 用於提交寫入的數據;confChangeC 用於提交配置改動數據。
然後分別啓動如下核心的 goroutine:
-
啓動 HTTP 服務器,用於接收用戶的請求數據,最終會將用戶請求的數據寫入前面的 proposeC/confChangeC channel 中。
-
啓動 raftNode 結構體,該結構體中有上面提到的 raft/node.go 中的 node 結構體,也就是通過該結構體實現的 Node 接口與 raft 庫進行交互。同時,raftNode 還會啓動協程監聽前面的兩個 channel,收到數據之後通過 Node 接口函數調用 raft 庫對應的接口。
-
HTTP 服務負責接收用戶數據,再寫入到兩個核心 channel 中,而 raftNode 負責監聽這兩個 channel:如果收到 proposeC channel 的消息,說明有數據提交,則調用 Node.Propose 函數進行數據的提交;如果收到 confChangeC channel 的消息,說明有配置變更,則調用 Node.ProposeConfChange 函數進行配置變更。
-
設置定時器 tick,到時間後調用 Node.Tick 函數。
-
監聽 Node.Ready 函數返回的 Ready 結構體 channel,有數據變更時,根據 Ready 結構體的不同數據類型進行相應的操作,之後需要調用 Node.Advance 函數進行收尾。
到了這裏,已經對 raft 的使用有一個基本的概念了,即通過 node 結構體實現的 Node 接口與 raft 庫進行交互,涉及數據變更的核心數據結構就是 Ready 結構體,接下來可以進一步來分析該庫的實現了。
etcd raft 實現
raft 庫對外提供一個 Node 的 interface,由 raft/node.go 中的 node 結構體實現,這也是應用層唯一需要與這個 raft 庫直接打交道的結構體, Node 接口需要實現的函數包括:Tick、Propose、Ready、Step 等。
我們重點需要了解 Ready,這是一個核心函數,將返回 Ready 對應的 channel,該通道表示當前時間點的 channel。應用層需要關注該 channel,當發生變更時,其中的數據也將會進行相應的操作。其他的函數對應的功能如下:
-
Tick:滴答時鐘,最終會觸發發起選舉或者心跳;
-
Campaign:向 raft StateMachine 提交本地選舉 MsgHup;
-
Propose:通過 channel 向 raft StateMachine 提交一個 Op,提交的是本地 MsgProp Msg;
-
ProposeConfChange:通過 propc channel 向 raft StateMachine 提交一個配置變更的請求,提交的也是本地 MsgProp Msg;
-
Step:節點收到 Peer 節點發送的 Msg 的時候會通過這個接口提交給 raft - StateMachine,Step 接口通過 recvc channel 向 raft StateMachine 傳遞這個 Msg;
-
TransferLeadership:提交 Transfer Leader 的 Msg;
-
ReadIndex:提交 read only Msg。
接着是 raft 算法的實現,node 結構體實現了 Node 接口,其定義如下:
type node struct {
propc chan msgWithResult
recvc chan pb.Message
confc chan pb.ConfChangeV2
confstatec chan pb.ConfState
readyc chan Ready
advancec chan struct{}
tickc chan struct{}
done chan struct{}
stop chan struct{}
status chan chan Status
rn *RawNode
}
這個結構體會在後面經常用到。在 raft/raft.go 中還有兩個核心數據結構:
-
Config:與 raft 算法相關的配置參數都包裝在該結構體中。該結構體的命名是大寫字母開頭,用於提供給外部調用。
-
raft:具體實現 raft 算法的結構體。
節點狀態
我們來看看 raft StateMachine 的狀態機轉換,實際上就是 raft 算法中各種角色的轉換。每個 raft 節點,可能具有以下三種狀態中的一種:
-
Candidate:候選人狀態,節點切換到這個狀態時,意味着將進行一次新的選舉。
-
Follower:跟隨者狀態,節點切換到這個狀態時,意味着選舉結束。
-
Leader:領導者狀態,所有數據提交都必須先提交到 Leader 上。
每一個狀態都有其對應的狀態機,每次收到一條提交的數據時,都會根據其不同的狀態將消息輸入到不同狀態的狀態機中。同時,在進行 tick 操作時,每種狀態對應的處理函數也是不一樣的。
因此 raft 結構體中將不同的狀態及其不同的處理函數,獨立出來幾個成員變量:
-
state 保存當前節點狀態;
-
tick 函數,每個狀態對應的 tick 函數不同;
-
step,狀態機函數,同樣每個狀態對應的狀態機也不相同。
狀態轉換
etcd-raft StateMachine 封裝在 raft 機構體中,其狀態轉換如下圖:
raft-StateMachine.png
raft state 轉換的調用接口都在 raft.go 中,定義如下:
func (r *raft) becomeFollower(term uint64, lead uint64)
func (r *raft) becomePreCandidate()
func (r *raft) becomeCandidate()
func (r *raft) becomeLeader()
raft 在各種狀態下,如何驅動 raft StateMachine 狀態機運轉?etcd 將 raft 相關的所有處理都抽象爲了 Msg,通過 Step 接口處理:
func (r *raft) Step(m pb.Message) error {
r.step(r, m)
}
其中 step 是一個回調函數,在不同的 state 會設置不同的回調函數來驅動 raft,這個回調函數 stepFunc 就是在 becomeXX()
函數完成的設置
type raft struct {
...
step stepFunc
}
step 回調函數有如下幾個值,其中 stepCandidate 會處理 PreCandidate 和 Candidate 兩種狀態:
func stepFollower(r *raft, m pb.Message) error
func stepCandidate(r *raft, m pb.Message) error
func stepLeader(r *raft, m pb.Message) error
這裏以 stepCandidate 爲例說明:
func stepCandidate(r *raft, m pb.Message) error {
...
switch m.Type {
case pb.MsgProp:
r.logger.Infof("%x no Leader at term %d; dropping proposal", r.id, r.Term)
return ErrProposalDropped
case pb.MsgApp:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleAppendEntries(m)
case pb.MsgHeartbeat:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleHeartbeat(m)
case pb.MsgSnap:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleSnapshot(m)
case myVoteRespType:
...
case pb.MsgTimeoutNow:
r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
}
return nil
}
即對各種 Msg 進行處理,這裏就不展開詳細展開。我們來看下 raft 消息的類型及其定義。
raft 消息
raft 算法本質上是一個大的狀態機,任何的操作例如選舉、提交數據等,最後封裝成一個消息結構體,輸入到 raft 算法庫的狀態機中。
在 raft/raftpb/raft.proto 文件中,定義了 raft 算法中傳輸消息的結構體。熟悉 raft 論文的都知道,raft 算法其實由好幾個協議組成,但是在這裏,統一定義在了 Message 這個結構體之中,以下總結了該結構體的成員用途。
// 位於 raft/raftpb/raft.pb.go:295
type Message struct {
Type MessageType `protobuf:"varint,1,opt,` // 消息類型
To uint64 `protobuf:"varint,2,opt,` // 消息接收者的節點ID
From uint64 `protobuf:"varint,3,opt,` // 消息發送者的節點ID
Term uint64 `protobuf:"varint,4,opt,` // 任期ID
LogTerm uint64 `protobuf:"varint,5,opt,` // 日誌所處的任期ID
Index uint64 `protobuf:"varint,6,opt,` // 日誌索引ID,用於節點向Leader彙報自己已經commit的日誌數據ID
Entries []Entry `protobuf:"bytes,7,rep,` // 日誌條目數組
Commit uint64 `protobuf:"varint,8,opt,` // 提交日誌索引
Snapshot Snapshot `protobuf:"bytes,9,opt,` // 快照數據
Reject bool `protobuf:"varint,10,opt,` // 是否拒絕
RejectHint uint64 `protobuf:"varint,11,opt,` // 拒絕同步日誌請求時返回的當前節點日誌ID,用於被拒絕方快速定位到下一次合適的同步日誌位置
Context []byte `protobuf:"bytes,12,opt,` // 上下文數據
XXX_unrecognized []byte `json:"-"`
}
Message 結構體相關的數據類型爲 MessageType,MessageType 有十九種。當然,並不是所有的消息類型都會用到上面定義的 Message 結構體中的所有字段,因此其中有些字段是 optinal 的,我其中常用的協議(即不同的消息類型)的用途總結成如下的表格:
上表列出了消息的類型對應的功能、消息接收者的節點 ID 和 消息發送者的節點 ID。在收到消息之後,根據消息類型檢索本表,幫助我們理解 raft 算法的操作。
選舉流程
raft 一致性算法實現的關鍵有 Leader 選舉、日誌複製和安全性限制。Leader 故障後集羣能快速選出新 Leader;日誌複製, 集羣只有 Leader 能寫入日誌, Leader 負責複製日誌到 Follower 節點,並強制 Follower 節點與自己保持相同;安全性
raft 算法的第一步是首先選舉出 Leader 出來,在 Leader 出現故障後也需要快速選出新 Leader,所以我們來關注下選舉的流程。
發起選舉的節點
只有在 Candidate 或者 Follower 狀態下的節點,纔有可能發起一個選舉流程,而這兩種狀態的節點,其對應的 tick 函數爲 raft.tickElection 函數,用來發起選舉和選舉超時控制。選舉流程如下所示:
-
節點啓動時都以 Follower 啓動,同時隨機生成自己的選舉超時時間。之所以每個節點隨機選擇自己的超時時間,是爲了避免同時有兩個節點同時進行選舉,這種情況下會出現沒有任何一個節點贏得半數以上的投票從而這一輪選舉失敗,繼續再進行下一輪選舉。
-
在 Follower 的 tick 函數 tickElection 函數中,當選舉超時,節點向自己發送 MsgHup 消息。
-
在狀態機函數 raft.Step 函數中,在收到 MsgHup 消息之後,節點首先判斷當前有沒有沒有 apply 的配置變更消息,如果有就忽略該消息。其原因在於,當有配置更新的情況下不能進行選舉操作,即要保證每一次集羣成員變化時只能同時變化一個,不能同時有多個集羣成員的狀態發生變化。
-
否則進入 campaign 函數中進行選舉:首先將任期號 +1,然後廣播給其他節點選舉消息,帶上的其它字段包括:節點當前的最後一條日誌索引(Index 字段),最後一條日誌對應的任期號(LogTerm 字段),選舉任期號(Term 字段,即前面已經進行 +1 之後的任期號),Context 字段(目的是爲了告知這一次是否是 Leader 轉讓類需要強制進行選舉的消息)。
-
如果在一個選舉超時之內,該發起新的選舉流程的節點,得到了超過半數的節點投票,那麼狀態就切換到 Leader 狀態,成爲 Leader 的同時,Leader 將發送一條 dummy 的 append 消息,目的是爲了提交該節點上在此任期之前的值。
收到選舉消息的節點
當收到任期號大於當前節點任期號的消息,同時該消息類型如果是選舉類的消息(類型爲 prevote 或者 vote)時,會做以下判斷:
-
首先判斷該消息是否爲強制要求進行選舉的類型(context 爲 campaignTransfer,context 爲這種類型時表示在進行 Leader 轉讓,流程見下面的 Leader 轉讓流程)
-
判斷當前是否在租約期以內,判斷的條件包括:checkQuorum 爲 true,當前節點保存的 Leader 不爲空,沒有到選舉超時,前面這三個條件同時滿足。
如果不是強制要求選舉,同時又在租約期以內,那麼就忽略該選舉消息返回不進行處理,這麼做是爲了避免出現那些離開集羣的節點,頻繁發起新的選舉請求。
-
如果不是前面的忽略選舉消息的情況,那麼除非是 prevote 類的選舉消息,在收到其他消息的情況下,該節點都切換爲 Follower 狀態。
-
此時需要針對投票類型中帶來的其他字段進行處理了,需要同時滿足以下兩個條件:
-
只有在沒有給其他節點進行過投票,或者消息的 term 任期號大於當前節點的任期號,或者之前的投票給的就是這個發出消息的節點
-
進行選舉的節點,它的日誌是更新的,條件爲:logterm 比本節點最新日誌的任期號大,在兩者相同的情況下,消息的 index 大於等於當前節點最新日誌的 index,即總要保證該選舉節點的日誌比自己的大。
只有在同時滿足以上兩個條件的情況下,才能同意該節點的選舉,否則都會被拒絕。這麼做的原因是:保證最後能勝出來當新的 Leader 的節點,它上面的日誌都是最新的。
日誌複製
選舉好 Leader 之後,Leader 在收到 put 提案時,如何將提案複製給其他 Follower 呢?
我們回顧下前面課時所講的 etcd 讀寫請求的處理流程。以下面的圖示來說明日誌複製的流程。
-
收到客戶端請求之後,etcd Server 的 KVServer 模塊會向 Raft 模塊提交一個類型爲 MsgProp 的提案消息。
-
Leader 節點在本地添加一條日誌,其對應的命令爲
put foo bar
。這裏涉及到兩個索引值,committedIndex 存儲的最後一條提交(commit)日誌的索引,appliedIndex 存儲的是最後一條應用到狀態機中的日誌索引值,一條日誌只有被提交了才能應用到狀態機中,因此總有 committedIndex >= appliedIndex 不等式成立。在這裏只是添加一條日誌還並沒有提交,兩個索引值還指向上一條日誌。 -
Leader 節點向集羣中其他節點廣播 AppendEntries 消息,帶上 put 命令。
接着看看 Leader 怎麼將日誌數據複製到 Follower 節點。
-
收到 AppendEntries 請求的 Follower 節點,同樣在本地添加了一條新的日誌,也還沒有提交。
-
Follower 節點向 Leader 節點應答 AppendEntries 消息。
-
當 Leader 節點收到集羣半數以上節點的 AppendEntries 請求的應答消息時,認爲
put foo bar
命令成功複製,可以進行提交,於是修改了本地 committed 日誌的索引指向最新的存儲put foo bar
的日誌,而 appliedIndex 還是保持着上一次的值,因爲還沒有應用該命令到狀態機中。
當這個命令提交完成了之後,命令就可以提交給應用層了。
-
提交命令完成,給應用層說明這條命令已經提交。此時修改 appliedIndex 與 committedIndex 一樣了。
-
Leader 節點在下一次給 Follower 的 AppendEntries 請求中,會帶上當前最新的 committedIndex 索引值,Follower 收到之後同樣會修改本地日誌的 committedIndex 索引。
小結
本文主要介紹了 etcd-raft 模塊實現分佈式一致性的原理,通過 raftexample 瞭解 raft 模塊的使用方式和過程。接着重點介紹了選舉流程和日誌複製的過程。除此之外,etcd 還有安全性限制保證日誌選舉和日誌複製的正確性,比如 raft 算法中,並不是所有節點都能成爲 Leader。一個節點要成爲 Leader,需要得到集羣中半數以上節點的投票,而一個節點會投票給一個節點,其中一個充分條件是:這個進行選舉的節點的日誌,比本節點的日誌更新。其他還有判斷日誌的新舊以及提交前面任期的日誌條目等措施。
最後,留一個問題,什麼情況會選舉超時到來時沒有任何一個節點成爲 Leader,後續會怎麼處理呢?歡迎你在留言區提出。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/eAHY6KCPngUuIAKcDpq7wQ