200 行代碼實現基於 Paxos 的 KV 存儲

前言

寫完【paxos 的直觀解釋】之後,網友都說療效甚好,但是也會對這篇教程中一些環節提出疑問(有疑問說明真的看懂了 🤔),例如怎麼把只能確定一個值的 paxos 應用到實際場景中。

既然 Talk is cheap,那麼就 Show me the code,這次我們把教程中描述的內容直接用代碼實現出來,希望能覆蓋到教程中的涉及的每個細節。幫助大家理解 paxos 的運行機制。

這是一個基於 paxos,200 行代碼的 kv 存儲系統的簡單實現,作爲【paxos 的直觀解釋】這篇教程中的代碼示例部分。Paxos 的原理本文不再介紹了,本文提到的數據結構使用【protobuf】定義,網絡部分使用【grpc】定義。另外 200 行 go 代碼實現 paxos 存儲。

文中的代碼可能做了簡化, 完整代碼實現在【paxoskv】這個項目中(naive 分支)。

運行和使用

跑一下:

git clone https://github.com/openacid/paxoskv.git
cd paxoskv
go test -v ./...

這個項目中除了 paxos 實現,用 3 個 test case 描述了 3 個 paxos 運行的例子,

測試代碼描述了幾個 paxos 運行例子的行爲,運行測試可以確認 paxos 的實現符合預期。

本文中 protobuf 的數據結構定義如下:

service PaxosKV {
    rpc Prepare (Proposer) returns (Acceptor) {}
    rpc Accept (Proposer) returns (Acceptor) {}
}
message BallotNum {
    int64 N          = 1;
    int64 ProposerId = 2;
}
message Value {
    int64 Vi64 = 1;

}
message PaxosInstanceId {
    string Key = 1;
    int64  Ver = 2;
}
message Acceptor {
    BallotNum LastBal = 1;
    Value     Val     = 2;
    BallotNum VBal    = 3;
}
message Proposer {
    PaxosInstanceId Id  = 1;
    BallotNum       Bal = 2;
    Value           Val = 3;
}

以及主要的函數實現:

// struct KVServer
Storage : map[string]Versions
func Accept(c context.Context, r *Proposer) (*Acceptor, error)
func Prepare(c context.Context, r *Proposer) (*Acceptor, error)
func getLockedVersion(id *PaxosInstanceId) *Version

// struct Proposer
func Phase1(acceptorIds []int64, quorum int) (*Value, *BallotNum, error)
func Phase2(acceptorIds []int64, quorum int) (*BallotNum, error)
func RunPaxos(acceptorIds []int64, val *Value) *Value
func rpcToAll(acceptorIds []int64, action string) []*Acceptor

func ServeAcceptors(acceptorIds []int64) []*grpc.Server

從頭實現 Paxoskv

Paxos 相關的數據結構

在這個例子中我們的數據結構和服務框架使用【protobuf】和【grpc】實現,首先是最底層的 paxos 數據結構:

Proposer 和 Acceptor

在【slide-27】中我們介紹了 1 個 Acceptor 所需的字段:

在存儲端(Acceptor)也有幾個概念:

  • last_rnd 是 Acceptor 記住的最後一次進行寫前讀取的 Proposer(客戶端)是誰,以此來決定誰可以在後面真正把一個值寫到存儲中。

  • v 是最後被寫入的值。

  • vrnd 跟 v 是一對, 它記錄了在哪個 Round 中 v 被寫入了。

原文中這些名詞是參考了【paxos made simple】中的名稱,但在【Leslie Lamport】後面的幾篇 paper 中都換了名稱,爲了後續方便,在【paxoskv】的代碼實現中也做了相應的替換:

rnd      ==> Bal   // 每一輪paxos的編號, BallotNum
vrnd     ==> VBal  // 在哪個Ballot中v被Acceptor 接受(voted)
last_rnd ==> LastBal

Proposer 的字段也很簡單,它需要記錄:

於是在這個項目中用 protobuf 定義這兩個角色的數據結構,如代碼【paxoskv.proto】中的聲明,如下:

message Acceptor {
  BallotNum LastBal = 1;
  Value     Val = 2;
  BallotNum VBal = 3;
}

message Proposer {
  PaxosInstanceId Id = 1;

  BallotNum Bal = 2;
  Value     Val = 3;
}

其中 Proposer 還需要一個 PaxosInstanceId,來標識當前的 paxos 實例爲哪個 key 的哪個 version 在做決定,【paxos made simple】中只描述了一個 paxos 實例的算法(對應一個 key 的一次修改),要實現多次修改,就需要增加這個字段來區分不同的 paxos 實例:

