徒手用 Go 寫個 Redis 服務器

作者:HDT3213

今天給大家帶來的開源項目是 Godis:一個用 Go 語言實現的 Redis 服務器。支持:

你或許不需要自己實現 Redis 服務,但你是否厭煩了每天都是寫增刪改查的業務代碼,想提高編程水平試圖從零寫個項目打開 IDE 卻發現無從下手?

動手造輪子一定是提高編程能力的好辦法,下面就帶大家用 Go 從零開始寫一個 Redis 服務器(Godis),從中你將學到:

千萬不要擔心內容太難!!雖然示例代碼是 Go,但就算你不會 Go 語言也不會影響你理解 Redis 的原理和底層協議以及高性能的祕密。而且作者爲了照顧到廣大讀者,對技術的講解做了優化。示例代碼在原項目基礎上做了簡化,並逐行地加了註釋。如果是高級玩家,請直接訪問項目閱讀源碼:

https://github.com/HDT3213/godis

下面讓我們一起撥開 Redis 的迷霧。

一、寫個 TCP 服務器

衆所周知 Redis 是 C/S 模型,使用 TCP 協議進行通信。接下來就從實現 TCP 服務端開始。作爲廣泛用於服務端的編程語言 Golang 提供了非常簡潔的 TCP 接口,所以實現起來十分方便。示例代碼:

func ListenAndServe(address string) {
    // 綁定監聽地址
    listener, err := net.Listen("tcp", address)
    if err != nil {
        log.Fatal(fmt.Sprintf("listen err: %v", err))
    }
    defer listener.Close()
    log.Println(fmt.Sprintf("bind: %s, start listening...", address))

    for {
        // Accept 會一直阻塞直到有新的連接建立或者listen中斷纔會返回
        conn, err := listener.Accept()
        if err != nil {
            // 通常是由於listener被關閉無法繼續監聽導致的錯誤
            log.Fatal(fmt.Sprintf("accept err: %v", err))
        }
        // 開啓新的 goroutine 處理該連接
        go Handle(conn)
    }
}

func Handle(conn net.Conn) {
    reader := bufio.NewReader(conn)
    for {
        // ReadString 會一直阻塞直到遇到分隔符 '\n'
        // 遇到分隔符後 ReadString 會返回上次遇到分隔符到現在收到的所有數據
        // 若在遇到分隔符之前發生異常, ReadString 會返回已收到的數據和錯誤信息
        msg, err := reader.ReadString('\n')
        if err != nil {
            // 通常遇到的錯誤是連接中斷或被關閉,用io.EOF表示
            if err == io.EOF {
                log.Println("connection close")
            } else {
                log.Println(err)
            }
            return
        }
        b := []byte(msg)
        // 將收到的信息發送給客戶端
        conn.Write(b)
    }
}

func main() {
    ListenAndServe(":8000")
}

👌 至此只用了 40 行代碼就搞定服務端啦!啓動上面的 TCP 服務後,在終端中輸入 telnet 127.0.0.1 8000 就可以連接到剛寫好的服務器,它會將你發送的消息原樣返回給你(所以請不要罵它):

這個 TCP 服務器的非常簡單,主協程調用 accept 函數來監聽端口,接受新連接後開啓一個 Goroutine 來處理它。這種簡單的阻塞 IO 模型有些類似於早期的 Tomcat/Apache 服務器。

阻塞 IO 模型是使用一個線程處理一個連接,在沒有收到新數據時監聽線程處於阻塞狀態,直到數據就緒後線程被喚醒進行處理。因爲阻塞 IO 模型需要開啓大量線程並且頻繁地進行上下文切換,所以它的效率很低。而 Redis 使用的 epoll 技術(IO 多路複用)用一個線程處理大量連接,極大地提高了吞吐量。那麼我們的 TCP 服務器會比 Redis 慢很多嗎?

當然不會,Golang 利用 Goroutine 調度開銷遠遠小於線程調度開銷的優勢封裝出 goroutine-per-connection 風格的極簡接口,而且 net/tcp 庫將 epoll 封裝成了阻塞 IO 的樣子,在享受 epoll 高性能的同時避免了原生 epoll 接口所需的複雜異步代碼。

