如何從 0 到 1 實現一個基於 Bitcask 的 kv 存儲引擎

願景

今年大部分業餘時間都在 nutsdb 的開源貢獻上,nutsdb 是基於 bitcask 模型實現的持久化存儲引擎,提供了諸如 list,set 等多種豐富的數據結構。近來很多小夥伴,其中也有一些我的好朋友陸陸續續加入到這個項目上來。爲了幫助小夥伴們快速熟悉整個項目,我會把之前寫的一些文章分享給他們,但是我覺得這樣可能還是不太夠,因爲之前做的是關於性能優化的事情,文章是偏向於性能優化方面的思考。對於 bitcask 整體架構的解釋,實現,感覺寫着墨不多,意識到這一點之後,我就想寫一篇文章來解釋一下 bitcask 的核心概念,並且花了幾個小時的時間寫下了一個簡單版本的 demo(寫 unit test 測試之後,幾百行代碼只有一個小 bug,小竊喜)。在幫助大家瞭解了核心概念之後,就算是有了一個基本的入門了,後面可以更快的參與到項目中來。這是我寫這篇文章的初衷,也同時歡迎更多的小夥伴參與到這個項目中來。

這篇文章會結合 bitcask 論文以及我的代碼實現進行分析,主要是講的是 “怎麼做”,對於“爲什麼” 可能側重並不多。另外該 bitcask 實現我已經開源到我的 github 上,可以在 github 上看到更多實現細節,鏈接:https://github.com/elliotchenzichang/tiny-bitcask ,歡迎 star。

整體架構

該怎麼開始這個故事呢,我思慮良久,決定從 db 的整體架構開始闡述,然後在自底向上的講述每一個部分的實現細節。爲什麼是自底向上呢?因爲從下往上講可以從點到線,從線到面,慢慢的撥開雲霧,看見一整個 db 的實現,會有一種世界在面前緩緩打開的感覺。另外,如果是從上到下來看結合代碼講的時候,可能有一些代碼會變得不好解釋。

言歸正傳,如上面所說我們要實現一個基於 Bitcask 模型的 kv 存儲引擎,那麼對於持久化存儲引擎而言,數據的最終歸宿是磁盤。而我們知道,程序是運行在內存中的,所以存儲引擎提供需要做的就是以某種方式把用戶給的數據存進磁盤,也以某種方式將用戶的數據從磁盤裏面拿出來使用。至於這些方式的設計的是否高效,就是存儲引擎設計的藝術所在。其實粗略來看,存儲引擎的整體架構整體如下圖,內存中放置索引,可以直接 (比如 bitcask) 或者間接 (比如 leveldb 的 SST 的索引形式) 的找到數據,磁盤中存儲用戶的數據,可以是同構的數據文件(比如 bitcask 全是 data_file),也可以是異構的數據文件(比如 leveldb 的 WAL log 和 SST)。

bitcask 採用的是一種比較簡單的形式,如下圖所示,內存中會記錄每條數據在磁盤中的位置,以及 key 和 value 的長度,這樣就可以直接通過一次系統調用在數據存放的位置把它拿出來了。

大概講述了整體的架構之後我們來看看代碼實現中主要的對象有哪些,自底向上的看。

  1. Entry:代表 db 中一條數據的信息。

  2. Storage:與文件系統打交道的對象,包括了寫入,讀取數據。

  3. Index,索引,記錄一條數據的具體信息,主要是數據在磁盤中的位置。

  4. db,db 的實體。包含了 db 的各種操作,包括讀取,寫入數據。

接下來我們看看具體的實現,下面解析會結合代碼分析,在代碼的關鍵部分我已經寫好了註釋。繫好安全帶,發車了!

  1. 數據的編碼與解碼