message PaxosInstanceId {
  string Key = 1;
  int64  Ver = 2;
}

【paxoskv.proto】還定義了一個 BallotNum,因爲要保證全系統內的 BallotNum 都有序且不重複,一般的做法就是用一個本地單調遞增的整數,和一個全局唯一的 id 組合起來實現:

message BallotNum {
    int64 N = 1;
    int64 ProposerId = 2;
}

定義 RPC 消息結構

RPC 消息定義了 Proposer 和 Acceptor 之間的通訊。

在一個 paxos 系統中,至少要有 4 個消息:

如【slide-28】所描述的(原文中使用 rnd,這裏使用 Bal,都是同一個概念):

Phase- 1(Prepare):

request:
    Bal: int

reply:
    LastBal: int
    Val:     string
    VBal:    int

Phase- 2(Accept):

request:
    Bal: int
    Val:   string

reply:
    LastBal: int

在 Prepare-request 或 Accept-request 中,發送的是一部分或全部的 Proposer 的字段,因此我們在代碼中:

在使用的時候只使用其中幾個字段,對應我們的 RPC 服務【PaxosKV】定義如下:

service PaxosKV {
    rpc Prepare (Proposer) returns (Acceptor) {}
    rpc Accept (Proposer) returns (Acceptor) {}
}

使用 Protobuf 和 Grpc 生成服務框架

protobuf 可以將【paxoskv.proto】直接生成 go 代碼(代碼庫中已經包含了生成好的代碼:【paxoskv.pb.go】,只有修改【paxoskv.proto】之後才需要重新生成)

  protoc \
      --proto_path=proto \
      --go_out=plugins=grpc:paxoskv \
      paxoskv.proto

生成後的【paxoskv.pb.go】代碼中可以看到,其中主要的數據結構例如 Acceptor 的定義:

type Acceptor struct {
  LastBal *BallotNum ...
  Val     *Value ...
  VBal    *BallotNum ...
        ...
}

以及 KV 服務的 client 端和 server 端的代碼,client 端是實現好的,server 端只有一個 interface,後面我們需要來完成它的實現:

type paxosKVClient struct {
  cc *grpc.ClientConn
}
type PaxosKVClient interface {
  Prepare(
    ctx context.Context,
    in *Proposer,
    opts ...grpc.CallOption
  ) (*Acceptor, error)

  Accept(
    ctx context.Context,
    in *Proposer,
    opts ...grpc.CallOption
  ) (*Acceptor, error)
}

type PaxosKVServer interface {
  Prepare(context.Context,
          *Proposer) (*Acceptor, error)
  Accept(context.Context,
         *Proposer) (*Acceptor, error)
}

實現存儲的服務器端

【impl.go】是所有實現部分,我們定義一個 KVServer 結構體,用來實現 grpc 服務的 interface PaxosKVServer;其中使用一個內存裏的 map 結構模擬數據的存儲:

type Version struct {
  mu       sync.Mutex
  acceptor Acceptor
}
type Versions map[int64]*Version
type KVServer struct {
  mu      sync.Mutex
  Storage map[string]Versions
}

其中 Version 對應一個 key 的一次變化,也就是對應一個 paxos 實例,Versions 對應一個 key 的一系列變化,Storage 就是所有 key 的所有變化。

實現 Acceptor 的 grpc 服務 handler

Acceptor,是這個系統裏的 server 端,監聽一個端口,等待 Proposer 發來的請求並處理,然後給出應答。

根據 paxos 的定義,Acceptor 的邏輯很簡單:在【slide-28】中描述:

根據教程裏的描述,爲 KVServer 定義 handle Prepare-request 的代碼:

func (s *KVServer) Prepare(
    c context.Context,
    r *Proposer) (*Acceptor, error) {

  v := s.getLockedVersion(r.Id)
  defer v.mu.Unlock()

  reply := v.acceptor

  if r.Bal.GE(v.acceptor.LastBal) {
    v.acceptor.LastBal = r.Bal
  }

  return &reply, nil
}

這段代碼分 3 步:

其中 getLockedVersion() 從 KVServer.Storage 中根據 request 發來的 PaxosInstanceId 中的字段 key 和 ver 獲取一個指定 Acceptor 的實例:

