Go 語言從 0 到 1 實現最簡單的數據庫!

 後臺開發對於數據庫操作是必不可少的事情,瞭解數據庫原理對於平常的工作的內功積累還是很有幫助的,這裏實現一個最簡單的數據庫加深自己對數據庫的理解。

一、go 實現數據庫目的

二、數據庫選擇 SQLite

選擇 SQLite(https://www.sqlite.org/arch.html)原因是數據庫完全開源,實現簡單,並且有 C 語言最簡單的實現版本,因此參考 go 語言實現一個數據庫加深對於關係型數據庫的理解。

三、SQLite 主要架構

其中:前端的輸入是一個 SQL 查詢。輸出是 sqlite 虛擬機字節碼(本質上是一個可以在數據庫上操作的編譯程序) 後端:VM 將前端生成的字節作爲指令,然後對一個表或者多個表或索引進行操作,每一個表或者索引都存儲在 B 樹中,VM 本質上時指令的分支選擇語句。B 樹組成了每一個節點,每個節點的最大長度時一頁。B 樹可以通過 pager 的命令,將數據保存到磁盤上。pager 收到數據讀寫的命令,負責數據偏移與讀寫,它還將最近訪問的頁面緩存在內存中,並確定何時需要將這些頁面寫回磁盤。

雷普勒

啓動 sqlite,會有一個讀寫命令循環:

main 函數將有一個無限循環來打印提示,獲取一行輸入,然後處理該行輸入:

// run main 主函數,這樣寫方便單元測試
func run()  {
   table, err := dbOpen("./db.txt")
   if err != nil {
      panic(err)
   }
   for  {
      printPrompt()
      // 語句解析
      inputBuffer, err := readInput()
      if err != nil {
         fmt.Println("read err", err)
      }
      // 特殊操作
      if len(inputBuffer) != 0 && inputBuffer[0] == '.' {
         switch doMetaCommand(inputBuffer, table) {
         case metaCommandSuccess:
            continue
         case metaCommandUnRecongnizedCommand:
            fmt.Println("Unrecognized command", inputBuffer)
            continue
         }
      }
      // 普通操作 code Generator
      statement := Statement{}
      switch prepareStatement(inputBuffer, &statement) {
      case prepareSuccess:
         break;
      case prepareUnrecognizedStatement:
         fmt.Println("Unrecognized keyword at start of ", inputBuffer)
         continue
      default:
         fmt.Println("invalid unput ", inputBuffer)
         continue
      }
      res := executeStatement(&statement, table)
      if res == ExecuteSuccess {
         fmt.Println("Exected")
         continue
      }
      if res == ExecuteTableFull {
         fmt.Printf("Error: Table full.\n");
         break
      }
      if res == EXECUTE_DUPLICATE_KEY {
         fmt.Printf("Error: Duplicate key.\n");
         break;
      }
   }
}

處理特殊的元語句如下:

func doMetaCommand(input string, table *Table) metaCommandType {
   if input == ".exit" {
      dbClose(table)
      os.Exit(0)
      return metaCommandSuccess
   }
   if input == ".btree" {
      fmt.Printf("Tree:\n");
      print_leaf_node(getPage(table.pager, 0));
      return metaCommandSuccess;
   }
   if input == ".constants" {
      fmt.Printf("Constants:\n");
      print_constants();
      return metaCommandSuccess
   }
   return metaCommandUnRecongnizedCommand
}

效果如下:

四、最簡單的 “SQL 編譯器和 “VM”(虛擬機)

(一)prepareStatement 爲最簡單的解析器 “SQL 編譯器”

當前改解析器,最簡單到還沒有識別出 SQL 語句,只是寫死識別兩個單詞的 SQL 語句:

func prepareStatement(input string, statement *Statement)PrepareType {
   if  len(input) >= 6 && input[0:6] == "insert" {
      statement.statementType = statementInsert
      inputs := strings.Split(input, " ")
      if len(inputs) <=1 {
         return prepareUnrecognizedStatement
      }
      id, err := strconv.ParseInt(inputs[1], 10, 64)
      if err != nil {
         return prepareUnrecognizedSynaErr
      }
      statement.rowToInsert.ID =  int32(id)
      statement.rowToInsert.UserName = inputs[2]
      statement.rowToInsert.Email = inputs[3]
      return prepareSuccess
   }
   if len(input) >= 6 && input[0:6] == "select" {
      statement.statementType = statementSelect
      return prepareSuccess
   }
   return prepareUnrecognizedStatement
}

(二)最簡單的 “虛擬機”(VM)執行器

// executeStatement 實行sql語句 ,解析器解析程statement,將最終成爲我們的虛擬機
func executeStatement(statement *Statement, table *Table)  executeResult{
   switch statement.statementType {
      case statementInsert:
         return executeInsert(statement, table)
   case statementSelect:
      return executeSelect(statement, table)
   default:
      fmt.Println("unknown statement")
   }
   return ExecuteSuccess
}

(三)最簡單的插入的數據結構

需要插入序列化的數據格式如下:

將一列進行序列化代碼如下:

// 將row序列化到指針,爲標準寫入磁盤做準備
func serializeRow(row *Row, destionaton unsafe.Pointer) {
   ids := Uint32ToBytes(row.ID)
   q := (*[ROW_SIZE]byte)(destionaton)
   copy(q[0:ID_SIZE], ids)
   copy(q[ID_SIZE+1:ID_SIZE+USERNAME_SIZE], (row.UserName))
   copy(q[ID_SIZE+USERNAME_SIZE+1: ROW_SIZE], (row.Email))
}

(四)從文件去取出反序列化

// deserializeRow 將文件內容序列化成數據庫元數據
func deserializeRow(source unsafe.Pointer, rowDestination *Row) {
   ids := make([]byte, ID_SIZE, ID_SIZE)
   sourceByte := (*[ROW_SIZE]byte)(source)
   copy(ids[0:ID_SIZE], (*sourceByte)[0:ID_SIZE])
   rowDestination.ID = BytesToInt32(ids)
   userName := make([]byte, USERNAME_SIZE, USERNAME_SIZE)
   copy(userName[0:], (*sourceByte)[ID_SIZE+1: ID_SIZE + USERNAME_SIZE])
   realNameBytes := getUseFulByte(userName)
   rowDestination.UserName = (string)(realNameBytes)
   emailStoreByte :=  make([]byte, EMAIL_SIZE, EMAIL_SIZE)
   copy(emailStoreByte[0:], (*sourceByte)[1+ ID_SIZE + USERNAME_SIZE: ROW_SIZE])
   emailByte := getUseFulByte(emailStoreByte)
   rowDestination.Email = (string)(emailByte)
}

(五)呼叫器

主要功能寫入到磁盤,數據結構:

// Pager 管理數據從磁盤到內存
type Pager struct {
   osfile     *os.File;
   fileLength int64;
   numPages   uint32;
   pages      []unsafe.Pointer;  // 存儲數據
}

整個數據庫的數據表:

// Table 數據庫表
type Table struct {
   rootPageNum uint32;     
   pager       *Pager;
}

page 寫入磁盤,由下面可以看到時一頁一頁寫入文件:

// pagerFlush 這一頁寫入文件系統
func pagerFlush(pager *Pager, pageNum , realNum uint32) error{
   if pager.pages[pageNum] == nil {
      return fmt.Errorf("pagerFlush null page")
   }
   offset, err := pager.osfile.Seek(int64(pageNum*PageSize), io.SeekStart)
   if err != nil {
      return fmt.Errorf("seek %v", err)
   }
   if offset == -1 {
      return fmt.Errorf("offset %v", offset)
   }
   originByte := make([]byte, realNum)
   q := (*[PageSize]byte)(pager.pages[pageNum])
   copy(originByte[0:realNum], (*q)[0:realNum])
   // 寫入到byte指針裏面
   bytesWritten, err := pager.osfile.WriteAt(originByte, offset)
   if err != nil {
      return fmt.Errorf("write %v", err)
   }
   // 撈取byte數組到這一頁中
   fmt.Println("already wittern", bytesWritten)
   return nil
}

在關閉 db 的鏈接,寫入磁盤:

func dbClose(table *Table) {
   for i:= uint32(0); i < table.pager.numPages; i++ {
      if table.pager.pages[i] == nil {
         continue
      }
      pagerFlush(table.pager, i, PageSize);
   }
   defer table.pager.osfile.Close()
   // go語言自帶gc
}

數據從磁盤到內存的獲取:

func getPage(pager *Pager, pageNum uint32)  unsafe.Pointer   {
   if pageNum > TABLE_MAX_PAGES {
      fmt.Println("Tried to fetch page number out of bounds:", pageNum)
      os.Exit(0)
   }
   if pager.pages[pageNum] == nil {
      page := make([]byte, PageSize)
      numPage := uint32(pager.fileLength/PageSize)  // 第幾頁
      if pager.fileLength%PageSize == 0 {
         numPage += 1
      }
      if pageNum <= numPage {
         curOffset := pageNum*PageSize
         // 偏移到下次可以讀讀未知
         curNum, err := pager.osfile.Seek(int64(curOffset), io.SeekStart)
         if err != nil {
            panic(err)
         }
         fmt.Println(curNum)
         // 讀到偏移這一頁到下一頁,必須是真的有多少字符
         if _,err = pager.osfile.ReadAt(page, curNum);err != nil && err != io.EOF{
            panic(err)
         }
      }
      pager.pages[pageNum] = unsafe.Pointer(&page[0])
      if pageNum >= pager.numPages {
         pager.numPages = pageNum +1
      }
   }
   return pager.pages[pageNum]
}

上面可以看到,爲了儘量減少磁盤 IO,我們採用一頁一頁讀取磁盤(disk)信息,並且以 B + 樹點形似。

(六)B 樹

B 樹是對二叉查找樹的改進:設計思想是,將相關數據儘量集中在一起,以便一次讀取多個數據,減少硬盤操作次數。

(七)B + 樹:

非葉子節點不存儲 data,只存儲 key。如果每一個節點的大小固定(如 4k,正如在 sqlite 中那樣),那麼可以進一步提高內部節點的度,降低樹的深度。

(八)table 和索引(索引)

根據 sqlite 介紹表的存儲用的 B + 樹,索引用的 B 樹,我想大概是因爲索引不需要存數據,只需要看存在不存在。這裏的表比較小,索引暫時沒有實現,下面有數據儲存主鍵的查找。

樹的節點查找

在表裏面查找主鍵:

// 返回key的位置,如果key不存在,返回應該被插入的位置
func tableFind(table *Table, key uint32) *Cursor {
   rootPageNum := table.rootPageNum
   rootNode := getPage(table.pager, rootPageNum)
   // 沒有找到匹配到
   if getNodeType(rootNode) == leafNode {
      return leafNodeFind(table, rootPageNum, key)
   } else {
      fmt.Printf("Need to implement searching an internal node\n");
      os.Exit(0);
   }
   return nil
}

葉子節點查找:

func leafNodeFind(table *Table, pageNum uint32, key uint32) *Cursor {
   node := getPage(table.pager, pageNum)
   num_cells := *leaf_node_num_cells(node)
   cur := &Cursor{
      table: table,
      page_num: pageNum,
   }
   // Binary search
   var min_index uint32
   var one_past_max_index  = num_cells
   for ;one_past_max_index != min_index; {
      index := (min_index + one_past_max_index) /2
      key_at_index := *leaf_node_key(node, index)
      if key == key_at_index {
         cur.cell_num = index
         return cur
      }
      // 如果在小到一邊,就將最大值變成當前索引
      if key < key_at_index {
         one_past_max_index = index
      } else {
         min_index = index+1    // 選擇左側
      }
   }
   cur.cell_num = min_index
   return cur
}

並且爲了 B + 樹方便查找遍歷,增加了遊標抽象層次:

// Cursor 光標
type Cursor struct {
   table        *Table
   pageNum      uint32 // 第幾頁
   cellNum      uint32 // 多少個數據單元
   endOfTable bool
}
func tableStart(table *Table)  * Cursor{
   rootNode := getPage(table.pager, table.rootPageNum)
   numCells := *leaf_node_num_cells(rootNode)
   return &Cursor{
      table:      table,
      pageNum:    table.rootPageNum,
      cellNum:    0,
      endOfTable: numCells ==0,
   }
}
func cursorAdvance(cursor *Cursor) {
   node := getPage(cursor.table.pager, cursor.pageNum)
   cursor.cellNum += 1
   if cursor.cellNum >=(*leaf_node_num_cells(node)) {
      cursor.endOfTable = true
   }
}

五、總結

本文以 Go 語言從 0 到 1 實現最簡單的數據庫爲例,選取 SQlite 數據庫,實現了 insert 和 select 數據操作,並進一步介紹了 page 對磁盤的讀寫操作,B 樹如何進行數據存儲操作等內容。只是當前實現的基於 B + 樹的數據庫僅僅支持一頁內的讀取,當一頁內容達到上限 4K 之後便會報錯,在後續開發中將進一步優化該功能,提升容量。

參考資料:

1.c 語言 0-1 實現一個數據庫

作者簡介

張滔滔

騰訊後臺開發工程師

騰訊後臺開發工程師,畢業於哈爾濱工業大學,目前負責手 Q 遊戲中心運營後臺開發,對後後臺系統有一定的理解,有一些後臺應對高併發和活動運營的開發經驗。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/pLYS_ep8R2f4k4MlB2yYQg