etcd-raft 實現之源碼解讀

【導讀】raft 是一種應用廣泛的高效分佈式一致性算法,etcd 項目的 raft 算法是如何實現的?本文做了詳細介紹。

這篇文章打算介紹使用 etcd 的 raft 模塊,本來我是想一行實現的代碼都不貼,就單獨介紹 etcd-raft 使用(即如何基於它來實現 raft 語義的)。但是讀了一下 raftexample 發現不太現實,感覺 etcd-raft 的外部邏輯和實現是嚴重耦合的,所以或許我們不得不從實現開始,瞭解一下 etcd-raft 大概提供了什麼樣的抽象, 然後再介紹一下官方的 raftexample 的內容,便於我們實現簡單的 raft 服務器。

etcd 的 raft 實現都在etcd/raft目錄下,但是大部分的實現都在下面幾個比較核心的文件:

type raft struct {
  // raft 節點本身在集羣中對應的 id
 id uint64

 Term uint64
 Vote uint64
 // readIndex 相關的結構,用來存儲 readIndex 相關的狀態
 readStates []ReadState

 // the log
 raftLog *raftLog

 maxMsgSize         uint64
 maxUncommittedSize uint64
 // TODO(tbg): rename to trk.
 prs tracker.ProgressTracker

 state StateType

 // isLearner is true if the local raft node is a learner.
 isLearner bool

 msgs []pb.Message

 // the leader id
 lead uint64
 // leadTransferee is id of the leader transfer target when its value is not zero.
 // Follow the procedure defined in raft thesis 3.10.
 leadTransferee uint64
 // Only one conf change may be pending (in the log, but not yet
 // applied) at a time. This is enforced via pendingConfIndex, which
 // is set to a value >= the log index of the latest pending
 // configuration change (if any). Config changes are only allowed to
 // be proposed if the leader's applied index is greater than this
 // value.
 pendingConfIndex uint64
 // an estimate of the size of the uncommitted tail of the Raft log. Used to
 // prevent unbounded log growth. Only maintained by the leader. Reset on
 // term changes.
 uncommittedSize uint64

 readOnly *readOnly

 // number of ticks since it reached last electionTimeout when it is leader
 // or candidate.
 // number of ticks since it reached last electionTimeout or received a
 // valid message from current leader when it is a follower.
 electionElapsed int

 // number of ticks since it reached last heartbeatTimeout.
 // only leader keeps heartbeatElapsed.
 heartbeatElapsed int

 checkQuorum bool
 preVote     bool

 heartbeatTimeout int
 electionTimeout  int
 // randomizedElectionTimeout is a random number between
 // [electiontimeout, 2 * electiontimeout - 1]. It gets reset
 // when raft changes its state to follower or candidate.
 randomizedElectionTimeout int
 disableProposalForwarding bool

 tick func()
 step stepFunc

 logger Logger
}

raftLog && Storage

比較重要的結構是 raftLog:

type raftLog struct {
 // storage contains all stable entries since the last snapshot.
 storage Storage

 // unstable contains all unstable entries and snapshot.
 // they will be saved into storage.
 unstable unstable

 // committed is the highest log position that is known to be in
 // stable storage on a quorum of nodes.
 committed uint64
 // applied is the highest log position that the application has
 // been instructed to apply to its state machine.
 // Invariant: applied <= committed
 applied uint64

 logger Logger

 // maxNextEntsSize is the maximum number aggregate byte size of the messages
 // returned from calls to nextEnts.
 maxNextEntsSize uint64
}
// unstable.entries[i] has raft log position i+unstable.offset.
// Note that unstable.offset may be less than the highest log
// position in storage; this means that the next write to storage
// might need to truncate the log before persisting unstable.entries.
type unstable struct {
 // the incoming unstable snapshot, if any.
 snapshot *pb.Snapshot
 // all entries that have not yet been written to storage.
 entries []pb.Entry
 offset  uint64

 logger Logger
}

以上是 unstable 的結構,下面重點看看 Storage:

// Storage is an interface that may be implemented by the application
// to retrieve log entries from storage.
//
// If any Storage method returns an error, the raft instance will
// become inoperable and refuse to participate in elections; the
// application is responsible for cleanup and recovery in this case.
type Storage interface {
 // TODO(tbg): split this into two interfaces, LogStorage and StateStorage.

 // InitialState returns the saved HardState and ConfState information.
 InitialState() (pb.HardState, pb.ConfState, error)
 // Entries returns a slice of log entries in the range [lo,hi).
 // MaxSize limits the total size of the log entries returned, but
 // Entries returns at least one entry if any.
 Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
 // Term returns the term of entry i, which must be in the range
 // [FirstIndex()-1, LastIndex()]. The term of the entry before
 // FirstIndex is retained for matching purposes even though the
 // rest of that entry may not be available.
 Term(i uint64) (uint64, error)
 // LastIndex returns the index of the last entry in the log.
 LastIndex() (uint64, error)
 // FirstIndex returns the index of the first log entry that is
 // possibly available via Entries (older entries have been incorporated
 // into the latest Snapshot; if storage only contains the dummy entry the
 // first log entry is not available).
 FirstIndex() (uint64, error)
 // Snapshot returns the most recent snapshot.
 // If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
 // so raft state machine could know that Storage needs some time to prepare
 // snapshot and call Snapshot later.
 Snapshot() (pb.Snapshot, error)
}

HardState 是原論文中介紹需要 persistent 的 state 部分,ConfState 則是集羣的配置。

message HardState {
 optional uint64 term   = 1 [(gogoproto.nullable) = false];
 optional uint64 vote   = 2 [(gogoproto.nullable) = false];
 optional uint64 commit = 3 [(gogoproto.nullable) = false];
}

snapshot 本體是被序列化了的一堆 bytes, 帶上 snap 的狀態。

Conf

// Config contains the parameters to start a raft.
type Config struct {
 // ID is the identity of the local raft. ID cannot be 0.
 ID uint64

 // peers contains the IDs of all nodes (including self) in the raft cluster. It
 // should only be set when starting a new raft cluster. Restarting raft from
 // previous configuration will panic if peers is set. peer is private and only
 // used for testing right now.
 peers []uint64

 // learners contains the IDs of all learner nodes (including self if the
 // local node is a learner) in the raft cluster. learners only receives
 // entries from the leader node. It does not vote or promote itself.
 learners []uint64

 ElectionTick int
 HeartbeatTick int

 // Storage is the storage for raft. raft generates entries and states to be
 // stored in storage. raft reads the persisted entries and states out of
 // Storage when it needs. raft reads out the previous state and configuration
 // out of storage when restarting.
 Storage Storage
 Applied uint64

 MaxSizePerMsg uint64
 MaxCommittedSizePerReady uint64
 MaxUncommittedEntriesSize uint64
 MaxInflightMsgs int

 // CheckQuorum specifies if the leader should check quorum activity. Leader
 // steps down when quorum is not active for an electionTimeout.
 CheckQuorum bool
 PreVote bool
 ReadOnlyOption ReadOnlyOption

 // Logger is the logger used for raft log. For multinode which can host
 // multiple raft group, each raft group can have its own logger
 Logger Logger
 DisableProposalForwarding bool
}

這屬於 raft 的一些配置,大部分要麼在論文裏有,要麼是 raft 的參數,還是很好理解的。

Node

Node 是單個節點的抽象,raft 裏面有個 Node interface,同時有一個 node 的實現,同時我們從 node 中找到 ready channel,從這個 channel 取東西,進行處理

// Ready encapsulates the entries and messages that are ready to read,
// be saved to stable storage, committed or sent to other peers.
// All fields in Ready are read-only.
type Ready struct {
 // The current volatile state of a Node.
 // SoftState will be nil if there is no update.
 // It is not required to consume or store SoftState.
 *SoftState

 // The current state of a Node to be saved to stable storage BEFORE
 // Messages are sent.
 // HardState will be equal to empty state if there is no update.
 pb.HardState

 // ReadStates can be used for node to serve linearizable read requests locally
 // when its applied index is greater than the index in ReadState.
 // Note that the readState will be returned when raft receives msgReadIndex.
 // The returned is only valid for the request that requested to read.
 ReadStates []ReadState

 // Entries specifies entries to be saved to stable storage BEFORE
 // Messages are sent.
 Entries []pb.Entry

 // Snapshot specifies the snapshot to be saved to stable storage.
 Snapshot pb.Snapshot

 // CommittedEntries specifies entries to be committed to a
 // store/state-machine. These have previously been committed to stable
 // store.
 CommittedEntries []pb.Entry

 // Messages specifies outbound messages to be sent AFTER Entries are
 // committed to stable storage.
 // If it contains a MsgSnap message, the application MUST report back to raft
 // when the snapshot has been received or has failed by calling ReportSnapshot.
 Messages []pb.Message

 // MustSync indicates whether the HardState and Entries must be synchronously
 // written to disk or if an asynchronous write is permissible.
 MustSync bool
}

