etcd-raft 實現之源碼解讀
【導讀】raft 是一種應用廣泛的高效分佈式一致性算法,etcd 項目的 raft 算法是如何實現的?本文做了詳細介紹。
這篇文章打算介紹使用 etcd 的 raft 模塊,本來我是想一行實現的代碼都不貼,就單獨介紹 etcd-raft 使用(即如何基於它來實現 raft 語義的)。但是讀了一下 raftexample 發現不太現實,感覺 etcd-raft 的外部邏輯和實現是嚴重耦合的,所以或許我們不得不從實現開始,瞭解一下 etcd-raft 大概提供了什麼樣的抽象, 然後再介紹一下官方的 raftexample 的內容,便於我們實現簡單的 raft 服務器。
-
Raft 結構體
-
raft 的 Config
-
Storage 接口,和實現的 memory storage
etcd 的 raft 實現都在
etcd/raft
目錄下,但是大部分的實現都在下面幾個比較核心的文件:
-
raft.go
: 從名字也可以看出來,這個是最核心的部分,比如 leader 選擇的邏輯、raft 消息的處理邏輯等 -
node.go
: 可以理解爲 raft 集羣的一個節點,客戶端也主要是這個類打交道,比如心跳的邏輯、propose、狀態機、成員變更等都是這個類負責處理。 -
log.go
: raft 日誌相關的代碼,比如保存日誌記錄 -
raft.proto
: 定義了 raft 一些核心的 RPC 數據結構,由於 protobuf 是跨語言的,因此如果想用其他語言重寫etcd raft
,那麼至少這部分內容都是可以複用的
type raft struct {
// raft 節點本身在集羣中對應的 id
id uint64
Term uint64
Vote uint64
// readIndex 相關的結構,用來存儲 readIndex 相關的狀態
readStates []ReadState
// the log
raftLog *raftLog
maxMsgSize uint64
maxUncommittedSize uint64
// TODO(tbg): rename to trk.
prs tracker.ProgressTracker
state StateType
// isLearner is true if the local raft node is a learner.
isLearner bool
msgs []pb.Message
// the leader id
lead uint64
// leadTransferee is id of the leader transfer target when its value is not zero.
// Follow the procedure defined in raft thesis 3.10.
leadTransferee uint64
// Only one conf change may be pending (in the log, but not yet
// applied) at a time. This is enforced via pendingConfIndex, which
// is set to a value >= the log index of the latest pending
// configuration change (if any). Config changes are only allowed to
// be proposed if the leader's applied index is greater than this
// value.
pendingConfIndex uint64
// an estimate of the size of the uncommitted tail of the Raft log. Used to
// prevent unbounded log growth. Only maintained by the leader. Reset on
// term changes.
uncommittedSize uint64
readOnly *readOnly
// number of ticks since it reached last electionTimeout when it is leader
// or candidate.
// number of ticks since it reached last electionTimeout or received a
// valid message from current leader when it is a follower.
electionElapsed int
// number of ticks since it reached last heartbeatTimeout.
// only leader keeps heartbeatElapsed.
heartbeatElapsed int
checkQuorum bool
preVote bool
heartbeatTimeout int
electionTimeout int
// randomizedElectionTimeout is a random number between
// [electiontimeout, 2 * electiontimeout - 1]. It gets reset
// when raft changes its state to follower or candidate.
randomizedElectionTimeout int
disableProposalForwarding bool
tick func()
step stepFunc
logger Logger
}
- (裏面好像很多都是配置,爲啥不內嵌一個 conf 嘞)
raftLog && Storage
比較重要的結構是 raftLog
:
type raftLog struct {
// storage contains all stable entries since the last snapshot.
storage Storage
// unstable contains all unstable entries and snapshot.
// they will be saved into storage.
unstable unstable
// committed is the highest log position that is known to be in
// stable storage on a quorum of nodes.
committed uint64
// applied is the highest log position that the application has
// been instructed to apply to its state machine.
// Invariant: applied <= committed
applied uint64
logger Logger
// maxNextEntsSize is the maximum number aggregate byte size of the messages
// returned from calls to nextEnts.
maxNextEntsSize uint64
}
-
commited, applied 都是論文提到的
-
storage 是一個很重要的 interface, 是對 raft 依賴的存儲層的抽象。你可以(應該)自己實現,etcd 也提供了 memory storage。實際上 Storage 是訪問落盤數據的藉口。
-
unstable 是一個 log 的內存寫 buffer,便於日誌複製。unstable 使用內存數組維護其中所有的 日誌,對於 Leader 節點而言,它維護了客戶端請求對應的日誌;對於 Follower 節點而言,它維護的是從 Leader 節點複製來的日誌。
// unstable.entries[i] has raft log position i+unstable.offset.
// Note that unstable.offset may be less than the highest log
// position in storage; this means that the next write to storage
// might need to truncate the log before persisting unstable.entries.
type unstable struct {
// the incoming unstable snapshot, if any.
snapshot *pb.Snapshot
// all entries that have not yet been written to storage.
entries []pb.Entry
offset uint64
logger Logger
}
以上是 unstable 的結構,下面重點看看 Storage:
// Storage is an interface that may be implemented by the application
// to retrieve log entries from storage.
//
// If any Storage method returns an error, the raft instance will
// become inoperable and refuse to participate in elections; the
// application is responsible for cleanup and recovery in this case.
type Storage interface {
// TODO(tbg): split this into two interfaces, LogStorage and StateStorage.
// InitialState returns the saved HardState and ConfState information.
InitialState() (pb.HardState, pb.ConfState, error)
// Entries returns a slice of log entries in the range [lo,hi).
// MaxSize limits the total size of the log entries returned, but
// Entries returns at least one entry if any.
Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
// Term returns the term of entry i, which must be in the range
// [FirstIndex()-1, LastIndex()]. The term of the entry before
// FirstIndex is retained for matching purposes even though the
// rest of that entry may not be available.
Term(i uint64) (uint64, error)
// LastIndex returns the index of the last entry in the log.
LastIndex() (uint64, error)
// FirstIndex returns the index of the first log entry that is
// possibly available via Entries (older entries have been incorporated
// into the latest Snapshot; if storage only contains the dummy entry the
// first log entry is not available).
FirstIndex() (uint64, error)
// Snapshot returns the most recent snapshot.
// If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
// so raft state machine could know that Storage needs some time to prepare
// snapshot and call Snapshot later.
Snapshot() (pb.Snapshot, error)
}
- Storage 本意是定義的存儲層,能夠有連續的日誌,邏輯上可以生成 snapshot
HardState 是原論文中介紹需要 persistent 的 state 部分,ConfState 則是集羣的配置。
message HardState {
optional uint64 term = 1 [(gogoproto.nullable) = false];
optional uint64 vote = 2 [(gogoproto.nullable) = false];
optional uint64 commit = 3 [(gogoproto.nullable) = false];
}
snapshot 本體是被序列化了的一堆 bytes, 帶上 snap 的狀態。
Conf
// Config contains the parameters to start a raft.
type Config struct {
// ID is the identity of the local raft. ID cannot be 0.
ID uint64
// peers contains the IDs of all nodes (including self) in the raft cluster. It
// should only be set when starting a new raft cluster. Restarting raft from
// previous configuration will panic if peers is set. peer is private and only
// used for testing right now.
peers []uint64
// learners contains the IDs of all learner nodes (including self if the
// local node is a learner) in the raft cluster. learners only receives
// entries from the leader node. It does not vote or promote itself.
learners []uint64
ElectionTick int
HeartbeatTick int
// Storage is the storage for raft. raft generates entries and states to be
// stored in storage. raft reads the persisted entries and states out of
// Storage when it needs. raft reads out the previous state and configuration
// out of storage when restarting.
Storage Storage
Applied uint64
MaxSizePerMsg uint64
MaxCommittedSizePerReady uint64
MaxUncommittedEntriesSize uint64
MaxInflightMsgs int
// CheckQuorum specifies if the leader should check quorum activity. Leader
// steps down when quorum is not active for an electionTimeout.
CheckQuorum bool
PreVote bool
ReadOnlyOption ReadOnlyOption
// Logger is the logger used for raft log. For multinode which can host
// multiple raft group, each raft group can have its own logger
Logger Logger
DisableProposalForwarding bool
}
這屬於 raft 的一些配置,大部分要麼在論文裏有,要麼是 raft 的參數,還是很好理解的。
Node
Node 是單個節點的抽象,raft 裏面有個 Node interface,同時有一個 node 的實現,同時我們從 node 中找到 ready channel,從這個 channel 取東西,進行處理
// Ready encapsulates the entries and messages that are ready to read,
// be saved to stable storage, committed or sent to other peers.
// All fields in Ready are read-only.
type Ready struct {
// The current volatile state of a Node.
// SoftState will be nil if there is no update.
// It is not required to consume or store SoftState.
*SoftState
// The current state of a Node to be saved to stable storage BEFORE
// Messages are sent.
// HardState will be equal to empty state if there is no update.
pb.HardState
// ReadStates can be used for node to serve linearizable read requests locally
// when its applied index is greater than the index in ReadState.
// Note that the readState will be returned when raft receives msgReadIndex.
// The returned is only valid for the request that requested to read.
ReadStates []ReadState
// Entries specifies entries to be saved to stable storage BEFORE
// Messages are sent.
Entries []pb.Entry
// Snapshot specifies the snapshot to be saved to stable storage.
Snapshot pb.Snapshot
// CommittedEntries specifies entries to be committed to a
// store/state-machine. These have previously been committed to stable
// store.
CommittedEntries []pb.Entry
// Messages specifies outbound messages to be sent AFTER Entries are
// committed to stable storage.
// If it contains a MsgSnap message, the application MUST report back to raft
// when the snapshot has been received or has failed by calling ReportSnapshot.
Messages []pb.Message
// MustSync indicates whether the HardState and Entries must be synchronously
// written to disk or if an asynchronous write is permissible.
MustSync bool
}
這個時候處理 batch 消息
raftexample
這個服務器大致會構建一個 channel pipe, 然後 http 接到消息之後發給 node, node 處理完畢同步的邏輯之後會把消息 send 給 kvstore, 使其 apply 到狀態機。
httpKVApi
// Handler for a http based key-value store backed by raft
type httpKVAPI struct {
store *kvstore
confChangeC chan<- raftpb.ConfChange
}
啓動的邏輯如下:
// serveHttpKVAPI starts a key-value server with a GET/PUT API and listens.
func serveHttpKVAPI(kv *kvstore, port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) {
srv := http.Server{
Addr: ":" + strconv.Itoa(port),
Handler: &httpKVAPI{
store: kv,
confChangeC: confChangeC,
},
}
go func() {
if err := srv.ListenAndServe(); err != nil {
log.Fatal(err)
}
}()
// exit when raft goes down
if err, ok := <-errorC; ok {
log.Fatal(err)
}
}
-
如果收到 conf change, 會把請求 parse 出來,發給
confChangeC
-
如果有數據更改的請求,會調用
h.store.Propose(key, string(v))
,這個函數內部會s.proposeC <- buf.String()
, 給proposeC
發送消息
raft node
commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)
raftNode
不是 raft
庫定義的,是程序定義的,有關代碼如下:
// newRaftNode initiates a raft instance and returns a committed log entry
// channel and error channel. Proposals for log updates are sent over the
// provided the proposal channel. All log entries are replayed over the
// commit channel, followed by a nil message (to indicate the channel is
// current), then new log entries. To shutdown, close proposeC and read errorC.
func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,
confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error, <-chan *snap.Snapshotter) {
commitC := make(chan *string)
errorC := make(chan error)
rc := &raftNode{
proposeC: proposeC,
confChangeC: confChangeC,
commitC: commitC,
errorC: errorC,
id: id,
peers: peers,
join: join,
waldir: fmt.Sprintf("raftexample-%d", id),
snapdir: fmt.Sprintf("raftexample-%d-snap", id),
getSnapshot: getSnapshot,
snapCount: defaultSnapshotCount,
stopc: make(chan struct{}),
httpstopc: make(chan struct{}),
httpdonec: make(chan struct{}),
snapshotterReady: make(chan *snap.Snapshotter, 1),
// rest of structure populated after WAL replay
}
go rc.startRaft()
return commitC, errorC, rc.snapshotterReady
}
這個 raftNode 結構如下:
// A key-value stream backed by raft
type raftNode struct {
proposeC <-chan string // proposed messages (k,v)
confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
commitC chan<- *string // entries committed to log (k,v)
errorC chan<- error // errors from raft session
id int // client ID for raft session
peers []string // raft peer URLs
join bool // node is joining an existing cluster
waldir string // path to WAL directory
snapdir string // path to snapshot directory
getSnapshot func() ([]byte, error)
lastIndex uint64 // index of log at start
confState raftpb.ConfState
snapshotIndex uint64
appliedIndex uint64
// raft backing for the commit/error channel
node raft.Node
raftStorage *raft.MemoryStorage
wal *wal.WAL
snapshotter *snap.Snapshotter
snapshotterReady chan *snap.Snapshotter // signals when snapshotter is ready
snapCount uint64
transport *rafthttp.Transport
stopc chan struct{} // signals proposal channel closed
httpstopc chan struct{} // signals http server to shutdown
httpdonec chan struct{} // signals http server shutdown complete
}
比較重要的是 serveChannels
:
func (rc *raftNode) serveChannels() {
snap, err := rc.raftStorage.Snapshot()
if err != nil {
panic(err)
}
rc.confState = snap.Metadata.ConfState
rc.snapshotIndex = snap.Metadata.Index
rc.appliedIndex = snap.Metadata.Index
defer rc.wal.Close()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
// send proposals over raft
go func() {
confChangeCount := uint64(0)
for rc.proposeC != nil && rc.confChangeC != nil {
select {
case prop, ok := <-rc.proposeC:
if !ok {
rc.proposeC = nil
} else {
// blocks until accepted by raft state machine
rc.node.Propose(context.TODO(), []byte(prop))
}
case cc, ok := <-rc.confChangeC:
if !ok {
rc.confChangeC = nil
} else {
confChangeCount++
cc.ID = confChangeCount
rc.node.ProposeConfChange(context.TODO(), cc)
}
}
}
// client closed channel; shutdown raft if not already
close(rc.stopc)
}()
// event loop on raft state machine updates
for {
select {
case <-ticker.C:
rc.node.Tick()
// store raft entries to wal, then publish over commit channel
case rd := <-rc.node.Ready():
rc.wal.Save(rd.HardState, rd.Entries)
if !raft.IsEmptySnap(rd.Snapshot) {
rc.saveSnap(rd.Snapshot)
rc.raftStorage.ApplySnapshot(rd.Snapshot)
rc.publishSnapshot(rd.Snapshot)
}
rc.raftStorage.Append(rd.Entries)
rc.transport.Send(rd.Messages)
if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
rc.stop()
return
}
rc.maybeTriggerSnapshot()
rc.node.Advance()
case err := <-rc.transport.ErrorC:
rc.writeError(err)
return
case <-rc.stopc:
rc.stop()
return
}
}
}
這裏把 proposeC
收到的請求交給 node
處理。然後主要線程處理 Ready
, Ready
收到消息的時候,會依次:
-
先寫 wal
-
再寫 snap
-
試圖寫
rd.Entries
store -
transport 的網絡層 send
-
試圖 apply committed 的消息
-
調用
advance
ready 對應的邏輯是:
-
pb.HardState: 包含當前節點見過的最大的 term,以及在這個 term 給誰投過票,已經當前節點知道的 commit index
-
Messages: 需要廣播給所有 peers 的消息
-
CommittedEntries: 已經 commit 了,還沒有 apply 到狀態機的日誌
-
Snapshot: 需要持久化的快照
應用需要對 Ready 的處理包括:
-
將 HardState, Entries, Snapshot 持久化到 storage。
-
將 Messages(上文提到的 msgs) 非阻塞的廣播給其他 peers
-
將 CommittedEntries(已經 commit 還沒有 apply) 應用到狀態機。
-
如果發現 CommittedEntries 中有成員變更類型的 entry,調用 node 的 ApplyConfChange() 方法讓 node 知道 (這裏和 raft 論文不一樣,論文中只要節點收到了成員變更日誌就應用)
-
調用 Node.Advance() 告訴 raft node,這批狀態更新處理完了,狀態已經演進了,可以給我下一批 Ready 讓我處理。
實際上這裏就是對這個狀態機處理,但是感覺耦合的很嚴重,所以需要理解清楚這個代碼的層次。
kvstore
kvstore 是具體 apply 到的地方,最初要的方法如下:
func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) {
for data := range commitC {
if data == nil {
// done replaying log; new data incoming
// OR signaled to load snapshot
snapshot, err := s.snapshotter.Load()
if err == snap.ErrNoSnapshot {
return
}
if err != nil {
log.Panic(err)
}
log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
if err := s.recoverFromSnapshot(snapshot.Data); err != nil {
log.Panic(err)
}
continue
}
var dataKv kv
dec := gob.NewDecoder(bytes.NewBufferString(*data))
if err := dec.Decode(&dataKv); err != nil {
log.Fatalf("raftexample: could not decode message (%v)", err)
}
s.mu.Lock()
s.kvStore[dataKv.Key] = dataKv.Val
s.mu.Unlock()
}
if err, ok := <-errorC; ok {
log.Fatal(err)
}
}
這相當於一個 apply 線程
轉自:mwish
zhuanlan.zhihu.com/p/138563359
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/7OoGaZqix-xgGAjKJxHAug