首先我們要講的是,一條數據是以怎麼樣的形式存進磁盤的。磁盤纔不理你是放進來的是什麼東西,他只知道在他身上存放的是一堆二進制,至於那些二進制是什麼,由放進來的應用程序來定義。大概邏輯如下圖所示,應用程序需要自己實現對磁盤數據寫入和讀出時候的編碼和解碼。

在 kv 存儲引擎中一條數據在磁盤中的是如下圖所示。整體上來看我們會有一個 meta,key,value,key 和 value 就不必多說了,就是真實的數據部分。那麼 meta 是什麼呢?meta 是這條數據的元數據,也就是起到描述作用的數據,比如 key 有多長,value 有多長,在什麼位置,以及寫入數據的時間,其實這個時間戳可以理解爲數據的版本,可以是物理時間,也可以是邏輯時間(邏輯時間需要自己實現)。meta 的 crc 部分是做數據校驗用的,因爲磁盤有時候會出現一些意外,比如一個比特位上的數據發生了變化,從存儲 1 變成 0 或者從 0 到 1,在或者磁盤數據丟失,也是可能的,所以要加上 crc 在讀出數據的時候再讀計算出 crc 的值和存在磁盤裏的 crc 做比較,如果不一致說明數據出現了問題。

下面是代碼實現的節選,其中包含了 Entry 和 Meta 這兩個主要數據結構的定義,以及數據編碼的實現。在寫入數據的時候,我們會將一個內存中的 Entry 對象 Encode 編碼成字節數組然後將字節數據存進磁盤中,在讀取數據的時候再將拿到的字節數組解碼,這樣就組成了我們的數據編解碼過程。

// Entry代表數據。
type Entry struct {
  key   []byte
  value []byte
  meta  *Meta
}
// Meta是元數據
type Meta struct {
  crc       uint32
  position  uint64
  timeStamp uint64
  keySize   uint32
  valueSize uint32
}
//這個方法的功能是將一個Entry對象編碼成byte數組
func (e *Entry) Encode() []byte {
  // size是meta+key+value的長度。
  size := e.Size()
  buf := make([]byte, size)
  //以小端字節序將數字寫入到字節數組中
  binary.LittleEndian.PutUint64(buf[4:12], e.meta.position)
  binary.LittleEndian.PutUint64(buf[12:20], e.meta.timeStamp)
  binary.LittleEndian.PutUint32(buf[20:24], e.meta.keySize)
  binary.LittleEndian.PutUint32(buf[24:28], e.meta.valueSize)
  // 操作完meta,下面是操作key和value
  copy(buf[MetaSize:MetaSize+len(e.key)], e.key)
  copy(buf[MetaSize+len(e.key):MetaSize+len(e.key)+len(e.value)], e.value)
  // 計算crc32校驗值
  c32 := crc32.ChecksumIEEE(buf[4:])
  binary.LittleEndian.PutUint32(buf[0:4], c32)
  return buf
}
  1. 數據的存儲與索引

首先我們知道在 bitcask 模型中有兩種文件,一種叫 active data file,這個文件可讀可寫,寫入操作(包括刪除)只會寫入到 active data file 中,Older data file 是 active data file 數據達到閾值之後轉化來的,只可以讀取數據,不可以寫入數據。

在實現的時候,我們把文件名定義成數字,並且在文件存儲的數據到達閾值的時候讓當前 active data file 的文件名 + 1 變成下一個 active data file 的文件名,這麼做有什麼好處呢?

這意味着數據文件中文件名所代表的數字最大的文件,就是 active data file,另外,依照這個寫入的邏輯,我們可以得到這樣一個結論。假設 fid 是文件的名字,off 是數據在文件中的位置,那麼兩條數據可以表示爲 fid1.off1,fid2.off2, 並且如果這兩條數據寫入的是同一個 key,那麼那條數據是最新的呢?毫無疑問,如果 fid 大的數據是最新的,如果 fid 相等,off 大的數據是最新的。這個也很好理解嘛,越往後寫入的數據就越新。有了這條結論我們後續的很多操作都會變簡單一些。