在作者的電腦上 Redis 每秒可以響應 10.6k 個 PING 命令,而 Godis(完整代碼) 的吞吐量爲 9.2 kqps 相差並不大。想了解更多 Golang 高性能的㊙️密,可以搜索 go netpoller 或者 go 語言 網絡輪詢器 關鍵字

另外,合格的 TCP 的服務器在關閉的時候不應該一停了之,而需要完成響應已接收的請求、釋放 TCP 連接等必要的清理工作。這個功能我們一般稱爲 優雅關閉 或者 graceful shutdown,優雅關閉步驟:

優雅關閉的代碼比較多,這裏就不完整貼出了。

二、透視 Redis 協議

在解決完通信後,下一步就是搞清楚 Redis 的協議,其實就是一套序列化協議類似 JSON、Protocol Buffers,你看底層其實也就是一些基礎的知識。

自 Redis 2.0 以後的通信統一爲 RESP 協議(REdis Serialization Protocol),該協議易於實現不僅可以高效的被程序解析,還能夠被人類讀懂容易調試。

RESP 是一個二進制安全的文本協議,工作於 TCP 協議上。RESP 以行作爲單位,客戶端和服務器發送的命令或數據一律以 \r\n(CRLF)作爲換行符。

二進制安全是指允許協議中出現任意字符而不會導致故障。比如 C 語言的字符串以 \0 作爲結尾不允許字符串中間出現 \0,而 Go 語言的 string 則允許出現 \0,我們說 Go 語言的 string 是二進制安全的,而 C 語言字符串不是二進制安全的。

RESP 的二進制安全性允許我們在 key 或者 value 中包含 \r 或者 \n 這樣的特殊字符。在使用 Redis 存儲 protobuf、msgpack 等二進制數據時,二進制安全性尤爲重要。

RESP 定義了 5 種格式:

RESP 通過第一個字符來表示格式:

下面讓我們通過一些實際例子來理解協議。

2.1 字符串

字符串(Bulk String)有兩行,第一行爲 $+ 正文長度,第二行爲實際內容。如:

$3\r\nSET\r\n

字符串(Bulk String)是二進制安全的,就是說可以在 Bulk String 內部包含 "\r\n" 字符(行尾的 CRLF 被隱藏):

$4
a\r\nb

2.2 空

$-1 表示 nil,比如使用 get 命令查詢一個不存在的 key 時,響應即爲 $-1

2.3 數組

數組(Array)格式第一行爲 "*"+ 數組長度,其後是相應數量的 字符串(Bulk String)。比如 ["foo", "bar"] 的報文(傳輸時的內容):

*2
$3
foo
$3
bar

客戶端也使用 數組(Array)格式向服務端發送指令。命令本身將作爲第一個參數,比如 SET key value 指令的 RESP 報文:

*3
$3
SET
$3
key
$5
value

將換行符打印出來:

*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n

2.4 解析預備

知道常用的 RESP 報文內容後,就可以開始着手解析了。但需要注意的是 RESP 是 二進制安全 的協議,它允許在正文中使用 \r\n 字符。舉例來說 Redis 可以正確接收並執行 SET "a\r\nb" hellogithub 指令,這條指令的正確報文是這樣的:

*3  
$3
SET
$4
a\r\n$11
hellogithub

ReadBytes 讀取到第五行 "a\r\nb\r\n" 時會將其誤認爲兩行:

*3  
$3
SET
$4
a  // 錯誤的分行
b // 錯誤的分行
$11
hellogithub

因此當讀取到第四行 $4 後,不應該繼續使用 ReadBytes('\n') 讀取下一行,應使用 io.ReadFull(reader, msg) 方法來讀取指定長度的內容。

msg = make([]byte, 4 + 2) // 正文長度4 + 換行符長度2
_, err = io.ReadFull(reader, msg)

2.5 編寫 RESP 協議解析器

解決完上面內容包含 "\r\n" 的問題,我們就可以開始放手編寫 Redis 協議解析器啦!

type Payload struct {
 Data redis.Reply
 Err  error
}

