etcd 存儲引擎之主幹框架

0 前言

年前和大家一起完成了有關 lsm tree 話題的探討,並在系列完結時立下一個了 flag——下個系列,劍指 b+ 樹專題~

於是乎,擇日不如撞日,今天咱就肝上一把,開啓新專題,推動填坑之旅!

本系列我們將以 etcd 存儲引擎 boltdb 作爲 b + 樹 工程實踐案例進行學習,該項目開源地址爲:https://github.com/etcd-io/bbolt,go 語言純度接近 100%. (本系列涉及走讀的 boltdb 源碼版本爲 v1.3.8

此外,在這裏補充插入一個致敬環節. 在關於本專題內容的學習過程中,我藉助了叉鴿 boltdb 系列博客以及滴滴出行魏猛老師的分享,大大降低了學習的阻力,特此致敬一下.

順帶附上叉鴿系列博客傳送門,講得不錯,需要者自取:

https://blog.mrcroxx.com/posts/code-reading/boltdb-made-simple/0-introduction/

下面是本專題的分享節奏,計劃分爲四篇:

1 核心概念

1.1 bolt 之於 etcd

etcd 是一個具有強一致性的分佈式協調服務,基於 golang 實現,底層基於 raft 算法保證數據的強一致和高可用,對應開源地址: https://github.com/etcd-io/etcd

言歸正傳,本系列我們聚焦 etcd 存儲層引擎 boltdb 的實現原理. boltdb 在 etcd 整體架構中所屬的層次定位示意如上圖,它是由 go 語言實現的單機 kv 數據磁盤存儲系統:

1.2 存儲設計

下面我們拓展聊聊 boltdb 存儲技術實現. 本文作爲系列開篇,整體內容偏宏觀,本小節內容講解力度偏小、點到即止,更多細節內容在未來的存儲設計篇和 b + 樹實現篇中進一步展開.

1.2.1 讀寫

boltdb 存儲依賴於磁盤,針對於存儲數據的交互,分爲讀、寫流程:

1.2.2 page

基於局部性原理,操作系統下,內存與磁盤間數據的交換以頁 page 爲單位. 與之類似,boltdb 也是通過 page 爲單位,完成數據庫文件內容的組織.

在 boltdb 中,page 可以分爲以下四類:

每個 db 在初始化時,會先完成 4 個 page 的初始化和持久化:

1.2.3 b + 樹

boltdb 中,基於 b+ 樹實現數據的存儲. 這也是我將其作爲學習案例的初衷.

有關於 b+ 樹的詳細設定,我將在 b+ 樹實現篇中詳細展開,這裏僅一筆帶過:

b+ 樹是 b 樹的升級版本,本質上是一顆扁平化的 n 叉樹,葉子節點存儲真實數據,非葉子節點僅存儲索引信息,其拓撲結構形如下圖:

b+ 樹本身是偏理論性的定義,在落地實踐時可能會出現一定的差異化改造. 以 boltdb 的實現爲例,其中存在但不僅限的兩個較大改造點包括:

1.2.4 bucket

在 boltdb 中引入了桶 bucket 的設定. bucket 的作用是實現業務數據的隔離, 可以簡單把 bucket 類比於數據庫中的表,只不過 bucket 的形式會更加靈活一些,還能支持嵌套式的拓撲關係,形如上圖,school 和 school-class 是兩個合法的 bucket,且彼此爲父子關係.

從每個 db 會有個默認的 root bucket,以此爲起點可以衍生出一個 bucket 多叉樹,本身也是通過 b+ 樹的模型實現.

在邏輯意義上,每個 bucket 會有一棵獨立的 b+ 樹,用於存放當前 bucket 範圍內的 kv 數據.

1.3 事務執行

最後是關於事務 transaction 的部分.

boltdb 中的事務分爲只讀事務 read-only tx 和讀寫事務 read-write tx 兩類:

有關 boltdb 事務的更多設定、acid 性質的保證機制及事務的實現細節等內容,我們放在事務實現篇中詳細講解.

2 使用示例

本章通過一個單測示例向大家展示 boltdb 的基本用法.

2.1 啓動

啓動 boltdb 時,需要指定數據庫文件的路徑,並將文件權限設置爲可讀寫:

import (
    "testing"


    "go.etcd.io/bbolt"
)


func Test_boltDB(t *testing.T) {
    // 1 啓動數據庫
    db, err := bbolt.Open("./test_b.db", 0600, nil)
    if err != nil {
        t.Error(err)
        return
    }
    defer db.Close()
    // ...
}

2.2 建 bucket(表)

import (
    "testing"


    "go.etcd.io/bbolt"
)


func Test_boltDB(t *testing.T) {
    // 1 啓動數據庫
    // ...
    
    // 2 建表
    if err = db.Update(func(tx *bbolt.Tx) error {
        _, err := tx.CreateBucketIfNotExists([]byte("test"))
        return err
    }); err != nil {
        t.Error(err)
        return
    }


    // ...
}

2.3 增改

import (
    "testing"


    "go.etcd.io/bbolt"
)


func Test_boltDB(t *testing.T) {
    // 1 啓動數據庫
    // ...
    
    // 2 建表
    // ...


    // 3 增、改
    if err = db.Update(func(tx *bbolt.Tx) error {
        table := tx.Bucket([]byte("test"))
        if err := table.Put([]byte("a")[]byte("b")); err != nil {
            return err
        }
        return table.Put([]byte("c")[]byte("d"))
    }); err != nil {
        t.Error(err)
        return
    }


    // ...
}

2.4 刪

通過 bucket.Delete 方法,完成 key 的刪除:

import (
    "testing"


    "go.etcd.io/bbolt"
)


func Test_boltDB(t *testing.T) {
    // 1 啓動數據庫
    // ...
    
    // 2 建表
    // ...


    // 3 增、改
    // ...
    
    // 4 刪
    if err = db.Update(func(tx *bbolt.Tx) error {
        table := tx.Bucket([]byte("test"))
        return table.Delete([]byte("a"))
    }); err != nil {
        t.Error(err)
        return
    }


    // ...
}

2.5 查

import (
    "testing"


    "go.etcd.io/bbolt"
)


func Test_boltDB(t *testing.T) {
    // 1 啓動數據庫
    // ...
    
    // 2 建表
    // ...


    // 3 增、改
    // ...
    
    // 4 刪
    // ...


    // 5 查
    if err = db.View(func(tx *bbolt.Tx) error {
        table := tx.Bucket([]byte("test"))
        v1 := table.Get([]byte("c"))
        t.Logf("v of key c: %s", v1)
        v2 := table.Get([]byte("a"))
        t.Logf("v of key a: %s", v2)
        return nil
    }); err != nil {
        t.Error(err)
        return
    }
}

3 主流程走讀

在本章中,我們將以第 2 章示例代碼爲入口,進行幾個核心操作流程的源碼流程走讀,但涉足的源碼深度相對較淺:

3.1 db 定義

首先介紹 boltdb 中的一個核心類——DB. 對應爲一個數據庫實例的代碼抽象,其中包含的核心成員屬性,已通過圖解和源碼註釋的方式給出:

// boltdb 抽象的數據庫
type DB struct {
    // ...
    // 數據庫文件名稱
    path     string
    // 打開文件方法
    openFile func(string, int, os.FileMode) (*os.File, error)
    // 數據庫文件,所有數據存儲於此
    file     *os.File
    // 基於 mmap 技術映射的數據庫文件內容
    data     *[maxMapSize]byte
    // ...
    // 兩個輪換使用的 meta page
    meta0    *meta
    meta1    *meta
    // 數據庫單個 page 的大小,單位 byte
    pageSize int
    // 數據庫是否已啓動
    opened   bool
    // 全局唯一的讀寫事務
    rwtx     *Tx
    // 一系列只讀事務
    txs      []*Tx
    // freelist,管理空閒的 page
    freelist     *freelist
    freelistLoad sync.Once
    // 提高 page 字節數組複用率的對象池
    pagePool sync.Pool
    // ...
    
    // 互斥鎖,保證讀寫事務全局唯一
    rwlock   sync.Mutex   
    // 保護 meta page 的互斥鎖
    metalock sync.Mutex   
    // 保護 mmap 的讀寫鎖
    mmaplock sync.RWMutex 
    // 數據落盤持久化時使用的操作方法,對應爲 pwrite 操作
    ops struct {
        writeAt func([]byte, off int64) (n int, err error)
    }


    // 是否已只讀模式啓動數據庫
    readOnly bool
}

