手擼源碼系列 - cache2go

你好,我是小四,你情商高也可以叫我四哥。

碰到很多同學問我,平時疲於寫各種業務代碼,如何才能提高編程能力?我的辦法是多閱讀優秀的代碼,只有見過更好的,我們才能知道如何編寫好的代碼、提高編程能力。就好比如果學武功,你肯定要找一個武林高手作師傅。

進入今天的正題。

前一篇文章,我們分析了 go-cache 庫,今天再來看一個緩存庫 -- cache2go。

網上也有很多文章分析了這個庫,閱讀過源碼的同學都是這麼覺得的:

所以,如果想要提高 Go 語言編程水平的同學,不妨多閱讀源碼。如果不知道從哪裏開始,不如就從 cache2go 開始。

什麼是 cache2go

cache2go 是一個併發安全、帶有自動過期機制的緩存庫。

通過閱讀源碼我們可以掌握:

這個庫還提供瞭如下功能:

項目地址:https://github.com/muesli/cache2go

如何使用

先來練練手,熟悉下如何使用。

type myStruct struct {
 text     string
 moreData []byte
}

var (
 k = "testkey"
 v = "testvalue"
)

func main() {

 cache := cache2go.Cache("myCache") // 創建CacheTable

 val := myStruct{"This is a test!"[]byte{}}
 cache.Add("someKey", 5*time.Second, &val) // 5s 是 item 的存活時間,超過 5s 不被訪問就會被刪除

 res, err := cache.Value("someKey") // 獲取item
 if err == nil {
  fmt.Println("Found value in cache:", res.Data().(*myStruct).text)
 } else {
  fmt.Println("Error retrieving value from cache:", err)
 }

 time.Sleep(6 * time.Second)       // 休眠6s  5s過後,上面添加的 item 會過期,自動被刪除
 res, err = cache.Value("someKey") // 再次獲取 item,不存在會報錯
 if err != nil {
  fmt.Println("Item is not cached (anymore).")
 }

 cache.Add("someKey", 0, &val)                                    // 無過期時間,表示不會過期
 cache.SetAboutToDeleteItemCallback(func(e *cache2go.CacheItem) { // 設置刪除回調函數,刪除 item 時會自動觸發
  fmt.Println("Deleting:", e.Key(), e.Data().(*myStruct).text, e.CreatedOn())
 })

 cache.Delete("someKey") // 手動刪除item,會觸發刪除回調函數

 cache.Flush() // 清空table

 // 完整例子請關注公衆號【Golang來啦】,後臺發送關鍵字 cache2go 獲取
 
 //TestCache()
 //TestCacheExpire()
 //TestExists()
 //TestNotFoundAdd()
 //TestCacheKeepAlive()
 //TestDelete()
 //TestFlush()
 //TestCount()
 //TestAccessCount()
 //TestCallbacks()
 //TestDataLoader()

 // 完整例子請關注公衆號【Golang來啦】,後臺發送關鍵字 cache2go 獲取
}

下面我們來看下 cache2go 內部是如何實現前面說的那些功能的。

主要數據結構

我們看下項目目錄,如下圖:

可以看到代碼主要集中在 cache.go、cachetable.go、cacheitem.go 三個文件中,主要的數據結構有三個:

存放鍵值對 (K-V) 的結構稱爲 item。

這三個數據結構的關係如下圖:

從圖中可以看出是三個層級的結構,最底層是 CacheItem,再上一層是 CacheTable,最上一層是 cache。主要的邏輯代碼集中在 CacheTable,也是我們學習的重點。從下往上,我們一層層看。

CacheItem

CacheItem 是真正存儲數據的結構,詳細的字段如下,代碼中有註釋。與 CacheItem 有關的操作,後面會給大家詳解。

type CacheItem struct {
 sync.RWMutex   // 讀寫鎖

 key interface{}  // 鍵

 data interface{}  // key對應的數據

 lifeSpan time.Duration  // 不被訪問後的存活時間   這個時間的含義是:距離上一次被訪問的時間如果超過 lifeSpan 則會被自動刪除

 createdOn time.Time   // item 添加的時間

 accessedOn time.Time  // 最近一次被訪問的時間

 accessCount int64 // 被訪問的次數

 aboutToExpire []func(key interface{}) // item 被刪除時會觸發該回調函數
}

CacheTable

CacheTable 結構體如下:

type CacheTable struct {
 sync.RWMutex   // 讀寫鎖
 
 name string   // table name

 items map[interface{}]*CacheItem   // table 下存放的 item

 cleanupTimer *time.Timer    // 清理定時器,觸發下次清理過期 item 的事件

 cleanupInterval time.Duration  // 清理操作觸發的時間間隔

 logger *log.Logger

 loadData func(key interface{}, args ...interface{}) *CacheItem  // 當提取一個不存在的 key 時觸發的回調函數

 addedItem []func(item *CacheItem)   // 添加 item 會觸發的回調函數,可以設置多個

 aboutToDeleteItem []func(item *CacheItem)   // 刪除 item 會觸發的回調函數,可以設置多個
}