這個時候處理 batch 消息

raftexample

這個服務器大致會構建一個 channel pipe, 然後 http 接到消息之後發給 node, node 處理完畢同步的邏輯之後會把消息 send 給 kvstore, 使其 apply 到狀態機。

httpKVApi

// Handler for a http based key-value store backed by raft
type httpKVAPI struct {
 store       *kvstore
 confChangeC chan<- raftpb.ConfChange
}

啓動的邏輯如下:

// serveHttpKVAPI starts a key-value server with a GET/PUT API and listens.
func serveHttpKVAPI(kv *kvstore, port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) {
 srv := http.Server{
  Addr: ":" + strconv.Itoa(port),
  Handler: &httpKVAPI{
   store:       kv,
   confChangeC: confChangeC,
  },
 }
 go func() {
  if err := srv.ListenAndServe(); err != nil {
   log.Fatal(err)
  }
 }()

 // exit when raft goes down
 if err, ok := <-errorC; ok {
  log.Fatal(err)
 }
}

raft node

commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)

raftNode 不是 raft 庫定義的,是程序定義的,有關代碼如下:

// newRaftNode initiates a raft instance and returns a committed log entry
// channel and error channel. Proposals for log updates are sent over the
// provided the proposal channel. All log entries are replayed over the
// commit channel, followed by a nil message (to indicate the channel is
// current)then new log entries. To shutdown, close proposeC and read errorC.
func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,
 confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error, <-chan *snap.Snapshotter) {

 commitC := make(chan *string)
 errorC := make(chan error)

 rc := &raftNode{
  proposeC:    proposeC,
  confChangeC: confChangeC,
  commitC:     commitC,
  errorC:      errorC,
  id:          id,
  peers:       peers,
  join:        join,
  waldir:      fmt.Sprintf("raftexample-%d", id),
  snapdir:     fmt.Sprintf("raftexample-%d-snap", id),
  getSnapshot: getSnapshot,
  snapCount:   defaultSnapshotCount,
  stopc:       make(chan struct{}),
  httpstopc:   make(chan struct{}),
  httpdonec:   make(chan struct{}),

  snapshotterReady: make(chan *snap.Snapshotter, 1),
  // rest of structure populated after WAL replay
 }
 go rc.startRaft()
 return commitC, errorC, rc.snapshotterReady
}

這個 raftNode 結構如下:

// A key-value stream backed by raft
type raftNode struct {
 proposeC    <-chan string            // proposed messages (k,v)
 confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
 commitC     chan<- *string           // entries committed to log (k,v)
 errorC      chan<- error             // errors from raft session

 id          int      // client ID for raft session
 peers       []string // raft peer URLs
 join        bool     // node is joining an existing cluster
 waldir      string   // path to WAL directory
 snapdir     string   // path to snapshot directory
 getSnapshot func() ([]byte, error)
 lastIndex   uint64 // index of log at start

 confState     raftpb.ConfState
 snapshotIndex uint64
 appliedIndex  uint64

 // raft backing for the commit/error channel
 node        raft.Node
 raftStorage *raft.MemoryStorage
 wal         *wal.WAL

 snapshotter      *snap.Snapshotter
 snapshotterReady chan *snap.Snapshotter // signals when snapshotter is ready

 snapCount uint64
 transport *rafthttp.Transport
 stopc     chan struct{} // signals proposal channel closed
 httpstopc chan struct{} // signals http server to shutdown
 httpdonec chan struct{} // signals http server shutdown complete
}