3.2 啓動

3.2.1 主流程

通過 Open 方法可以啓動 db,核心流程包括:

func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
    // 構造 db 實例
    db := &DB{
        opened: true,
    }
    // 啓用默認配置
    if options == nil {
        options = DefaultOptions
    }
    
    // ...
    // 默認不啓用只讀模式
    if options.ReadOnly {
        flag = os.O_RDONLY
        db.readOnly = true
    } else {
        // always load free pages in write mode
        db.PreLoadFreelist = true
    }


    // 打開數據庫文件的操作方法
    db.openFile = options.OpenFile
    if db.openFile == nil {
        db.openFile = os.OpenFile
    }


    // 打開數據庫文件
    var err error
    if db.file, err = db.openFile(path, flag|os.O_CREATE, mode); err != nil {
        _ = db.close()
        return nil, err
    }
    // 數據庫文件名稱賦值
    db.path = db.file.Name()


    // ...
    // 數據落盤操作
    db.ops.writeAt = db.file.WriteAt


    // 數據 page 大小
    if db.pageSize = options.PageSize; db.pageSize == 0 {
        // 默認等於操作系統 page 大小
        db.pageSize = defaultPageSize
    }


    // 倘若從零到一創建一個新的 db 文件,則需要進行初始化
    if info, err := db.file.Stat(); err != nil {
        _ = db.close()
        return nil, err
    } else if info.Size() == 0 {
        // 初始化 db
        if err := db.init(); err != nil {
            // ...
            _ = db.close()
            return nil, err
        }
    } 
    // ...


    // 對象池,用於複用 page 的字節數組
    db.pagePool = sync.Pool{
        New: func() interface{} {
            return make([]byte, db.pageSize)
        },
    }


    // 基於 mmap 建立數據庫文件和內存空間的映射
    if err := db.mmap(options.InitialMmapSize); err != nil {
        _ = db.close()
        return nil, err
    }


    // 預加載 freelist
    if db.PreLoadFreelist {
        db.loadFreelist()
    }


    // ...
    return db, nil
}

3.2.2 初始化

下面是啓用一個全新數據庫文件時,需要執行的初始化方法:

// 初始化一個全新的數據庫文件
func (db *DB) init() error {
    // 初始化數據庫的 4 個 page:meta page * 2 + freelist page + leaf page
    buf := make([]byte, db.pageSize*4)
    // 初始化 mata page
    for i := 0; i < 2; i++ {
        p := db.pageInBuffer(buf, pgid(i))
        p.id = pgid(i)
        p.flags = metaPageFlag


        // Initialize the meta page.
        m := p.meta()
        m.magic = magic
        m.version = version
        m.pageSize = uint32(db.pageSize)
        m.freelist = 2
        m.root = bucket{root: 3}
        m.pgid = 4
        m.txid = txid(i)
        m.checksum = m.sum64()
    }


    // 初始化 freelist page
    p := db.pageInBuffer(buf, pgid(2))
    p.id = pgid(2)
    p.flags = freelistPageFlag
    p.count = 0


    // 初始化空的 leaf page
    p = db.pageInBuffer(buf, pgid(3))
    p.id = pgid(3)
    p.flags = leafPageFlag
    p.count = 0


    // 將初始化的 4 個 page 落盤,基於 pwrite + fdatasync 操作
    if _, err := db.ops.writeAt(buf, 0); err != nil {
        return err
    }
    if err := fdatasync(db); err != nil {
        return err
    }
    db.filesz = len(buf)


    return nil
}

3.2.3 mmap

下面是通過 mmap 實現數據文件與內存映射的源碼,核心步驟包括:

func (db *DB) mmap(minsz int) (err error) {
    // 互斥鎖,保護 mmap 併發安全
    db.mmaplock.Lock()
    defer db.mmaplock.Unlock()


    info, err := db.file.Stat()
    // ...


    // 調整合適的 mmap 容量
    fileSize := int(info.Size())
    var size = fileSize
    if size < minsz {
        size = minsz
    }
    size, err = db.mmapSize(size)
    if err != nil {
        return err
    }


    // ...


    // 倘若此前已經有讀寫事務在運行,此時因爲要執行 mmap 操作,則需要對 bucket 內容進行重塑
    if db.rwtx != nil {
        db.rwtx.root.dereference()
    }


    // 解除之前建立的 mmap 映射
    if err = db.munmap(); err != nil {
        return err
    }


    // 建立新的 mmap 映射
    if err = mmap(db, size); err != nil {
        return err
    }


    // ...


    return nil
}

mmap 底層通過系統調用實現,不同的操作系統會有不同的實現細節. 以我當前所用的 mac 爲例,對應的 unix 系統版本實現源碼如下:

// mmap memory maps a DB's data file.
func mmap(db *DB, sz int) error {
    // Map the data file to memory.
    b, err := unix.Mmap(int(db.file.Fd()), 0, sz, syscall.PROT_READ, syscall.MAP_SHARED|db.MmapFlags)
    if err != nil {
        return err
    }


    // Advise the kernel that the mmap is accessed randomly.
    err = unix.Madvise(b, syscall.MADV_RANDOM)
    if err != nil && err != syscall.ENOSYS {
        // Ignore not implemented error in kernel because it still works.
        return fmt.Errorf("madvise: %s", err)
    }


    // Save the original byte slice and convert to a byte array pointer.
    db.dataref = b
    db.data = (*[maxMapSize]byte)(unsafe.Pointer(&b[0]))
    db.datasz = sz
    return nil
}

3.3 建 bucket(表)

一個 bucket 本質上是從屬於其父 bucket b+ 樹中的一筆特殊的 kv 對數據. 因此創建 bucket 的過程會和寫入 kv 數據的流程相類似:

func (b *Bucket) CreateBucket(key []byte) (*Bucket, error) {
    // ...
    // 獲取遊標
    c := b.Cursor()
    
    // 藉助遊標找到桶名 key 對應的位置
    k, _, flags := c.seek(key)


    // 桶已存在
    if bytes.Equal(key, k) {
        if (flags & bucketLeafFlag) != 0 {
            return nil, ErrBucketExists
        }
        return nil, ErrIncompatibleValue
    }


    // 創建新的桶實例
    var bucket = Bucket{
        bucket:      &bucket{},
        rootNode:    &node{isLeaf: true},
        FillPercent: DefaultFillPercent,
    }
    
    // 取得桶的序列化結果
    var value = bucket.write()


    // 將這個新桶對應的 kv 對數據寫入到 b+ 樹中
    key = cloneBytes(key)
    c.node().put(key, key, value, 0, bucketLeafFlag)


    // ...
    // 返回創建好的新桶
    return b.Bucket(key), nil
}

3.4 查 bucket(表)

通過名稱檢索 bucket 的流程,一定程度上和數據的查詢流程相類似:

func (b *Bucket) Bucket(name []byte) *Bucket {
    // 如果 map 中已經緩存了對應的桶,直接返回
    if b.buckets != nil {
        if child := b.buckets[string(name)]; child != nil {
            return child
        }
    }


    // 藉助遊標在 b+ 樹中檢索 kv 對
    c := b.Cursor()
    k, v, flags := c.seek(name)


    // ...
    // 找到桶後,對其反序列化
    var child = b.openBucket(v)
    // 緩存到 map 中
    if b.buckets != nil {
        b.buckets[string(name)] = child
    }
    // 返回桶
    return child
}

3.5 數據 crud

數據的 crud 過程同樣是藉助在 b+ 樹上游走的遊標 cursor 加以完成,下面分別示意增改、刪、查操作的源碼主流程:

func (b *Bucket) Put(key []byte, value []byte) error {
    // 前置校驗 
    // ...


    // 藉助遊標檢索到 k v 對所在的位置
    c := b.Cursor()
    k, _, flags := c.seek(key)


    // ...
    // 在對應位置中插入 kv 對內容
    key = cloneBytes(key)
    c.node().put(key, key, value, 0, 0)


    return nil
}
// 在表中刪除 key
func (b *Bucket) Delete(key []byte) error {
    // ...
    // 藉助遊標移動到 key 對應位置
    c := b.Cursor()
    k, _, flags := c.seek(key)


    // 倘若 key 不存在
    if !bytes.Equal(key, k) {
        return nil
    }


    // ...


    // 在 b+ 樹節點中刪除對應的 key
    c.node().del(key)


    return nil
}
func (b *Bucket) Get(key []byte) []byte {
    // 藉助遊標檢索到 kv 對所在位置
    k, v, flags := b.Cursor().seek(key)


    // ...
    // key 不存在,則返回空
    if !bytes.Equal(key, k) {
        return nil
    }
    
    // 返回對應的 value
    return v
}

