手擼源碼系列 - cache2go
你好,我是小四,你情商高也可以叫我四哥。
碰到很多同學問我,平時疲於寫各種業務代碼,如何才能提高編程能力?我的辦法是多閱讀優秀的代碼,只有見過更好的,我們才能知道如何編寫好的代碼、提高編程能力。就好比如果學武功,你肯定要找一個武林高手作師傅。
進入今天的正題。
前一篇文章,我們分析了 go-cache 庫,今天再來看一個緩存庫 -- cache2go。
網上也有很多文章分析了這個庫,閱讀過源碼的同學都是這麼覺得的:
所以,如果想要提高 Go 語言編程水平的同學,不妨多閱讀源碼。如果不知道從哪裏開始,不如就從 cache2go 開始。
什麼是 cache2go
cache2go 是一個併發安全、帶有自動過期機制的緩存庫。
通過閱讀源碼我們可以掌握:
-
作者是如何維護緩存數據的;
-
作者是如何使用 sync.RWMutex 保證併發安全,有哪些值得借鑑的技巧;
-
自動過期機制是如何實現的(這塊代碼思路值得學習);
這個庫還提供瞭如下功能:
-
可以配置添加、刪除、訪問數據時的回調函數;
-
爲每個 item 單獨設置存活時間;
-
記錄每個 item 的最近訪問時間、創建時間、訪問次數等;
-
按照訪問次數排序;
項目地址:https://github.com/muesli/cache2go
如何使用
先來練練手,熟悉下如何使用。
type myStruct struct {
text string
moreData []byte
}
var (
k = "testkey"
v = "testvalue"
)
func main() {
cache := cache2go.Cache("myCache") // 創建CacheTable
val := myStruct{"This is a test!", []byte{}}
cache.Add("someKey", 5*time.Second, &val) // 5s 是 item 的存活時間,超過 5s 不被訪問就會被刪除
res, err := cache.Value("someKey") // 獲取item
if err == nil {
fmt.Println("Found value in cache:", res.Data().(*myStruct).text)
} else {
fmt.Println("Error retrieving value from cache:", err)
}
time.Sleep(6 * time.Second) // 休眠6s 5s過後,上面添加的 item 會過期,自動被刪除
res, err = cache.Value("someKey") // 再次獲取 item,不存在會報錯
if err != nil {
fmt.Println("Item is not cached (anymore).")
}
cache.Add("someKey", 0, &val) // 無過期時間,表示不會過期
cache.SetAboutToDeleteItemCallback(func(e *cache2go.CacheItem) { // 設置刪除回調函數,刪除 item 時會自動觸發
fmt.Println("Deleting:", e.Key(), e.Data().(*myStruct).text, e.CreatedOn())
})
cache.Delete("someKey") // 手動刪除item,會觸發刪除回調函數
cache.Flush() // 清空table
// 完整例子請關注公衆號【Golang來啦】,後臺發送關鍵字 cache2go 獲取
//TestCache()
//TestCacheExpire()
//TestExists()
//TestNotFoundAdd()
//TestCacheKeepAlive()
//TestDelete()
//TestFlush()
//TestCount()
//TestAccessCount()
//TestCallbacks()
//TestDataLoader()
// 完整例子請關注公衆號【Golang來啦】,後臺發送關鍵字 cache2go 獲取
}
下面我們來看下 cache2go 內部是如何實現前面說的那些功能的。
主要數據結構
我們看下項目目錄,如下圖:
可以看到代碼主要集中在 cache.go、cachetable.go、cacheitem.go 三個文件中,主要的數據結構有三個:
-
cache:用於緩存多個 table;
-
CacheTable:緩存一個 table;
-
CacheItem:存放鍵值對 (item);
存放鍵值對 (K-V) 的結構稱爲 item。
這三個數據結構的關係如下圖:
從圖中可以看出是三個層級的結構,最底層是 CacheItem,再上一層是 CacheTable,最上一層是 cache。主要的邏輯代碼集中在 CacheTable,也是我們學習的重點。從下往上,我們一層層看。
CacheItem
CacheItem 是真正存儲數據的結構,詳細的字段如下,代碼中有註釋。與 CacheItem 有關的操作,後面會給大家詳解。
type CacheItem struct {
sync.RWMutex // 讀寫鎖
key interface{} // 鍵
data interface{} // key對應的數據
lifeSpan time.Duration // 不被訪問後的存活時間 這個時間的含義是:距離上一次被訪問的時間如果超過 lifeSpan 則會被自動刪除
createdOn time.Time // item 添加的時間
accessedOn time.Time // 最近一次被訪問的時間
accessCount int64 // 被訪問的次數
aboutToExpire []func(key interface{}) // item 被刪除時會觸發該回調函數
}
CacheTable
CacheTable 結構體如下:
type CacheTable struct {
sync.RWMutex // 讀寫鎖
name string // table name
items map[interface{}]*CacheItem // table 下存放的 item
cleanupTimer *time.Timer // 清理定時器,觸發下次清理過期 item 的事件
cleanupInterval time.Duration // 清理操作觸發的時間間隔
logger *log.Logger
loadData func(key interface{}, args ...interface{}) *CacheItem // 當提取一個不存在的 key 時觸發的回調函數
addedItem []func(item *CacheItem) // 添加 item 會觸發的回調函數,可以設置多個
aboutToDeleteItem []func(item *CacheItem) // 刪除 item 會觸發的回調函數,可以設置多個
}
items 字段是一個 map,key 可以任意類型,value 存放 CacheItem 的指針,指向真正的數據。
cache
cache 的結構比較簡單,是一個 map,用於集中存放 table。
var (
cache = make(map[string]*CacheTable)
mutex sync.RWMutex // 讀寫鎖
)
與 cache 相關的操作比較簡單,先在這裏說下:
// 返回一個table,如果已經存在就直接返回,否則執行添加操作,再返回
func Cache(table string) *CacheTable {
mutex.RLock()
t, ok := cache[table] // 判斷是否存在
mutex.RUnlock()
if !ok {
mutex.Lock()
t, ok = cache[table] // double-check 再次判斷是否存在
if !ok { // 不存在則添加
t = &CacheTable{
name: table,
items: make(map[interface{}]*CacheItem),
}
cache[table] = t
}
mutex.Unlock()
}
return t
}
與 cache 有關的操作只有這一個函數 Cache(),在 cache.go 文件中,用來創建 table。上面聲明全局變量的那把鎖是用來確保操作 cache 併發安全的。代碼中也使用了 double-check 機制,進一步確保併發安全, 經常閱讀源碼的同學應該知道,這是一種常用的安全機制。我覺得自己又學到了?!哈哈
主要操作流程
cache 有關的操作上面已經講過了,接下來我們主要講下與 CacheItem、CacheTable 有關的操作。
CacheItem
與 CacheItem 有關的操作主要有初始化、設置過期刪除回調函數、更新訪問次數 / 訪問時間等。
初始化
// CacheItem 初始化
func NewCacheItem(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem {
t := time.Now()
return &CacheItem{
key: key,
lifeSpan: lifeSpan,
createdOn: t,
accessedOn: t,
accessCount: 0,
aboutToExpire: nil,
data: data,
}
}
初始化操作乾的就是初始化一個 CacheItem 並返回變量地址,需要特別注意的是 lifeSpan 變量,表示 item 的存活時間:距離上一次被訪問的時間如果超過 lifeSpan,則 item 會被自動刪除。
設置回調函數
item 被刪除時會被自動調用回調函數,由下面三個方法設置:
// 設置回調函數,如果原來已經有回調函數則先清空再添加
func (item *CacheItem) SetAboutToExpireCallback(f func(interface{})) {
if len(item.aboutToExpire) > 0 {
item.RemoveAboutToExpireCallback()
}
item.Lock()
defer item.Unlock()
item.aboutToExpire = append(item.aboutToExpire, f)
}
// 添加回調函數
func (item *CacheItem) AddAboutToExpireCallback(f func(interface{})) {
item.Lock()
defer item.Unlock()
item.aboutToExpire = append(item.aboutToExpire, f)
}
// 刪除回調函數
func (item *CacheItem) RemoveAboutToExpireCallback() {
item.Lock()
defer item.Unlock()
item.aboutToExpire = nil
}
與回調函數有關的完整示例請關注公衆號【Golang 來啦】,後臺發送關鍵字 cache2go 獲取。
更新訪問次數 / 時間
// 更新 item 的訪問次數、時間
func (item *CacheItem) KeepAlive() {
item.Lock()
defer item.Unlock()
item.accessedOn = time.Now()
item.accessCount++
}
每次獲取數據時,如果 item 存在都會調用這個方法更新 item 的數據。item 的訪問時間 (accessedOn) 被更新之後,item 的 “壽命” 又延長了,哈哈。
CacheTable
caceh2go 的主要邏輯集中在對 table 的操作,主要包括添加、刪除、過期檢查、數據獲取 (當數據不存在時,可以自定義回調函數,實現從數據庫、文件等其他地方獲取數據並添加到緩存)。如何實現這些功能是我們閱讀源碼的重點。
添加 item
在 table 中添加 item,通過 Add() 方法實現,參數 lifeSpan 是 item 的存活時間。先看流程圖,方便理解代碼。
代碼如下:
func (table *CacheTable) Add(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem {
item := NewCacheItem(key, lifeSpan, data) // 初始化item
table.Lock() // 加鎖
table.addInternal(item) // 添加item
return item
}
// 內部添加方法實現,調用這個方法之前必須保證 table-mutex 已經上鎖
func (table *CacheTable) addInternal(item *CacheItem) {
table.log("Adding item with key", item.key, "and lifespan of", item.lifeSpan, "to table", table.name)
table.items[item.key] = item // 添加操作
// 將需要的數據先拿出來,及時釋放鎖,保證鎖粒度最小化,後面如果有回調函數需要執行可能會比較耗時
expDur := table.cleanupInterval // 當前的清理時間間隔
addedItem := table.addedItem // 添加時待執行的回調函數
table.Unlock() // 釋放鎖
if addedItem != nil { // 如果回調函數存在,則執行
for _, callback := range addedItem {
callback(item)
}
}
// 保證鎖最小粒度,注意不要在這裏釋放鎖
// 什麼時候需要觸發過期檢查? 答:item設置了存活時間且清理時間間隔等於0 或 設置了存活時間且存活時間小於時間間隔
// 注:expDur == 0 表示還沒有設置過定時清理器(在過期清理方法裏設置,接下去我們會講到)
// 這裏爲什麼要觸發過期檢查功能?
// 答:有可能清理定時器下一次觸發的時間比新加的這個 item 存活時間要長,此時就必須及時更新清理定時器,
// 保證到存活時間點能及時調用過期清理函數,將過期的 item 清理掉
if item.lifeSpan > 0 && (expDur == 0 || item.lifeSpan < expDur) {
table.expirationCheck()
}
}
主要的代碼邏輯看註釋,有不清楚或者有誤的地方,可以私我交流下。
有幾點需要注意的地方:
鎖的應用:1、上鎖 / 釋放鎖的時機:調用 addInternal() 方法之前,必須保證 table-mutex 已經上鎖,因爲方法裏有解鎖操作;2、保證鎖最小粒度,釋放鎖不應該放在調用回調函數之後,因爲有可能回調函數執行時間較長,其他 goroutine 獲取不到鎖造成不必要的等待,影響性能;
過期檢查函數調用的條件是什麼?添加 item 時爲什麼要做過期檢查?解析可以看註釋。
過期檢查
過期檢查的流程圖如下:
代碼如下:
func (table *CacheTable) expirationCheck() {
table.Lock()
if table.cleanupTimer != nil { // 如果存在清理定時器,則先停止
table.cleanupTimer.Stop()
}
if table.cleanupInterval > 0 {
table.log("Expiration check triggered after", table.cleanupInterval, "for table", table.name)
} else {
table.log("Expiration check installed for table", table.name)
}
now := time.Now()
smallestDuration := 0 * time.Second // smallestDuration,所有 item 中最小的過期時長,初值爲 0,後面會更新
for key, item := range table.items {
// 獲取 item 的數據,使用讀寫鎖
item.RLock()
lifeSpan := item.lifeSpan
accessedOn := item.accessedOn
item.RUnlock()
if lifeSpan == 0 { // 存活時間等於 0 表示 item 一直存活,不會過期
continue
}
if now.Sub(accessedOn) >= lifeSpan { // 距離 item 上一次被訪問時間已經超過存活時間,則刪除 item
table.deleteInternal(key) // 調用內部刪除方法
} else {
// 通過對比找到所有 item 中最小的過期時長
if smallestDuration == 0 || lifeSpan-now.Sub(accessedOn) < smallestDuration {
smallestDuration = lifeSpan - now.Sub(accessedOn)
}
}
}
// 根據 smallestDuration 設置清理週期,並設置一個定時器,觸發下一次過期清理動作
table.cleanupInterval = smallestDuration
if smallestDuration > 0 {
table.cleanupTimer = time.AfterFunc(smallestDuration, func() {
go table.expirationCheck() // 調用過期檢查
// 這裏並不是循環調用,啓動一個新的 goroutine 後當前 goroutine 會退出,並不會引起 goroutine 泄漏
})
}
table.Unlock()
}
可以看到 expirationCheck() 方法實現了一個動態的清理器,每次執行的時會找出最快過期的 item 的存活時長 smallestDuration,根據這個值定義一個定時器,到點執行清理函數,這樣往復循環。
需要注意的是,方法開始判斷了是否設置了定時器,如果已經設置就先停止。
這在什麼情況下會發生呢?
比如之前已經設置了清理定時器,將在 10s 後執行,此時新添加了一個 item,存活時長是 5s,此時就會觸發執行 expirationCheck() 方法,更新定時清理器。
expirationCheck() 方法調用了刪除方法,接着一起看下。
刪除
除了 item 過期時會自動調用刪除方法之外,cache2go 對外還提供了 Delete() 方法手動刪除 item。
流程圖:
代碼如下:
// 刪除 item,存在的話會返回 item,否則返回錯誤
func (table *CacheTable) Delete(key interface{}) (*CacheItem, error) {
table.Lock()
defer table.Unlock()
return table.deleteInternal(key)
}
// 內部刪除方法
func (table *CacheTable) deleteInternal(key interface{}) (*CacheItem, error) {
r, ok := table.items[key]
if !ok { // 不存在
return nil, ErrKeyNotFound
}
// 將需要的數據先拿出來,及時釋放鎖,保證鎖粒度最小化,後面如果有回調函數需要執行可能會比較耗時
aboutToDeleteItem := table.aboutToDeleteItem
table.Unlock()
// Trigger callbacks before deleting an item from cache.
if aboutToDeleteItem != nil { // table 有刪除回調函數,則執行
for _, callback := range aboutToDeleteItem {
callback(r)
}
}
r.RLock()
defer r.RUnlock()
if r.aboutToExpire != nil { // item 有過期回調函數,則執行
for _, callback := range r.aboutToExpire {
callback(key)
}
}
table.Lock()
table.log("Deleting item with key", key, "created on", r.createdOn, "and hit", r.accessCount, "times from table", table.name)
delete(table.items, key) // 刪除
return r, nil
}
刪除方法邏輯比較簡單,這裏涉及到調用兩處刪除回調函數,一處是在 table 定義的刪除回調函數,如果 item 也定義了刪除回調函數,也是需要執行的。
另外需要注意的是保證鎖的最小粒度。
數據獲取
cache2go 對外提供了 Value() 方法用於獲取 item,拿到 item 之後就可以調用 item 的方法,獲取 item 有關的信息,比如訪問時間 / 次數、存活時長、與 key 關聯的數據等。
// 提取 key 對應的 item 數據,如果緩存中不存在,如果設置了 data-loader 回調函數,則會嘗試去別的地方,比如數據庫、文件等,獲取數據並添加到緩存中
func (table *CacheTable) Value(key interface{}, args ...interface{}) (*CacheItem, error) {
table.RLock()
r, ok := table.items[key]
loadData := table.loadData
table.RUnlock()
if ok {
r.KeepAlive() // 更新訪問次數(accessCount) 和訪問時間(accessedOn),重新設置 accessedOn,表示該 item 還可以存活 lifeSpan 時長
return r, nil
}
// 如果 item 不存在,則嘗試使用 data-loader 生成一個 item 並添加到 cache 裏
if loadData != nil {
item := loadData(key, args...) // 返回一個 item
if item != nil {
table.Add(key, item.lifeSpan, item.data) // 執行添加操作,將獲取到的數據添加到緩存中
return item, nil
}
return nil, ErrKeyNotFoundOrLoadable
}
return nil, ErrKeyNotFound
}
可以看到當 item 存在時,會調用 KeepAlive() 方法更新 item 的訪問時間 (accessedOn),表示該 item 還可以存活 lifeSpan 時長,延長 item 的 “壽命”。
另外,Value() 函數里有個比較有意思的地方,如果設置了 data-loader 回調函數,並且 item 不存在時,會調用回調函數, 在回調函數里面可以寫自己想要實現的邏輯代碼,比如從數據庫、文件等獲取數據並添加到緩存裏。
data-loader 回調函數通過 SetDataLoader() 方法設置。
與 data-loader 有關的完整示例請關注公衆號【Golang 來啦】,後臺發送關鍵字 cache2go 獲取。
cache2go 的數據獲取邏輯與 go-cache 不同之處在於,go-cache 獲取數據時會判斷 item 的過期時間,如果已過期則認爲數據已經不存在。go-cache 爲什麼會判斷過期時間呢?因爲它的過期清理週期是固定的,有可能還沒到清理時間 item 已經過期了。而 cache2go 的清理週期是自動動態調整,能保證在過期時間點將已過期的 item 及時清理掉。
其他
上面提到的幾個功能是我們學習的重點,其他功能,比如對 table 設置添加回調函數、刪除回調函數等,比較簡單,大家可以拉源碼來看下,就不在這分析。
另外 table 還提供了一個方法 MostAccessed(),功能:將 item 按訪問次數倒序並返回前 count 個 item,裏面用到了 sort.Sort() 排序,感興趣的同學可以看下,有不清楚的歡迎私我交流。
總結
cache2go 的代碼裏不算多,結構分層清晰,函數功能職責明確。通過閱讀源碼,可以學習:
-
加深對鎖的靈活使用,在一個方法中加鎖,到另一個方法中解鎖,比如在 Delete() 方法解鎖,在 deleteInternal() 方法中解鎖,再加鎖,最後回到 Delete() 中解鎖,這麼做的目的是保證鎖的最小粒度,儘可能不阻塞其他 goroutine 執行。在源碼其他方法中也有相同的使用方式;
-
如何實現一個動態的過期清理機制;
-
如何設置回調方法,方便對數據進行維護;
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/Q7YnXppGw-CLo6wA2oonyg