基於 golang 從零到一實現時間輪算法

0 前言

近期計劃攻堅一個新的專題系列——如何基於 golang 從零到一實現 redis.

這裏選擇的學習素材是 hdt3213 大佬於 github 上開源的 godis 項目. 在大佬的實現中,充分利用了 golang 的特性,將 redis 存儲層由單線程模型轉爲併發模型,其中在實現數據的 expire 機制時,採用的是單機時間輪模型進行過期數據的刪除操作.

godis 項目開源地址:http://github.com/HDT3213/godis

godis 時間輪代碼:http://github.com/HDT3213/godis/blob/master/lib/timewheel/timewheel.go

實際上,在我之前分享個人項目——分佈式定時器 xtimer 中也有到了時間輪算法,這部分內容具有一定共性. 藉此機會,本期單獨對時間輪算法的原理和實踐內容進行梳理,並基於 golang 從零到一開源實現出一個單機版和 redis 分佈式版的時間輪,供大家一起交流探討.

個人開源的時間輪項目地址爲:http://github.com/xiaoxuxiansheng/timewheel

本期內容的目錄大綱如下:

1 時間輪原理

時間輪是定時任務調度系統中常用到的一種經典的算法模型. 有關時間輪算法的詳細概念介紹,可以參見論文:《Hashed and Hierarchical Time Wheels: Data Structures for the Efficient Implementation of a Time Facility》http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf 

接下來我也會從個人角度出發,談談我對於時間輪算法的一些淺顯理解.

1.1 時間輪概念

聊時間輪之前,先聊聊時間的概念.

首先時間是一維、單向的,我們可以用一條一維的時間軸將其具象化. 我們對時間軸進行刻度拆分,每個刻度對應一個時間範圍,那麼刻度拆分得越細,則表示的時間範圍越精確.

然而,我們知道時間是沒有盡頭的,因此這條一維時間軸的長度是無窮大的. 倘若我們想要建立一個數據結構去表達這條由一系列刻度聚合形成的時間軸,這個數據結構所佔用的空間也是無窮無盡的.

那麼,我們應該如何優化這個問題呢?此時,大家不妨低頭看一眼自己的手錶. 手錶或時鐘這類日常生活中用來關聯表達時間的工具,採用的是首尾銜接的環狀結構來替代無窮長度的一維時間軸,每當鐘錶劃過一圈,刻度從新回到零值,但是已有的時間進度會傳承往下.

本期所聊的時間輪算法採用的思路正是與之類似,下面我們梳理一下核心流程:

接下來,我們建立時間輪的掃描機制,就如同鐘錶中的指針一般,按照固定的時間節奏,沿着環形數組週而復始地持續向下掃描. 每當來到一個刻度時,則取出鏈表中輪次爲 0 的定時任務進行執行. 這就是時間輪算法的核心思路.

1.2 多級時間輪

接下來聊一聊時間輪中的等級制度與多級時間輪的概念.

首先捋一捋,時間輪每個週期輪次中,使用的數據結構容量與所表達的時間範圍之間的關係.

我們把時間輪中的每個刻度記爲一個 slot,每個 slot 表示的時間範圍記爲 t.

假設時間輪中總共包含 2m 個 slot,求問如何組織我們的時間輪數據結構,能夠使得時間輪每個輪次對應表達的時間範圍儘可能的長. (一個輪次對應的時間範圍越長,在時間流逝過程中輪次的迭代速度就越慢,於是每個 slot 對應的定時任務鏈表長度就越短,執行定時任務時的檢索效率就越高.)

這裏最簡單的方式就是進行採用一維縱向排列的方式,那麼能夠表達的時間範圍就是 2m * t,某個刻度對應的時間值就記爲 {slot_i}.

另一種思路是,我們在時間輪中建立一種等級秩序.

比如我們將 2m 個 slot 拆成兩個等級——level1 和 level2. 最終我們通過 {level1_slot}_{level2_slot} 的方式進行時間的表達.