func (s *KVServer) getLockedVersion(
    id *PaxosInstanceId) *Version {

  s.mu.Lock()
  defer s.mu.Unlock()

  key := id.Key
  ver := id.Ver
  rec, found := s.Storage[key]
  if !found {
    rec = Versions{}
    s.Storage[key] = rec
  }

  v, found := rec[ver]
  if !found {
    // initialize an empty paxos instance
    rec[ver] = &Version{
      acceptor: Acceptor{
        LastBal: &BallotNum{},
        VBal:    &BallotNum{},
      },
    }
    v = rec[ver]
  }

  v.mu.Lock()
  return v
}

handle Accept-request 的處理類似,在【slide-31】中描述:

Accept() 要記錄 3 個值,

func (s *KVServer) Accept(
    c context.Context,
    r *Proposer) (*Acceptor, error) {

  v := s.getLockedVersion(r.Id)
  defer v.mu.Unlock()

  reply := Acceptor{
    LastBal: &*v.acceptor.LastBal,
  }

  if r.Bal.GE(v.acceptor.LastBal) {
    v.acceptor.LastBal = r.Bal
    v.acceptor.Val = r.Val
    v.acceptor.VBal = r.Bal
  }

  return &reply, nil
}

Acceptor 的邏輯到此完整了,再看 Proposer:

實現 Proposer 邏輯

Proposer 的運行分 2 個階段,Phase1 和 Phase2,與 Prepare 和 Accept 對應。

Phase1

在【impl.go】的實現中,Proposer.Phase1() 函數負責 Phase1 的邏輯:

func (p *Proposer) Phase1(
    acceptorIds []int64,
    quorum int) (*Value, *BallotNum, error) {

  replies := p.rpcToAll(acceptorIds, "Prepare")

  ok := 0
  higherBal := *p.Bal
  maxVoted := &Acceptor{VBal: &BallotNum{}}

  for _, r := range replies {
    if !p.Bal.GE(r.LastBal) {
      higherBal = *r.LastBal
      continue
    }

    if r.VBal.GE(maxVoted.VBal) {
      maxVoted = r
    }

    ok += 1
    if ok == quorum {
      return maxVoted.Val, nil, nil
    }
  }

  return nil, &higherBal, NotEnoughQuorum
}

這段代碼首先通過 rpcToAll() 向所有 Acceptor 發送 Prepare-request 請求, 然後找出所有的成功的 reply:

最後,成功應答如果達到多數派(quorum),則認爲 Phase1 完成,返回最後一個被 voted 的值,也就是 VBal 最大的那個。讓上層調用者繼續 Phase2;

如果沒有達到 quorum,這時可能是有多個 Proposer 併發運行而造成衝突,有更大的 ballot number,這時則把見到的最大 ballot number 返回,由上層調用者提升 ballot number 再重試。

client 與 server 端的連接

上面用到的 rpcToAll 在這個項目中的實現 client 端(Proposer)到 server 端(Acceptor)的通訊,它是一個十分 簡潔美觀 簡陋的 grpc 客戶端實現:

func (p *Proposer) rpcToAll(
    acceptorIds []int64,
    action string) []*Acceptor {

  replies := []*Acceptor{}

  for _, aid := range acceptorIds {
    var err error
    address := fmt.Sprintf("127.0.0.1:%d",
        AcceptorBasePort+int64(aid))

    conn, err := grpc.Dial(
        address, grpc.WithInsecure())
    if err != nil {
      log.Fatalf("did not connect: %v", err)
    }
    defer conn.Close()

    c := NewPaxosKVClient(conn)

    ctx, cancel := context.WithTimeout(
        context.Background(), time.Second)
    defer cancel()

    var reply *Acceptor
    if action == "Prepare" {
      reply, err = c.Prepare(ctx, p)
    } else if action == "Accept" {
      reply, err = c.Accept(ctx, p)
    }
    if err != nil {
      continue
    }
    replies = append(replies, reply)
  }
  return replies
}

Phase2

Proposer 運行的 Phase2 在【slide-30】中描述,比 Phase1 更簡單:

在第 2 階段 phase-2,Proposer X 將它選定的值寫入到 Acceptor 中,這個值可能是它自己要寫入的值,或者是它從某個 Acceptor 上讀到的 v(修復)。

func (p *Proposer) Phase2(
    acceptorIds []int64,
    quorum int) (*BallotNum, error) {

  replies := p.rpcToAll(acceptorIds, "Accept")

  ok := 0
  higherBal := *p.Bal
  for _, r := range replies {
    if !p.Bal.GE(r.LastBal) {
      higherBal = *r.LastBal
      continue
    }
    ok += 1
    if ok == quorum {
      return nil, nil
    }
  }

  return &higherBal, NotEnoughQuorum
}

