etcd 存儲:如何實現鍵值對的讀寫操作?
你好,我是 aoho,今天我和你分享的主題是 etcd 存儲:如何實現鍵值對的讀寫操作?
我們在前面課時介紹了 etcd 的整體架構以及 etcd 常用的通信接口。在介紹 etcd 整體架構時,我們梳理了 etcd 的分層架構以及交互概覽。本課時將會聚焦於 etcd 存儲是如何實現鍵值對的讀寫操作。
本課時圍繞 etcd 底層讀寫的實現,首先會簡要介紹客戶端訪問 etcd 服務端讀寫的整個過程,然後是重點介紹讀寫的實現細節。
讀操作
在 etcd 中讀請求佔了大部分,是高頻的操作。我們使用 etcdctl 命令行工具進行讀操作:
$ etcdctl --endpoints http://localhost:2379 get foo
foo
bar
將整個讀操作劃分成如下幾個步驟:
-
etcdctl 會創建一個 clientv3 庫對象,選取一個合適的 etcd 節點;
-
調用 KVServer 模塊的 Range RPC 方法(上一課時有講解),發送請求;
-
攔截器攔截,主要做一些校驗和監控;
-
調用 KVServer 模塊的 Range 接口獲取數據;
接着就進入了讀請求的核心步驟,會經過線性讀 ReadIndex 模塊、MVCC (包含 treeIndex 和 BlotDB)模塊。
這裏提一下線性讀,線性讀是相對串行讀來說。集羣模式下會有多個 etcd 節點,不同節點之間可能存在一致性問題,串行讀直接返回狀態數據,不需要與集羣中其他節點交互。這種方式速度快,開銷小,但是會存在數據不一致的情況。而線性讀則需要集羣成員之間達成共識,存在開銷,響應速度相對慢。但是能夠保證數據的一致性,etcd 默認讀模式是線性讀。我們將在後面的課時重點介紹分佈式一致性的實現。
繼續往下,看看如何讀取 etcd 中的數據。etcd 中查詢請求,查詢單個鍵或者一組鍵,亦或是查詢數量,到了底層實際都會調用 rangeKeys 方法,我們來分析下這個方式的實現。range 請求的結構圖如下所示:
從上至下,查詢鍵值對的流程包括:
-
在 treeIndex 中根據鍵利用 BTree 快速查詢該鍵對應的索引項 keyIndex,索引項中包含 Revision;
-
根據查詢到的版本號信息 Revision,在 Backend 的緩存 buffer 中利用二分法查找,如果命中則直接返回;
-
若緩存中不符合條件,在 BlotDB 中查找(基於 BlotDB 的索引),查詢之後返回鍵值對信息。
圖中的 ReadTx 和 BatchTx 是兩個接口,用於負責讀寫請求。我們在創建 backend 結構時,都會創建 readTx 和 batchTx 結構體,這兩個結構體分別實現了 ReadTx 和 BatchTx 接口,一個負責處理只讀請求,另一個負責處理讀寫請求。。
rangeKeys
方法的實現如下所示:
// 位於 mvcc/kvstore_txn.go:117
func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
rev := ro.Rev
if rev > curRev {
return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
}
if rev <= 0 {
rev = curRev
}
if rev < tr.s.compactMainRev {
return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
}
// 獲取索引項 keyIndex,索引項中包含 Revision
revpairs := tr.s.kvindex.Revisions(key, end, rev)
tr.trace.Step("range keys from in-memory index tree")
// 結果爲空,直接返回
if len(revpairs) == 0 {
return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
}
if ro.Count {
return &RangeResult{KVs: nil, Count: len(revpairs), Rev: curRev}, nil
}
limit := int(ro.Limit)
if limit <= 0 || limit > len(revpairs) {
limit = len(revpairs)
}
kvs := make([]mvccpb.KeyValue, limit)
revBytes := newRevBytes()
for i, revpair := range revpairs[:len(kvs)] {
revToBytes(revpair, revBytes)
// UnsafeRange 實現了 ReadTx,查詢對應的鍵值對
_, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)
if len(vs) != 1 {
tr.s.lg.Fatal(
"range failed to find revision pair",
zap.Int64("revision-main", revpair.main),
zap.Int64("revision-sub", revpair.sub),
)
}
if err := kvs[i].Unmarshal(vs[0]); err != nil {
tr.s.lg.Fatal(
"failed to unmarshal mvccpb.KeyValue",
zap.Error(err),
)
}
}
tr.trace.Step("range keys from bolt db")
return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
}
在上述代碼的實現中,我們需要通過 Revisions
方法從 Btree 中獲取範圍內所有的 keyIndex,以此才能獲取一個範圍內的所有鍵值對。Revisions
方法實現如下:
// 位於 mvcc/index.go:106
func (ti *treeIndex) Revisions(key, end []byte, atRev int64) (revs []revision) {
if end == nil {
rev, _, _, err := ti.Get(key, atRev)
if err != nil {
return nil
}
return []revision{rev}
}
ti.visit(key, end, func(ki *keyIndex) {
// 使用 keyIndex.get 來遍歷整顆樹
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
revs = append(revs, rev)
}
})
return revs
}
如果只是獲取一個鍵對應的版本,使用 treeIndex 方法即可,但是一般會從 Btree 索引中獲取多個 Revision 值時,需要調用 keyIndex.get 方法來遍歷整顆樹並選取合適的版本。這是因爲 BoltDB 保存一個 key 的多個歷史版本。每一個 Key 的 keyIndex 中其實都存儲着多個歷史版本,我們需要根據傳入的參數返回正確的版本。
對於上層的鍵值存儲來說,它會利用這裏返回的 Revision 從真正存儲數據的 BoltDB 中查詢當前 Key 對應 Revision 的結果。BoltDB 內部使用的也是類似 bucket 的方式存儲,其實就是對應 MySQL 中的表結構,用戶的 key 數據存放的 bucket 名字的是 key,etcd MVCC 元數據存放的 bucket 是 meta。
寫操作
介紹完讀請求,我們回憶一下寫操作的實現。使用 etcdctl 命令行工具進行寫操作:
$ etcdctl --endpoints http://localhost:2379 put foo bar
將整個寫操作劃分成如下幾個步驟:
-
客戶端通過負載均衡算法選擇一個 etcd 節點,發起 gRPC 調用;
-
etcd server 收到客戶端請求;
-
經過 gRPC 攔截、Quota 校驗,Quota 模塊用於校驗 etcd db 文件大小是否超過了配額;
-
接着 KVServer 模塊將請求發送給本模塊中的 raft,這裏負責與 etcd raft 模塊進行通信。發起一個提案,命令爲
put foo bar
,即使用 put 方法將 foo 更新爲 bar; -
提案經過轉發之後,半數節點成功持久化;
-
MVCC 模塊更新狀態機。
我們重點關注最後一步,學習如何更新和插入鍵值對。與上面一張圖相對應,我們來看下 put 接口的執行過程:
調用 put 向 etcd 寫入數據時,首先會使用傳入的鍵構建 keyIndex 結構體,基於 currentRevision 自增生成新的 revision 如 {1,0},並從 treeIndex 中獲取相關版本 revision 等信息;寫事務提交之後,將本次寫操作的緩存 buffer 合併到(merge)到讀緩存上(圖中 ReadTx 中的緩存)。代碼實現如下所示:
//位於 mvcc/index.go:53
func (ti *treeIndex) Put(key []byte, rev revision) {
keyi := &keyIndex{key: key}
// 加鎖,互斥
ti.Lock()
defer ti.Unlock()
// 獲取版本信息
item := ti.tree.Get(keyi)
if item == nil {
keyi.put(ti.lg, rev.main, rev.sub)
ti.tree.ReplaceOrInsert(keyi)
return
}
okeyi := item.(*keyIndex)
okeyi.put(ti.lg, rev.main, rev.sub)
}
treeIndex.Put 在獲取 Btree 中的 keyIndex 結構之後,會通過 keyIndex.put 在其中加入新的 revision,方法實現如下:
// 位於 mvcc/key_index.go:77
func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
rev := revision{main: main, sub: sub}
// 校驗版本號
if !rev.GreaterThan(ki.modified) {
lg.Panic(
"'put' with an unexpected smaller revision",
zap.Int64("given-revision-main", rev.main),
zap.Int64("given-revision-sub", rev.sub),
zap.Int64("modified-revision-main", ki.modified.main),
zap.Int64("modified-revision-sub", ki.modified.sub),
)
}
if len(ki.generations) == 0 {
ki.generations = append(ki.generations, generation{})
}
g := &ki.generations[len(ki.generations)-1]
if len(g.revs) == 0 { // 創建一個新的鍵
keysGauge.Inc()
g.created = rev
}
g.revs = append(g.revs, rev)
g.ver++
ki.modified = rev
}
新的 revision 結構體寫入 keyIndex 鍵值索引時,都會改變 generation 結構體中的創建版本 、修改次數等參數,因此,基於 put
方法,我們就可以知道 generation 結構體中的各個成員如何定義和賦值。
revision{1,0} 是生成的全局版本號,作爲 BoltDB 的 key,經過序列化包括 key 名稱、key 創建時的版本號(create_revision)、value 值和租約等信息爲二進制數據之後,將填充到 BoltDB 的 value 中,同時將該鍵和 revision 等信息存儲到 Btree。
小結
本文主要介紹了 etcd 的底層如何實現讀寫操作。我們首先簡單介紹了客戶端與服務端讀寫操作的流程,之後重點分析了在 etcd 中如何讀寫數據。
讀寫操作依賴 MVCC 模塊的 treeIndex 和 BoltDB,treeIndex 是一個 內存索引模塊,用來保存鍵的歷史版本號信息;BoltDB 是一個基於 Btree 實現的數據庫,可以用來保存 etcd 的鍵值對數據。通過這兩個模塊之間的協作,實現了 etcd 數據的讀取和存儲。
學習完本課時,給大家留個小問題,etcd 寫事務的提交會涉及 B+ 重新平衡,這部分開銷昂貴該如何權衡呢?歡迎你在留言區提出。
當然,本課時僅是介紹了底層的存儲,對於如何實現分佈式數據一致性並沒有展開講解。我們將在下一講介紹 etcd-raft 如何實現分佈式一致性。
閱讀最新文章,關注公衆號:aoho 求索
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/U0MXNY36lnn9S492hAai7g