程序員必會的任務調度實踐

1. 延時任務系統介紹

延時任務系統 (Delayed Job System) 是一種用於設置任務在將來的某個時刻自動觸發執行的機制。

**定義:**延時任務系統使作業 / 任務能夠在未來的某個預定義時間運行。簡單來說, 就是一種可以設置任務在未來執行的系統。

延時任務系統的主要特點是:

時間觸發: 根據預設的時間來自動觸發任務執行

可靠性: 具備容錯能力, 防止任務丟失

擴展性: 可以水平擴展, 提高吞吐量

靈活性: 可以自由配置執行邏輯

延時任務系統的主要應用場景:

訂單超時取消: 下單後如果超過 xx 分鐘未支付, 自動取消訂單

**短信 / 郵件驗證碼:**xx 分鐘內未使用驗證碼自動失效

商品秒殺: 定時開啓秒殺活動, 同一時間發起請求大量湧入

會員特權: 會員包年包月, 到期後自動失效

積分系統: 簽到、邀請獎勵, 定期發放

消息推送: 預約兌換碼, 定時推送兌換鏈接

2. Go 語言實現延時任務的方法

Go 語言中的標準庫和社區生態中提供了多種實現延時任務的方法。

(1) time.After()

time.After() 可以在指定的時間後發送一個值到返回的 chan 中。

package main
import (
   "fmt"
   "time"
)
func main() {
   c1 := time.After(time.Second * 5)
   <-c1
   fmt.Println("5 seconds later")
   c2 := time.After(time.Minute * 1)
   <-c2 
   fmt.Println("1 minute later")
}

time.After() 非常適合簡單的一次性延時任務。但由於返回一個 chan, 所以無法獲得狀態等信息。

(2) time.Tick()

time.Tick() 會每隔一定時間就發送一個事件到 chan 中, 可以實現定時執行。

package main
import (
   "fmt"
   "time"
)
func main() {
   ticker := time.NewTicker(time.Second * 2)
   i := 0
   for {
       <- ticker.C 
       i++
       fmt.Println("Tick", i) 
       if i == 5 {
           ticker.Stop()
           break
       }
   }
}

time.Tick() 適合要定期執行的任務場景。停止只需調用 ticker.Stop()。

(3) Worker Pools

使用 worker pool 方式可以更好地複用 goroutine, 而不是每次都啓動新的 goroutine。

package main
import (
   "fmt"
   "time"
)
func worker(id int) {
   for {
       fmt.Printf("Worker %d: started\n", id)
       time.Sleep(time.Second) 
   }
}
func main() {
   // 定義2個worker
   p := make(chan struct{}, 2) 
   // 啓動2個worker
   go worker(1)  
   p <- struct{}{}
   go worker(2)
   p <- struct{}{}
   // 停10秒後退出
   time.Sleep(time.Second * 10)
}

缺點是沒有統一的入口來發布任務。

(4) cronjobs

使用成熟的類似 cron 的第三方庫 (如 robfig/cron) 可以使得任務調度更加簡單。

package main
import (
   "github.com/robfig/cron"
   "fmt"
)
func task() {
   fmt.Println("Running task...") 
}
func main() {
   c := cron.New()
   c.AddFunc("@every 2s", task)
   c.Start()
   // 運行一段時間後退出
   time.Sleep(5 * time.Minute)
   c.Stop() 
}

基於 cron 語法可以方便地處理定時任務, 但是不支持延時任務。

綜上所說,Go 語言通過自帶的 time 包或第三方庫都可以實現延時任務, 後面將展示完整的基於 Redis Sorted Set 實現的延時任務系統。

3. 延時任務系統關鍵要素設計

開發一個可靠、可擴展的分佈式延時任務系統需要考慮一些關鍵要素:

(1) 任務創建

tasks 應該是一種通用的數據結構, 需要考慮版本、重試次數等數據信息。

接口設計應該簡單, 比如提供創建任務的函數:

// 定義任務結構
type Task struct {
   ID     string  
   Type   string      
   Payload map[string]string 
   // 其他元數據
}
func CreateTask(task *Task) error

(2) 任務存儲

需要選擇一個持久化存儲, 比如關係數據庫或者 Redis 等

存儲方式應該有利於任務的有序訪問和範圍掃描重點是查找操作需要高效。

(3) 任務調度

調度器需要定期掃描存儲, 查找已經到期需要執行的任務。