數據寫入到磁盤之後要把當前最新的數據更新到索引中。索引結構如下,hashmap 的 key 是存入數據的 key,Index 記錄了找到這條數據的關鍵信息,主要是 fid 和 off,這兩個字段連起來含義就是:數據存在那個文件 (fid) 的什麼位置(off),

type keyDir struct {
  index map[string]*Index
}
type Index struct {
  fid       int
  off       int64
  timestamp uint64
}

存儲數據的主要流程如下:

// Storage這個結構封裝了和磁盤操作的相關邏輯
type Storage struct {
  // 代表db所在的目錄
  dir      string
  // 代表可寫入文件的閾值
  fileSize int64
  // 當前active data file的相關信息
  af       *ActiveFile
  // db中所有數據文件的文件描述符(fd)緩存,免得重複打開文件描述符導致性能消耗。
  fds      map[int]*os.File
}
// activeFile封裝了當前可寫入文件的信息
type ActiveFile struct {
  // 代表ative data file的名字
  fid int
  // 文件描述符
  f   *os.File
  // 當前寫入數據的最新位置,可以理解爲文件的尾部
  off int64
}
func (s *Storage) writeAt(bytes []byte) (i *Index, err error) {
  err = s.af.writeAt(bytes)
  if err != nil {
    return nil, err
  }
  i = &Index{
    fid: s.af.fid,
    off: s.af.off,
  }
  s.af.off += int64(len(bytes))
  // 如果當前的off大於設置的閾值,進行actice file的切換
  // 具體操作是新建一個名爲fid + 1 的文件,然後將af切換成代表最新可寫入文件的對象。
  if s.af.off >= s.fileSize {
    err := s.rotate()
    if err != nil {
      return nil, err
    }
  }
  return i, nil
}
// 代表在可寫入文件寫入數據
func (af *ActiveFile) writeAt(bytes []byte) error {
  n, err := af.f.WriteAt(bytes, af.off)
  if n < len(bytes) {
    return writeMissDataErr
  }
  return err
}

講完了寫入流程,我還是想多費一些筆墨講講怎麼讀取數據出來,這裏 Index 的實現其實和 bitcask 論文中的不太一樣,論文中還會記錄 keySize 和 valueSize,在讀取的時候其實一次性讀取就可以了,用 metaSize+keySize+valueSize 就是整條數據的長度。那麼我這裏爲什麼不在 Index 中存儲 KeySize 和 ValueSize 了呢?主要是爲了和奔潰恢復做一個複用,在奔潰回覆的場景中,內存裏是沒有索引信息的,所以需要兩次讀取,一次是讀出 meta,根據 meta 上面記錄的 keySize 和 valueSize,再讀取 key 和 value。

  1. 併發處理

由於論文中沒有提到 bitcask 是怎麼實現併發控制的,這也是優化 nutsdb 併發性能的重大議題,目前我也有在看一些論文和做相關的實踐。在這裏,我們採用的是比較原始的方式。在整個 db 下面加上一把讀寫鎖,有數據寫入的時候就加上寫鎖,讀取數據的時候就加上讀鎖。其實類似於 mysql innodb 的讀提交。

  1. 數據 Merge

之前我們提到了,db 在運行過程中寫入操作一直是追加寫入,考慮下面的操作:

set key_1 value_1
set key_1 value_2

在寫入值爲 value_2 的數據之後,原本的數據就會變成無效數據,爲了節省磁盤空間,就需要一個操作來清理掉無效的數據。這個操作就是 Merge 操作。那麼怎麼做呢?之前提到的通過 fid 和 off 比較數據的新舊的結論在這個時候就派上用場了。因爲 fid 越大,off 越大,數據也就越新,merge 操作要求我們的也是保留最新的數據。所以 merge 操作分爲以下幾步:

  1. 將文件按照名字 (除了 active data file) 從小往大讀取,文件從頭讀到尾。

  2. 遇到最新的數據就寫入到 ative data file,並更新該數據的索引到最新的寫入位置。當一個文件中的所有最新數據都寫入到別的地方,那麼意味着這個文件中所有的數據都是舊數據,就可以把這個文件刪除掉啦。

