基於 go 實現 redis 之主幹框架
0 前言
本着學習和實踐的目的,從本期開始,我將和大家一起走進一個新的專題—— 【基於 go 實現 redis】 .
該專題圍繞着我的一個開源項目——goredis 展開. 由於個人水平有限,如有不到位之處,歡迎批評指正:https://github.com/xiaoxuxiansheng/goredis
本系列計劃分爲四篇內容:
-
• 基於 go 實現 redis 之主幹框架(本篇): 在宏觀視角下縱覽 goredis 整體架構,梳理各模塊間的關聯性
-
• 基於 go 實現 redis 之指令分發(待填坑): 聚焦介紹 goredis 服務端如何啓動和運行,並在接收客戶端請求後實現指令協議的解析和分發
-
• 基於 go 實現 redis 之存儲引擎(待填坑): 聚焦介紹數據存儲層中單協程無鎖化執行框架,各類基本數據類型的底層實現細節,以及過期數據的惰性和定期回收機制
-
• 基於 go 實現 redis 之數據持久化(待填坑): 介紹 goredis 關於 aof 持久化機制的實現以及有關於 aof 重寫策略的執行細節
(此外,這裏需要特別提及一下,在學習過程中,很大程度上需藉助了 hdt3213 系列博客和項目的幫助,在此致敬一下:https://www.cnblogs.com/Finley/category/1598973.html)
1 實現目標
redis(https://redis.io/)是一款高性能的 key-value 內存存儲組件,廣泛應用於數據緩存、分佈式協調、消息隊列等使用場景中.
我此前也做過一些有關 redis 的技術分享,主要圍繞着分佈式鎖和消息隊列的主題展開:
有關 redis 的技術細節博大精妙,受限於個人水平,本系列中我只能說通過 go 語言仿製出一個 “乞丐版”redis,嘗試窺探其中的冰山一角. 項目中涉及實現到的功能點包括:
-
• tcp 服務端搭建
-
• 基於 go 自帶 netpoller 實現 io 多路複用
-
• 還原 redis 數據解析協議
-
• 常規數據類型與操作指令支持
-
• string——get/mget/set/mset
-
• list——lpush/lpop/rpush/rpop/lrange
-
• set——sadd/sismember/srem
-
• hashmap——hset/hget/hdel
-
• sortedset——zadd/zrem/zrangebyscore
-
• 數據持久化機制
-
• appendonlyfile 落盤與重寫
除此之外,爲了效仿 redis. 項目在與底層數據模型交互時,會基於單協程模型實現. 數據存儲模型則是由一個無鎖化的 map 結構實現.
2 架構總覽
如上圖所示,goredis 的運行架構在縱向上可以拆分爲三個模塊:
-
• 服務運行層: 支持 goredis 作爲 tcp 服務端,接收和處理來自客戶端的連接與請求
-
• 應用程序 application
-
• 服務端 server
-
• 指令分發層: 在服務端接收到請求後,遵循 Redis Serialization Protocol 轉換爲操作指令,並分發到存儲引擎層
-
• 指令分發器 handler
-
• 協議解析器 parser
-
• 存儲引擎層: 與存儲介質交互,完成數據存儲、過期回收以及持久化處理
-
• 數據持久化 persister
-
• 存儲介質 kvstore
-
• 數據庫執行層 executor
-
• 數據庫觸發層 trigger
在宏觀上,goredis 接收並處理一筆請求的途徑鏈路爲:
server->handler(parser)->dbtrigger->dbexecutor->kvstore(persister)
goredis 項目遵循依賴注入的實現風格,基於 dig(https://github.com/uber-go/dig) 實現各模塊和組件的注入與管理,具體使用方式可以參見我之前的文章——低配 Spring—Golang IOC 框架 dig 原理解析
在文件 app/factory.go 中作了統一收口,可以基於全局視角一覽各部分參與的模塊組件:
var container = dig.New()
func init() {
/**
其它
**/
// 配置加載 conf
_ = container.Provide(SetUpConfig)
_ = container.Provide(PersistThinker)
// 日誌打印 logger
_ = container.Provide(log.GetDefaultLogger)
/**
存儲引擎層
**/
// 數據持久化
_ = container.Provide(persist.NewPersister)
// 存儲介質
_ = container.Provide(datastore.NewKVStore)
// 執行器
_ = container.Provide(database.NewDBExecutor)
// 觸發器
_ = container.Provide(database.NewDBTrigger)
/**
指令分發層
**/
// 協議解析器
_ = container.Provide(protocol.NewParser)
// 指令分發器
_ = container.Provide(handler.NewHandler)
/**
服務端運行層
**/
_ = container.Provide(server.NewServer)
}
3 服務運行層
服務運行層支撐了整個 goredis 應用的運行,包含兩個核心子模塊:
-
• Application: 對應爲 goredis 應用程序的抽象
-
• Server: 對應爲 goredis 接收和響應客戶端請求的運行服務端
3.1 應用程序
goredis 應用程序啓動的入口代碼位於 main.go,核心步驟包括:
-
• 構造 server 與 application 實例
-
• 一鍵運行 application
import "github.com/xiaoxuxiansheng/goredis/app"
func main() {
// 1 構造 server 實例
server, err := app.ConstructServer()
if err != nil {
panic(err)
}
// 2 構造 application 實例
app := app.NewApplication(server, app.SetUpConfig())
defer app.Stop()
// 3 運行 application
if err := app.Run(); err != nil {
panic(err)
}
}
有關 application 類定義代碼位於:app/app.go,包含兩個核心成員屬性:
-
• server: 持有的服務端實例
-
• conf: 聚合了使用方關於 goredis 的定製化配置
type Application struct {
server *server.Server
conf *Config
}
func (a *Application) Run() error {
return a.server.Serve(a.conf.Address())
}
func (a *Application) Stop() {
a.server.Stop()
}
3.2 服務端
服務端類定義代碼位於 server/server.go,其中持有了指令分發模塊 handler 的引用:
type Server struct {
// ...
handler Handler
// ...
}
server 運行時,會啓動 handler 實例,並且啓動 tcp 服務並監聽 tcp 端口,將每個到來的 tcp 連接移交給 handler 層進行處理:
func (s *Server) Serve(address string) error {
// 1 啓動 handler
s.handler.Start()
// ...
// 2 運行 tcp 服務
s.listenAndServe(listener, closec)
// ...
}
func (s *Server) listenAndServe(listener net.Listener, closec chan struct{}) {
// ...
// io 多路複用模型,goroutine for per conn
for {
// 接收到來的 tcp 連接
conn, err := listener.Accept()
// ...
// 爲每個到來的 conn 分配一個 goroutine 處理
pool.Submit(func() {
// ...
s.handler.Handle(ctx, conn)
})
}
// ...
}
func (s *Server) Stop() {
s.stopOnce.Do(func() {
close(s.stopc)
})
}
其中,pool.Submit 方法類似於 go func 一鍵啓動協程,只不過本項目中採用的是協程池模式:
import (
"runtime/debug"
"strings"
"github.com/panjf2000/ants"
)
var pool *ants.Pool
func init() {
_pool, err := ants.NewPool(50000, ants.WithPanicHandler(func(i interface{}) {
// ...
}))
if err != nil {
panic(err)
}
pool = _pool
}
func Submit(task func()) {
pool.Submit(task)
}
協程池使用到的是 ants(https://github.com/panjf2000/ants),之前我也發表了一篇對應的技術文章:Golang 協程池 Ants 實現原理
4 指令分發層
指令分發層的核心作用是,接收到請求後,遵循 Redis 協議將其解析爲操作指令,然後將指令分發到存儲引擎層進行執行.
4.1 指令分發器
指令分發器的接口定義代碼位於 server/server.go,包含兩個核心方法:
-
• Start:啓動 handler
-
• Handle:持續處理到來的一筆連接
// 邏輯處理器
type Handler interface {
Start() error // 啓動 handler
// 處理到來的每一筆 tcp 連接
Handle(ctx context.Context, conn net.Conn)
// 關閉處理器
Close()
}
關於 Handler 的實現類位於 handler/handler.go,其中包含三個核心成員屬性:
-
• db: 對下游存儲引擎層的抽象
-
• parser: 遵循 redis 協議,將到來的請求參數解析成對應的 redis 指令
-
• persister: handler 初啓動時,通過 persister 讀取此前的持久化內容,將其分發到存儲引擎層還原出對應的數據
type Handler struct {
// ...
db DB
parser Parser
persister Persister
// ...
}
啓動 Handler 的 Start 方法:
func (h *Handler) Start() error {
// 加載持久化文件,還原內容
reloader, err := h.persister.Reloader()
// ...
// 讀取持久化文件內容,還原內存數據庫
h.handle(SetLoadingPattern(context.Background()), newFakeReaderWriter(reloader))
return nil
}
處理到來連接的 Handle 方法——通過 parser 將請求參數轉化爲指令,然後分發給 db:
// 處理到來的 tcp 連接
func (h *Handler) Handle(ctx context.Context, conn net.Conn) {
// ...
h.handle(ctx, conn)
}
func (h *Handler) handle(ctx context.Context, conn io.ReadWriter) {
// 藉助 protocol parser 將到來的指令轉而通過 stream channel 輸出
stream := h.parser.ParseStream(conn)
for {
select {
// ...
// 讀取 stream channel 中到來的指令
case droplet := <-stream:
if err := h.handleDroplet(ctx, conn, droplet); err != nil {
// ...
}
}
}
}
// 處理每一筆指令
func (h *Handler) handleDroplet(ctx context.Context, conn io.ReadWriter, droplet *Droplet) error {
// ...
// 請求指令類型轉換
multiReply, ok := droplet.Reply.(MultiReply)
// ...
// 通過 db 引擎處理請求指令
if reply := h.db.Do(ctx, multiReply.Args()); reply != nil {
// 返回響應結果
_, _ = conn.Write(reply.ToBytes())
// ...
}
// ...
}
func (h *Handler) Close() {
// ...
h.db.Close()
h.persister.Close()
}
4.2 協議解析器
協議解析器的接口定義代碼位於 handler/struct.go,通過 ParseStream 方法將連接轉換成 channel 的形式,把每一筆請求參數轉爲 redis 操作指令,並通過 channel 傳遞給 handler 進行處理:
// 協議解析器
type Parser interface {
ParseStream(reader io.Reader) <-chan *Droplet
}
Parser 實現類代碼位於 protocol/parser.go:
type Parser struct {
lineParsers map[byte]lineParser
// ...
}
// 將連接轉爲 stream channel 形式,並通過異步協程執行 parse 方法持續往 channel 中傳送到來的指令
func (p *Parser) ParseStream(reader io.Reader) <-chan *handler.Droplet {
ch := make(chan *handler.Droplet)
pool.Submit(
func() {
p.parse(reader, ch)
})
return ch
}
func (p *Parser) parse(rawReader io.Reader, ch chan<- *handler.Droplet) {
reader := bufio.NewReader(rawReader)
for {
// 逐行讀取數據
firstLine, err := reader.ReadBytes('\n')
// 解析請求參數,分發給對應的指令解析函數
firstLine = bytes.TrimSuffix(firstLine, []byte{'\r', '\n'})
lineParseFunc, ok := p.lineParsers[firstLine[0]]
// ...
// 將解析後的指令傳送到 stream channel
ch <- lineParseFunc(firstLine, reader)
}
}
5 存儲引擎層
存儲引擎層模塊封裝了有關數據存儲交互的流程,其中分爲四個核心子模塊:
-
• 觸發層 trigger: 負責接收來自 handler 的操作指令,將其通過 channel 移交給 executor
-
• 執行層 executor: 採用單協程模式運行,負責完成存儲數據的 crud 操作
-
• 存儲介質 datastore: 存儲數據所在之處,包含各類數據類型的實現模型
-
• 持久化模塊 persister: 支持內存數據的持久化和重加載功能
5.1 數據庫觸發層
有關整個數據庫存儲引擎的接口定義位於 handler/struct.go,通過 Do 方法完成指令的執行,並接收其響應結果:
type DB interface {
Do(ctx context.Context, cmdLine [][]byte) Reply
Close()
}
對 DB 的實現類是數據庫觸發層 DBTrigger,代碼位於 database/trigger.go,其中依賴了 executor 實例:
// 數據庫觸發器
type DBTrigger struct {
// ...
executor Executor
}
DBTrigger 接收到指令,會對其進行包裝,然後通過 channel(executor.Entrance)將其發送給全局單例的 executor 中:
// 處理指令
func (d *DBTrigger) Do(ctx context.Context, cmdLine [][]byte) handler.Reply {
// ...
// 組裝指令
cmd := Command{
ctx: ctx,
cmd: cmdType,
args: cmdLine[1:],
receiver: make(CmdReceiver),
}
// 將指令投遞到 executor
d.executor.Entrance() <- &cmd
// 接收來自 executor 返回的 reply
return <-cmd.Receiver()
}
5.2 數據庫執行層
有關數據執行器接口的定義代碼位於 database/struct.go,其會通過 Entrance 入口完成指令的接收:
type Executor interface {
// 執行器入口,用於接收指令
Entrance() chan<- *Command
// 校驗指令是否合法
ValidCommand(cmd CmdType) bool
Close()
}
DBExecutor 是數據庫執行器的實現類,代碼位於 database/executor.go,核心成員屬性包括:
-
• ch:通過該 channel 接收到來自上游 trigger 發送的指令
-
• cmdHandlers:基於指令類型映射到對應的執行方法
-
• dataStore:真正存儲數據的地方
-
• gcTicker:用於驅動定期回收過期數據的定時器
type DBExecutor struct {
// ...
// 接收來自 trigger 指令的 channel
ch chan *Command
// 指令分發
cmdHandlers map[CmdType]CmdHandler
// 數據存儲介質
dataStore DataStore
// 回收過期數據的定時器
gcTicker *time.Ticker
}
func (e *DBExecutor) Entrance() chan<- *Command {
return e.ch
}
DBExecutor 會以全局單例的形式存在,並在其初始化時啓動 run 方法,持續接收來自 trigger 和 gcTicker 的指令,對 dataStore 中的數據進行操作. 正因爲單協程運行的 executor 是 dataStore 的唯一操作入口,因此存儲數據時可以放心地採用無鎖化模型
// 數據庫執行層單協程運行模式
func (e *DBExecutor) run() {
for {
// ...
// 定期批量回收一次過期數據
case <-e.gcTicker.C:
e.dataStore.GC()
// 接收並處理到來的指令
case cmd := <-e.ch:
// 獲取指令對應的處理函數
cmdFunc, ok := e.cmdHandlers[cmd.cmd]
// ...
// 基於懶加載機制,對 key 進行回收
e.dataStore.ExpirePreprocess(string(cmd.args[0]))
// 執行指令處理函數,並將執行結果通過 receiver 發送給 trigger
cmd.receiver <- cmdFunc(cmd)
}
}
}
5.3 數據存儲介質
存儲介質 DataStore 的接口定義代碼位於 database/struct.go,其中包含了各類數據操作指令犯法:
type DataStore interface {
// ...
// 過期相關指令
GC()
Expire(*Command) handler.Reply
ExpireAt(*Command) handler.Reply
// string 數據類型
Get(*Command) handler.Reply
Set(*Command) handler.Reply
// ...
// list 數據類型
LPush(*Command) handler.Reply
// ...
// set 數據類型
SAdd(*Command) handler.Reply
// ...
// hashmap 數據類型
HSet(*Command) handler.Reply
// ...
// sortedset 數據類型
ZAdd(*Command) handler.Reply
// ...
}
dataStore 的實現類爲 KVStore,對應代碼位於 database/kv_store.go:
-
• data:實際存儲數據的 map 結構,因爲不存在併發,所以無需加鎖保護
-
• expiredAt:記錄了每個 key 對應的過期時間
type KVStore struct {
// 實際存儲數據的 map
data map[string]interface{}
// 記錄 key 過期時間的 map
expiredAt map[string]time.Time
// 執行 key 過期回收任務的時間輪
expireTimeWheel SortedSet
// 數據持久化模塊
persister handler.Persister
}
5.4 數據持久化模塊
數據持久化模塊的接口定義代碼位於 handler/persist.go,核心方法包括:
-
• Reloader:獲取用於重加載數據的 reloader
-
• PersistCmd:完成一筆指令的持久化
type Persister interface {
// 加載持久化數據的 loader
Reloader() (io.ReadCloser, error)
// 對指令進行持久化
PersistCmd(ctx context.Context, cmd [][]byte)
Close()
}
goredis 中,針對持久化模塊僅僅實現了 aof 模式,對應代碼位於 persist/aof.go,相關內容將在本系列第 4 篇中展開.
type aofPersister struct {
// ...
// 接收持久化指令的通道
buffer chan [][]byte
// aof 文件
aofFile *os.File
// aof 文件名
aofFileName string
// aof 持久化策略
appendFsync appendSyncStrategy
// aof 重寫策略
autoAofRewriteAfterCmd int64
aofCounter atomic.Int64
// ...
}
6 展望
至此,本篇正文結束. 本期向大家介紹的是 goredis 系列的開篇之作,在此對未來擬展開部分內容作個展望:
-
• 基於 go 實現 redis 之主幹框架(已完成): 在宏觀視角下縱覽 goredis 整體架構,梳理各模塊間的關聯性
-
• 基於 go 實現 redis 之指令分發(待填坑): 聚焦介紹 goredis 服務端如何啓動和運行,並在接收客戶端請求後實現指令協議的解析和分發
-
• 基於 go 實現 redis 之存儲引擎(待填坑): 聚焦介紹數據存儲層中單協程無鎖化執行框架,各類基本數據類型的底層實現細節,以及過期數據的惰性和定期回收機制
-
• 基於 go 實現 redis 之數據持久化(待填坑): 介紹 goredis 關於 aof 持久化機制的實現以及有關於 aof 重寫策略的執行細節
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/tKtmhCNtc696a87NBbo1Dw