掃描間隔需要可以配置, 精確到毫秒。

超時時長可以作爲任務的一部分持久化存儲。

(4) 任務執行

Worker 進程負責監聽任務存儲, 彈出到期任務並執行相關業務邏輯。

如果任務失敗應該重試, 一般設置最大重試次數。

執行的任務狀態、時間、結果等需要更新存儲。

(5) 任務結果處理

執行成功的任務從存儲中刪除。

失敗的任務根據重試策略進行重試或備份。

回調機制可以通知相關服務。

(6) 容錯處理

考慮分佈式場景下可能出現的問題:

Clock Synchronization: 不同機器間時鐘不同步導致調度不準。

Data Inconsistency: 任務狀態 Replication 不一致。

Job Deduplication: 避免任務重複執行。

Resharding: 動態擴容時保證高可用。

4. 基於 Redis 的延時任務系統實現

基於 Redis 的 Sorted Set 數據結構, 可以簡單實現一個延時任務系統。

(1) 使用 Redis Sorted Set 存儲任務

ZADD tasks <timestamp1> <payload1>
ZADD tasks <timestamp2> <payload2>

時間戳是 Score,payload 就是需要延時執行的任務數據。

(2) 調度器定期掃描 Sorted Set

調度器維護一個指針 lastScanTimestamp, 每次掃描從這個時間點開始掃描:

ZRANGEBYSCORE tasks lastScanTimestamp 當前時間戳

掃描出的任務即爲已到期需要執行的任務。

(3) Worker 從 Set 中彈出執行

Worker 可以監聽任務已到期需要執行的 Redis Pub/Sub 頻道。

根據調度器發送的到期任務, 彈出該任務, 執行任務處理邏輯, 最後刪除已處理任務。

(4) 任務執行結果更新

如果任務執行成功, 直接從 Sorted Set 中刪除該任務。

如果執行失敗, 根據重試策略更新 Set 中任務的其他元數據, 例如重試次數等。

5. 性能優化方法

(1) 調整任務掃描間隔

掃描間隔直接影響任務延遲精確性。scanInterval 越小則延遲會越小, 但是 Redis 壓力會上升。

需要根據任務數量級調整間隔, 比如初期可以 100ms, 後期每增大 10 倍調整到 1s 等。

(2) 根據業務類型優化存儲

如果有大量具有相同超時時長的任務, 可以考慮爲他們創建獨立的 Sorted Set, 從而減少掃描範圍提升效率。

(3) 流控

大流量時爲了保護後端服務, 可以通過限流限速來控制生產入庫速度。例如線上業務可以按照每個用戶最多產生 2 個任務 / 秒來限速。

(4) 擴容機制

可以基於預警指標來自動擴容, 比如當每秒待處理任務數超過 10000 時, 自動增加一臺 Worker 節點。

6. 遇到的常見問題及解決方法

(1) 時鐘回撥

如果系統時鐘回撥, 可能導致任務重複執行。解決方法是每個任務維護一個唯一 ID,Worker 處理時檢查重試字段來避免重複執行。

(2) 分佈式下任務協同

分佈式場景下要確保同一任務只會被唯一一個 Worker 執行, 使用 SETNX 選取待處理任務可以防止重複執行。

(3) 任務丟失和重複執行

網絡抖動、重啓等都可能導致結果丟失或多次寫 Redis。這種情況使用唯一 ID + 重試次數也可以有效解決。

(4) 任務堆積處理

可以設置失敗任務的最大重試歷史, 如果超過此歷史則丟入獨立的 dead queues 中由人工處理。避免堆積的任務佔用存儲和計算資源。

7. 總結

(1) 延時任務系統的意義

延時任務系統是很多業務場景下的基礎通用服務, 包含了調度機制、容錯能力、執行框架等, 可以顯著簡化業務開發。

(2) Go 語言實現的優勢

Go 語言本身的高併發、簡潔和性能使其很適合構建後臺服務。利用 Go 語言開發的延時任務系統性能好, 互聯網公司的延時任務場景能夠承受大規模吞吐。

(3) 展望

可以預見隨着微服務架構和流量不斷增長, 對任務系統的可擴展性、隔離性和運維的要求會越來越高。基於 Kubernetes 等裝箱式部署, 以及 Serverless 架構的任務系統會是發展方向。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/GZ7lrPuEH01qT8s5eU_Z2w