etcd 存儲:如何實現鍵值對的讀寫操作?

你好,我是 aoho,今天我和你分享的主題是 etcd 存儲:如何實現鍵值對的讀寫操作?

我們在前面課時介紹了 etcd 的整體架構以及 etcd 常用的通信接口。在介紹 etcd 整體架構時,我們梳理了 etcd 的分層架構以及交互概覽。本課時將會聚焦於 etcd 存儲是如何實現鍵值對的讀寫操作。

本課時圍繞 etcd 底層讀寫的實現,首先會簡要介紹客戶端訪問 etcd 服務端讀寫的整個過程,然後是重點介紹讀寫的實現細節。

讀操作

在 etcd 中讀請求佔了大部分,是高頻的操作。我們使用 etcdctl 命令行工具進行讀操作:

$ etcdctl --endpoints http://localhost:2379 get foo

foo
bar

將整個讀操作劃分成如下幾個步驟:

接着就進入了讀請求的核心步驟,會經過線性讀 ReadIndex 模塊、MVCC (包含 treeIndex 和 BlotDB)模塊。

這裏提一下線性讀,線性讀是相對串行讀來說。集羣模式下會有多個 etcd 節點,不同節點之間可能存在一致性問題,串行讀直接返回狀態數據,不需要與集羣中其他節點交互。這種方式速度快,開銷小,但是會存在數據不一致的情況。而線性讀則需要集羣成員之間達成共識,存在開銷,響應速度相對慢。但是能夠保證數據的一致性,etcd 默認讀模式是線性讀。我們將在後面的課時重點介紹分佈式一致性的實現。

繼續往下,看看如何讀取 etcd 中的數據。etcd 中查詢請求,查詢單個鍵或者一組鍵,亦或是查詢數量,到了底層實際都會調用 rangeKeys 方法,我們來分析下這個方式的實現。range 請求的結構圖如下所示:

從上至下,查詢鍵值對的流程包括:

圖中的 ReadTx 和 BatchTx 是兩個接口,用於負責讀寫請求。我們在創建 backend 結構時,都會創建 readTx 和 batchTx 結構體,這兩個結構體分別實現了 ReadTx 和 BatchTx 接口,一個負責處理只讀請求,另一個負責處理讀寫請求。。

rangeKeys 方法的實現如下所示:

// 位於 mvcc/kvstore_txn.go:117
func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
 rev := ro.Rev
 if rev > curRev {
  return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
 }
 if rev <= 0 {
  rev = curRev
 }
 if rev < tr.s.compactMainRev {
  return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
 }
  // 獲取索引項 keyIndex,索引項中包含 Revision
 revpairs := tr.s.kvindex.Revisions(key, end, rev)
 tr.trace.Step("range keys from in-memory index tree")
  // 結果爲空,直接返回
 if len(revpairs) == 0 {
  return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
 }
 if ro.Count {
  return &RangeResult{KVs: nil, Count: len(revpairs), Rev: curRev}, nil
 }

 limit := int(ro.Limit)
 if limit <= 0 || limit > len(revpairs) {
  limit = len(revpairs)
 }

 kvs := make([]mvccpb.KeyValue, limit)
 revBytes := newRevBytes()
 for i, revpair := range revpairs[:len(kvs)] {
  revToBytes(revpair, revBytes)
    // UnsafeRange 實現了 ReadTx,查詢對應的鍵值對
  _, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)
  if len(vs) != 1 {
   tr.s.lg.Fatal(
    "range failed to find revision pair",
    zap.Int64("revision-main", revpair.main),
    zap.Int64("revision-sub", revpair.sub),
   )
  }
  if err := kvs[i].Unmarshal(vs[0]); err != nil {
   tr.s.lg.Fatal(
    "failed to unmarshal mvccpb.KeyValue",
    zap.Error(err),
   )
  }
 }
 tr.trace.Step("range keys from bolt db")
 return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
}

在上述代碼的實現中,我們需要通過 Revisions 方法從 Btree 中獲取範圍內所有的 keyIndex,以此才能獲取一個範圍內的所有鍵值對。Revisions 方法實現如下:

// 位於 mvcc/index.go:106
func (ti *treeIndex) Revisions(key, end []byte, atRev int64) (revs []revision) {
 if end == nil {
  rev, _, _, err := ti.Get(key, atRev)
  if err != nil {
   return nil
  }
  return []revision{rev}
 }
 ti.visit(key, end, func(ki *keyIndex) {
    // 使用 keyIndex.get 來遍歷整顆樹
  if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
   revs = append(revs, rev)
  }
 })
 return revs
}