我們給 level2 分配 m 個 slot,其中每個 slot 對應的時間範圍同樣爲 t. 而 level1 同樣也分配 m 個 slot,但是此時其中每個 slot 對應的時間範圍應該爲 m * t,因爲在 level1 中的 slot 確定時,level2 中還有 m 種 slot 的組合方式.

如此一來,這種組織方式下,時間輪單個輪次所能表達的時間範圍就是 m * m * t.

這裏探討的核心不是具體某級時間輪的時間範圍結果,而是拋出了一種多級時間輪的思路,從一到二是質變,從二到三、從三到四就僅僅是量變的問題,可以繼續復刻相同的思路.

回過頭來看,我們會發現日常使用的時間表達式正是採用了這樣一種多級時間輪的等級制度,比如當前的時刻爲:2023-09-23 15:50:00. 這本質上是一種通過 {year}-{month}-{date}-{hour}-{minute}-{second} 組成的 6 級時間輪等級結構.

後續在本文第 3 章探討如何基於 redis zset 實現時間輪的話題中,我們會進一步利用這種多級時間輪的思路,將每個任務首先基於前 5 級 {year}-{month}-{date}-{hour}-{minute} 的等級表達式進行分鐘級時間片的縱向拆分,最終在 1 分鐘範圍內進行定時任務的有序組織,保證在每次插入、刪除和檢索任務時,處理的數據量級能夠維持在分鐘級的數量,最大化地提高時間輪結果的處理性能.

2 單機版實現

聊完原理部分,下面我們一起進入實戰環節.

在本章中,我們會使用 golang 標準庫的定時器工具 time ticker 結合環狀數組的設計思路,實現一個單機版的單級時間輪.

2.1 核心類

2.1.1 時間輪

在對時間輪的類定義中,核心字段如下圖所示:

在幾個核心字段中:

在創建時間輪實例時,會通過一個異步的常駐 goroutine 執行定時任務的檢索、添加、刪除等操作,並通過幾個 channel 進行 goroutine 的執行邏輯和生命週期的控制:

// 單機版時間輪
type TimeWheel struct {
    // 單例工具,保證時間輪停止操作只能執行一次
    sync.Once
    // 時間輪運行時間間隔
    interval     time.Duration
    // 時間輪定時器
    ticker       *time.Ticker
    // 停止時間輪的 channel
    stopc        chan struct{}
    // 新增定時任務的入口 channel  
    addTaskCh    chan *taskElement
    // 刪除定時任務的入口 channel
    removeTaskCh chan string
    // 通過 list 組成的環狀數組. 通過遍歷環狀數組的方式實現時間輪
    // 定時任務數量較大,每個 slot 槽內可能存在多個定時任務,因此通過 list 進行組裝
    slots        []*list.List
    // 當前遍歷到的環狀數組的索引
    curSlot      int
    // 定時任務 key 到任務節點的映射,便於在 list 中刪除任務節點
    keyToETask   map[string]*list.Element
}

此處有幾個技術細節需要提及:

首先: 所謂環狀數組指的是邏輯意義上的. 在實際的實現過程中,會通過一個定長數組結合循環遍歷的方式,來實現這個邏輯意義上的 “環狀” 性質.

其次: 數組每一輪能表達的時間範圍是固定的. 每當在添加添加一個定時任務時,需要根據其延遲的相對時長推算出其所處的 slot 位置,其中可能跨遍歷輪次的情況,這時候需要額外通過定時任務中的 cycle 字段來記錄這一信息,避免定時任務被提前執行.

最後: 時間輪中一個 slot 可能需要掛載多筆定時任務,因此針對每個 slot,需要採用 golang 標準庫 container/list 中實現的雙向鏈表進行定時任務數據的存儲.

2.1.2 定時任務

下面是對一筆定時任務的類定義:

// 封裝了一筆定時任務的明細信息
type taskElement struct {
    // 內聚了定時任務執行邏輯的閉包函數
    task  func()
    // 定時任務掛載在環狀數組中的索引位置
    pos   int
    // 定時任務的延遲輪次. 指的是 curSlot 指針還要掃描過環狀數組多少輪,才滿足執行該任務的條件
    cycle int
    // 定時任務的唯一標識鍵
    key   string
}