items 字段是一個 map,key 可以任意類型,value 存放 CacheItem 的指針,指向真正的數據。

cache

cache 的結構比較簡單,是一個 map,用於集中存放 table。

var (
 cache = make(map[string]*CacheTable)
 mutex sync.RWMutex    // 讀寫鎖
)

與 cache 相關的操作比較簡單,先在這裏說下:

// 返回一個table,如果已經存在就直接返回,否則執行添加操作,再返回
func Cache(table string) *CacheTable {
 mutex.RLock()
 t, ok := cache[table]  // 判斷是否存在
 mutex.RUnlock()

 if !ok {
  mutex.Lock()
  t, ok = cache[table]    // double-check 再次判斷是否存在
  if !ok {       // 不存在則添加
    t = &CacheTable{
    name:  table,
    items: make(map[interface{}]*CacheItem),
   }
   cache[table] = t
  }
  mutex.Unlock()
 }

 return t
}

與 cache 有關的操作只有這一個函數 Cache(),在 cache.go 文件中,用來創建 table。上面聲明全局變量的那把鎖是用來確保操作 cache 併發安全的。代碼中也使用了 double-check 機制,進一步確保併發安全, 經常閱讀源碼的同學應該知道,這是一種常用的安全機制。我覺得自己又學到了?!哈哈

主要操作流程

cache 有關的操作上面已經講過了,接下來我們主要講下與 CacheItem、CacheTable 有關的操作。

CacheItem

與 CacheItem 有關的操作主要有初始化、設置過期刪除回調函數、更新訪問次數 / 訪問時間等。

初始化