如果只是獲取一個鍵對應的版本,使用 treeIndex 方法即可,但是一般會從 Btree 索引中獲取多個 Revision 值時,需要調用 keyIndex.get 方法來遍歷整顆樹並選取合適的版本。這是因爲 BoltDB 保存一個 key 的多個歷史版本。每一個 Key 的 keyIndex 中其實都存儲着多個歷史版本,我們需要根據傳入的參數返回正確的版本。

對於上層的鍵值存儲來說,它會利用這裏返回的 Revision 從真正存儲數據的 BoltDB 中查詢當前 Key 對應 Revision 的結果。BoltDB 內部使用的也是類似 bucket 的方式存儲,其實就是對應 MySQL 中的表結構,用戶的 key 數據存放的 bucket 名字的是 key,etcd MVCC 元數據存放的 bucket 是 meta。

寫操作

介紹完讀請求,我們回憶一下寫操作的實現。使用 etcdctl 命令行工具進行寫操作:

$ etcdctl --endpoints http://localhost:2379 put foo bar

將整個寫操作劃分成如下幾個步驟:

我們重點關注最後一步,學習如何更新和插入鍵值對。與上面一張圖相對應,我們來看下 put 接口的執行過程:

調用 put 向 etcd 寫入數據時,首先會使用傳入的鍵構建 keyIndex 結構體,基於 currentRevision 自增生成新的 revision 如 {1,0},並從 treeIndex 中獲取相關版本 revision 等信息;寫事務提交之後,將本次寫操作的緩存 buffer 合併到(merge)到讀緩存上(圖中 ReadTx 中的緩存)。代碼實現如下所示:

//位於 mvcc/index.go:53
func (ti *treeIndex) Put(key []byte, rev revision) {
 keyi := &keyIndex{key: key}
  // 加鎖,互斥
 ti.Lock()
 defer ti.Unlock()
  // 獲取版本信息
 item := ti.tree.Get(keyi)
 if item == nil {
  keyi.put(ti.lg, rev.main, rev.sub)
  ti.tree.ReplaceOrInsert(keyi)
  return
 }
 okeyi := item.(*keyIndex)
 okeyi.put(ti.lg, rev.main, rev.sub)
}

treeIndex.Put 在獲取 Btree 中的 keyIndex 結構之後,會通過 keyIndex.put 在其中加入新的 revision,方法實現如下:

// 位於 mvcc/key_index.go:77
func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
 rev := revision{main: main, sub: sub}
  // 校驗版本號
 if !rev.GreaterThan(ki.modified) {
  lg.Panic(
   "'put' with an unexpected smaller revision",
   zap.Int64("given-revision-main", rev.main),
   zap.Int64("given-revision-sub", rev.sub),
   zap.Int64("modified-revision-main", ki.modified.main),
   zap.Int64("modified-revision-sub", ki.modified.sub),
  )
 }
 if len(ki.generations) == 0 {
  ki.generations = append(ki.generations, generation{})
 }
 g := &ki.generations[len(ki.generations)-1]
 if len(g.revs) == 0 { // 創建一個新的鍵
  keysGauge.Inc()
  g.created = rev
 }
 g.revs = append(g.revs, rev)
 g.ver++
 ki.modified = rev
}

新的 revision 結構體寫入 keyIndex 鍵值索引時,都會改變 generation 結構體中的創建版本 、修改次數等參數,因此,基於 put 方法,我們就可以知道 generation 結構體中的各個成員如何定義和賦值。

revision{1,0} 是生成的全局版本號,作爲 BoltDB 的 key,經過序列化包括 key 名稱、key 創建時的版本號(create_revision)、value 值和租約等信息爲二進制數據之後,將填充到 BoltDB 的 value 中,同時將該鍵和 revision 等信息存儲到 Btree。

小結

本文主要介紹了 etcd 的底層如何實現讀寫操作。我們首先簡單介紹了客戶端與服務端讀寫操作的流程,之後重點分析了在 etcd 中如何讀寫數據。

讀寫操作依賴 MVCC 模塊的 treeIndex 和 BoltDB,treeIndex 是一個 內存索引模塊,用來保存鍵的歷史版本號信息;BoltDB 是一個基於 Btree 實現的數據庫,可以用來保存 etcd 的鍵值對數據。通過這兩個模塊之間的協作,實現了 etcd 數據的讀取和存儲。

學習完本課時,給大家留個小問題,etcd 寫事務的提交會涉及 B+ 重新平衡,這部分開銷昂貴該如何權衡呢?歡迎你在留言區提出。

當然,本課時僅是介紹了底層的存儲,對於如何實現分佈式數據一致性並沒有展開講解。我們將在下一講介紹 etcd-raft 如何實現分佈式一致性。

閱讀最新文章,關注公衆號:aoho 求索

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/U0MXNY36lnn9S492hAai7g