我們看到,它只需要確認成 Phase2 的功應答數量達到 quorum 就可以了。另外同樣它也有責任在 Phase2 失敗時返回看到的更大的 ballot number,因爲在 Phase1 和 Phase2 之間可能有其他 Proposer 使用更大的 ballot number 打斷了當前 Proposer 的執行,就像【slide-33】的衝突解決的例子中描述的那樣。

完整的 Paxos 邏輯

完整的 paxos 由 Proposer 負責,包括:如何選擇一個值,使得一致性得以保證。如【slide-29】中描述的:

Proposer X 收到多數(quorum)個應答,就認爲是可以繼續運行的。如果沒有聯繫到多於半數的 acceptor,整個系統就 hang 住了,這也是 paxos 聲稱的只能運行少於半數的節點失效。這時 Proposer 面臨 2 種情況:

所有應答中都沒有任何非空的 v,這表示系統之前是乾淨的,沒有任何值已經被其他 paxos 客戶端完成了寫入(因爲一個多數派讀一定會看到一個多數派寫的結果),這時 Proposer X 繼續將它要寫的值在 phase-2 中真正寫入到多於半數的 Acceptor 中。

如果收到了某個應答包含被寫入的 v 和 vrnd,這時,Proposer X 必須假設有其他客戶端(Proposer)正在運行,雖然 X 不知道對方是否已經成功結束,但任何已經寫入的值都不能被修改!所以 X 必須保持原有的值。於是 X 將看到的最大 vrnd 對應的 v 作爲 X 的 phase-2 將要寫入的值。

這時實際上可以認爲 X 執行了一次(不知是否已經中斷的)其他客戶端(Proposer)的修復。

基於 Acceptor 的服務端和 Proposer 2 個 Phase 的實現,最後把這些環節組合到一起組成一個完整的 paxos,在我們的代碼【RunPaxos】這個函數中完成這些事情:

func (p *Proposer) RunPaxos(
    acceptorIds []int64,
    val *Value) *Value {

  quorum := len(acceptorIds)/2 + 1

  for {
    p.Val = val

    maxVotedVal, higherBal, err := p.Phase1(
        acceptorIds, quorum)

    if err != nil {
      p.Bal.N = higherBal.N + 1
      continue
    }

    if maxVotedVal != nil {
      p.Val = maxVotedVal
    }

    // val == nil 是一個讀操作,
    // 沒有讀到voted值不需要Phase2
    if p.Val == nil {
      return nil
    }

    higherBal, err = p.Phase2(
        acceptorIds, quorum)

    if err != nil {
      p.Bal.N = higherBal.N + 1
      continue
    }

    return p.Val
  }
}

這段代碼完成了幾件事:運行 Phase1,有 voted 的值就選它,沒有就選自己要寫的值 val,然後運行 Phase2。

就像 Phase1 Phase2 中描述的一樣,任何一個階段,如果沒達到 quorum,就需要提升遇到的更大的 ballot number,重試去解決遇到的 ballot number 衝突。

這個函數接受 2 個參數:

其中,按照 paxos 的描述,這個值 val 不一定能提交:如果 paxos 在 Phase1 完成後看到了其他已經接受的值(voted value),那就要選擇已接收的值,放棄 val。遇到這種情況,在我們的系統中,例如要寫入  key=foo,ver=3 的值爲 bar,如果沒能選擇 bar,就要選擇下一個版本  key=foo,ver=4 再嘗試寫入。

這樣不斷的重試循環, 寫操作最終都能成功寫入一個值(voted value)。

實現讀操作

在我們這個 NB(naive and basic)的系統中,讀和寫一樣都要通過一次 paxos 算法來完成。因爲寫入過程就是一次 paxos 執行,而 paxos 只保證在一個 quorum 中寫入確定的值,不保證所有節點都有這個值。因此一次讀操作如果要讀到最後寫入的值,至少要進行一次多數派讀。

但多數派讀還不夠:它可能讀到一個未完成的 paxos 寫入,如【slide-11】中描述的髒讀問題,讀取到的最大 VBal 的值,可能不是確定的值(寫入到多數派)。

例如下面的狀態:

Val=foo    Val=bar    ?
VBal=3     VBal=2     ?
-------    -------    --
A0         A1         A2

