Golang 實現 Paxos 分佈式共識算法
定義相關結構體
1type proposer struct {
2 // server id
3 id int
4 // the largest round number the server has seen
5 round int
6 // proposal number = (round number, serverID)
7 number int
8 // proposal value
9 value string
10 acceptors map[int]bool
11 net network
12}
13
14
1type acceptor struct {
2 // server id
3 id int
4 // the number of the proposal this server will accept, or 0 if it has never received a Prepare request
5 promiseNumber int
6 // the number of the last proposal the server has accepted, or 0 if it never accepted any.
7 acceptedNumber int
8 // the value from the most recent proposal the server has accepted, or null if it has never accepted a proposal
9 acceptedValue string
10
11 learners []int
12 net network
13}
14
15
-
promiseNumber
:承諾的提案編號 -
acceptedNumber
:接受的提案編號 -
acceptedValue
:接受的提案值
定義消息結構體
-
Phase 1 請求:提案編號
-
Phase 1 響應:如果有被 Accepted 的提案,返回提案編號和提案值
-
Phase 2 請求:提案編號和提案值
-
Phase 2 響應:Accepted 的提案編號和提案值
這樣看,我們的消息結構體只需要提案編號和提案值,加上一個消息類型,用來區分是哪個階段的消息。消息結構體定義在 message.go 文件,具體如下:
1// MsgType represents the type of a paxos phase.
2type MsgType uint8
3
4const (
5 Prepare MsgType = iota
6 Promise
7 Propose
8 Accept
9)
10
11type message struct {
12 tp MsgType
13 from int
14 to int
15 number int // proposal number
16 value string // proposal value
17}
18
19
實現網絡
網絡上可以做的選擇和優化很多,但這裏爲了保持簡單的原則,我們將網絡定義成 interface
。後面完全可以改成 RPC 或 API 等其它通信方式來實現(沒錯,我已經實現了一個 Go RPC 的版本了)。
1type network interface {
2 send(m message)
3 recv(timeout time.Duration) (message, bool)
4}
5
6
接下里我們去實現 network 接口:
1type Network struct {
2 queue map[int]chan message
3}
4
5func newNetwork(nodes ...int) *Network {
6 pn := &Network{
7 queue: make(map[int]chan message, 0),
8 }
9
10 for _, a := range nodes {
11 pn.queue[a] = make(chan message, 1024)
12 }
13 return pn
14}
15
16func (net *Network) send(m message) {
17 log.Printf("net: send %+v", m)
18 net.queue[m.to] <- m
19}
20
21func (net *Network) recvFrom(from int, timeout time.Duration) (message, bool) {
22 select {
23 case m := <-net.queue[from]:
24 log.Printf("net: recv %+v", m)
25 return m, true
26 case <-time.After(timeout):
27 return message{}, false
28 }
29}
30
31
就是用 queue
來記錄每個節點的 chan
,key 則是節點的 server id。
發送消息則將 Message
發送到目標節點的 chan
中,接受消息直接從 chan
中讀取數據,並等待對應的超時時間。
不需要做其它網絡地址、包相關的東西,所以非常簡單。具體在 network.go
文件。
實現單元測試
這個項目主要使用 go 單元測試來檢驗正確性,我們主要測試兩種場景:
-
TestSingleProposer(單個 Proposer)
-
TestTwoProposers(多個 Proposer)
測試代碼通過運行 Paxos 後檢查 Chosen 返回的提案值是否符合預期。
實現算法流程
按照角色將文件分爲 proposer.go, acceptor.go 和 learner.go,每個文件都有一個 run()
函數來運行程序,run()
函數執行條件判斷,並在對應的階段執行對應的函數。
按照僞代碼描述,我們很容易實現 Phase 1 和 Phase 2,把每個階段的請求響應都作爲一個函數,我們一步步來看。
第一輪 Prepare RPCs 請求階段:
1// Phase 1. (a) A proposer selects a proposal number n
2
3// and sends a prepare request with number n to
4// a majority of acceptors.
5func (p *proposer) prepare() []message {
6 p.round++
7 p.number = p.proposalNumber()
8 msg := make([]message, p.majority())
9 i := 0
10
11 for to := range p.acceptors {
12 msg[i] = message{
13 tp: Prepare,
14 from: p.id,
15 to: to,
16 number: p.number,
17 }
18 i++
19 if i == p.majority() {
20 break
21 }
22 }
23 return msg
24}
25
26// proposal number = (round number, serverID)
27func (p *proposer) proposalNumber() int {
28 return p.round<< 16 | p.id
29}
30
31
Prepare 請求階段我們將 round+1 然後發送給多數派 Acceptors。
注:這裏很多博客和教程都會將 Prepare RPC 發給所有的 Acceptors,6.824 的 paxos 實驗就將 RPC 發送給所有 Acceptors。這裏保持和論文一致,只發送給 a majority of acceptors。
第一輪 Prepare RPCs 響應階段:
接下來在 acceptor.go
文件中處理請求:
1func (a *acceptor) handlePrepare(args message) (message, bool) {
2 if a.promiseNumber >= args.number {
3 return message{}, false
4 }
5 a.promiseNumber = args.number
6 msg := message{
7 tp: Promise,
8 from: a.id,
9 to: args.from,
10 number: a.acceptedNumber,
11 value: a.acceptedValue,
12 }
13 return msg, true
14}
15
16
-
如果
args.number
大於acceptor.promiseNumber
,則承諾將不會接收編號小於args.number
的提案(即a.promiseNumber = args.number
)。如果之前有提案被 Accepted 的話,響應還應包含 a.acceptedNumber 和 a.acceptedValue。 -
否則忽略,返回
false
。
第二輪 Accept RPCs 請求階段:
1func (p *proposer) accept() []message {
2 msg := make([]message, p.majority())
3 i := 0
4 for to, ok := range p.acceptors {
5 if ok {
6 msg[i] = message{
7 tp: Propose,
8 from: p.id,
9 to: to,
10 number: p.number,
11 value: p.value,
12 }
13 i++
14 }
15
16 if i == p.majority() {
17 break
18 }
19 }
20 return msg
21}
22
23
當 Proposer 收到超過半數 Acceptor 的響應後,Proposer 向多數派的 Acceptor 發起請求並帶上提案編號和提案值。
第二輪 Accept RPCs 響應階段:
1func (a *acceptor) handleAccept(args message) bool {
2 number := args.number
3 if number >= a.promiseNumber {
4 a.acceptedNumber = number
5 a.acceptedValue = args.value
6 a.promiseNumber = number
7 return true
8 }
9
10 return false
11}
12
13
Acceptor 收到 Accept()
請求,在這期間如果 Acceptor 沒有對比 a.promiseNumber 更大的編號另行 Promise,則接受該提案。
別忘了:Learning a Chosen Value
在 Paxos 中有一個十分容易混淆的概念:Chosen Value 和 Accepted Value,但如果你看過論文,其實已經說得非常直接了。論文的 2.3 節 Learning a Chosen Value 開頭就說:
To learn that a value has been chosen, a learner must find out that a proposal has been accepted by a majority of acceptors.
所以 Acceptor 接受提案後,會將接受的提案廣播 Leaners,一旦 Leaners 收到超過半數的 Acceptors 的 Accepted 提案,我們就知道這個提案被 Chosen 了。
1func (l *learner) chosen() (message, bool) {
2 acceptCounts := make(map[int]int)
3 acceptMsg := make(map[int]message)
4
5 for _, accepted := range l.acceptors {
6 if accepted.number != 0 {
7 acceptCounts[accepted.number]++
8 acceptMsg[accepted.number] = accepted
9 }
10 }
11
12 for n, count := range acceptCounts {
13 if count >= l.majority() {
14 return acceptMsg[n], true
15 }
16 }
17 return message{}, false
18}
19
20
運行和測試
代碼拉下來後,直接運行:
1go test
2
3
寫在後面
爲什麼不用 mit 6.824 的課程代碼?
之前我曾把 mit 6.824 的 Raft 答案推到自己的 Github,直到 2020 開課的時候 mit 的助教發郵件讓我將我的代碼轉爲 private,因爲這樣會導致學習課程的人直接搜到代碼,而無法保證作業獨立完成。
確實,實驗是計算機最不可或缺的環節,用 mit 6.824 2015 的 paxos 代碼會導致很多學習者不去自己解決困難,直接上網搜代碼,從而導致學習效果不好,違背了 mit 的初衷。
當然,你也可以說現在網上以及很容易搜到 6.824 的各種代碼了,但出於之前 mit 助教的郵件,我不會將作業代碼直接發出來。
感興趣的同學可以到 2015 版本學習:http://nil.csail.mit.edu/6.824/2015/
未來計劃
-
實現一個完整的(包含網絡和存儲的) Paxos
-
基於 Paxos 實現一個 Paxos KV 存儲
-
實現其它 Paxos 變種
歡迎各位朋友催更……
結語
本文代碼在 Github 上,如本文有什麼遺漏或者不對之處,或者各位朋友有什麼新的想法,歡迎提 issue 討論。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/4q-63pZodAGjsSpOsOL2Fg?scene=25#wechat_redirect