3.6 數據落盤

在 boltdb 提交讀寫事務時,會一次性將更新的髒數據溢寫落盤:

更多細節參見下方的源碼註釋:

func (tx *Tx) Commit() error {
    // ...
    
    // 數據溢寫磁盤前,需要調整一輪 b+ 樹,保證其平衡性
    // rebalance 是爲了避免因爲 delete 操作,導致某些節點 kv 對數量太少,不滿足 b+ 樹平衡性要求
    tx.root.rebalance()
    // ...
 
    // spill 是爲了避免因爲 put 操作,導致某些節點 kv 對數量太多,不滿足 b+ 樹平衡性要求
    if err := tx.root.spill(); err != nil {
        tx.rollback()
        return err
    }
    
    
    // 事務更新到的髒數據溢寫落盤
    if err := tx.write(); err != nil {
        tx.rollback()
        return err
    }


    // ...


    // meta page 溢寫落盤
    if err := tx.writeMeta(); err != nil {
        tx.rollback()
        return err
    }
    // ...


    // 關閉事務
    tx.close()
    // ...


    return nil
}
// 事務髒頁溢寫落盤
func (tx *Tx) write() error {
    // 事務緩存的髒頁
    pages := make(pages, 0, len(tx.pages))
    for _, p := range tx.pages {
        pages = append(pages, p)
    }
    // 清空緩存
    tx.pages = make(map[pgid]*page)
    // 對髒頁進行排序
    sort.Sort(pages)


    // 按照順序,將髒頁溢寫落盤
    for _, p := range pages {
        // page 總大小,包含 overflow 不分
        rem := (uint64(p.overflow) + 1) * uint64(tx.db.pageSize)
        // page 的 offset,可以根據 page id 推算得到
        offset := int64(p.id) * int64(tx.db.pageSize)
        var written uintptr


        // Write out page in "max allocation" sized chunks.
        for {
            sz := rem
            if sz > maxAllocSize-1 {
                sz = maxAllocSize - 1
            }
            buf := unsafeByteSlice(unsafe.Pointer(p), written, 0, int(sz))
            // 將 page 溢寫到文件對應 offset 的位置
            if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
                return err
            }


            rem -= sz
            // 一次性寫完了
            if rem == 0 {
                break
            }


            // 如果沒有一次性寫完,下一輪接着寫
            offset += int64(sz)
            written += uintptr(sz)
        }
    }


    // fdatasync 操作,確保數據溢寫落盤完成
    if !tx.db.NoSync || IgnoreNoSync {
        if err := fdatasync(tx.db); err != nil {
            return err
        }
    }


    // 釋放這部分已落盤 page,倘若其不存在 overflow,說明是標準規格的字節數組,則清空內容,然後添加到對象池中進行復用
    for _, p := range pages {
        // Ignore page sizes over 1 page.
        // These are allocated using make() instead of the page pool.
        if int(p.overflow) != 0 {
            continue
        }


        buf := unsafeByteSlice(unsafe.Pointer(p), 0, 0, tx.db.pageSize)


        // See https://go.googlesource.com/go/+/f03c9202c43e0abb130669852082117ca50aa9b1
        for i := range buf {
            buf[i] = 0
        }
        tx.db.pagePool.Put(buf) //nolint:staticcheck
    }


    return nil
}

 至此,本篇結束.

4 展望

本文作爲 etcd 存儲引擎系列的開篇,帶着大家一起以一個相關宏觀的視角總覽了 boltdb 的架構設計與核心概念. 本文內容相對停滯於淺層,針對幾個核心方向的挖深力度有所不足,主要通過後續幾個篇章持續發力展開,在此做個展望:

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/oL_G8H_ROSF3TjtzBOGCow