如果 Proposer 試圖讀,在 Phase1 聯繫到 A0 A1 這 2 個 Acceptor,那麼 foo 和 bar 這 2 個值哪個是確定下來的,要取決於 A2 的狀態。所以這時要再把最大 VBal 的值跑完一次  Phase2,讓它被確定下來,然後才能把結果返回給上層(否則另一個 Proposer 可能聯繫到 A1 和 A2,然後認爲 Val=bar 是被確定的值)。

當然如果 Proposer 在讀取流程的 Phase1 成功後沒有看到任何已經 voted 的值(例如沒有看到 foo 或 bar), 就不用跑 Phase2 了。

所以在這個版本的實現中,讀操作也是一次【RunPaxos】函數的調用,除了它並不 propose 任何新的值,爲了支持讀操作,所以在上面的代碼中 Phase2 之前加入一個判斷,如果傳入的 val 和已 voted 的值都爲空,則直接返回:

if p.Val == nil {
  return nil
}

【Example_setAndGetByKeyVer】這個測試用例展示瞭如何使用 paxos 實現一個 kv 存儲,實現讀和寫的代碼大概這樣:

prop := Proposer{
  Id: &PaxosInstanceId{
    Key: "foo",
    Ver: 0,
  },
  Bal: &BallotNum{N: 0, ProposerId: 2},
}

// 寫:
v := prop.RunPaxos(acceptorIds, &Value{Vi64: 5})

// 讀:
v := prop.RunPaxos(acceptorIds, nil)

到現在爲止,本文中涉及到的功能都實現完了,完整實現在【impl.go】中。

接着我們用測試用例實現 1 下【paxos 的直觀解釋】中列出的 2 個例子, 從代碼看 poxos 的運行:

文中例子

第 1 個例子是 paxos 無衝突的運行【slide-32】:

把它寫成 test case,確認教程中每步操作之後的結果都如預期 【TestCase1SingleProposer】:

func TestCase1SingleProposer(t *testing.T) {
  ta := require.New(t)

  acceptorIds := []int64{0, 1, 2}
  quorum := 2

  // 啓動3個Acceptor的服務
  servers := ServeAcceptors(acceptorIds)
  defer func() {
    for _, s := range servers {
      s.Stop()
    }
  }()

  // 用要更新的key和version定義paxos 實例的id
  paxosId := &PaxosInstanceId{
    Key: "i",
    Ver: 0,
  }

  var val int64 = 10

  // 定義Proposer, 隨便選個Proposer id 10.
  var pidx int64 = 10
  px := Proposer{
    Id:  paxosId,
    Bal: &BallotNum{N: 0, ProposerId: pidx},
  }

  // 用左邊2個Acceptor運行Phase1,
  // 成功, 沒有看到其他的ballot number
  latestVal, higherBal, err := px.Phase1(
      []int64{0, 1}, quorum)

  ta.Nil(err, "constitued a quorum")
  ta.Nil(higherBal, "no other proposer is seen")
  ta.Nil(latestVal, "no voted value")

  // Phase1成功後, 因爲沒有看到其他voted的值,
  // Proposer選擇它自己的值進行後面的Phase2
  px.Val = &Value{Vi64: val}

  // Phase 2
  higherBal, err = px.Phase2(
      []int64{0, 1}, quorum)

  ta.Nil(err, "constitued a quorum")
  ta.Nil(higherBal, "no other proposer is seen")
}

第 2 個例子對應 2 個 Proposer 遇到衝突並解決衝突的例子,略長不貼在文中了,代碼可以在 【TestCase2DoubleProposer】看到。

工程

Paxos 的出色之處在於它將分佈式一致性問題簡化到最核心的部分,沒有任何多餘的設計。

工程實現上我們多數時候會用一個 paxos 的變體,它需要對 paxos 中的實例擴展爲一系列多值的操作日誌,支持完整的狀態機,以及對運維提供支持成員變更,所以 raft 在工程上更受歡迎:

https://github.com/datafuselabs/openraft

創建 openraft 這個項目的目的是:

openraft 正在解決的這些問題,使之不僅僅是一個爲了性能和安全用 rust 重寫的項目。

參考鏈接

本文用到的代碼在 paxoskv 項目的 naive 分支上:

【https://github.com/openacid/paxoskv/tree/naive】

關於 Databend

Databend 是一款開源、彈性、低成本,基於對象存儲也可以做實時分析的新式數倉。期待您的關注,一起探索雲原生數倉解決方案,打造新一代開源 Data Cloud。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/YZfDgjzu49MZUQm927RDvw