分佈式系統 Consul 一致性實現即 Raft 日誌複製原理

彭榮新,茄子科技老司機一枚,長期專注基礎架構領域,對中間件的的研發和治理以及穩定性保障有豐富的實踐經驗。

**背景
**

Consul 是一個非常強大的服務發現和配置管理工具,可以幫助您簡化服務管理流程,提高系統的可用性和可擴展性,是目前非常流行的服務發現和配置管理系統,支持高可用,可擴展,多數據中心的分佈式系統,是很多公司的基礎實施組件,這些架構的優點的背後是基於分佈式協議 raft 的實現,raft 協議的理論有很多,以前需要根據 paxos 來實現,像 zk 自己實現了一套 zap 的協議來實現數據的複製和一致性,hdfs 的 namenode 日誌高可用也是基於 paxos 實現,技術發展就是快,現在要實現一致性,高可靠,多副本基本上都是採用 raft 協議來實現,真正講 raft 日誌複製實現的比較少,比如 k8s 用的 etcd,nacos cp 模型也有 raft 的實現,rocketmq 也有 raft 實現 slave 選舉,這篇文章主要分享 consul raft 協議日誌複製的實現原理,嘗試講明白寫日誌複製,日誌順序,過半提交,一致性檢查等相關的知識點和實現原理。

Consul Raft

raft 算法主要包含兩個部分,分別是 leader 選舉和日誌複製,leader 選舉我們不分析,我們主要分析日誌複製的實現原理,下面我們以 consul 的 key value 存儲的寫場景入手,一步步分析寫請求的實現邏輯,是怎麼實現 raft 日誌複製保證一致性的, 內容會比較長,會涉及到如下知識點:

Consul Agent 請求

客戶端發起一個 put key value 的 http 請求,由 kvs_endpoint.go 的 KVSEndpoint func 處理,put 的方法會路由給 KVSPut 處理,除了一些校驗外和請求標識,比如是否有獲取鎖 acquire 或者 release,這裏提下一個檢查,就是 value 的大小檢查,和 web 容器一樣檢查防止請求數據太大,可以通過參數 kv_max_value_size 控制,如果超過返回狀態碼 413,標準的 http 狀態碼。

檢查都 OK 後,consul agent 就開始請求 consul server 了,當然還是 rpc 操作

// Copy the value
buf := bytes.NewBuffer(nil)
// 這裏纔開始讀請求的數據。
if _, err := io.Copy(buf, req.Body); err != nil {
   return nil, err
}
applyReq.DirEnt.Value = buf.Bytes()
// Make the RPC
var out bool
// 開始請求server
if err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil {
   return nil, err
}
// Only use the out value if this was a CAS
// 沒有出錯的話,這裏就成功返回了
if applyReq.Op == api.KVSet {
    return true, nil
}

agent 請求 server 時,會默認請求 server list 的第一個節點,只有在失敗的請求下,回滾動節點,把失敗的添加到最後,原來的第二個節點做爲第一個節點,請求的是 consul 下面的 kvs_endpoint.go 下面的 Apply 方法,所以我們的重點要來了

Server Apply

consul server 的 apply 方法,代碼還是 show 下,這裏還有兩個邏輯說明下。

// Apply is used to apply a KVS update request to the data store.
func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
   // 檢查機房dc是否匹配,不是就轉發到對應到dc的server。
   if done, err := k.srv.forward("KVS.Apply", args, args, reply); done {
      return err
   }
   // 中間不重要的去了,省得太多...
   // 對權限token 應用ACL policy
   ok, err := kvsPreApply(k.logger, k.srv, authz, args.Op, &args.DirEnt)
   if err != nil {
      return err
   }
   if !ok {
      *reply = false
      return nil
   }
   // Apply the update.
   // 這裏是開啓raft 算法的之旅的入口。
   resp, err := k.srv.raftApply(structs.KVSRequestType, args)
   if err != nil {
      k.logger.Error("Raft apply failed", "error", err)
      return err
   }
   if respErr, ok := resp.(error); ok {
      return respErr
   }
   // Check if the return type is a bool.
   if respBool, ok := resp.(bool); ok {
      *reply = respBool
   }
   return nil
}

在真正開始執行 raft 算法前,主要做了如下兩件事:

先檢查了 dc 是否是當前 dc,如果不是會路由到正確的 dc,這也是 consul 支持多機房部署的一個很好的特性,路由很方便,這也是多機房部署 consul 是很好的選擇。

