基於 go 實現 redis 之主幹框架

0 前言

本着學習和實踐的目的,從本期開始,我將和大家一起走進一個新的專題—— 【基於 go 實現 redis】 .

該專題圍繞着我的一個開源項目——goredis 展開. 由於個人水平有限,如有不到位之處,歡迎批評指正:https://github.com/xiaoxuxiansheng/goredis

本系列計劃分爲四篇內容:

(此外,這裏需要特別提及一下,在學習過程中,很大程度上需藉助了 hdt3213 系列博客和項目的幫助,在此致敬一下:https://www.cnblogs.com/Finley/category/1598973.html)

1 實現目標

redis(https://redis.io/)是一款高性能的 key-value 內存存儲組件,廣泛應用於數據緩存、分佈式協調、消息隊列等使用場景中.

我此前也做過一些有關 redis 的技術分享,主要圍繞着分佈式鎖和消息隊列的主題展開:

有關 redis 的技術細節博大精妙,受限於個人水平,本系列中我只能說通過 go 語言仿製出一個 “乞丐版”redis,嘗試窺探其中的冰山一角. 項目中涉及實現到的功能點包括:

除此之外,爲了效仿 redis. 項目在與底層數據模型交互時,會基於單協程模型實現. 數據存儲模型則是由一個無鎖化的 map 結構實現.

2 架構總覽

如上圖所示,goredis 的運行架構在縱向上可以拆分爲三個模塊:

在宏觀上,goredis 接收並處理一筆請求的途徑鏈路爲:

server->handler(parser)->dbtrigger->dbexecutor->kvstore(persister)

goredis 項目遵循依賴注入的實現風格,基於 dig(https://github.com/uber-go/dig) 實現各模塊和組件的注入與管理,具體使用方式可以參見我之前的文章——低配 Spring—Golang IOC 框架 dig 原理解析

在文件 app/factory.go 中作了統一收口,可以基於全局視角一覽各部分參與的模塊組件:

var container = dig.New()


func init() {
    /**
       其它
    **/
    // 配置加載 conf
    _ = container.Provide(SetUpConfig)
    _ = container.Provide(PersistThinker)
    // 日誌打印 logger
    _ = container.Provide(log.GetDefaultLogger)


    /**
       存儲引擎層
    **/
    // 數據持久化
    _ = container.Provide(persist.NewPersister)
    // 存儲介質
    _ = container.Provide(datastore.NewKVStore)
    // 執行器
    _ = container.Provide(database.NewDBExecutor)
    // 觸發器
    _ = container.Provide(database.NewDBTrigger)


    /**
       指令分發層
    **/
    // 協議解析器
    _ = container.Provide(protocol.NewParser)
    // 指令分發器
    _ = container.Provide(handler.NewHandler)


    /**
       服務端運行層
    **/
    _ = container.Provide(server.NewServer)
}

3 服務運行層

服務運行層支撐了整個 goredis 應用的運行,包含兩個核心子模塊:

3.1 應用程序

goredis 應用程序啓動的入口代碼位於 main.go,核心步驟包括:

import "github.com/xiaoxuxiansheng/goredis/app"


func main() {
    // 1 構造 server 實例
    server, err := app.ConstructServer()
    if err != nil {
        panic(err)
    }


    // 2 構造 application 實例
    app := app.NewApplication(server, app.SetUpConfig())
    defer app.Stop()


    // 3 運行 application
    if err := app.Run(); err != nil {
        panic(err)
    }
}

有關 application 類定義代碼位於:app/app.go,包含兩個核心成員屬性:

type Application struct {
    server *server.Server
    conf   *Config
}
func (a *Application) Run() error {
    return a.server.Serve(a.conf.Address())
}


func (a *Application) Stop() {
    a.server.Stop()
}

3.2 服務端

服務端類定義代碼位於 server/server.go,其中持有了指令分發模塊 handler 的引用:

type Server struct {
    // ...
    handler  Handler
    // ...
}

server 運行時,會啓動 handler 實例,並且啓動 tcp 服務並監聽 tcp 端口,將每個到來的 tcp 連接移交給 handler 層進行處理:

func (s *Server) Serve(address string) error {
    // 1 啓動 handler
    s.handler.Start()
    // ...


    // 2 運行 tcp 服務
    s.listenAndServe(listener, closec)
    // ...
}


func (s *Server) listenAndServe(listener net.Listener, closec chan struct{}) {
    // ...
    // io 多路複用模型,goroutine for per conn
    for {
        // 接收到來的 tcp 連接
        conn, err := listener.Accept()
        // ...
        // 爲每個到來的 conn 分配一個 goroutine 處理
        pool.Submit(func() {
            // ...
            s.handler.Handle(ctx, conn)
        })
    }


    // ...
}


func (s *Server) Stop() {
    s.stopOnce.Do(func() {
        close(s.stopc)
    })
}

其中,pool.Submit 方法類似於 go func 一鍵啓動協程,只不過本項目中採用的是協程池模式:

import (
    "runtime/debug"
    "strings"


    "github.com/panjf2000/ants"
)


var pool *ants.Pool


func init() {
    _pool, err := ants.NewPool(50000, ants.WithPanicHandler(func(i interface{}) {
        // ...
    }))
    if err != nil {
        panic(err)
    }
    pool = _pool
}


func Submit(task func()) {
    pool.Submit(task)
}

協程池使用到的是 ants(https://github.com/panjf2000/ants),之前我也發表了一篇對應的技術文章:Golang 協程池 Ants 實現原理

4 指令分發層

指令分發層的核心作用是,接收到請求後,遵循 Redis 協議將其解析爲操作指令,然後將指令分發到存儲引擎層進行執行.

4.1 指令分發器

指令分發器的接口定義代碼位於 server/server.go,包含兩個核心方法:

// 邏輯處理器
type Handler interface {
    Start() error // 啓動 handler
    // 處理到來的每一筆 tcp 連接
    Handle(ctx context.Context, conn net.Conn)
    // 關閉處理器
    Close()
}

關於 Handler 的實現類位於 handler/handler.go,其中包含三個核心成員屬性:

type Handler struct {
    // ...
    db        DB
    parser    Parser
    persister Persister
    // ...
}

啓動 Handler 的 Start 方法:

func (h *Handler) Start() error {
    // 加載持久化文件,還原內容
    reloader, err := h.persister.Reloader()
    // ...
    // 讀取持久化文件內容,還原內存數據庫
    h.handle(SetLoadingPattern(context.Background()), newFakeReaderWriter(reloader))
    return nil
}

處理到來連接的 Handle 方法——通過 parser 將請求參數轉化爲指令,然後分發給 db:

// 處理到來的 tcp 連接
func (h *Handler) Handle(ctx context.Context, conn net.Conn) {
    // ...
    h.handle(ctx, conn)
}


func (h *Handler) handle(ctx context.Context, conn io.ReadWriter) {
    // 藉助 protocol parser 將到來的指令轉而通過 stream channel 輸出
    stream := h.parser.ParseStream(conn)
    for {
        select {
        // ...
        // 讀取 stream channel 中到來的指令
        case droplet := <-stream:
            if err := h.handleDroplet(ctx, conn, droplet); err != nil {
                // ...
            }
        }
    }
}


// 處理每一筆指令
func (h *Handler) handleDroplet(ctx context.Context, conn io.ReadWriter, droplet *Droplet) error {
    // ...
    // 請求指令類型轉換
    multiReply, ok := droplet.Reply.(MultiReply)
    // ...
    // 通過 db 引擎處理請求指令
    if reply := h.db.Do(ctx, multiReply.Args()); reply != nil {
        // 返回響應結果
        _, _ = conn.Write(reply.ToBytes())
        // ...
    }


    // ...
}
func (h *Handler) Close() {  
    // ...
    h.db.Close()
    h.persister.Close()
}

4.2 協議解析器

協議解析器的接口定義代碼位於 handler/struct.go,通過 ParseStream 方法將連接轉換成 channel 的形式,把每一筆請求參數轉爲 redis 操作指令,並通過 channel 傳遞給 handler 進行處理:

// 協議解析器
type Parser interface {
    ParseStream(reader io.Reader) <-chan *Droplet
}

Parser 實現類代碼位於 protocol/parser.go:

type Parser struct {
    lineParsers map[byte]lineParser
    // ...
}
// 將連接轉爲 stream channel 形式,並通過異步協程執行 parse 方法持續往 channel 中傳送到來的指令
func (p *Parser) ParseStream(reader io.Reader) <-chan *handler.Droplet {
    ch := make(chan *handler.Droplet)
    pool.Submit(
        func() {
            p.parse(reader, ch)
        })
    return ch
}
func (p *Parser) parse(rawReader io.Reader, ch chan<- *handler.Droplet) {
    reader := bufio.NewReader(rawReader)
    for {
        // 逐行讀取數據
        firstLine, err := reader.ReadBytes('\n')
        
        // 解析請求參數,分發給對應的指令解析函數
        firstLine = bytes.TrimSuffix(firstLine, []byte{'\r''\n'})
        lineParseFunc, ok := p.lineParsers[firstLine[0]]
        // ...
        // 將解析後的指令傳送到 stream channel
        ch <- lineParseFunc(firstLine, reader)
    }
}

5 存儲引擎層

存儲引擎層模塊封裝了有關數據存儲交互的流程,其中分爲四個核心子模塊:

5.1 數據庫觸發層

有關整個數據庫存儲引擎的接口定義位於 handler/struct.go,通過 Do 方法完成指令的執行,並接收其響應結果:

type DB interface {
    Do(ctx context.Context, cmdLine [][]byte) Reply
    Close()
}

對 DB 的實現類是數據庫觸發層 DBTrigger,代碼位於 database/trigger.go,其中依賴了 executor 實例:

// 數據庫觸發器
type DBTrigger struct {
    // ...
    executor Executor
}

DBTrigger 接收到指令,會對其進行包裝,然後通過 channel(executor.Entrance)將其發送給全局單例的 executor 中:

// 處理指令
func (d *DBTrigger) Do(ctx context.Context, cmdLine [][]byte) handler.Reply {
    // ...


    // 組裝指令
    cmd := Command{
        ctx:      ctx,
        cmd:      cmdType,
        args:     cmdLine[1:],
        receiver: make(CmdReceiver),
    }


    // 將指令投遞到 executor
    d.executor.Entrance() <- &cmd


    // 接收來自 executor 返回的 reply
    return <-cmd.Receiver()
}

5.2 數據庫執行層

有關數據執行器接口的定義代碼位於 database/struct.go,其會通過 Entrance 入口完成指令的接收:

type Executor interface {
    // 執行器入口,用於接收指令
    Entrance() chan<- *Command
    // 校驗指令是否合法
    ValidCommand(cmd CmdType) bool
    Close()
}

DBExecutor 是數據庫執行器的實現類,代碼位於 database/executor.go,核心成員屬性包括:

type DBExecutor struct {
    // ...
    // 接收來自 trigger 指令的 channel
    ch     chan *Command
    // 指令分發
    cmdHandlers map[CmdType]CmdHandler
    // 數據存儲介質
    dataStore   DataStore
    // 回收過期數據的定時器
    gcTicker *time.Ticker
}
func (e *DBExecutor) Entrance() chan<- *Command {
    return e.ch
}

DBExecutor 會以全局單例的形式存在,並在其初始化時啓動 run 方法,持續接收來自 trigger 和 gcTicker 的指令,對 dataStore 中的數據進行操作. 正因爲單協程運行的 executor 是 dataStore 的唯一操作入口,因此存儲數據時可以放心地採用無鎖化模型

// 數據庫執行層單協程運行模式
func (e *DBExecutor) run() {
    for {
        // ...        
        // 定期批量回收一次過期數據
        case <-e.gcTicker.C:
            e.dataStore.GC()
        // 接收並處理到來的指令
        case cmd := <-e.ch:
            // 獲取指令對應的處理函數
            cmdFunc, ok := e.cmdHandlers[cmd.cmd]
            // ...
            // 基於懶加載機制,對 key 進行回收
            e.dataStore.ExpirePreprocess(string(cmd.args[0]))
            // 執行指令處理函數,並將執行結果通過 receiver 發送給 trigger
            cmd.receiver <- cmdFunc(cmd)
        }
    }
}

5.3 數據存儲介質

存儲介質 DataStore 的接口定義代碼位於 database/struct.go,其中包含了各類數據操作指令犯法:

type DataStore interface {
    // ...
    // 過期相關指令
    GC()
    Expire(*Command) handler.Reply
    ExpireAt(*Command) handler.Reply


    // string 數據類型
    Get(*Command) handler.Reply
    Set(*Command) handler.Reply
    // ...


    // list 數據類型
    LPush(*Command) handler.Reply
    // ...


    // set 數據類型
    SAdd(*Command) handler.Reply
    // ...


    // hashmap 數據類型
    HSet(*Command) handler.Reply
    // ...


    // sortedset 數據類型
    ZAdd(*Command) handler.Reply
    // ...
}

dataStore 的實現類爲 KVStore,對應代碼位於 database/kv_store.go:

type KVStore struct {
    // 實際存儲數據的 map
    data      map[string]interface{}
    // 記錄 key 過期時間的 map
    expiredAt map[string]time.Time


    // 執行 key 過期回收任務的時間輪
    expireTimeWheel SortedSet


    // 數據持久化模塊
    persister handler.Persister
}

5.4 數據持久化模塊

數據持久化模塊的接口定義代碼位於 handler/persist.go,核心方法包括:

type Persister interface {
    // 加載持久化數據的 loader
    Reloader() (io.ReadCloser, error)
    // 對指令進行持久化
    PersistCmd(ctx context.Context, cmd [][]byte)
    Close()
}

goredis 中,針對持久化模塊僅僅實現了 aof 模式,對應代碼位於 persist/aof.go,相關內容將在本系列第 4 篇中展開.

type aofPersister struct {
    // ...
    // 接收持久化指令的通道
    buffer                 chan [][]byte
    // aof 文件
    aofFile                *os.File
    // aof 文件名
    aofFileName            string
    // aof 持久化策略
    appendFsync            appendSyncStrategy
    // aof 重寫策略
    autoAofRewriteAfterCmd int64
    aofCounter             atomic.Int64


    // ...
}

6 展望

至此,本篇正文結束. 本期向大家介紹的是 goredis 系列的開篇之作,在此對未來擬展開部分內容作個展望:

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/tKtmhCNtc696a87NBbo1Dw