Go 語言從 0 到 1 實現最簡單的數據庫!
後臺開發對於數據庫操作是必不可少的事情,瞭解數據庫原理對於平常的工作的內功積累還是很有幫助的,這裏實現一個最簡單的數據庫加深自己對數據庫的理解。
一、go 實現數據庫目的
-
瞭解數據是如何在內存和磁盤存儲的
-
數據是怎麼移動到磁盤
-
主鍵是如何保持唯一性
-
索引是如何形成
-
如何進行全表遍歷
-
熟悉 Go 語言對內存以及文件操作
二、數據庫選擇 SQLite
選擇 SQLite(https://www.sqlite.org/arch.html)原因是數據庫完全開源,實現簡單,並且有 C 語言最簡單的實現版本,因此參考 go 語言實現一個數據庫加深對於關係型數據庫的理解。
三、SQLite 主要架構
其中:前端的輸入是一個 SQL 查詢。輸出是 sqlite 虛擬機字節碼(本質上是一個可以在數據庫上操作的編譯程序) 後端:VM 將前端生成的字節作爲指令,然後對一個表或者多個表或索引進行操作,每一個表或者索引都存儲在 B 樹中,VM 本質上時指令的分支選擇語句。B 樹組成了每一個節點,每個節點的最大長度時一頁。B 樹可以通過 pager 的命令,將數據保存到磁盤上。pager 收到數據讀寫的命令,負責數據偏移與讀寫,它還將最近訪問的頁面緩存在內存中,並確定何時需要將這些頁面寫回磁盤。
雷普勒
啓動 sqlite,會有一個讀寫命令循環:
main 函數將有一個無限循環來打印提示,獲取一行輸入,然後處理該行輸入:
// run main 主函數,這樣寫方便單元測試
func run() {
table, err := dbOpen("./db.txt")
if err != nil {
panic(err)
}
for {
printPrompt()
// 語句解析
inputBuffer, err := readInput()
if err != nil {
fmt.Println("read err", err)
}
// 特殊操作
if len(inputBuffer) != 0 && inputBuffer[0] == '.' {
switch doMetaCommand(inputBuffer, table) {
case metaCommandSuccess:
continue
case metaCommandUnRecongnizedCommand:
fmt.Println("Unrecognized command", inputBuffer)
continue
}
}
// 普通操作 code Generator
statement := Statement{}
switch prepareStatement(inputBuffer, &statement) {
case prepareSuccess:
break;
case prepareUnrecognizedStatement:
fmt.Println("Unrecognized keyword at start of ", inputBuffer)
continue
default:
fmt.Println("invalid unput ", inputBuffer)
continue
}
res := executeStatement(&statement, table)
if res == ExecuteSuccess {
fmt.Println("Exected")
continue
}
if res == ExecuteTableFull {
fmt.Printf("Error: Table full.\n");
break
}
if res == EXECUTE_DUPLICATE_KEY {
fmt.Printf("Error: Duplicate key.\n");
break;
}
}
}
處理特殊的元語句如下:
func doMetaCommand(input string, table *Table) metaCommandType {
if input == ".exit" {
dbClose(table)
os.Exit(0)
return metaCommandSuccess
}
if input == ".btree" {
fmt.Printf("Tree:\n");
print_leaf_node(getPage(table.pager, 0));
return metaCommandSuccess;
}
if input == ".constants" {
fmt.Printf("Constants:\n");
print_constants();
return metaCommandSuccess
}
return metaCommandUnRecongnizedCommand
}
效果如下:
四、最簡單的 “SQL 編譯器和 “VM”(虛擬機)
(一)prepareStatement 爲最簡單的解析器 “SQL 編譯器”
當前改解析器,最簡單到還沒有識別出 SQL 語句,只是寫死識別兩個單詞的 SQL 語句:
func prepareStatement(input string, statement *Statement)PrepareType {
if len(input) >= 6 && input[0:6] == "insert" {
statement.statementType = statementInsert
inputs := strings.Split(input, " ")
if len(inputs) <=1 {
return prepareUnrecognizedStatement
}
id, err := strconv.ParseInt(inputs[1], 10, 64)
if err != nil {
return prepareUnrecognizedSynaErr
}
statement.rowToInsert.ID = int32(id)
statement.rowToInsert.UserName = inputs[2]
statement.rowToInsert.Email = inputs[3]
return prepareSuccess
}
if len(input) >= 6 && input[0:6] == "select" {
statement.statementType = statementSelect
return prepareSuccess
}
return prepareUnrecognizedStatement
}
(二)最簡單的 “虛擬機”(VM)執行器
// executeStatement 實行sql語句 ,解析器解析程statement,將最終成爲我們的虛擬機
func executeStatement(statement *Statement, table *Table) executeResult{
switch statement.statementType {
case statementInsert:
return executeInsert(statement, table)
case statementSelect:
return executeSelect(statement, table)
default:
fmt.Println("unknown statement")
}
return ExecuteSuccess
}
(三)最簡單的插入的數據結構
需要插入序列化的數據格式如下:
將一列進行序列化代碼如下:
// 將row序列化到指針,爲標準寫入磁盤做準備
func serializeRow(row *Row, destionaton unsafe.Pointer) {
ids := Uint32ToBytes(row.ID)
q := (*[ROW_SIZE]byte)(destionaton)
copy(q[0:ID_SIZE], ids)
copy(q[ID_SIZE+1:ID_SIZE+USERNAME_SIZE], (row.UserName))
copy(q[ID_SIZE+USERNAME_SIZE+1: ROW_SIZE], (row.Email))
}
(四)從文件去取出反序列化
// deserializeRow 將文件內容序列化成數據庫元數據
func deserializeRow(source unsafe.Pointer, rowDestination *Row) {
ids := make([]byte, ID_SIZE, ID_SIZE)
sourceByte := (*[ROW_SIZE]byte)(source)
copy(ids[0:ID_SIZE], (*sourceByte)[0:ID_SIZE])
rowDestination.ID = BytesToInt32(ids)
userName := make([]byte, USERNAME_SIZE, USERNAME_SIZE)
copy(userName[0:], (*sourceByte)[ID_SIZE+1: ID_SIZE + USERNAME_SIZE])
realNameBytes := getUseFulByte(userName)
rowDestination.UserName = (string)(realNameBytes)
emailStoreByte := make([]byte, EMAIL_SIZE, EMAIL_SIZE)
copy(emailStoreByte[0:], (*sourceByte)[1+ ID_SIZE + USERNAME_SIZE: ROW_SIZE])
emailByte := getUseFulByte(emailStoreByte)
rowDestination.Email = (string)(emailByte)
}
(五)呼叫器
主要功能寫入到磁盤,數據結構:
// Pager 管理數據從磁盤到內存
type Pager struct {
osfile *os.File;
fileLength int64;
numPages uint32;
pages []unsafe.Pointer; // 存儲數據
}
整個數據庫的數據表:
// Table 數據庫表
type Table struct {
rootPageNum uint32;
pager *Pager;
}
page 寫入磁盤,由下面可以看到時一頁一頁寫入文件:
// pagerFlush 這一頁寫入文件系統
func pagerFlush(pager *Pager, pageNum , realNum uint32) error{
if pager.pages[pageNum] == nil {
return fmt.Errorf("pagerFlush null page")
}
offset, err := pager.osfile.Seek(int64(pageNum*PageSize), io.SeekStart)
if err != nil {
return fmt.Errorf("seek %v", err)
}
if offset == -1 {
return fmt.Errorf("offset %v", offset)
}
originByte := make([]byte, realNum)
q := (*[PageSize]byte)(pager.pages[pageNum])
copy(originByte[0:realNum], (*q)[0:realNum])
// 寫入到byte指針裏面
bytesWritten, err := pager.osfile.WriteAt(originByte, offset)
if err != nil {
return fmt.Errorf("write %v", err)
}
// 撈取byte數組到這一頁中
fmt.Println("already wittern", bytesWritten)
return nil
}
在關閉 db 的鏈接,寫入磁盤:
func dbClose(table *Table) {
for i:= uint32(0); i < table.pager.numPages; i++ {
if table.pager.pages[i] == nil {
continue
}
pagerFlush(table.pager, i, PageSize);
}
defer table.pager.osfile.Close()
// go語言自帶gc
}
數據從磁盤到內存的獲取:
func getPage(pager *Pager, pageNum uint32) unsafe.Pointer {
if pageNum > TABLE_MAX_PAGES {
fmt.Println("Tried to fetch page number out of bounds:", pageNum)
os.Exit(0)
}
if pager.pages[pageNum] == nil {
page := make([]byte, PageSize)
numPage := uint32(pager.fileLength/PageSize) // 第幾頁
if pager.fileLength%PageSize == 0 {
numPage += 1
}
if pageNum <= numPage {
curOffset := pageNum*PageSize
// 偏移到下次可以讀讀未知
curNum, err := pager.osfile.Seek(int64(curOffset), io.SeekStart)
if err != nil {
panic(err)
}
fmt.Println(curNum)
// 讀到偏移這一頁到下一頁,必須是真的有多少字符
if _,err = pager.osfile.ReadAt(page, curNum);err != nil && err != io.EOF{
panic(err)
}
}
pager.pages[pageNum] = unsafe.Pointer(&page[0])
if pageNum >= pager.numPages {
pager.numPages = pageNum +1
}
}
return pager.pages[pageNum]
}
上面可以看到,爲了儘量減少磁盤 IO,我們採用一頁一頁讀取磁盤(disk)信息,並且以 B + 樹點形似。
(六)B 樹
B 樹是對二叉查找樹的改進:設計思想是,將相關數據儘量集中在一起,以便一次讀取多個數據,減少硬盤操作次數。
(七)B + 樹:
非葉子節點不存儲 data,只存儲 key。如果每一個節點的大小固定(如 4k,正如在 sqlite 中那樣),那麼可以進一步提高內部節點的度,降低樹的深度。
(八)table 和索引(索引)
根據 sqlite 介紹表的存儲用的 B + 樹,索引用的 B 樹,我想大概是因爲索引不需要存數據,只需要看存在不存在。這裏的表比較小,索引暫時沒有實現,下面有數據儲存主鍵的查找。
樹的節點查找
在表裏面查找主鍵:
// 返回key的位置,如果key不存在,返回應該被插入的位置
func tableFind(table *Table, key uint32) *Cursor {
rootPageNum := table.rootPageNum
rootNode := getPage(table.pager, rootPageNum)
// 沒有找到匹配到
if getNodeType(rootNode) == leafNode {
return leafNodeFind(table, rootPageNum, key)
} else {
fmt.Printf("Need to implement searching an internal node\n");
os.Exit(0);
}
return nil
}
葉子節點查找:
func leafNodeFind(table *Table, pageNum uint32, key uint32) *Cursor {
node := getPage(table.pager, pageNum)
num_cells := *leaf_node_num_cells(node)
cur := &Cursor{
table: table,
page_num: pageNum,
}
// Binary search
var min_index uint32
var one_past_max_index = num_cells
for ;one_past_max_index != min_index; {
index := (min_index + one_past_max_index) /2
key_at_index := *leaf_node_key(node, index)
if key == key_at_index {
cur.cell_num = index
return cur
}
// 如果在小到一邊,就將最大值變成當前索引
if key < key_at_index {
one_past_max_index = index
} else {
min_index = index+1 // 選擇左側
}
}
cur.cell_num = min_index
return cur
}
並且爲了 B + 樹方便查找遍歷,增加了遊標抽象層次:
// Cursor 光標
type Cursor struct {
table *Table
pageNum uint32 // 第幾頁
cellNum uint32 // 多少個數據單元
endOfTable bool
}
func tableStart(table *Table) * Cursor{
rootNode := getPage(table.pager, table.rootPageNum)
numCells := *leaf_node_num_cells(rootNode)
return &Cursor{
table: table,
pageNum: table.rootPageNum,
cellNum: 0,
endOfTable: numCells ==0,
}
}
func cursorAdvance(cursor *Cursor) {
node := getPage(cursor.table.pager, cursor.pageNum)
cursor.cellNum += 1
if cursor.cellNum >=(*leaf_node_num_cells(node)) {
cursor.endOfTable = true
}
}
五、總結
本文以 Go 語言從 0 到 1 實現最簡單的數據庫爲例,選取 SQlite 數據庫,實現了 insert 和 select 數據操作,並進一步介紹了 page 對磁盤的讀寫操作,B 樹如何進行數據存儲操作等內容。只是當前實現的基於 B + 樹的數據庫僅僅支持一頁內的讀取,當一頁內容達到上限 4K 之後便會報錯,在後續開發中將進一步優化該功能,提升容量。
參考資料:
1.c 語言 0-1 實現一個數據庫
作者簡介
張滔滔
騰訊後臺開發工程師
騰訊後臺開發工程師,畢業於哈爾濱工業大學,目前負責手 Q 遊戲中心運營後臺開發,對後後臺系統有一定的理解,有一些後臺應對高併發和活動運營的開發經驗。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/pLYS_ep8R2f4k4MlB2yYQg