檢查是否啓用了 acl 策略,如果有,需要檢查,沒有對應的 token 是不能操作的。

leader 的寫請求是從 kvs 的 apply 方法開始處理請求的,下面我們看下 apply 方法的實現邏輯,在真正執行 raft 前,consul 還做了一些加工,不能蠻搞,是非常嚴謹的,上面通過 raftApply,經過幾跳後,會執行到 raftApplyWithEncoder 方法,這裏做的工作是很重要的,所以還是拿出來說下,是漲知識的地方,代碼如下:

// raftApplyWithEncoder is used to encode a message, run it through raft,
// and return the FSM response along with any errors. Unlike raftApply this
// takes the encoder to use as an argument.
func (s *Server) raftApplyWithEncoder(t structs.MessageType, msg interface{}, encoder raftEncoder) (interface{}, error) {
   if encoder == nil {
      return nil, fmt.Errorf("Failed to encode request: nil encoder")
   }
   // 對請求編碼。
   buf, err := encoder(t, msg)
   if err != nil {
      return nil, fmt.Errorf("Failed to encode request: %v", err)
   }
   // Warn if the command is very large
   if n := len(buf); n > raftWarnSize {
      s.rpcLogger().Warn("Attempting to apply large raft entry", "size_in_bytes", n)
   }
   var chunked bool
   var future raft.ApplyFuture
   switch {
   case len(buf) <= raft.SuggestedMaxDataSize || t != structs.KVSRequestType:
      //請求的數據大小如果小於512 * 1024 即512k,則做一次log執行。
      future = s.raft.Apply(buf, enqueueLimit)
   default:
      //超過了512k,則需要分chunk,每個chunk做爲一個log來應用。
      chunked = true
      //這裏就是每個log一次future。
      future = raftchunking.ChunkingApply(buf, nil, enqueueLimit, s.raft.ApplyLog)
   }
   //阻塞,等待raft協議完成。
   if err := future.Error(); err != nil {
      return nil, err
   }
   resp := future.Response()
   //...
   return resp, nil
}

這裏通過註釋,你也可以看出,主要關心 4 件事情:

  1. 把請求編碼,這個不是我們的重點,後面有時間可以單獨分析。

  2. 檢查是否要拆包,是否要拆成多個 raft command 來執行,這裏有個參數控制,SuggestedMaxDataSize consul 默認設置是 512k,如果超過這個則拆,否則可以一次 raft 協議搞定。

  3. 有一個超時時間,默認是 30 秒,後面會用到。

  4. 最後事阻塞等待完成,是 logfuture。

爲什麼要拆包

這些事 raft 算法不會提的,這個事工程實踐纔會有的一些優化,此時你也和我一樣,爲啥要做這個優化呢,有什麼好處,解決什麼問題,這是我們做一個架構師必須要有的思考。

consul 的官方就給出瞭解釋,所以閱讀優秀的代碼就是一種享受,看註釋就能知道爲啥這樣做,下面是他們對 SuggestedMaxDataSize 的註釋:

// Increasing beyond this risks RPC IO taking too long and preventing
// timely heartbeat signals which are sent in serial in current transports,
// potentially causing leadership instability.
SuggestedMaxDataSize = 512 * 1024

理解就是單次 log 數據不能太大,太大理解有下面幾個問題:

上面說影響到心跳,那我們總要知道 heartbeat 是怎麼實現的,看看到底有什麼影響,所以我們把 consul 的心跳機制實現原理說明下

定時心跳

- raft 心跳理論

把日誌提交的兩階段優化爲了一個階段,省去了 commit 階段,減少了一個 rt,提升了吞吐量,爲什麼能這樣優化,是藉助下次請求和心跳請求來告訴 followe 當前 leader 的 commit index,所以 raft 算法認爲心跳包也是會帶上當前 commit index 給 follower,讓 follower 可以儘快提交,保持和 leader 一致,理論是這樣的,但是 consul 並沒有這樣實現,請繼續看下面

但是 consul 在實踐的時候並沒有這麼做,也可能是優化了實現,實現邏輯是每個 follower 有一個獨立的 goroutine 來負責發送 heartbeat,consul leader 給 follower 發心跳時,只帶了一個當前 leader 的任期和 leader 自身的 id 和地址兩個信息,不帶 log 相關的信息,所以 follower 處理心跳請求就很簡單,只要更新下心跳時間即可,當然也會檢查任期,這樣就沒有了 io 請求,就能快速響應,也叫 fast path,就時直接在 io 線程處理了,因爲 follower 處理正常的 rpc 請求和心跳請求經過 decode 後,都會統一有 follower 的 main goroutine 來處理,如果有一個 log append 的 rpc 請求很大,即 io 操作會大,需要持久化 log,很定影響後面的請求,即會影響到心跳請求,follower 認爲在心跳超時時間內沒有收到心跳,則認爲 leader 出了問題,會觸發選舉,就影響了 leader 穩定。