2.2 構造器

在創建時間輪的構造器函數中,需要傳入兩個入參:

初始化時間輪實例的過程中,會完成定時器 ticker 以及各個 channel 的初始化,並針對數組 中的各個 slot 進行初始化,每個 slot 位置都需要填充一個 list.

每個時間輪實例都會異步調用 run 方法,啓動一個常駐 goroutine 用於接收和處理定時任務.

// 創建單機版時間輪 slotNum——時間輪環狀數組長度  interval——掃描時間間隔
func NewTimeWheel(slotNum int, interval time.Duration) *TimeWheel {
    // 環狀數組長度默認爲 10
    if slotNum <= 0 {
        slotNum = 10
    }
    // 掃描時間間隔默認爲 1 秒
    if interval <= 0 {
        interval = time.Second
    }


    // 初始化時間輪實例
    t := TimeWheel{
        interval:     interval,
        ticker:       time.NewTicker(interval),
        stopc:        make(chan struct{}),
        keyToETask:   make(map[string]*list.Element),
        slots:        make([]*list.List, 0, slotNum),
        addTaskCh:    make(chan *taskElement),
        removeTaskCh: make(chan string),
    }
    for i := 0; i < slotNum; i++ {
        t.slots = append(t.slots, list.New())
    }
    
    // 異步啓動時間輪常駐 goroutine
    go t.run()
    return &t
}

2.3 啓動與停止

時間輪運行的核心邏輯位於 timeWheel.run 方法中,該方法會通過 for 循環結合 select 多路複用的方式運行,屬於 golang 中非常常見的異步編程風格.

goroutine 運行過程中需要從以下四類 channel 中接收不同的信號,並進行邏輯的分發處理:

此處值得一提的是,後續不論是創建、刪除還是檢索定時任務,都是通過這個常駐 goroutine 完成的,因此在訪問一些臨界資源的時候,不需要加鎖,因爲不存在併發訪問的情況

// 運行時間輪
func (t *TimeWheel) run() {
    defer func() {
        if err := recover(); err != nil {
            // ...
        }
    }()
 
    // 通過 for + select 的代碼結構運行一個常駐 goroutine 是常規操作
    for {
        select {
        // 停止時間輪
        case <-t.stopc:
            return
        // 接收到定時信號
        case <-t.ticker.C:
            // 批量執行定時任務
            t.tick()
        // 接收創建定時任務的信號
        case task := <-t.addTaskCh:
            t.addTask(task)
        // 接收到刪除定時任務的信號
        case removeKey := <-t.removeTaskCh:
            t.removeTask(removeKey)
        }
    }
}

時間輪提供了一個 Stop 方法,用於手動停止時間輪,回收對應的 goroutine 和 ticker 資源.

停止時間輪的操作是通過關閉 stopc channel 完成的,由於 channel 不允許被反覆關閉,因此這裏通過 sync.Once 保證該邏輯只被調用一次.

// 停止時間輪
func (t *TimeWheel) Stop() {
    // 通過單例工具,保證 channel 只能被關閉一次,避免 panic
    t.Do(func() {
        // 定製定時器 ticker
        t.ticker.Stop()
        // 關閉定時器運行的 stopc
        close(t.stopc)
    })
}

2.4 創建任務

創建一筆定時任務的核心步驟如下:

// 添加定時任務到時間輪中
func (t *TimeWheel) AddTask(key string, task func(), executeAt time.Time) {
    // 根據執行時間推算得到定時任務從屬的 slot 位置,以及需要延遲的輪次
    pos, cycle := t.getPosAndCircle(executeAt)
    // 將定時任務通過 channel 進行投遞
    t.addTaskCh <- &taskElement{
        pos:   pos,
        cycle: cycle,
        task:  task,
        key:   key,
    }
}
// 根據執行時間推算得到定時任務從屬的 slot 位置,以及需要延遲的輪次
func (t *TimeWheel) getPosAndCircle(executeAt time.Time) (int, int) {
    delay := int(time.Until(executeAt))
    // 定時任務的延遲輪次
    cycle := delay / (len(t.slots) * int(t.interval))
    // 定時任務從屬的環狀數組 index
    pos := (t.curSlot + delay/int(t.interval)) % len(t.slots)
    return pos, cycle
}
// 常駐 goroutine 接收到創建定時任務後的處理邏輯
func (t *TimeWheel) addTask(task *taskElement) {
    // 獲取到定時任務從屬的環狀數組 index 以及對應的 list
    list := t.slots[task.pos]
    // 倘若定時任務 key 之前已存在,則需要先刪除定時任務
    if _, ok := t.keyToETask[task.key]; ok {
        t.removeTask(task.key)
    }
    // 將定時任務追加到 list 尾部
    eTask := list.PushBack(task)
    // 建立定時任務 key 到將定時任務所處的節點
    t.keyToETask[task.key] = eTask
}

2.5 刪除任務

刪除一筆定時任務的核心步驟如下:

// 刪除定時任務,投遞信號
func (t *TimeWheel) RemoveTask(key string) {
    t.removeTaskCh <- key
}
// 時間輪常駐 goroutine 接收到刪除任務信號後,執行的刪除任務邏輯
func (t *TimeWheel) removeTask(key string) {
    eTask, ok := t.keyToETask[key]
    if !ok {
        return
    }
    // 將定時任務節點從映射 map 中移除
    delete(t.keyToETask, key)
    // 獲取到定時任務節點後,將其從 list 中移除
    task, _ := eTask.Value.(*taskElement)
    _ = t.slots[task.pos].Remove(eTask)
}

2.6 執行定時任務

最後來捋一下最核心的鏈路——檢索並批量執行定時任務的流程.

首先,每當接收到 ticker 信號時,會根據當前的 curSlot 指針,獲取到對應 slot 位置掛載的定時任務 list,調用 execute 方法執行其中的定時任務. 最後通過 circularIncr 方法推進 curSlot 指針向前移動.

// 常駐 goroutine 每次接收到定時信號後用於執行定時任務的邏輯
func (t *TimeWheel) tick() {
    // 根據 curSlot 獲取到當前所處的環狀數組索引位置,取出對應的 list
    list := t.slots[t.curSlot]
     // 在方法返回前,推進 curSlot 指針的位置,進行環狀遍歷
    defer t.circularIncr()
    // 批量處理滿足執行條件的定時任務
    t.execute(list)
}

在 execute 方法中,會對 list 中的定時任務進行遍歷:

// 執行定時任務,每次處理一個 list
func (t *TimeWheel) execute(l *list.List) {
    // 遍歷 list
    for e := l.Front(); e != nil; {
        // 獲取到每個節點對應的定時任務信息
        taskElement, _ := e.Value.(*taskElement)
        // 倘若任務還存在延遲輪次,則只對 cycle 計數器進行扣減,本輪不作任務的執行
        if taskElement.cycle > 0 {
            taskElement.cycle--
            e = e.Next()
            continue
        }


        // 當前節點對應定時任務已達成執行條件,開啓一個 goroutine 負責執行任務
        go func() {
            defer func() {
                if err := recover(); err != nil {
                    // ...
                }
            }()
            taskElement.task()
        }()


        // 任務已執行,需要把對應的任務節點從 list 中刪除
        next := e.Next()
        l.Remove(e)
        // 把任務 key 從映射 map 中刪除
        delete(t.keyToETask, taskElement.key)
        e = next
    }
}

在 circularIncr 方法中,呼應了環狀數組的邏輯處理方式:

// 每次 tick 後需要推進 curSlot 指針的位置,slots 在邏輯意義上是環狀數組,所以在到達尾部時需要從新回到頭部 
func (t *TimeWheel) circularIncr() {
    t.curSlot = (t.curSlot + 1) % len(t.slots)
}

3 分佈式版實現

本章我們討論一下,如何基於 redis 實現分佈式版本的時間輪,以貼合實際生產環境對分佈式定時任務調度系統的訴求.

redis 版時間輪的實現思路是使用 redis 中的有序集合 sorted set(簡稱 zset) 進行定時任務的存儲管理,其中以每個定時任務執行時間對應的時間戳作爲 zset 中的 score,完成定時任務的有序排列組合.