// ParseStream 通過 io.Reader 讀取數據並將結果通過 channel 將結果返回給調用者
// 流式處理的接口適合供客戶端/服務端使用
func ParseStream(reader io.Reader) <-chan *Payload {
 ch := make(chan *Payload)
 go parse0(reader, ch)
 return ch
}

由於解析器的代碼比較多,這裏只簡單地介紹一下核心流程。

func parse0(reader io.Reader, ch chan<- *Payload) {
    // 初始化讀取狀態
    readingMultiLine := false
    expectedArgsCount := 0
    var args [][]byte
    var bulkLen int64
    for {
        // 上文中我們提到 RESP 是以行爲單位的
        // 因爲行分爲簡單字符串和二進制安全的 BulkString,我們需要封裝一個 readLine 函數來兼容
        line, err = readLine(reader, bulkLen)
        if err != nil { 
            // 處理錯誤
            return
        }
        // 接下來我們對剛剛讀取的行進行解析
        // 我們簡單的將 Reply 分爲兩類:
        // 單行: StatusReply, IntReply, ErrorReply
        // 多行: BulkReply, MultiBulkReply

        if !readingMultiLine {
            if isMulitBulkHeader(line) {
                // 我們收到了 MulitBulkReply 的第一行
                // 獲得 MulitBulkReply 中 BulkString 的個數
                expectedArgsCount = parseMulitBulkHeader(line)
                // 等待 MulitBulkReply 後續行
                readingMultiLine = true
            } else if isBulkHeader(line) {
                // 我們收到了 BulkReply 的第一行
                // 獲得 BulkReply 第二行的長度, 通過 bulkLen 告訴 readLine 函數下一行 BulkString 的長度
                bulkLen = parseBulkHeader()
                // 這個 Reply 中一共有 1 個 BulkString
                expectedArgsCount = 1 
                // 等待 BulkReply 後續行
                readingMultiLine = true
            } else {
                // 處理 StatusReply, IntReply, ErrorReply 等單行 Reply
                reply := parseSingleLineReply(line)
                // 通過 ch 返回結果
                emitReply(ch)
            }
        } else {
            // 進入此分支說明我們正在等待 MulitBulkReply 或 BulkReply 的後續行
            // MulitBulkReply 的後續行有兩種,BulkHeader 或者 BulkString
            if isBulkHeader(line) {
                bulkLen = parseBulkHeader()
            } else {
                // 我們正在讀取一個 BulkString, 它可能是 MulitBulkReply 或 BulkReply 
                args = append(args, line)
            }
            if len(args) == expectedArgsCount { // 我們已經讀取了所有後續行
                // 通過 ch 返回結果
                emitReply(ch)
                // 重置狀態, 準備解析下一條 Reply
                readingMultiLine = false
                expectedArgsCount = 0
                args = nil
                bulkLen = 0
            }
        }
    }
}

三、實現內存數據庫

至此我們已經搞定數據接收和解析的部分了,剩下就是我們應該把數據存在哪裏了?

拋開持久化部分,作爲基於內存的 KV 數據庫 Redis 的所有數據需要都存儲在內存中的哈希表,而這個哈希表就是我們今天需要編寫的最後一個組件。

與單線程的 Redis 不同我們實現的 Redis(godis)是並行工作的,所以我們必須考慮各種併發安全問題。常見的併發安全哈希表設計有幾種:

但漸進式 rehash 的實現非常複雜,所以 godis 採用 Golang 社區廣泛使用的分段鎖策略(非上面的三種),就是將 key 分散到固定數量的 shard 中避免進行整體 rehash 操作。shard 是有鎖保護的 map,當 shard 進行 rehash 時會阻塞 shard 內的讀寫,但不會對其他 shard 造成影響。

代碼如下:

type ConcurrentDict struct {
    table []*Shard
    count int32
}

type Shard struct {
    m     map[string]interface{}
    mutex sync.RWMutex
}

func (dict *ConcurrentDict) spread(hashCode uint32) uint32 {
 tableSize := uint32(len(dict.table))
 return (tableSize - 1) & uint32(hashCode)
}

func (dict *ConcurrentDict) getShard(index uint32) *Shard {
 return dict.table[index]
}

