etcd-raft 模塊如何實現分佈式一致性?

你好,我是 aoho,大家週末快樂。今天我和你分享的主題是:etcd-raft 模塊如何實現分佈式一致性?

我們在上一篇介紹了 etcd 讀寫操作的底層實現,但至於 etcd 集羣如何實現分佈式數據一致性並沒有詳細介紹。在分佈式環境中,常用數據複製來避免單點故障,實現多副本,提高服務的高可用性以及系統的吞吐量。etcd 集羣中的多個節點不可避免地會出現相互之間數據不一致的情況。但不管是同步複製、異步複製還是半同步複製,會存在可用性或者一致性的問題。解決多個節點數據一致性的方案其實就是共識算法,常見的共識算法有 Paxos 和 Raft。Zookeeper 使用的 Zab 協議,etcd 使用的共識算法就是 Raft。

本課時將會首先介紹如何使用 raftexample,接着介紹 etcd-raft 模塊的實現。etcd-raft 模塊是 etcd 中解決分佈式一致性的模塊,我們結合源碼分析下 raft 在 etcd 中的實現。

使用 raftexample

etcd 項目中包含了 Raft 庫使用的示例。raftexample 基於 etcd-raft 庫實現了鍵值對存儲服務器。

raftexample 的入口方法實現代碼如下所示:

func main() {
 cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")
 id := flag.Int("id", 1, "node ID")
 kvport := flag.Int("port", 9121, "key-value server port")
 join := flag.Bool("join", false, "join an existing cluster")
 flag.Parse()
  // 構建 propose
 proposeC := make(chan string)
 defer close(proposeC)
 confChangeC := make(chan raftpb.ConfChange)
 defer close(confChangeC)

 // raft 爲來自http api的提案提供 commit 流
 var kvs *kvstore
 getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }
 commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)

 kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)

  // 鍵值對的處理器將會向 raft 發起提案來更新
  serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)
}

在入口函數中創建了兩個 channel:proposeC 用於提交寫入的數據;confChangeC 用於提交配置改動數據。

然後分別啓動如下核心的 goroutine:

到了這裏,已經對 raft 的使用有一個基本的概念了,即通過 node 結構體實現的 Node 接口與 raft 庫進行交互,涉及數據變更的核心數據結構就是 Ready 結構體,接下來可以進一步來分析該庫的實現了。

etcd raft 實現

raft 庫對外提供一個 Node 的 interface,由 raft/node.go 中的 node 結構體實現,這也是應用層唯一需要與這個 raft 庫直接打交道的結構體, Node 接口需要實現的函數包括:Tick、Propose、Ready、Step 等。

我們重點需要了解 Ready,這是一個核心函數,將返回 Ready 對應的 channel,該通道表示當前時間點的 channel。應用層需要關注該 channel,當發生變更時,其中的數據也將會進行相應的操作。其他的函數對應的功能如下:

接着是 raft 算法的實現,node 結構體實現了 Node 接口,其定義如下:

type node struct {
 propc      chan msgWithResult
 recvc      chan pb.Message
 confc      chan pb.ConfChangeV2
 confstatec chan pb.ConfState
 readyc     chan Ready
 advancec   chan struct{}
 tickc      chan struct{}
 done       chan struct{}
 stop       chan struct{}
 status     chan chan Status

 rn *RawNode
}

這個結構體會在後面經常用到。在 raft/raft.go 中還有兩個核心數據結構:

節點狀態

我們來看看 raft StateMachine 的狀態機轉換,實際上就是 raft 算法中各種角色的轉換。每個 raft 節點,可能具有以下三種狀態中的一種:

每一個狀態都有其對應的狀態機,每次收到一條提交的數據時,都會根據其不同的狀態將消息輸入到不同狀態的狀態機中。同時,在進行 tick 操作時,每種狀態對應的處理函數也是不一樣的。

因此 raft 結構體中將不同的狀態及其不同的處理函數,獨立出來幾個成員變量:

狀態轉換

etcd-raft StateMachine 封裝在 raft 機構體中,其狀態轉換如下圖:

raft-StateMachine.png

raft state 轉換的調用接口都在 raft.go 中,定義如下:

func (r *raft) becomeFollower(term uint64, lead uint64)
func (r *raft) becomePreCandidate()
func (r *raft) becomeCandidate()
func (r *raft) becomeLeader()

raft 在各種狀態下,如何驅動 raft StateMachine 狀態機運轉?etcd 將 raft 相關的所有處理都抽象爲了 Msg,通過 Step 接口處理:

func (r *raft) Step(m pb.Message) error {
    r.step(r, m)
}

其中 step 是一個回調函數,在不同的 state 會設置不同的回調函數來驅動 raft,這個回調函數 stepFunc 就是在 becomeXX() 函數完成的設置

type raft struct {
    ...
    step stepFunc
}

step 回調函數有如下幾個值,其中 stepCandidate 會處理 PreCandidate 和 Candidate 兩種狀態:

func stepFollower(r *raft, m pb.Message) error
func stepCandidate(r *raft, m pb.Message) error
func stepLeader(r *raft, m pb.Message) error

這裏以 stepCandidate 爲例說明:

func stepCandidate(r *raft, m pb.Message) error {
    ...
    switch m.Type {
    case pb.MsgProp:
        r.logger.Infof("%x no Leader at term %d; dropping proposal", r.id, r.Term)
        return ErrProposalDropped
    case pb.MsgApp:
        r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
        r.handleAppendEntries(m)
    case pb.MsgHeartbeat:
        r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
        r.handleHeartbeat(m)
    case pb.MsgSnap:
        r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
        r.handleSnapshot(m)
    case myVoteRespType:
        ...
    case pb.MsgTimeoutNow:
        r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
    }
    return nil
}

即對各種 Msg 進行處理,這裏就不展開詳細展開。我們來看下 raft 消息的類型及其定義。

raft 消息

raft 算法本質上是一個大的狀態機,任何的操作例如選舉、提交數據等,最後封裝成一個消息結構體,輸入到 raft 算法庫的狀態機中。

在 raft/raftpb/raft.proto 文件中,定義了 raft 算法中傳輸消息的結構體。熟悉 raft 論文的都知道,raft 算法其實由好幾個協議組成,但是在這裏,統一定義在了 Message 這個結構體之中,以下總結了該結構體的成員用途。

// 位於 raft/raftpb/raft.pb.go:295
type Message struct {
 Type             MessageType `protobuf:"varint,1,opt,` // 消息類型
 To               uint64      `protobuf:"varint,2,opt,` // 消息接收者的節點ID
 From             uint64      `protobuf:"varint,3,opt,` // 消息發送者的節點ID
 Term             uint64      `protobuf:"varint,4,opt,` // 任期ID
 LogTerm          uint64      `protobuf:"varint,5,opt,` // 日誌所處的任期ID
 Index            uint64      `protobuf:"varint,6,opt,` // 日誌索引ID,用於節點向Leader彙報自己已經commit的日誌數據ID
 Entries          []Entry     `protobuf:"bytes,7,rep,` // 日誌條目數組
 Commit           uint64      `protobuf:"varint,8,opt,` // 提交日誌索引
 Snapshot         Snapshot    `protobuf:"bytes,9,opt,` // 快照數據
 Reject           bool        `protobuf:"varint,10,opt,` // 是否拒絕
 RejectHint       uint64      `protobuf:"varint,11,opt,` // 拒絕同步日誌請求時返回的當前節點日誌ID,用於被拒絕方快速定位到下一次合適的同步日誌位置
 Context          []byte      `protobuf:"bytes,12,opt,` // 上下文數據
 XXX_unrecognized []byte      `json:"-"`
}

Message 結構體相關的數據類型爲 MessageType,MessageType 有十九種。當然,並不是所有的消息類型都會用到上面定義的 Message 結構體中的所有字段,因此其中有些字段是 optinal 的,我其中常用的協議(即不同的消息類型)的用途總結成如下的表格:

eqDVF7

上表列出了消息的類型對應的功能、消息接收者的節點 ID 和 消息發送者的節點 ID。在收到消息之後,根據消息類型檢索本表,幫助我們理解 raft 算法的操作。

選舉流程

raft 一致性算法實現的關鍵有 Leader 選舉、日誌複製和安全性限制。Leader 故障後集羣能快速選出新 Leader;日誌複製, 集羣只有 Leader 能寫入日誌, Leader 負責複製日誌到 Follower 節點,並強制 Follower 節點與自己保持相同;安全性

raft 算法的第一步是首先選舉出 Leader 出來,在 Leader 出現故障後也需要快速選出新 Leader,所以我們來關注下選舉的流程。

發起選舉的節點

只有在 Candidate 或者 Follower 狀態下的節點,纔有可能發起一個選舉流程,而這兩種狀態的節點,其對應的 tick 函數爲 raft.tickElection 函數,用來發起選舉和選舉超時控制。選舉流程如下所示:

收到選舉消息的節點

當收到任期號大於當前節點任期號的消息,同時該消息類型如果是選舉類的消息(類型爲 prevote 或者 vote)時,會做以下判斷:

如果不是強制要求選舉,同時又在租約期以內,那麼就忽略該選舉消息返回不進行處理,這麼做是爲了避免出現那些離開集羣的節點,頻繁發起新的選舉請求。

只有在同時滿足以上兩個條件的情況下,才能同意該節點的選舉,否則都會被拒絕。這麼做的原因是:保證最後能勝出來當新的 Leader 的節點,它上面的日誌都是最新的。

日誌複製

選舉好 Leader 之後,Leader 在收到 put 提案時,如何將提案複製給其他 Follower 呢?

我們回顧下前面課時所講的 etcd 讀寫請求的處理流程。以下面的圖示來說明日誌複製的流程。

接着看看 Leader 怎麼將日誌數據複製到 Follower 節點。

當這個命令提交完成了之後,命令就可以提交給應用層了。

小結

本文主要介紹了 etcd-raft 模塊實現分佈式一致性的原理,通過 raftexample 瞭解 raft 模塊的使用方式和過程。接着重點介紹了選舉流程和日誌複製的過程。除此之外,etcd 還有安全性限制保證日誌選舉和日誌複製的正確性,比如 raft 算法中,並不是所有節點都能成爲 Leader。一個節點要成爲 Leader,需要得到集羣中半數以上節點的投票,而一個節點會投票給一個節點,其中一個充分條件是:這個進行選舉的節點的日誌,比本節點的日誌更新。其他還有判斷日誌的新舊以及提交前面任期的日誌條目等措施。

最後,留一個問題,什麼情況會選舉超時到來時沒有任何一個節點成爲 Leader,後續會怎麼處理呢?歡迎你在留言區提出。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/eAHY6KCPngUuIAKcDpq7wQ