consul 沒有通過心跳機制來讓 follower 儘快和 leader 保持一致來 commit log 到 fsm 狀態機,如果寫不連續,那最近一次寫 follower 就會一直不提交,本來是發心跳給 follower 時會讓 follower 提交,但現在心跳不幹這個活了,所以 consul 需要一個新的機制來保證即使沒有新的寫請求的情況下,讓 follower 也儘快和 leader 保持一致的 commit log,這個機制就是 commit timeout 隨機時間,每到了一個時間,consul leader 就發一個

日誌複製的請求給 follower,該 rpc 請求除了帶上 leader 任期 term 和標識信息外,還會告訴 follower leader 的 commitindex,follower 就能比較自己的 commitindex,如果小於,則進行提交的流程,把沒有應用到狀態機的 log commit 掉。

批量發送

說完了拆包優化邏輯後,我們看下 ApplyLog 的邏輯,代碼如下:

// ApplyLog performs Apply but takes in a Log directly. The only values
// currently taken from the submitted Log are Data and Extensions.
func (r *Raft) ApplyLog(log Log, timeout time.Duration) ApplyFuture {
   metrics.IncrCounter([]string{"raft", "apply"}, 1)
   var timer <-chan time.Time
   if timeout > 0 {
      timer = time.After(timeout)
   }
   // Create a log future, no index or term yet
   logFuture := &logFuture{
      log: Log{
         Type:       LogCommand,
         Data:       log.Data,
         Extensions: log.Extensions,
      },
   }
   logFuture.init()
   select {
   case <-timer:
      return errorFuture{ErrEnqueueTimeout}
   case <-r.shutdownCh:
      return errorFuture{ErrRaftShutdown}
   case r.applyCh <- logFuture:
      return logFuture
   }
}

這裏主要關心這個 applyCh channel,consul 在初始化 leader 的時候給創建的一個無緩衝區的通道,所以如果 leader 的協程在幹其他的事情,那這個提交 log 就阻塞了,時間最長 30s,寫入成功,就返回了 logFuture,也就事前面我們看到 future 的阻塞。

到這裏整個 consul leader server 的插入請求從接受到阻塞等待的邏輯就完成了,consul leader server 有個核心的 go routine 在 watch 這個 applyCh,從定義可以看出,是應用 raft log 的 channel,阻塞在 applych 的 go routine 代碼如下:

case newLog := <-r.applyCh://這個是前面我們提交log future的
   if r.getLeadershipTransferInProgress() {
      r.logger.Debug(ErrLeadershipTransferInProgress.Error())
      newLog.respond(ErrLeadershipTransferInProgress)
      continue
   }
   // Group commit, gather all the ready commits
   ready := []*logFuture{newLog}
GROUP_COMMIT_LOOP:
   for i := 0; i < r.conf.MaxAppendEntries; i++ {
      select {
      case newLog := <-r.applyCh:
         ready = append(ready, newLog)
      default:
         break GROUP_COMMIT_LOOP
      }
   }
   // Dispatch the logs
   if stepDown {
      // we're in the process of stepping down as leader, don't process anything new
     //如果發現我們不是leader了,直接響應失敗 
     for i := range ready {
         ready[i].respond(ErrNotLeader)
      }
   } else {
      r.dispatchLogs(ready)
   }

這裏的一個重要的點就是組發送請求,就是讀 applyCh 的 log,這個裏做了組提交的優化,最多一次發送 MaxAppendEntries 個,默認位 64 個,如果併發高的情況下,這裏是能讀到一個 batch 的,在網絡傳輸和 io 操作,分組提交是一個通用的優化技巧,比如 dubbo 在 rpc 網絡發送,rocketmq,mysql innodb log 提交都用了分組提交技術來充分利用網絡 io 帶寬,減少網絡來回或者 io 次數的開銷,因爲一次大概率是用不完網絡或者 io 帶寬的,就像高速 4 車道的,我們可以一次發四個,而不是一個一個的發。

分組好了後,下面就開始 dispatch log 了,代碼如下:

// dispatchLog is called on the leader to push a log to disk, mark it
// as inflight and begin replication of it.
func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
   now := time.Now()
   defer metrics.MeasureSince([]string{"raft", "leader", "dispatchLog"}, now)
   //獲取當前leader的任期編號,這個不會重複是遞增的,如果有心的leaer了,會比這個大。
   term := r.getCurrentTerm()
   //log 編號,寫一個加1
   lastIndex := r.getLastIndex()
   n := len(applyLogs)
   logs := make([]*Log, n)
   metrics.SetGauge([]string{"raft", "leader", "dispatchNumLogs"}, float32(n))
   //設置每個log的編號和任期
   for idx, applyLog := range applyLogs {
      applyLog.dispatch = now
      lastIndex++
      applyLog.log.Index = lastIndex
      applyLog.log.Term = term
      logs[idx] = &applyLog.log
      r.leaderState.inflight.PushBack(applyLog)
   }
   // Write the log entry locally
   // log先寫入本地持久化,consul大部分的版本底層用的是boltdb,boltdb
   // 是一個支持事物的數據庫,非常方便,這裏會涉及io操作。
   if err := r.logs.StoreLogs(logs); err != nil {
      r.logger.Error("failed to commit logs", "error", err)
      //如果寫失敗,則直接響應,前面的future阻塞就會喚醒。
      for _, applyLog := range applyLogs {
         applyLog.respond(err)
      }
      //更新自己爲follower
      r.setState(Follower)
      return
   }
   //這裏很重要,好就纔看明白,這個是log 複製成功後,最終應用到狀態機的一個機制
   //這裏是記錄下leader自己的結果,因爲過半leader也算一份。
   r.leaderState.commitment.match(r.localID, lastIndex)
   // Update the last log since it's on disk now
   // 更新最新log entry的編號,寫到這裏了。
   r.setLastLog(lastIndex, term)
   // Notify the replicators of the new log
   // 開始異步發送給所有的follower,這個leader主go routine的活就幹完了。
   for _, f := range r.leaderState.replState {
      asyncNotifyCh(f.triggerCh)
   }
}

這個 dispatchlog 的邏輯註釋裏基本寫清楚了,核心的 go routine 經過一頓操作後,最主要就是兩點:

consul log 持久化是通過 boltdb 來存儲的,boltdb 可以看做一個簡單版的 innodb 實現,是一個支撐事務和 mvcc 的存儲引擎, consul 新版本自己實現了 log 持久化通過 wal 的方式,可以配置。

又異步交給了 replicate go routine 來處理,他就去繼續去分組提交了,大概率如此循環往復,不知疲倦的給 replication routine 派活。

複製 GoRoutine

replication routine 會監聽 triggerCh channel,接受領導的任務,這個比較簡單,就開始真正發給各自的 follower 了,代碼如下:

case <-s.triggerCh:
   lastLogIdx, _ := r.getLastLog()
   //這個後面沒有異步了,就是這個rpc調用,判斷
   shouldStop = r.replicateTo(s, lastLogIdx)

replicateTo 就是 rpc 調用 follower,真正遠程 rpc 給 follower,等待響應,這裏 consul 爲保障 follower 完全和 leader 的日誌一致,需要做有序檢查,所以 consul leader 在 replicate log 給 follower 時,有一個細節要注意下,就是 leader 除發當前操作的 log entry,還需要帶上上一條 log entry,每條 log entry 的有兩個關鍵變量:

這裏要發送的日誌是就是獲取當前最大的 log index 即 lastIndex 做爲最大值,然後每個 follower 維護一個 nextIndex, 即從那裏開始讀 log,replication goroutine 會從存儲裏獲取 nextIndex-->lastIndex 的 log,這裏可能涉及到 io 操作,就是把前面的持久化的 log,再批量讀出來,nextIndex 是在複製給 follower 成功後,會吧 lastIndex+1 來更新 nextIndex,下次就從新的地方開始讀了。

除了 log 外,還會帶上 leader 當前的 CommitIndex,即 leader 已經應用到狀態機 FSM 的 log 索引,follower 通過這個來比較,判斷自己是否要提交 log。

follower 節點通過這兩個變量來匹配 log 是否一致,下面 log 一致性檢查會說明具體怎麼用,也會說明爲啥要發前面一天 log。

過半提交

