基於 golang 從零到一實現時間輪算法
0 前言
近期計劃攻堅一個新的專題系列——如何基於 golang 從零到一實現 redis.
這裏選擇的學習素材是 hdt3213 大佬於 github 上開源的 godis 項目. 在大佬的實現中,充分利用了 golang 的特性,將 redis 存儲層由單線程模型轉爲併發模型,其中在實現數據的 expire 機制時,採用的是單機時間輪模型進行過期數據的刪除操作.
godis 項目開源地址:http://github.com/HDT3213/godis
godis 時間輪代碼:http://github.com/HDT3213/godis/blob/master/lib/timewheel/timewheel.go
實際上,在我之前分享個人項目——分佈式定時器 xtimer 中也有到了時間輪算法,這部分內容具有一定共性. 藉此機會,本期單獨對時間輪算法的原理和實踐內容進行梳理,並基於 golang 從零到一開源實現出一個單機版和 redis 分佈式版的時間輪,供大家一起交流探討.
個人開源的時間輪項目地址爲:http://github.com/xiaoxuxiansheng/timewheel
本期內容的目錄大綱如下:
1 時間輪原理
時間輪是定時任務調度系統中常用到的一種經典的算法模型. 有關時間輪算法的詳細概念介紹,可以參見論文:《Hashed and Hierarchical Time Wheels: Data Structures for the Efficient Implementation of a Time Facility》http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf
接下來我也會從個人角度出發,談談我對於時間輪算法的一些淺顯理解.
1.1 時間輪概念
聊時間輪之前,先聊聊時間的概念.
首先時間是一維、單向的,我們可以用一條一維的時間軸將其具象化. 我們對時間軸進行刻度拆分,每個刻度對應一個時間範圍,那麼刻度拆分得越細,則表示的時間範圍越精確.
然而,我們知道時間是沒有盡頭的,因此這條一維時間軸的長度是無窮大的. 倘若我們想要建立一個數據結構去表達這條由一系列刻度聚合形成的時間軸,這個數據結構所佔用的空間也是無窮無盡的.
那麼,我們應該如何優化這個問題呢?此時,大家不妨低頭看一眼自己的手錶. 手錶或時鐘這類日常生活中用來關聯表達時間的工具,採用的是首尾銜接的環狀結構來替代無窮長度的一維時間軸,每當鐘錶劃過一圈,刻度從新回到零值,但是已有的時間進度會傳承往下.
本期所聊的時間輪算法採用的思路正是與之類似,下面我們梳理一下核心流程:
-
• 建立一個環狀數據結構
-
• 每個刻度對應一個時間範圍
-
• 創建定時任務時,根據距今的相對時長,推算出需要向後推移的刻度值
-
• 倘若來到環形數組的結尾,則重新從起點開始計算,但是記錄時把執行輪次數加 1
-
• 一個刻度可能存在多筆定時任務,所以每個刻度需要掛載一個定時任務鏈表
接下來,我們建立時間輪的掃描機制,就如同鐘錶中的指針一般,按照固定的時間節奏,沿着環形數組週而復始地持續向下掃描. 每當來到一個刻度時,則取出鏈表中輪次爲 0 的定時任務進行執行. 這就是時間輪算法的核心思路.
1.2 多級時間輪
接下來聊一聊時間輪中的等級制度與多級時間輪的概念.
首先捋一捋,時間輪每個週期輪次中,使用的數據結構容量與所表達的時間範圍之間的關係.
我們把時間輪中的每個刻度記爲一個 slot,每個 slot 表示的時間範圍記爲 t.
假設時間輪中總共包含 2m 個 slot,求問如何組織我們的時間輪數據結構,能夠使得時間輪每個輪次對應表達的時間範圍儘可能的長. (一個輪次對應的時間範圍越長,在時間流逝過程中輪次的迭代速度就越慢,於是每個 slot 對應的定時任務鏈表長度就越短,執行定時任務時的檢索效率就越高.)
這裏最簡單的方式就是進行採用一維縱向排列的方式,那麼能夠表達的時間範圍就是 2m * t,某個刻度對應的時間值就記爲 {slot_i}.
另一種思路是,我們在時間輪中建立一種等級秩序.
比如我們將 2m 個 slot 拆成兩個等級——level1 和 level2. 最終我們通過 {level1_slot}_{level2_slot} 的方式進行時間的表達.
我們給 level2 分配 m 個 slot,其中每個 slot 對應的時間範圍同樣爲 t. 而 level1 同樣也分配 m 個 slot,但是此時其中每個 slot 對應的時間範圍應該爲 m * t,因爲在 level1 中的 slot 確定時,level2 中還有 m 種 slot 的組合方式.
如此一來,這種組織方式下,時間輪單個輪次所能表達的時間範圍就是 m * m * t.
這裏探討的核心不是具體某級時間輪的時間範圍結果,而是拋出了一種多級時間輪的思路,從一到二是質變,從二到三、從三到四就僅僅是量變的問題,可以繼續復刻相同的思路.
回過頭來看,我們會發現日常使用的時間表達式正是採用了這樣一種多級時間輪的等級制度,比如當前的時刻爲:2023-09-23 15:50:00. 這本質上是一種通過 {year}-{month}-{date}-{hour}-{minute}-{second} 組成的 6 級時間輪等級結構.
後續在本文第 3 章探討如何基於 redis zset 實現時間輪的話題中,我們會進一步利用這種多級時間輪的思路,將每個任務首先基於前 5 級 {year}-{month}-{date}-{hour}-{minute} 的等級表達式進行分鐘級時間片的縱向拆分,最終在 1 分鐘範圍內進行定時任務的有序組織,保證在每次插入、刪除和檢索任務時,處理的數據量級能夠維持在分鐘級的數量,最大化地提高時間輪結果的處理性能.
2 單機版實現
聊完原理部分,下面我們一起進入實戰環節.
在本章中,我們會使用 golang 標準庫的定時器工具 time ticker 結合環狀數組的設計思路,實現一個單機版的單級時間輪.
2.1 核心類
2.1.1 時間輪
在對時間輪的類定義中,核心字段如下圖所示:
在幾個核心字段中:
-
• slots——類似於時鐘的錶盤
-
• curSlot——類似於時鐘的指針
-
• ticker 是使用 golang 標準庫的定時器工具,類似於驅動指針運轉的齒輪
在創建時間輪實例時,會通過一個異步的常駐 goroutine 執行定時任務的檢索、添加、刪除等操作,並通過幾個 channel 進行 goroutine 的執行邏輯和生命週期的控制:
-
• stopc:用於停止 goroutine
-
• addTaskCh:用於接收創建定時器指令
-
• removeTaskCh:用於接收刪除定時任務的指令
// 單機版時間輪
type TimeWheel struct {
// 單例工具,保證時間輪停止操作只能執行一次
sync.Once
// 時間輪運行時間間隔
interval time.Duration
// 時間輪定時器
ticker *time.Ticker
// 停止時間輪的 channel
stopc chan struct{}
// 新增定時任務的入口 channel
addTaskCh chan *taskElement
// 刪除定時任務的入口 channel
removeTaskCh chan string
// 通過 list 組成的環狀數組. 通過遍歷環狀數組的方式實現時間輪
// 定時任務數量較大,每個 slot 槽內可能存在多個定時任務,因此通過 list 進行組裝
slots []*list.List
// 當前遍歷到的環狀數組的索引
curSlot int
// 定時任務 key 到任務節點的映射,便於在 list 中刪除任務節點
keyToETask map[string]*list.Element
}
此處有幾個技術細節需要提及:
首先: 所謂環狀數組指的是邏輯意義上的. 在實際的實現過程中,會通過一個定長數組結合循環遍歷的方式,來實現這個邏輯意義上的 “環狀” 性質.
其次: 數組每一輪能表達的時間範圍是固定的. 每當在添加添加一個定時任務時,需要根據其延遲的相對時長推算出其所處的 slot 位置,其中可能跨遍歷輪次的情況,這時候需要額外通過定時任務中的 cycle 字段來記錄這一信息,避免定時任務被提前執行.
最後: 時間輪中一個 slot 可能需要掛載多筆定時任務,因此針對每個 slot,需要採用 golang 標準庫 container/list 中實現的雙向鏈表進行定時任務數據的存儲.
2.1.2 定時任務
下面是對一筆定時任務的類定義:
-
• key:每個定時任務的全局唯一標識鍵
-
• task:包含了定時任務執行邏輯的閉包函數
-
• pos:定時任務在環形數組所處的位置,即數組的索引 index
-
• cycle:定時任務的延遲輪次. 時間輪的 curSlot 指針每完成一整輪的數組遍歷,所有定時任務的 cycle 指數都需要減 1. 當定時任務 cycle 指數爲 0 時,代表該任務在當前遍歷輪次執行.
// 封裝了一筆定時任務的明細信息
type taskElement struct {
// 內聚了定時任務執行邏輯的閉包函數
task func()
// 定時任務掛載在環狀數組中的索引位置
pos int
// 定時任務的延遲輪次. 指的是 curSlot 指針還要掃描過環狀數組多少輪,才滿足執行該任務的條件
cycle int
// 定時任務的唯一標識鍵
key string
}
2.2 構造器
在創建時間輪的構造器函數中,需要傳入兩個入參:
-
• slotNum:由使用方指定 slot 的個數,默認爲 10
-
• interval:由使用方指定每個 slot 對應的時間範圍,默認爲 1 秒
初始化時間輪實例的過程中,會完成定時器 ticker 以及各個 channel 的初始化,並針對數組 中的各個 slot 進行初始化,每個 slot 位置都需要填充一個 list.
每個時間輪實例都會異步調用 run 方法,啓動一個常駐 goroutine 用於接收和處理定時任務.
// 創建單機版時間輪 slotNum——時間輪環狀數組長度 interval——掃描時間間隔
func NewTimeWheel(slotNum int, interval time.Duration) *TimeWheel {
// 環狀數組長度默認爲 10
if slotNum <= 0 {
slotNum = 10
}
// 掃描時間間隔默認爲 1 秒
if interval <= 0 {
interval = time.Second
}
// 初始化時間輪實例
t := TimeWheel{
interval: interval,
ticker: time.NewTicker(interval),
stopc: make(chan struct{}),
keyToETask: make(map[string]*list.Element),
slots: make([]*list.List, 0, slotNum),
addTaskCh: make(chan *taskElement),
removeTaskCh: make(chan string),
}
for i := 0; i < slotNum; i++ {
t.slots = append(t.slots, list.New())
}
// 異步啓動時間輪常駐 goroutine
go t.run()
return &t
}
2.3 啓動與停止
時間輪運行的核心邏輯位於 timeWheel.run 方法中,該方法會通過 for 循環結合 select 多路複用的方式運行,屬於 golang 中非常常見的異步編程風格.
goroutine 運行過程中需要從以下四類 channel 中接收不同的信號,並進行邏輯的分發處理:
-
• stopc:停止時間輪,使得當前 goroutine 退出
-
• ticker:接收到 ticker 的信號說明時間由往前推進了一個 interval,則需要批量檢索並執行當前 slot 中的定時任務. 並推進指針 curSlot 往前偏移
-
• addTaskCh:接收創建定時任務的指令
-
• removeTaskCh:接收刪除定時任務的指令
此處值得一提的是,後續不論是創建、刪除還是檢索定時任務,都是通過這個常駐 goroutine 完成的,因此在訪問一些臨界資源的時候,不需要加鎖,因爲不存在併發訪問的情況
// 運行時間輪
func (t *TimeWheel) run() {
defer func() {
if err := recover(); err != nil {
// ...
}
}()
// 通過 for + select 的代碼結構運行一個常駐 goroutine 是常規操作
for {
select {
// 停止時間輪
case <-t.stopc:
return
// 接收到定時信號
case <-t.ticker.C:
// 批量執行定時任務
t.tick()
// 接收創建定時任務的信號
case task := <-t.addTaskCh:
t.addTask(task)
// 接收到刪除定時任務的信號
case removeKey := <-t.removeTaskCh:
t.removeTask(removeKey)
}
}
}
時間輪提供了一個 Stop 方法,用於手動停止時間輪,回收對應的 goroutine 和 ticker 資源.
停止時間輪的操作是通過關閉 stopc channel 完成的,由於 channel 不允許被反覆關閉,因此這裏通過 sync.Once 保證該邏輯只被調用一次.
// 停止時間輪
func (t *TimeWheel) Stop() {
// 通過單例工具,保證 channel 只能被關閉一次,避免 panic
t.Do(func() {
// 定製定時器 ticker
t.ticker.Stop()
// 關閉定時器運行的 stopc
close(t.stopc)
})
}
2.4 創建任務
創建一筆定時任務的核心步驟如下:
-
• 使用方往 addTaskCh 中投遞定時任務,由常駐 goroutine 接收定時任務
-
• 根據執行時間,推算出定時任務所處的 slot 位置以及需要延遲的輪次 cycle
-
• 將定時任務包裝成一個 list node,追加到對應 slot 位置的 list 尾部
-
• 以定時任務唯一鍵爲 key,list node 爲 value,在 keyToETask map 中建立映射關係,方便後續刪除任務時使用
// 添加定時任務到時間輪中
func (t *TimeWheel) AddTask(key string, task func(), executeAt time.Time) {
// 根據執行時間推算得到定時任務從屬的 slot 位置,以及需要延遲的輪次
pos, cycle := t.getPosAndCircle(executeAt)
// 將定時任務通過 channel 進行投遞
t.addTaskCh <- &taskElement{
pos: pos,
cycle: cycle,
task: task,
key: key,
}
}
// 根據執行時間推算得到定時任務從屬的 slot 位置,以及需要延遲的輪次
func (t *TimeWheel) getPosAndCircle(executeAt time.Time) (int, int) {
delay := int(time.Until(executeAt))
// 定時任務的延遲輪次
cycle := delay / (len(t.slots) * int(t.interval))
// 定時任務從屬的環狀數組 index
pos := (t.curSlot + delay/int(t.interval)) % len(t.slots)
return pos, cycle
}
// 常駐 goroutine 接收到創建定時任務後的處理邏輯
func (t *TimeWheel) addTask(task *taskElement) {
// 獲取到定時任務從屬的環狀數組 index 以及對應的 list
list := t.slots[task.pos]
// 倘若定時任務 key 之前已存在,則需要先刪除定時任務
if _, ok := t.keyToETask[task.key]; ok {
t.removeTask(task.key)
}
// 將定時任務追加到 list 尾部
eTask := list.PushBack(task)
// 建立定時任務 key 到將定時任務所處的節點
t.keyToETask[task.key] = eTask
}
2.5 刪除任務
刪除一筆定時任務的核心步驟如下:
-
• 使用方往 removeTaskCh 中投遞刪除任務的 key,由常駐 goroutine 接收處理
-
• 從 keyToETask map 中,找到該任務對應的 list node
-
• 從 keyToETask map 中移除該組 kv 對
-
• 從對應 slot 的 list 中移除該 list node
// 刪除定時任務,投遞信號
func (t *TimeWheel) RemoveTask(key string) {
t.removeTaskCh <- key
}
// 時間輪常駐 goroutine 接收到刪除任務信號後,執行的刪除任務邏輯
func (t *TimeWheel) removeTask(key string) {
eTask, ok := t.keyToETask[key]
if !ok {
return
}
// 將定時任務節點從映射 map 中移除
delete(t.keyToETask, key)
// 獲取到定時任務節點後,將其從 list 中移除
task, _ := eTask.Value.(*taskElement)
_ = t.slots[task.pos].Remove(eTask)
}
2.6 執行定時任務
最後來捋一下最核心的鏈路——檢索並批量執行定時任務的流程.
首先,每當接收到 ticker 信號時,會根據當前的 curSlot 指針,獲取到對應 slot 位置掛載的定時任務 list,調用 execute 方法執行其中的定時任務. 最後通過 circularIncr 方法推進 curSlot 指針向前移動.
// 常駐 goroutine 每次接收到定時信號後用於執行定時任務的邏輯
func (t *TimeWheel) tick() {
// 根據 curSlot 獲取到當前所處的環狀數組索引位置,取出對應的 list
list := t.slots[t.curSlot]
// 在方法返回前,推進 curSlot 指針的位置,進行環狀遍歷
defer t.circularIncr()
// 批量處理滿足執行條件的定時任務
t.execute(list)
}
在 execute 方法中,會對 list 中的定時任務進行遍歷:
-
• 對於 cycle > 0 的定時任務,說明當前還未達到執行條件,需要將其 cycle 值減 1,留待後續輪次再處理
-
• 對於 cycle = 0 的定時任務,開啓一個 goroutine ,執行其中的閉包函數 task,並將其從 list 和 map 中移除
// 執行定時任務,每次處理一個 list
func (t *TimeWheel) execute(l *list.List) {
// 遍歷 list
for e := l.Front(); e != nil; {
// 獲取到每個節點對應的定時任務信息
taskElement, _ := e.Value.(*taskElement)
// 倘若任務還存在延遲輪次,則只對 cycle 計數器進行扣減,本輪不作任務的執行
if taskElement.cycle > 0 {
taskElement.cycle--
e = e.Next()
continue
}
// 當前節點對應定時任務已達成執行條件,開啓一個 goroutine 負責執行任務
go func() {
defer func() {
if err := recover(); err != nil {
// ...
}
}()
taskElement.task()
}()
// 任務已執行,需要把對應的任務節點從 list 中刪除
next := e.Next()
l.Remove(e)
// 把任務 key 從映射 map 中刪除
delete(t.keyToETask, taskElement.key)
e = next
}
}
在 circularIncr 方法中,呼應了環狀數組的邏輯處理方式:
// 每次 tick 後需要推進 curSlot 指針的位置,slots 在邏輯意義上是環狀數組,所以在到達尾部時需要從新回到頭部
func (t *TimeWheel) circularIncr() {
t.curSlot = (t.curSlot + 1) % len(t.slots)
}
3 分佈式版實現
本章我們討論一下,如何基於 redis 實現分佈式版本的時間輪,以貼合實際生產環境對分佈式定時任務調度系統的訴求.
redis 版時間輪的實現思路是使用 redis 中的有序集合 sorted set(簡稱 zset) 進行定時任務的存儲管理,其中以每個定時任務執行時間對應的時間戳作爲 zset 中的 score,完成定時任務的有序排列組合.
zset 數據結構的 redis 官方文檔鏈接:https://redis.io/docs/data-types/sorted-sets/
這裏有兩個的技術細節需要提前和大家同步:
-
• 分鐘級時間分片: 爲了避免產生 redis 大 key 問題,此處採用本文 1.2 小節中提到的多級時間輪等級制度,以分鐘的維度進行時間片的縱向劃分,每個分鐘級時間片對應一個獨立的 zset 有序表,保證每次執行任務時處理的數據規模僅爲分鐘的量級
-
• 惰性刪除機制: 爲了簡化刪除定時任務的流程. 在使用方指定刪除定時任務時,我們不直接從 zset 中刪除數據,而是額外記錄一個已刪除任務的 set. 後續在檢索定時任務時,通過使用 set 進行定時任務的過濾,實現定時任務的惰性刪除.
3.1 核心類
3.1.1 redis 時間輪
在 redis 版時間輪中有兩個核心類,第一個是關於時間輪的類定義:
-
• redisClient:定時任務的存儲是基於 redis zset 實現的,因此需要內置一個 redis 客戶端,這部分在 3.2 小節展開;
-
• httpClient:定時任務執行時,是通過請求使用方預留回調地址的方式實現的,因此需要內置一個 http 客戶端
-
• channel × 2:ticker 和 stopc 對應爲 golang 標準庫定時器以及停止 goroutine 的控制器
// 基於 redis 實現的分佈式版時間輪
type RTimeWheel struct {
// 內置的單例工具,用於保證 stopc 只被關閉一次
sync.Once
// redis 客戶端
redisClient *redis.Client
// http 客戶端. 在執行定時任務時需要使用到.
httpClient *thttp.Client
// 用於停止時間輪的控制器 channel
stopc chan struct{
// 觸發定時掃描任務的定時器
ticker *time.Ticker
}
3.1.2 定時任務
定時任務的類型定義如下,其中包括定時任務的唯一鍵 key,以及執行定時任務回調時需要使用到的 http 協議參數.
// 使用方提交的每一筆定時任務
type RTaskElement struct {
// 定時任務全局唯一 key
Key string `json:"key"`
// 定時任務執行時,回調的 http url
CallbackURL string `json:"callback_url"`
// 回調時使用的 http 方法
Method string `json:"method"`
// 回調時傳遞的請求參數
Req interface{} `json:"req"`
// 回調時使用的 http 請求頭
Header map[string]string `json:"header"`
}
3.2 redis lua 使用事項
本項目使用的 redis 客戶端是我個人基於 golang-redis 客戶端 sdk——redigo 進一步封裝實現的,redigo 的開源地址爲: https://github.com/gomodule/redigo
此處主要在 redisClient 中封裝了一個 Eval 方法,便於使用 redis lua 腳本執行復合指令.
// Eval:執行 lua 腳本.
// src——lua 腳本內容 keyCount——key 的數量 keysAndArgs——由 key 和 args 組成的列表
func (c *Client) Eval(ctx context.Context, src string, keyCount int, keysAndArgs []interface{}) (interface{}, error) {
args := make([]interface{}, 2+len(keysAndArgs))
args[0] = src
args[1] = keyCount
copy(args[2:], keysAndArgs)
// 從 redis 鏈接池中獲取一個連接
conn, err := c.pool.GetContext(ctx)
if err != nil {
return -1, err
}
// 使用完成後將連接放回池子
defer conn.Close()
// 執行 lua 腳本
return conn.Do("EVAL", args...)
}
lua 腳本是 redis 的高級功能,能夠保證針在單個 redis 節點內執行的一系列指令具備原子性,中途不會被其他操作者打斷.
redis lua 功能介紹:https://redis.io/docs/interact/programmability/eval-intro/
lua 語法教程:https://www.runoob.com/lua/lua-tutorial.html
此處之所以需要使用 lua 腳本,是因爲在實現時間輪的過程中,存在一系列本身不具備原子性但在業務流程中不可拆解的複合操作,需要由 lua 腳本賦予其原子性質.
在使用 lua 時,尤其需要注意的點是,只有操作的數據屬於單個 redis 節點時,才能保證其原子性. 然而在生產環境中,redis 通常採用縱向分治的集羣模式,這使得 key 不同的數據可能被分發在不同的 redis 節點上,此時 lua 腳本的性質就無法保證.
在使用 lua 腳本時,倘若一系列複合操作都是針對於同一個 key,那麼數據必然位於同一個節點,沒有任何疑議. 倘若我們在 lua 中涉及到對多個 key 的操作,那麼這些 key 對應的數據就可能從屬於不同的 redis 節點,此時 lua 腳本存在失效的風險.
針對這個問題,本項目採取的是定製的分區策略,來保證指定的 key 一定被分發到相同的 redis 節點上. 此處使用的方式是通過 "{}" 進行 hash_tag 的標識,所有擁有相同 hash_tag 的 key 都一定會被分發到相同的節點上.
該分區策略可參見 redis 官方文檔:https://redis.io/commands/cluster-keyslot/
指令示例如下:
> CLUSTER KEYSLOT somekey
(integer) 11058
> CLUSTER KEYSLOT foo{hash_tag}
(integer) 2515
> CLUSTER KEYSLOT bar{hash_tag}
(integer) 2515
3.3 構造器
在構造時間輪實例時,使用方需要注入 redis 客戶端以及 http 客戶端.
在初始化流程中,ticker 爲 golang 標準庫實現的定時器,定時器的執行時間間隔固定爲 1 s. 此外會異步運行 run 方法,啓動一個常駐 goroutine,生命週期會通過 stopc channel 進行控制.
// 構造 redis 實現的分佈式時間輪
func NewRTimeWheel(redisClient *redis.Client, httpClient *thttp.Client) *RTimeWheel {
// 創建時間輪實例
r := RTimeWheel{
// 創建定時器,每隔 1 s 執行一次
ticker: time.NewTicker(time.Second),
redisClient: redisClient,
httpClient: httpClient,
stopc: make(chan struct{}),
}
// 異步啓動時間輪
go r.run()
return &r
}
3.4 啓動與停止
時間輪常駐 goroutine 運行流程同樣通過 for + select 的形式運行:
-
• 接收到 stopc 信號時,goroutine 退出,時間輪停止運行
-
• 接收到 ticker 信號時,開啓一個異步 goroutine 用於執行當前批次的定時任務
// 運行時間輪
func (r *RTimeWheel) run() {
// 通過 for + select 的代碼結構運行一個常駐 goroutine 是常規操作
for {
select {
// 接收到終止信號,則退出 goroutine
case <-r.stopc:
return
// 每次接收到來自定時器的信號,則批量掃描並執行定時任務
case <-r.ticker.C:
// 每次 tick 獲取任務
go r.executeTasks()
}
}
}
停止時間輪的 Stop 方法通過關閉 stopc 保證常駐 goroutine 能夠及時退出.
// 停止時間輪
func (r *RTimeWheel) Stop() {
// 基於單例工具,保證 stopc 只能被關閉一次
r.Do(func() {
// 關閉 stopc,使得常駐 goroutine 停止運行
close(r.stopc)
// 終止定時器 ticker
r.ticker.Stop()
})
}
3.5 創建任務
在創建定時任務時,每筆定時任務需要根據其執行的時間找到從屬的分鐘時間片.
定時任務真正的存儲邏輯定義在一段 lua 腳本中,通過 redis 客戶端的 Eval 方法執行.
// 添加定時任務
func (r *RTimeWheel) AddTask(ctx context.Context, key string, task *RTaskElement, executeAt time.Time) error {
// 前置對定時任務的參數進行校驗
if err := r.addTaskPrecheck(task); err != nil {
return err
}
task.Key = key
// 將定時任務序列化成字節數組
taskBody, _ := json.Marshal(task)
// 通過執行 lua 腳本,實現將定時任務添加 redis zset 中. 本質上底層使用的是 zadd 指令.
_, err := r.redisClient.Eval(ctx, LuaAddTasks, 2, []interface{}{
// 分鐘級 zset 時間片
r.getMinuteSlice(executeAt),
// 標識任務刪除的集合
r.getDeleteSetKey(executeAt),
// 以執行時刻的秒級時間戳作爲 zset 中的 score
executeAt.Unix(),
// 任務明細
string(taskBody),
// 任務 key,用於存放在刪除集合中
key,
})
return err
}
下面展示的是獲取分鐘級定時任務有序表 minuteSlice 以及已刪除任務集合 deleteSet 的細節.
此處呼應了 3.3 小節,通過以分鐘級表達式作爲 {hash_tag} 的方式,確保 minuteSlice 和 deleteSet 一定會分發到相同的 redis 節點之上,進一步保證 lua 腳本的原子性能夠生效.
- • 獲取定時任務有序表 key 的方法:
func (r *RTimeWheel) getMinuteSlice(executeAt time.Time) string {
return fmt.Sprintf("xiaoxu_timewheel_task_{%s}", util.GetTimeMinuteStr(executeAt))
}
- • 獲取刪除任務集合 key 的方法:
func (r *RTimeWheel) getDeleteSetKey(executeAt time.Time) string {
return fmt.Sprintf("xiaoxu_timewheel_delset_{%s}", util.GetTimeMinuteStr(executeAt))
}
下面展示一下創建定時任務流程中 lua 腳本的執行邏輯:
const (
// 添加任務時,如果存在刪除 key 的標識,則將其刪除
// 添加任務時,根據時間(所屬的 min)決定數據從屬於哪個分片{}
LuaAddTasks = `
-- 獲取的首個 key 爲 zset 的 key
local zsetKey = KEYS[1]
-- 獲取的第二個 key 爲標識已刪除任務 set 的 key
local deleteSetKey = KEYS[2]
-- 獲取的第一個 arg 爲定時任務在 zset 中的 score
local score = ARGV[1]
-- 獲取的第二個 arg 爲定時任務明細數據
local task = ARGV[2]
-- 獲取的第三個 arg 爲定時任務唯一鍵,用於將其從已刪除任務 set 中移除
local taskKey = ARGV[3]
-- 每次添加定時任務時,都直接將其從已刪除任務 set 中移除,不管之前是否在 set 中
redis.call('srem',deleteSetKey,taskKey)
-- 調用 zadd 指令,將定時任務添加到 zset 中
return redis.call('zadd',zsetKey,score,task)
`
)
3.6 刪除任務
刪除定時任務的方式是將定時任務追加到分鐘級的已刪除任務 set 中. 之後在檢索定時任務時,會根據這個 set 對定時任務進行過濾,實現惰性刪除機制.
// 從 redis 時間輪中刪除一個定時任務
func (r *RTimeWheel) RemoveTask(ctx context.Context, key string, executeAt time.Time) error {
// 執行 lua 腳本,將被刪除的任務追加到 set 中.
_, err := r.redisClient.Eval(ctx, LuaDeleteTask, 1, []interface{}{
r.getDeleteSetKey(executeAt),
key,
})
return err
}
lua 執行邏輯如下:
const(
// 刪除定時任務 lua 腳本
LuaDeleteTask = `
-- 獲取標識刪除任務的 set 集合的 key
local deleteSetKey = KEYS[1]
-- 獲取定時任務的唯一鍵
local taskKey = ARGV[1]
-- 將定時任務唯一鍵添加到 set 中
redis.call('sadd',deleteSetKey,taskKey)
-- 倘若是 set 中的首個元素,則對 set 設置 120 s 的過期時間
local scnt = redis.call('scard',deleteSetKey)
if (tonumber(scnt) == 1)
then
redis.call('expire',deleteSetKey,120)
end
return scnt
) `
3.7 執行定時任務
在執行定時任務時,會通過 getExecutableTasks 方法批量獲取到滿足執行條件的定時任務 list,然後併發調用 execute 方法完成定時任務的回調執行.
// 批量執行定時任務
func (r *RTimeWheel) executeTasks() {
defer func() {
if err := recover(); err != nil {
// log
}
}()
// 併發控制,保證 30 s 之內完成該批次全量任務的執行,及時回收 goroutine,避免發生 goroutine 泄漏
tctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
// 根據當前時間條件掃描 redis zset,獲取所有滿足執行條件的定時任務
tasks, err := r.getExecutableTasks(tctx)
if err != nil {
// log
return
}
// 併發執行任務,通過 waitGroup 進行聚合收口
var wg sync.WaitGroup
for _, task := range tasks {
wg.Add(1)
// shadow
task := task
go func() {
defer func() {
if err := recover(); err != nil {
}
wg.Done()
}()
// 執行定時任務
if err := r.executeTask(tctx, task); err != nil {
// log
}
}()
}
wg.Wait()
}
3.8 檢索定時任務
最後介紹一下,如何根據當前時間獲取到滿足執行條件的定時任務列表:
-
• 每次檢索時,首先根據當前時刻,推算出所從屬的分鐘級時間片
-
• 然後獲得當前的秒級時間戳,作爲 zrange 指令檢索的 score 範圍
-
• 調用 lua 腳本,同時獲取到已刪除任務 set 以及 score 範圍內的定時任務 list.
-
• 通過 set 過濾掉被刪除的任務,然後返回滿足執行條件的定時任務
func (r *RTimeWheel) getExecutableTasks(ctx context.Context) ([]*RTaskElement, error) {
now := time.Now()
// 根據當前時間,推算出其從屬的分鐘級時間片
minuteSlice := r.getMinuteSlice(now)
// 推算出其對應的分鐘級已刪除任務集合
deleteSetKey := r.getDeleteSetKey(now)
nowSecond := util.GetTimeSecond(now)
// 以秒級時間戳作爲 score 進行 zset 檢索
score1 := nowSecond.Unix()
score2 := nowSecond.Add(time.Second).Unix()
// 執行 lua 腳本,本質上是通過 zrange 指令結合秒級時間戳對應的 score 進行定時任務檢索
rawReply, err := r.redisClient.Eval(ctx, LuaZrangeTasks, 2, []interface{}{
minuteSlice, deleteSetKey, score1, score2,
})
if err != nil {
return nil, err
}
// 結果中,首個元素對應爲已刪除任務的 key 集合,後續元素對應爲各筆定時任務
replies := gocast.ToInterfaceSlice(rawReply)
if len(replies) == 0 {
return nil, fmt.Errorf("invalid replies: %v", replies)
}
deleteds := gocast.ToStringSlice(replies[0])
deletedSet := make(map[string]struct{}, len(deleteds))
for _, deleted := range deleteds {
deletedSet[deleted] = struct{}{}
}
// 遍歷各筆定時任務,倘若其存在於刪除集合中,則跳過,否則追加到 list 中返回,用於後續執行
tasks := make([]*RTaskElement, 0, len(replies)-1)
for i := 1; i < len(replies); i++ {
var task RTaskElement
if err := json.Unmarshal([]byte(gocast.ToString(replies[i])), &task); err != nil {
// log
continue
}
if _, ok := deletedSet[task.Key]; ok {
continue
}
tasks = append(tasks, &task)
}
return tasks, nil
}
lua 腳本的執行邏輯如下:
(
// 掃描 redis 時間輪. 獲取分鐘範圍內,已刪除任務集合 以及在時間上達到執行條件的定時任務進行返回
LuaZrangeTasks = `
-- 第一個 key 爲存儲定時任務的 zset key
local zsetKey = KEYS[1]
-- 第二個 key 爲已刪除任務 set 的 key
local deleteSetKey = KEYS[2]
-- 第一個 arg 爲 zrange 檢索的 score 左邊界
local score1 = ARGV[1]
-- 第二個 arg 爲 zrange 檢索的 score 右邊界
local score2 = ARGV[2]
-- 獲取到已刪除任務的集合
local deleteSet = redis.call('smembers',deleteSetKey)
-- 根據秒級時間戳對 zset 進行 zrange 檢索,獲取到滿足時間條件的定時任務
local targets = redis.call('zrange',zsetKey,score1,score2,'byscore')
-- 檢索到的定時任務直接從時間輪中移除,保證分佈式場景下定時任務不被重複獲取
redis.call('zremrangebyscore',zsetKey,score1,score2)
-- 返回的結果是一個 table
local reply = {}
-- table 的首個元素爲已刪除任務集合
reply[1] = deleteSet
-- 依次將檢索到的定時任務追加到 table 中
for i, v in ipairs(targets) do
reply[#reply+1]=v
end
return reply
`
)
4 總結
本期和大家探討了如何基於 golang 從零到一實現時間輪算法,通過原理結合源碼,詳細展示了單機版和 redis 分佈式版時間輪的實現方式.
最後我們展望一下,時間輪算法在工程實踐中具體能得到怎樣的應用呢?
此前我基於 redis zset 時間輪的模型實現了一個分佈式定時任務調度系統 xtimer,大家感興趣的話可以閱讀一下我之前分享的這篇文章:基於協程池架構實現的分佈式定時器 XTimer
該項目源碼已於 github 開源:http://github.com/xiaoxuxiansheng/xtimer
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/0HwuYTTe9cdT46advEZe0w