比較重要的是 serveChannels:

func (rc *raftNode) serveChannels() {
 snap, err := rc.raftStorage.Snapshot()
 if err != nil {
  panic(err)
 }
 rc.confState = snap.Metadata.ConfState
 rc.snapshotIndex = snap.Metadata.Index
 rc.appliedIndex = snap.Metadata.Index

 defer rc.wal.Close()

 ticker := time.NewTicker(100 * time.Millisecond)
 defer ticker.Stop()

 // send proposals over raft
 go func() {
  confChangeCount := uint64(0)

  for rc.proposeC != nil && rc.confChangeC != nil {
   select {
   case prop, ok := <-rc.proposeC:
    if !ok {
     rc.proposeC = nil
    } else {
     // blocks until accepted by raft state machine
     rc.node.Propose(context.TODO()[]byte(prop))
    }

   case cc, ok := <-rc.confChangeC:
    if !ok {
     rc.confChangeC = nil
    } else {
     confChangeCount++
     cc.ID = confChangeCount
     rc.node.ProposeConfChange(context.TODO(), cc)
    }
   }
  }
  // client closed channel; shutdown raft if not already
  close(rc.stopc)
 }()

 // event loop on raft state machine updates
 for {
  select {
  case <-ticker.C:
   rc.node.Tick()

  // store raft entries to wal, then publish over commit channel
  case rd := <-rc.node.Ready():
   rc.wal.Save(rd.HardState, rd.Entries)
   if !raft.IsEmptySnap(rd.Snapshot) {
    rc.saveSnap(rd.Snapshot)
    rc.raftStorage.ApplySnapshot(rd.Snapshot)
    rc.publishSnapshot(rd.Snapshot)
   }
   rc.raftStorage.Append(rd.Entries)
   rc.transport.Send(rd.Messages)
   if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
    rc.stop()
    return
   }
   rc.maybeTriggerSnapshot()
   rc.node.Advance()

  case err := <-rc.transport.ErrorC:
   rc.writeError(err)
   return

  case <-rc.stopc:
   rc.stop()
   return
  }
 }
}

這裏把 proposeC 收到的請求交給 node 處理。然後主要線程處理 Ready, Ready 收到消息的時候,會依次:

ready 對應的邏輯是:

應用需要對 Ready 的處理包括:

  1. 將 HardState, Entries, Snapshot 持久化到 storage。

  2. 將 Messages(上文提到的 msgs) 非阻塞的廣播給其他 peers

  3. 將 CommittedEntries(已經 commit 還沒有 apply) 應用到狀態機。

  4. 如果發現 CommittedEntries 中有成員變更類型的 entry,調用 node 的 ApplyConfChange() 方法讓 node 知道 (這裏和 raft 論文不一樣,論文中只要節點收到了成員變更日誌就應用)

  5. 調用 Node.Advance() 告訴 raft node,這批狀態更新處理完了,狀態已經演進了,可以給我下一批 Ready 讓我處理。

實際上這裏就是對這個狀態機處理,但是感覺耦合的很嚴重,所以需要理解清楚這個代碼的層次。

kvstore

kvstore 是具體 apply 到的地方,最初要的方法如下:

func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) {
 for data := range commitC {
  if data == nil {
   // done replaying log; new data incoming
   // OR signaled to load snapshot
   snapshot, err := s.snapshotter.Load()
   if err == snap.ErrNoSnapshot {
    return
   }
   if err != nil {
    log.Panic(err)
   }
   log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
   if err := s.recoverFromSnapshot(snapshot.Data); err != nil {
    log.Panic(err)
   }
   continue
  }

  var dataKv kv
  dec := gob.NewDecoder(bytes.NewBufferString(*data))
  if err := dec.Decode(&dataKv); err != nil {
   log.Fatalf("raftexample: could not decode message (%v)", err)
  }
  s.mu.Lock()
  s.kvStore[dataKv.Key] = dataKv.Val
  s.mu.Unlock()
 }
 if err, ok := <-errorC; ok {
  log.Fatal(err)
 }
}

這相當於一個 apply 線程

轉自:mwish

zhuanlan.zhihu.com/p/138563359

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/7OoGaZqix-xgGAjKJxHAug