raft 協議要求寫操作,只有超過一半才能算成功,才能應用到狀態機 FSM, 客戶端才能讀到這個數據,這個過半是 leader 自己也算在裏面的,也就是前面一篇文章我們提到的,leader 在持久化 log 後,就標記自己寫成功了,我們沒有分析,現在我們來分析下這個邏輯,因爲 follower 處理完日誌複製後,也是有這個邏輯處理的。

//這裏很重要,好就纔看明白,這個是log 複製成功後,最終應用到狀態機的一個機制
//這裏是記錄下leader自己的結果,因爲過半leader也算一份。
r.leaderState.commitment.match(r.localID, lastIndex)

我們上篇文章只是在這裏做了一個註釋,並沒有分析裏面怎麼實現的,我們就是要搞懂到底怎麼實現的,下面是 match 的代碼:

// Match is called once a server completes writing entries to disk: either the
// leader has written the new entry or a follower has replied to an
// AppendEntries RPC. The given server's disk agrees with this server's log up
// through the given index.
func (c *commitment) match(server ServerID, matchIndex uint64) {
   c.Lock()
   defer c.Unlock()
   if prev, hasVote := c.matchIndexes[server]; hasVote && matchIndex > prev {
      c.matchIndexes[server] = matchIndex
      c.recalculate()
   }
}

註釋也基本說明了這個方法的作用,就是我們上面說的,我們就不再重複了,要理解這個邏輯,先了解下這個數據結構 matchIndexes,matchIndexes 是一個 map,key 就是 server id,就是 consul 集羣每個節點有一個 id,value 就是上次應用 log 到狀態機的編號 commitIndex,recalculate 的代碼如下:

// Internal helper to calculate new commitIndex from matchIndexes.
// Must be called with lock held.
func (c *commitment) recalculate() {
   if len(c.matchIndexes) == 0 {
      return
   }
   matched := make([]uint64, 0, len(c.matchIndexes))
   for _, idx := range c.matchIndexes {
      matched = append(matched, idx)
   }
   //這個排序是降序,才能保證下面取中間索引位置的值來判斷是否過半已經複製成功。
   sort.Sort(uint64Slice(matched))
   quorumMatchIndex := matched[(len(matched)-1)/2]
   //如果超過一半的follower成功了,則開始commit,即應用到狀態機
   if quorumMatchIndex > c.commitIndex && quorumMatchIndex >= c.startIndex {
      c.commitIndex = quorumMatchIndex
      //符合條件,觸發commit,通知leader執行apply log
      asyncNotifyCh(c.commitCh)
   }
}

這個 recalculate 的邏輯單獨看有點晦澀,先不急於理解,先舉一個例子來說明下 recalculate 的邏輯:

假如集羣三個節點,server id 分別爲 1,2,3,上次寫 log 的編號是 3,就是 leader 和 follower 都成功了,這個 matchIndexes 的數據如下:

1(leader) --> 3

2(follower) --> 3

3(follower) --> 3

假如這個時候新來一個 put 請求,leader 本地持久化成功,就要更新這個數據結構了 matchIndexes 了, 因爲 leader 是先更新,再併發請求 follower 的,所以這個時候 matchIndexes 數據如下,因爲一個 log,所以 logIndex 是加 1。

1(leader) --> 4

2(follower) --> 3

3(follower) --> 3

因爲 leader 本地完成和 follower 遠程完成一樣,都要通過這個邏輯來判斷是否 commit 該 log 請求,即是否應用到 FSM,所以就是要判斷是否過半完成了,邏輯是這樣的:

  1. 先創建一個數組 matched,長度爲集羣節點數,我們的例子是 3,

  2. 然後把 matchIndexes 的 commitIndex 起出來,放到 matched 中,matched 的數據就是 [4,3,3]

  3. 排序,爲啥要排序,因爲 map 是無序的,下面要通過中間索引的值來判斷是否變化。

  4. 然後計算 quorumMatchIndex := matched[(len(matched)-1)/2],這個就是取中間索引下標的值,也是因爲這點,需要第三步排序.

  5. 比較 quorumMatchIndex 是否大於當前的 commitIndex, 如果大於,說明滿足過半的條件,則更新,然後應用到狀態機。

通過上面 5 步,來實現了一個過半的邏輯,我們再以兩個場景來理下,

假如一個 follower 失敗了,一個成功,成功的 follower 會更新 matched 的數據是 [4,3,4], 或者是 [4,4,3], 排序後爲都是 [4,4,3], 第 4 步計算的結果是 4 大於 3,就可以提交了,經過上面的詳細,再回看上面的代碼就好理解了。