// CacheItem 初始化
func NewCacheItem(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem {
 t := time.Now()
 return &CacheItem{
  key:           key,
  lifeSpan:      lifeSpan,
  createdOn:     t,
  accessedOn:    t,
  accessCount:   0,
  aboutToExpire: nil,
  data:          data,
 }
}

初始化操作乾的就是初始化一個 CacheItem 並返回變量地址,需要特別注意的是 lifeSpan 變量,表示 item 的存活時間:距離上一次被訪問的時間如果超過 lifeSpan,則 item 會被自動刪除。

設置回調函數

item 被刪除時會被自動調用回調函數,由下面三個方法設置:

// 設置回調函數,如果原來已經有回調函數則先清空再添加
func (item *CacheItem) SetAboutToExpireCallback(f func(interface{})) {
 if len(item.aboutToExpire) > 0 {
  item.RemoveAboutToExpireCallback()
 }
 item.Lock()
 defer item.Unlock()
 item.aboutToExpire = append(item.aboutToExpire, f)
}

// 添加回調函數
func (item *CacheItem) AddAboutToExpireCallback(f func(interface{})) {
 item.Lock()
 defer item.Unlock()
 item.aboutToExpire = append(item.aboutToExpire, f)
}

// 刪除回調函數
func (item *CacheItem) RemoveAboutToExpireCallback() {
 item.Lock()
 defer item.Unlock()
 item.aboutToExpire = nil
}

與回調函數有關的完整示例請關注公衆號【Golang 來啦】,後臺發送關鍵字 cache2go 獲取。

更新訪問次數 / 時間

// 更新 item 的訪問次數、時間
func (item *CacheItem) KeepAlive() {
 item.Lock()
 defer item.Unlock()
 item.accessedOn = time.Now()
 item.accessCount++
}

每次獲取數據時,如果 item 存在都會調用這個方法更新 item 的數據。item 的訪問時間 (accessedOn) 被更新之後,item 的 “壽命” 又延長了,哈哈。

CacheTable

caceh2go 的主要邏輯集中在對 table 的操作,主要包括添加、刪除、過期檢查、數據獲取 (當數據不存在時,可以自定義回調函數,實現從數據庫、文件等其他地方獲取數據並添加到緩存)。如何實現這些功能是我們閱讀源碼的重點。

添加 item

在 table 中添加 item,通過 Add() 方法實現,參數 lifeSpan 是 item 的存活時間。先看流程圖,方便理解代碼。

代碼如下:

func (table *CacheTable) Add(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem {
 item := NewCacheItem(key, lifeSpan, data)   // 初始化item

 table.Lock()   // 加鎖
 table.addInternal(item)   // 添加item

 return item
}

// 內部添加方法實現,調用這個方法之前必須保證 table-mutex 已經上鎖
func (table *CacheTable) addInternal(item *CacheItem) {

 table.log("Adding item with key", item.key, "and lifespan of", item.lifeSpan, "to table", table.name)
 table.items[item.key] = item    // 添加操作

 // 將需要的數據先拿出來,及時釋放鎖,保證鎖粒度最小化,後面如果有回調函數需要執行可能會比較耗時
 expDur := table.cleanupInterval // 當前的清理時間間隔
 addedItem := table.addedItem    // 添加時待執行的回調函數
 table.Unlock()    // 釋放鎖

 if addedItem != nil {   // 如果回調函數存在,則執行
  for _, callback := range addedItem {
   callback(item)
  }
 }
 // 保證鎖最小粒度,注意不要在這裏釋放鎖

 // 什麼時候需要觸發過期檢查?  答:item設置了存活時間且清理時間間隔等於0 或 設置了存活時間且存活時間小於時間間隔
 // 注:expDur == 0 表示還沒有設置過定時清理器(在過期清理方法裏設置,接下去我們會講到)

 // 這裏爲什麼要觸發過期檢查功能?
 // 答:有可能清理定時器下一次觸發的時間比新加的這個 item 存活時間要長,此時就必須及時更新清理定時器,
 // 保證到存活時間點能及時調用過期清理函數,將過期的 item 清理掉
 if item.lifeSpan > 0 && (expDur == 0 || item.lifeSpan < expDur) {
  table.expirationCheck()
 }
}

主要的代碼邏輯看註釋,有不清楚或者有誤的地方,可以私我交流下。

有幾點需要注意的地方:

鎖的應用:1、上鎖 / 釋放鎖的時機:調用 addInternal() 方法之前,必須保證 table-mutex 已經上鎖,因爲方法裏有解鎖操作;2、保證鎖最小粒度,釋放鎖不應該放在調用回調函數之後,因爲有可能回調函數執行時間較長,其他 goroutine 獲取不到鎖造成不必要的等待,影響性能;

過期檢查函數調用的條件是什麼?添加 item 時爲什麼要做過期檢查?解析可以看註釋。

過期檢查

過期檢查的流程圖如下:

代碼如下:

func (table *CacheTable) expirationCheck() {
 table.Lock()
 if table.cleanupTimer != nil {   // 如果存在清理定時器,則先停止
  table.cleanupTimer.Stop()
 }
 if table.cleanupInterval > 0 {
  table.log("Expiration check triggered after", table.cleanupInterval, "for table", table.name)
 } else {
  table.log("Expiration check installed for table", table.name)
 }

 now := time.Now()
 smallestDuration := 0 * time.Second  // smallestDuration,所有 item 中最小的過期時長,初值爲 0,後面會更新
 for key, item := range table.items {
  // 獲取 item 的數據,使用讀寫鎖
  item.RLock()
  lifeSpan := item.lifeSpan
  accessedOn := item.accessedOn
  item.RUnlock()

  if lifeSpan == 0 {  // 存活時間等於 0 表示 item 一直存活,不會過期
   continue
  }
  if now.Sub(accessedOn) >= lifeSpan {    // 距離 item 上一次被訪問時間已經超過存活時間,則刪除 item
   table.deleteInternal(key)  // 調用內部刪除方法
  } else {
   // 通過對比找到所有 item 中最小的過期時長
   if smallestDuration == 0 || lifeSpan-now.Sub(accessedOn) < smallestDuration {
    smallestDuration = lifeSpan - now.Sub(accessedOn)
   }
  }
 }

 // 根據 smallestDuration 設置清理週期,並設置一個定時器,觸發下一次過期清理動作
 table.cleanupInterval = smallestDuration
 if smallestDuration > 0 {
  table.cleanupTimer = time.AfterFunc(smallestDuration, func() {
   go table.expirationCheck()    // 調用過期檢查
   // 這裏並不是循環調用,啓動一個新的 goroutine 後當前 goroutine 會退出,並不會引起 goroutine 泄漏
  })
 }
 table.Unlock()
}

可以看到 expirationCheck() 方法實現了一個動態的清理器,每次執行的時會找出最快過期的 item 的存活時長 smallestDuration,根據這個值定義一個定時器,到點執行清理函數,這樣往復循環。

需要注意的是,方法開始判斷了是否設置了定時器,如果已經設置就先停止。

這在什麼情況下會發生呢?

比如之前已經設置了清理定時器,將在 10s 後執行,此時新添加了一個 item,存活時長是 5s,此時就會觸發執行 expirationCheck() 方法,更新定時清理器。

expirationCheck() 方法調用了刪除方法,接着一起看下。

刪除

除了 item 過期時會自動調用刪除方法之外,cache2go 對外還提供了 Delete() 方法手動刪除 item。

流程圖:

代碼如下:

// 刪除 item,存在的話會返回 item,否則返回錯誤
func (table *CacheTable) Delete(key interface{}) (*CacheItem, error) {
 table.Lock()
 defer table.Unlock()

 return table.deleteInternal(key)
}


// 內部刪除方法
func (table *CacheTable) deleteInternal(key interface{}) (*CacheItem, error) {
 r, ok := table.items[key]
 if !ok {   // 不存在
  return nil, ErrKeyNotFound
 }

 // 將需要的數據先拿出來,及時釋放鎖,保證鎖粒度最小化,後面如果有回調函數需要執行可能會比較耗時
 aboutToDeleteItem := table.aboutToDeleteItem
 table.Unlock()

 // Trigger callbacks before deleting an item from cache.
 if aboutToDeleteItem != nil {   // table 有刪除回調函數,則執行
  for _, callback := range aboutToDeleteItem {
   callback(r)
  }
 }

 r.RLock()
 defer r.RUnlock()
 if r.aboutToExpire != nil {  // item 有過期回調函數,則執行
  for _, callback := range r.aboutToExpire {
   callback(key)
  }
 }

 table.Lock()
 table.log("Deleting item with key", key, "created on", r.createdOn, "and hit", r.accessCount, "times from table", table.name)
 delete(table.items, key)  // 刪除

 return r, nil
}

刪除方法邏輯比較簡單,這裏涉及到調用兩處刪除回調函數,一處是在 table 定義的刪除回調函數,如果 item 也定義了刪除回調函數,也是需要執行的。

另外需要注意的是保證鎖的最小粒度。

數據獲取

cache2go 對外提供了 Value() 方法用於獲取 item,拿到 item 之後就可以調用 item 的方法,獲取 item 有關的信息,比如訪問時間 / 次數、存活時長、與 key 關聯的數據等。

// 提取 key 對應的 item 數據,如果緩存中不存在,如果設置了 data-loader 回調函數,則會嘗試去別的地方,比如數據庫、文件等,獲取數據並添加到緩存中
func (table *CacheTable) Value(key interface{}, args ...interface{}) (*CacheItem, error) {
 table.RLock()
 r, ok := table.items[key]
 loadData := table.loadData
 table.RUnlock()

 if ok {
  r.KeepAlive()   // 更新訪問次數(accessCount) 和訪問時間(accessedOn),重新設置 accessedOn,表示該 item 還可以存活 lifeSpan 時長
  return r, nil
 }

 // 如果 item 不存在,則嘗試使用 data-loader 生成一個 item 並添加到 cache 裏
 if loadData != nil {
  item := loadData(key, args...)   // 返回一個 item
  if item != nil {
   table.Add(key, item.lifeSpan, item.data)  // 執行添加操作,將獲取到的數據添加到緩存中
   return item, nil
  }

  return nil, ErrKeyNotFoundOrLoadable
 }

 return nil, ErrKeyNotFound
}

可以看到當 item 存在時,會調用 KeepAlive() 方法更新 item 的訪問時間 (accessedOn),表示該 item 還可以存活 lifeSpan 時長,延長 item 的 “壽命”。

另外,Value() 函數里有個比較有意思的地方,如果設置了 data-loader 回調函數,並且 item 不存在時,會調用回調函數, 在回調函數里面可以寫自己想要實現的邏輯代碼,比如從數據庫、文件等獲取數據並添加到緩存裏。

data-loader 回調函數通過 SetDataLoader() 方法設置。

與 data-loader 有關的完整示例請關注公衆號【Golang 來啦】,後臺發送關鍵字 cache2go 獲取。

cache2go 的數據獲取邏輯與 go-cache 不同之處在於,go-cache 獲取數據時會判斷 item 的過期時間,如果已過期則認爲數據已經不存在。go-cache 爲什麼會判斷過期時間呢?因爲它的過期清理週期是固定的,有可能還沒到清理時間 item 已經過期了。而 cache2go 的清理週期是自動動態調整,能保證在過期時間點將已過期的 item 及時清理掉。

關於 go-cache 的源碼解讀可以看這篇文章

其他

上面提到的幾個功能是我們學習的重點,其他功能,比如對 table 設置添加回調函數、刪除回調函數等,比較簡單,大家可以拉源碼來看下,就不在這分析。

另外 table 還提供了一個方法 MostAccessed(),功能:將 item 按訪問次數倒序並返回前 count 個 item,裏面用到了 sort.Sort() 排序,感興趣的同學可以看下,有不清楚的歡迎私我交流。

總結

cache2go 的代碼裏不算多,結構分層清晰,函數功能職責明確。通過閱讀源碼,可以學習:

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Q7YnXppGw-CLo6wA2oonyg