func (dict *ConcurrentDict) Get(key string) (val interface{}, exists bool) {
 hashCode := fnv32(key)
 index := dict.spread(hashCode)
 shard := dict.getShard(index)
 shard.mutex.RLock()
 defer shard.mutex.RUnlock()
 val, exists = shard.m[key]
 return
}

func (dict *ConcurrentDict) Put(key string, val interface{}) (result int) {
 if dict == nil {
  panic("dict is nil")
 }
 hashCode := fnv32(key)
 index := dict.spread(hashCode)
 shard := dict.getShard(index)
 shard.mutex.Lock()
 defer shard.mutex.Unlock()

 if _, ok := shard.m[key]; ok {
  shard.m[key] = val
  return 0
 } else {
  shard.m[key] = val
  dict.addCount()
  return 1
 }
}

ConcurrentDict 可以保證對單個 key 操作的併發安全性,但是仍然無法滿足併發安全的需求,舉例來說:

  1. Incr 命令需要完成:讀取 -> 做加法 -> 寫入 三步操作,讀取和寫入兩步操作不是原子性的

  2. MSETNX 命令當且僅當所有給定鍵都不存在時所有給定鍵設置值,我們需要保證「檢查多個 key 是否存在」以及「寫入多個 key」這兩個操作的原子性

因此我們需要實現 db.Locker 用於鎖定一個或一組 key 直到我們完成所有操作後再釋放。

實現 db.Locker 最直接的想法是使用一個 map[string]*sync.RWMutex

那麼存在一個無法解決的併發問題:

Lp651B

由於 t3 時協程 B 釋放了鎖,t4 時協程 A 試圖加鎖會失敗。若協程 B 在解鎖時不執行 delete(locker["a"]) 就可以避免該異常的發生,但是這樣會造成嚴重的內存泄露。

我們注意到哈希槽的數量遠少於 key 的數量,反過來說多個鍵可以共用一個哈希槽。所以我們不再直接對 key 進行加鎖而是鎖定 key 所在的哈希槽也可以保證安全,另一方面哈希槽數量較少即使不釋放也不會消耗太多內存。

type Locks struct {
    table []*sync.RWMutex
}

func Make(tableSize int) *Locks {
    table := make([]*sync.RWMutex, tableSize)
    for i := 0; i < tableSize; i++ {
        table[i] = &sync.RWMutex{}
    }
    return &Locks{
        table: table,
    }
}

func (locks *Locks)Lock(key string) {
    index := locks.spread(fnv32(key))
    mu := locks.table[index]
    mu.Lock()
}

func (locks *Locks)UnLock(key string) {
    index := locks.spread(fnv32(key))
    mu := locks.table[index]
    mu.Unlock()
}

在鎖定多個 key 時需要注意,若 協程 A 持有 鍵 a 的鎖試圖獲得 鍵 b 的鎖,此時 協程 B 持有 鍵 b 的鎖試圖獲得 鍵 a 的鎖則會形成死鎖。

解決方法是所有協程都按照相同順序加鎖,若兩個協程都想獲得 鍵 a 和 鍵 b 的鎖,那麼必須先獲取 鍵 a 的鎖後獲取 鍵 b 的鎖,這樣就可以避免循環等待。

到目前爲止構建 Redis 服務器所需的基本組件已經備齊,只需要將 TCP 服務器、協議解析器與哈希表組裝起來我們的 Redis 服務器就可以開始工作啦。

最後,以上代碼均簡化自我寫的 Godis:一個開源僅用 Go 語言實現的 Redis 服務器。期待您的關注和 Star:

項目地址:https://github.com/HDT3213/godis

四、結束

很多朋友的日常工作主要是編寫業務代碼,對於框架、數據庫、中間件這些 “架構”、“底層代碼” 有一些恐懼感。

但本文我們只寫了 3 個組件,共計幾百行代碼就實現了一個基本的 Redis 服務器。所以底層的技術並不難,只要你對技術感興趣由淺入深、從簡到繁,“底層代碼” 也並不神祕。

興趣是最好的老師,HelloGitHub 發現編程的樂趣

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/zK_As1RklzwSEV0hlf2fZw