代碼如下:

func (db *DB) Merge() error {
  // 給整個db加入寫鎖防止在寫入數據的時候有別的程序併發寫入。當然這裏實則鎖住了整個db,效率很低。
  db.rw.Lock()
  defer db.rw.Unlock()
  fids, err := getFids(db.s.dir)
  if err != nil {
    return err
  }
  if len(fids) < 2 {
    return NoNeedToMerge
  }
  // fid排序
  sort.Ints(fids)
  // 這裏將active data file排除在外
  // 因爲最新的數據重新寫入邏輯是寫在active data file中
  // 如果還讀取active data file的最新數據再寫入到自己身上,就相當於蛇咬自己的尾巴,愛的魔力轉圈圈圈了。
  for _, fid := range fids[:len(fids)-1] {
    var off int64 = 0
    for {
      entry, err := db.s.readEntry(fid, off)
      if err == nil {
        off += int64(entry.Size())
        oldIndex := db.kd.index[string(entry.key)]
        // 判斷讀出來的數據是不是最新的,用索引中存儲的位置比較,如果fid和off都對上了,就是最新數據。
        if oldIndex.fid == fid && oldIndex.off == off {
          // 重新寫入該數據,拿到最新寫入位置的索引,並更新。
          newIndex, err := db.s.writeAt(entry.Encode())
          if err != nil {
            return err
          }
          db.kd.index[string(entry.key)] = newIndex
        }
      } else {
        if err == io.EOF {
          break
        }
        return err
      }
    }
    // 文件中的最新數據都重新寫入完畢之後,這個文件的數據都是老數據了,就可以刪除了。
    err = os.Remove(fmt.Sprintf("%s/%d%s", db.s.dir, fid, fileSuffix))
    if err != nil {
      return err
    }
  }
  return nil
}
  1. 崩潰恢復

當 db crash 之後需要重新打開的時候,如何恢復現場呢?其實這個問題在理解 merge 的操作之後就會變的很簡單。邏輯是一樣的,按照文件名大小依次讀取數據,將數據的位置信息構建成索引插入到內存索引中,不需要理會這個 key 在索引中是否存在,因爲當前讀到的數據,必定比索引中的存的索引所代表的數據要新。因爲當前讀到的數據 fid 或 off 必定是大於索引中存儲的 index 信息的。這裏就不貼代碼了 hh,感覺理解 merge 之後這裏應該很快就理解啦。

總結

本篇文章我理解爲是一篇基於 bitcask kv 存儲引擎的保姆級教程了。不過這篇文章的目的還是讓大家對 bitcask 和 nutsdb 有整體的瞭解,並且很多地方實現的個人感覺並不是很好,比如 merge 可以做到和主流程併發的,異步的進行,然後併發性能也是可以提升的,另外刪除的 api 也沒有提供 hhh。不過讀者朋友們如果能通過看我這篇文章就理解了 bitcask,或者可以自己動手也搞一個簡單的,我認爲我的目的就達到了。相關的代碼已經開源,對一些細節之處需要整體瞭解的可以直接去我的 github 倉庫看哦,鏈接:https://github.com/elliotchenzichang/tiny-bitcask(歡迎 star),另外我也不知道現在的代碼中會不會有什麼 bug,如果 github 中的代碼和文章中的有出入,請以 github 爲準。

參考資料

  1. nutsdb:https://github.com/nutsdb/nutsdb

  2. tiny-bitcask:https://github.com/elliotchenzichang/tiny-bitcask

  3. bitcask paper:https://riak.com/assets/bitcask-intro.pdf

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/pJ_0rQ6B0dFkMai3gVZ_UQ