一致性檢查:

raft 協議日誌複製是需要嚴格保證順序的,所以在日誌複製的時候 follower 需要對日誌做檢查,主要有兩種情況:

每次 log append 給 follower 時,follower 會把自己當前的 logindex 編號和當前 leader 的任期 term 返回給 leader,leader 獲取到對應的編號時,會更新發送 logNext,也就是從這裏開始發生日誌給 follower,就進入重試的的流程,重新發日誌。

這裏 consul 根據 raft 協議做了一個優化,raft 協議描述的是每次遞減一個 logindex 編號,來回確認,直到找到 follower 匹配的編號,再開始發日誌,這樣性能就很差,所有基本上沒有那個分佈式系統是那樣實現落地的。

Commit Log

只要超過一半的日誌 複製成功,consul 就進入日誌 commit 階段,也就是將修改應用到狀態機,通過 recalculate 方法給 leader 監聽的 commitCh 發一個消息,通知 leader 開始執行 apply log 到 FSM, leader 的代碼如下:

case <-r.leaderState.commitCh:
   // Process the newly committed entries
   //上次執行commit log index
   oldCommitIndex := r.getCommitIndex()
   //新的log需要commit的log index,在判斷是過半時,會更新commitindex
   commitIndex := r.leaderState.commitment.getCommitIndex()
   r.setCommitIndex(commitIndex)
   ....
   start := time.Now()
   var groupReady []*list.Element
   var groupFutures = make(map[uint64]*logFuture)
   var lastIdxInGroup uint64
   // Pull all inflight logs that are committed off the queue.
   for e := r.leaderState.inflight.Front(); e != nil; e = e.Next() {
      commitLog := e.Value.(*logFuture)
      idx := commitLog.log.Index
      //idx 大於commitIndex,說明是後面新寫入的,還沒有同步到follower的日誌。
      if idx > commitIndex {
         // Don't go past the committed index
         break
      }
      // Measure the commit time
      metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch)
      groupReady = append(groupReady, e)
      groupFutures[idx] = commitLog
      lastIdxInGroup = idx
   }
   // Process the group
   if len(groupReady) != 0 {
      //應用的邏輯在這裏。groupFutures 就是寫入go routine wait的future
      r.processLogs(lastIdxInGroup, groupFutures)
      //清理inflight集合中已經commit過的log,防止重複commit
      for _, e := range groupReady {
         r.leaderState.inflight.Remove(e)
      }
   }

這裏比較簡單,就是從 leaderState.inflight 中取出 log,就是我們之前寫入的,循環判斷,如果 log 的編號大於 commitIndex, 說明是後面新寫入的 log,還沒有同步到 follower 的 log,不能提交。這裏應該是有序的,lastIdxInGroup 應該就是需要 commit 的 log 的最大的一個編號。

processLogs 的邏輯就是支持分批提交支持,發給 consul 的 runFSM 的 go routine,consul raft 專門有一個 go routine 來負責 commit log 到狀態機,支持批量和一個一個 commit,我們看下單個 commit 的情況,代碼如下:

commitSingle := func(req *commitTuple) {
   // Apply the log if a command or config change
   var resp interface{}
   // Make sure we send a response
   defer func() {
      // Invoke the future if given
      if req.future != nil {
         req.future.response = resp
         req.future.respond(nil)
      }
   }()
   switch req.log.Type {
   case LogCommand:
      start := time.Now()
      //將日誌應用到FSM的關鍵在這裏。
      resp = r.fsm.Apply(req.log)
      metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start)
    ....
   }
   // Update the indexes
   lastIndex = req.log.Index
   lastTerm = req.log.Term
}

主要就是三點,應用 log 到 fsm,然後跟新下 fsm 的 logindex 和任期,最後就是要通知還在 wait 的 go routine。

整個日誌複製的流程很長,最後再上一張圖總結下整個過程:

總結

這篇文章主要基於 consul 目前版本的實現和基於個人的理解,以 consul 一個寫請求的整個過程爲線索,介紹了 consul 基於 raft 協議的實現日誌複製的基本過程,重點介紹了日誌順序保證措施,日誌一致性檢查,過半提交 log,心跳機制的實現原理,以及相關的幾個優化措施,比如大請求分拆,分組批量發送等一些實踐優化措施,如有不正確的地方,歡迎交流和指正,個人微博 @絕塵駒

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/eHDscu9G4xB8Y-5kfldeCQ