zset 數據結構的 redis 官方文檔鏈接:https://redis.io/docs/data-types/sorted-sets/

這裏有兩個的技術細節需要提前和大家同步:

3.1 核心類

3.1.1 redis 時間輪

在 redis 版時間輪中有兩個核心類,第一個是關於時間輪的類定義:

// 基於 redis 實現的分佈式版時間輪
type RTimeWheel struct {
    // 內置的單例工具,用於保證 stopc 只被關閉一次
    sync.Once
    // redis 客戶端
    redisClient *redis.Client
    // http 客戶端. 在執行定時任務時需要使用到.
    httpClient  *thttp.Client
    // 用於停止時間輪的控制器 channel
    stopc       chan struct{
    // 觸發定時掃描任務的定時器 
    ticker      *time.Ticker
}

3.1.2 定時任務

定時任務的類型定義如下,其中包括定時任務的唯一鍵 key,以及執行定時任務回調時需要使用到的 http 協議參數.

// 使用方提交的每一筆定時任務
type RTaskElement struct {
    // 定時任務全局唯一 key
    Key         string            `json:"key"`
    // 定時任務執行時,回調的 http url
    CallbackURL string            `json:"callback_url"`
    // 回調時使用的 http 方法
    Method      string            `json:"method"`
    // 回調時傳遞的請求參數
    Req         interface{}       `json:"req"`
    // 回調時使用的 http 請求頭
    Header      map[string]string `json:"header"`
}

3.2 redis lua 使用事項

本項目使用的 redis 客戶端是我個人基於 golang-redis 客戶端 sdk——redigo 進一步封裝實現的,redigo 的開源地址爲: https://github.com/gomodule/redigo

此處主要在 redisClient 中封裝了一個 Eval 方法,便於使用 redis lua 腳本執行復合指令.

// Eval:執行 lua 腳本.
// src——lua 腳本內容 keyCount——key 的數量 keysAndArgs——由 key 和 args 組成的列表
func (c *Client) Eval(ctx context.Context, src string, keyCount int, keysAndArgs []interface{}) (interface{}, error) {
    args := make([]interface{}, 2+len(keysAndArgs))
    args[0] = src
    args[1] = keyCount
    copy(args[2:], keysAndArgs)


    // 從 redis 鏈接池中獲取一個連接
    conn, err := c.pool.GetContext(ctx)
    if err != nil {
        return -1, err
    }
    // 使用完成後將連接放回池子
    defer conn.Close()
    // 執行 lua 腳本
    return conn.Do("EVAL", args...)
}

lua 腳本是 redis 的高級功能,能夠保證針在單個 redis 節點內執行的一系列指令具備原子性,中途不會被其他操作者打斷.

redis lua 功能介紹:https://redis.io/docs/interact/programmability/eval-intro/

lua 語法教程:https://www.runoob.com/lua/lua-tutorial.html

此處之所以需要使用 lua 腳本,是因爲在實現時間輪的過程中,存在一系列本身不具備原子性但在業務流程中不可拆解的複合操作,需要由 lua 腳本賦予其原子性質.

在使用 lua 時,尤其需要注意的點是,只有操作的數據屬於單個 redis 節點時,才能保證其原子性. 然而在生產環境中,redis 通常採用縱向分治的集羣模式,這使得 key 不同的數據可能被分發在不同的 redis 節點上,此時 lua 腳本的性質就無法保證.

在使用 lua 腳本時,倘若一系列複合操作都是針對於同一個 key,那麼數據必然位於同一個節點,沒有任何疑議. 倘若我們在 lua 中涉及到對多個 key 的操作,那麼這些 key 對應的數據就可能從屬於不同的 redis 節點,此時 lua 腳本存在失效的風險.

針對這個問題,本項目採取的是定製的分區策略,來保證指定的 key 一定被分發到相同的 redis 節點上. 此處使用的方式是通過 "{}" 進行 hash_tag 的標識,所有擁有相同 hash_tag 的 key 都一定會被分發到相同的節點上.

該分區策略可參見 redis 官方文檔:https://redis.io/commands/cluster-keyslot/

指令示例如下:

> CLUSTER KEYSLOT somekey
(integer) 11058
> CLUSTER KEYSLOT foo{hash_tag}
(integer) 2515
> CLUSTER KEYSLOT bar{hash_tag}
(integer) 2515

3.3 構造器

在構造時間輪實例時,使用方需要注入 redis 客戶端以及 http 客戶端.

在初始化流程中,ticker 爲 golang 標準庫實現的定時器,定時器的執行時間間隔固定爲 1 s. 此外會異步運行 run 方法,啓動一個常駐 goroutine,生命週期會通過 stopc channel 進行控制.

// 構造 redis 實現的分佈式時間輪
func NewRTimeWheel(redisClient *redis.Client, httpClient *thttp.Client) *RTimeWheel {
    // 創建時間輪實例
    r := RTimeWheel{
        // 創建定時器,每隔 1 s 執行一次
        ticker:      time.NewTicker(time.Second),
        redisClient: redisClient,
        httpClient:  httpClient,
        stopc:       make(chan struct{}),
    }


    // 異步啓動時間輪
    go r.run()
    return &r
}

3.4 啓動與停止

時間輪常駐 goroutine 運行流程同樣通過 for + select 的形式運行:

// 運行時間輪
func (r *RTimeWheel) run() {
    // 通過 for + select 的代碼結構運行一個常駐 goroutine 是常規操作
    for {
        select {
        // 接收到終止信號,則退出 goroutine
        case <-r.stopc:
            return
        // 每次接收到來自定時器的信號,則批量掃描並執行定時任務
        case <-r.ticker.C:
            // 每次 tick 獲取任務
            go r.executeTasks()
        }
    }
}

停止時間輪的 Stop 方法通過關閉 stopc 保證常駐 goroutine 能夠及時退出.

// 停止時間輪
func (r *RTimeWheel) Stop() {
    // 基於單例工具,保證 stopc 只能被關閉一次
    r.Do(func() {
        // 關閉 stopc,使得常駐 goroutine 停止運行
        close(r.stopc)
        // 終止定時器 ticker
        r.ticker.Stop()
    })
}

3.5 創建任務

在創建定時任務時,每筆定時任務需要根據其執行的時間找到從屬的分鐘時間片.

定時任務真正的存儲邏輯定義在一段 lua 腳本中,通過 redis 客戶端的 Eval 方法執行.

// 添加定時任務
func (r *RTimeWheel) AddTask(ctx context.Context, key string, task *RTaskElement, executeAt time.Time) error {
    // 前置對定時任務的參數進行校驗
    if err := r.addTaskPrecheck(task); err != nil {
        return err
    }


    task.Key = key
    // 將定時任務序列化成字節數組
    taskBody, _ := json.Marshal(task)
    // 通過執行 lua 腳本,實現將定時任務添加 redis zset 中. 本質上底層使用的是 zadd 指令.
    _, err := r.redisClient.Eval(ctx, LuaAddTasks, 2, []interface{}{
        // 分鐘級 zset 時間片
        r.getMinuteSlice(executeAt),
        // 標識任務刪除的集合
        r.getDeleteSetKey(executeAt),
        // 以執行時刻的秒級時間戳作爲 zset 中的 score
        executeAt.Unix(),
        // 任務明細
        string(taskBody),
        // 任務 key,用於存放在刪除集合中
        key,
    })
    return err
}

下面展示的是獲取分鐘級定時任務有序表 minuteSlice 以及已刪除任務集合 deleteSet 的細節.

此處呼應了 3.3 小節,通過以分鐘級表達式作爲 {hash_tag} 的方式,確保 minuteSlice 和 deleteSet 一定會分發到相同的 redis 節點之上,進一步保證 lua 腳本的原子性能夠生效.

func (r *RTimeWheel) getMinuteSlice(executeAt time.Time) string {
    return fmt.Sprintf("xiaoxu_timewheel_task_{%s}", util.GetTimeMinuteStr(executeAt))
}
func (r *RTimeWheel) getDeleteSetKey(executeAt time.Time) string {
    return fmt.Sprintf("xiaoxu_timewheel_delset_{%s}", util.GetTimeMinuteStr(executeAt))
}

下面展示一下創建定時任務流程中 lua 腳本的執行邏輯:

const (
    // 添加任務時,如果存在刪除 key 的標識,則將其刪除
    // 添加任務時,根據時間(所屬的 min)決定數據從屬於哪個分片{}
    LuaAddTasks = `
       -- 獲取的首個 key 爲 zset 的 key
       local zsetKey = KEYS[1]
       -- 獲取的第二個 key 爲標識已刪除任務 set 的 key
       local deleteSetKey = KEYS[2]
       -- 獲取的第一個 arg 爲定時任務在 zset 中的 score
       local score = ARGV[1]
       -- 獲取的第二個 arg 爲定時任務明細數據
       local task = ARGV[2]
       -- 獲取的第三個 arg 爲定時任務唯一鍵,用於將其從已刪除任務 set 中移除
       local taskKey = ARGV[3]
       -- 每次添加定時任務時,都直接將其從已刪除任務 set 中移除,不管之前是否在 set 中
       redis.call('srem',deleteSetKey,taskKey)
       -- 調用 zadd 指令,將定時任務添加到 zset 中
       return redis.call('zadd',zsetKey,score,task)
    `
)

3.6 刪除任務

刪除定時任務的方式是將定時任務追加到分鐘級的已刪除任務 set 中. 之後在檢索定時任務時,會根據這個 set 對定時任務進行過濾,實現惰性刪除機制.

// 從 redis 時間輪中刪除一個定時任務
func (r *RTimeWheel) RemoveTask(ctx context.Context, key string, executeAt time.Time) error {
    // 執行 lua 腳本,將被刪除的任務追加到 set 中.
    _, err := r.redisClient.Eval(ctx, LuaDeleteTask, 1, []interface{}{
        r.getDeleteSetKey(executeAt),
        key,
    })
    return err
}

lua 執行邏輯如下:

const(    
    // 刪除定時任務 lua 腳本
    LuaDeleteTask = `
       -- 獲取標識刪除任務的 set 集合的 key
       local deleteSetKey = KEYS[1]
       -- 獲取定時任務的唯一鍵
       local taskKey = ARGV[1]
       -- 將定時任務唯一鍵添加到 set 中
       redis.call('sadd',deleteSetKey,taskKey)
       -- 倘若是 set 中的首個元素,則對 set 設置 120 s 的過期時間
       local scnt = redis.call('scard',deleteSetKey)
       if (tonumber(scnt) == 1)
       then
           redis.call('expire',deleteSetKey,120)
       end
       return scnt
)    `

3.7 執行定時任務

在執行定時任務時,會通過 getExecutableTasks 方法批量獲取到滿足執行條件的定時任務 list,然後併發調用 execute 方法完成定時任務的回調執行.

// 批量執行定時任務
func (r *RTimeWheel) executeTasks() {
    defer func() {
        if err := recover(); err != nil {
            // log
        }
    }()


    // 併發控制,保證 30 s 之內完成該批次全量任務的執行,及時回收 goroutine,避免發生 goroutine 泄漏
    tctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
    defer cancel()
    // 根據當前時間條件掃描 redis zset,獲取所有滿足執行條件的定時任務
    tasks, err := r.getExecutableTasks(tctx)
    if err != nil {
        // log
        return
    }


    // 併發執行任務,通過 waitGroup 進行聚合收口
    var wg sync.WaitGroup
    for _, task := range tasks {
        wg.Add(1)
        // shadow
        task := task
        go func() {
            defer func() {
                if err := recover(); err != nil {
                }
                wg.Done()
            }()
            // 執行定時任務
            if err := r.executeTask(tctx, task); err != nil {
                // log
            }
        }()
    }
    wg.Wait()
}

3.8 檢索定時任務

最後介紹一下,如何根據當前時間獲取到滿足執行條件的定時任務列表:

func (r *RTimeWheel) getExecutableTasks(ctx context.Context) ([]*RTaskElement, error) {
    now := time.Now()
    // 根據當前時間,推算出其從屬的分鐘級時間片
    minuteSlice := r.getMinuteSlice(now)
    // 推算出其對應的分鐘級已刪除任務集合
    deleteSetKey := r.getDeleteSetKey(now)
    nowSecond := util.GetTimeSecond(now)
    // 以秒級時間戳作爲 score 進行 zset 檢索
    score1 := nowSecond.Unix()
    score2 := nowSecond.Add(time.Second).Unix()
    // 執行 lua 腳本,本質上是通過 zrange 指令結合秒級時間戳對應的 score 進行定時任務檢索
    rawReply, err := r.redisClient.Eval(ctx, LuaZrangeTasks, 2, []interface{}{
        minuteSlice, deleteSetKey, score1, score2,
    })
    if err != nil {
        return nil, err
    }


    // 結果中,首個元素對應爲已刪除任務的 key 集合,後續元素對應爲各筆定時任務
    replies := gocast.ToInterfaceSlice(rawReply)
    if len(replies) == 0 {
        return nil, fmt.Errorf("invalid replies: %v", replies)
    }


    deleteds := gocast.ToStringSlice(replies[0])
    deletedSet := make(map[string]struct{}, len(deleteds))
    for _, deleted := range deleteds {
        deletedSet[deleted] = struct{}{}
    }


    // 遍歷各筆定時任務,倘若其存在於刪除集合中,則跳過,否則追加到 list 中返回,用於後續執行
    tasks := make([]*RTaskElement, 0, len(replies)-1)
    for i := 1; i < len(replies); i++ {
        var task RTaskElement
        if err := json.Unmarshal([]byte(gocast.ToString(replies[i]))&task); err != nil {
            // log
            continue
        }


        if _, ok := deletedSet[task.Key]; ok {
            continue
        }
        tasks = append(tasks, &task)
    }


    return tasks, nil
}

lua 腳本的執行邏輯如下:

(    
    // 掃描 redis 時間輪. 獲取分鐘範圍內,已刪除任務集合 以及在時間上達到執行條件的定時任務進行返回
    LuaZrangeTasks = `
       -- 第一個 key 爲存儲定時任務的 zset key
       local zsetKey = KEYS[1]
       -- 第二個 key 爲已刪除任務 set 的 key
       local deleteSetKey = KEYS[2]
       -- 第一個 arg 爲 zrange 檢索的 score 左邊界
       local score1 = ARGV[1]
       -- 第二個 arg 爲 zrange 檢索的 score 右邊界
       local score2 = ARGV[2]
       -- 獲取到已刪除任務的集合
       local deleteSet = redis.call('smembers',deleteSetKey)
       -- 根據秒級時間戳對 zset 進行 zrange 檢索,獲取到滿足時間條件的定時任務
       local targets = redis.call('zrange',zsetKey,score1,score2,'byscore')
       -- 檢索到的定時任務直接從時間輪中移除,保證分佈式場景下定時任務不被重複獲取
       redis.call('zremrangebyscore',zsetKey,score1,score2)
       -- 返回的結果是一個 table
       local reply = {}
       -- table 的首個元素爲已刪除任務集合
       reply[1] = deleteSet
       -- 依次將檢索到的定時任務追加到 table 中
       for i, v in ipairs(targets) do
           reply[#reply+1]=v
       end
       return reply
    `
)

4 總結

本期和大家探討了如何基於 golang 從零到一實現時間輪算法,通過原理結合源碼,詳細展示了單機版和 redis 分佈式版時間輪的實現方式.

最後我們展望一下,時間輪算法在工程實踐中具體能得到怎樣的應用呢?

此前我基於 redis zset 時間輪的模型實現了一個分佈式定時任務調度系統 xtimer,大家感興趣的話可以閱讀一下我之前分享的這篇文章:基於協程池架構實現的分佈式定時器 XTimer

該項目源碼已於 github 開源:http://github.com/xiaoxuxiansheng/xtimer

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/0HwuYTTe9cdT46advEZe0w