基於 CRON 庫擴展的分佈式 Crontab 的實現
0x00 前言
cron[1] 是一個用於管理定時任務的庫(單機),基於 Golang 實現 Linux 中 crontab 的功能
0x01 使用
Linux 的 crontab
crontab 基本格式:
# 文件格式說明
# ┌──分鐘(0 - 59)
# │ ┌──小時(0 - 23)
# │ │ ┌──日(1 - 31)
# │ │ │ ┌─月(1 - 12)
# │ │ │ │ ┌─星期(0 - 6,表示從週日到週六)
# │ │ │ │ │
# * * * * * 被執行的命令
基礎例子
用法極豐富,V3 版本也支持標準的 crontab
格式,具體用法細節可以參考 此文 [2]:
func main() {
job := cron.New(
cron.WithSeconds(), // 添加秒級別支持,默認支持最小粒度爲分鐘(如需秒級精度則必須設置)
)
// 每秒鐘執行一次
job.AddFunc("* * * * * *", func() {
fmt.Printf("task run: %v\n", time.Now())
})
job.Run() // 啓動
}
其他典型的用法還有如下:
type cronJobDemo int
func (c cronJobDemo) Run() {
fmt.Println("5s func trigger")
return
}
func main() {
c := cron.New(
cron.WithSeconds(),
)
c.AddFunc("0 * * * *", func() { fmt.Println("Every hour on the half hour") })
c.AddFunc("30 3-6,20-23 * * *", func() { fmt.Println(".. in the range 3-6am, 8-11pm") })
c.AddFunc("CRON_TZ=Asia/Tokyo 30 04 * * *", func() { fmt.Println("Runs at 04:30 Tokyo time every day") })
c.AddFunc("@every 5m", func() { fmt.Println("every 5m, start 5m fron now") }) // 容易理解的格式
// 通過 AddJob 註冊
var cJob cronJobDemo
c.AddJob("@every 5s", cJob)
c.Start()
// c.Stop()
select {}
}
0x02 代碼分析
核心數據結構
對於 cron 庫的整體邏輯,最關鍵的兩個數據結構就是 Entry
和 Cron
1、Job
:抽象一個定時任務,cron 調度一個 Job
,就去執行 Job
的 Run()
方法
type Job interface {
Run()
}
FuncJob
:FuncJob
實際就是一個 func()
類型,實現了 Run()
方法:
type FuncJob func()
func (f FuncJob) Run() {
f()
}
在實際應用中,我們需要對 Job
結構做一些擴展,於是就有了 JobWrapper
,使用修飾器機制加工 Job(傳入一個 Job
,返回一個 Job
),有點像 gin 中間件,包裝器可以在執行實際的 Job 前後添加一些邏輯,然後使用一個 Chain
將這些 JobWrapper
組合到一起。
比如給 Job
添加這樣一些屬性:
-
在
Job
回調方法中捕獲panic
異常 -
如果
Job
上次運行還未結束,推遲本次執行 -
如果
Job
上次運行還未結束,跳過本次執行 -
記錄每個
Job
的執行情況
type JobWrapper func(Job) Job
type Chain struct {
wrappers []JobWrapper
}
func NewChain(c ...JobWrapper) Chain {
return Chain{c}
}
2、Chain
結構Chain
是 JobWrapper
的數組,調用 Chain
對象的 Then(j Job)
方法應用這些 JobWrapper
,返回最終的 Job
:
type Chain struct {
wrappers []JobWrapper
}
func NewChain(c ...JobWrapper) Chain {
return Chain{c}
}
func (c Chain) Then(j Job) Job {
for i := range c.wrappers {
// 注意:應用 JobWrapper 的順序
j = c.wrappers[len(c.wrappers)-i-1](j "len(c.wrappers "len(c.wrappers)-i-1")-i-1")
}
return j
}
3、Schedule
:描述一個 job 如何循環執行的抽象,需要實現Next
方法,此方法返回任務下次被調度的時間
// Schedule describes a job's duty cycle.
type Schedule interface {
// Next returns the next activation time, later than the given time.
// Next is invoked initially, and then each time the job is run.
Next(time.Time) time.Time
}
Scheduler
的實例化結構有:
-
ConstantDelaySchedule
:實現 [3] -
SpecSchedule
:實現 [4],默認選擇,提供了對 Cron 表達式的解析能力
4、Entry
結構:抽象了一個 job 每當使用 AddJob
註冊一個定時調用策略,就會爲該策略生成唯一的 Entry
,Entry
裏會存儲被執行的時間、需要被調度執行的實體 Job
type Entry struct {
ID EntryID // job id,可以通過該 id 來刪除 job
Schedule Schedule // 用於計算 job 下次的執行時間
Next time.Time // job 下次執行時間
Prev time.Time // job 上次執行時間,沒執行過爲 0
WrappedJob Job // 修飾器加工過的 job
Job Job // 未經修飾的 job,可以理解爲 AddFunc 的第二個參數
}
5、Cron
結構 [5]:關於 Cron
結構,有一些細節,entries
爲何設計爲一個指針 slice
?
// Cron keeps track of any number of entries, invoking the associated func as
// specified by the schedule. It may be started, stopped, and the entries may
// be inspected while running.
type Cron struct {
entries []*Entry // 所有 Job 集合
chain Chain // 裝飾器鏈
stop chan struct{} // 停止信號
add chan *Entry // 用於異步增加 Entry
remove chan EntryID // 用於異步刪除 Entry
snapshot chan chan []Entry
running bool // 是否正在運行
logger Logger
runningMu sync.Mutex // 運行時鎖
location *time.Location // 時區相關
parser Parser // Cron 解析器
nextID EntryID
jobWaiter sync.WaitGroup // 併發控制,正在運行的 Job
}
entries 成員
剛纔說到 entries
爲何設計爲指針 slice
,原因在於 cron 核心邏輯中,每次循環開始時都會對 Cron.entries
進行排序,排序字段依賴於每個 Entry
結構的 Next
成員,排序依賴於下面的原則:
-
按照觸發時間正向排序,越先觸發的越靠前
-
IsZero
的任務向後面排 -
由於可能存在相同週期的任務 Job,所以排序是不穩定的
// byTime is a wrapper for sorting the entry array by time
// (with zero time at the end).
type byTime []*Entry
func (s byTime) Len() int { return len(s) }
func (s byTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s byTime) Less(i, j int) bool {
// Two zero times should return false.
// Otherwise, zero is "greater" than any other time.
// (To sort it at the end of the list.)
if s[i].Next.IsZero() {
return false
}
if s[j].Next.IsZero() {
return true
}
// 排序的原則,s[i] 比 s[j] 先觸發
return s[i].Next.Before(s[j].Next)
}
0x03 內置 JobWrapper 介紹
Recover:捕捉 panic,避免進程異常退出
此 wrapper 比較好理解,在執行內層的 Job 邏輯前,添加 recover() 調用。如果 Job.Run() 執行過程中有 panic。這裏的 recover() 會捕獲到,輸出調用堆棧
// cron.go
func Recover(logger Logger) JobWrapper {
return func(j Job) Job {
return FuncJob(func() {
defer func() {
if r := recover(); r != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
err, ok := r.(error)
if !ok {
err = fmt.Errorf("%v", r)
}
logger.Error(err, "panic", "stack", "...\n"+string(buf))
}
}()
j.Run()
})
}
}
DelayIfStillRunning
實現了已有任務運行推遲的邏輯。核心是通過一個(任務共用的)互斥鎖 sync.Mutex
,每次執行任務前獲取鎖,執行結束之後釋放鎖。所以在上一個任務結束前,下一個任務獲取鎖會阻塞,從而保證的任務的串行執行。
// chain.go
func DelayIfStillRunning(logger Logger) JobWrapper {
return func(j Job) Job {
var mu sync.Mutex
return FuncJob(func() {
start := time.Now()
// 下一個任務阻塞等待獲取鎖
mu.Lock()
defer mu.Unlock()
if dur := time.Since(start); dur > time.Minute {
logger.Info("delay", "duration", dur)
}
j.Run()
})
}
}
SkipIfStillRunning
和 DelayIfStillRunning
機制不一樣,該方法是跳過執行,通過無緩衝 channel 機制實現。執行任務時,從通道中取值,如果成功,執行,否則跳過。執行完成之後再向通道中發送一個值,確保下一個任務能執行。初始發送一個值到通道中,保證第一個任務的執行。
func SkipIfStillRunning(logger Logger) JobWrapper {
return func(j Job) Job {
// 定義一個無緩衝 channel
var ch = make(chan struct{}, 1)
ch <- struct{}{}
return FuncJob(func() {
select {
case v := <-ch:
j.Run()
ch <- v
default:
logger.Info("skip")
}
})
}
}
0x04 核心方法分析
AddJob 方法
AddJob
方法通過兩種方法將任務節點 entry 添加到 Cron.entries
中:
-
初始化時,直接
append
-
運行狀態下,通過 channel 方式異步添加,避免加鎖
// AddJob adds a Job to the Cron to be run on the given schedule.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
schedule, err := c.parser.Parse(spec)
if err != nil {
return 0, err
}
return c.Schedule(schedule, cmd), nil
}
// Schedule adds a Job to the Cron to be run on the given schedule.
// The job is wrapped with the configured Chain.
func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
c.runningMu.Lock()
defer c.runningMu.Unlock()
c.nextID++
entry := &Entry{
ID: c.nextID,
Schedule: schedule,
WrappedJob: c.chain.Then(cmd),
Job: cmd,
}
if !c.running {
// 直接加
c.entries = append(c.entries, entry)
} else {
// 異步
c.add <- entry
}
return entry.ID
}
run 方法
run 方法
cron 的核心 run()
方法的實現如下,這個是很經典的 for-select
異步處理模型,避免的對 entries
加鎖,非常值得借鑑。其核心有如下幾點:
- 一個定時任務(集)的實現,內部採用排序數組,取數組首位元素的時間作爲
timer
觸發時間(感覺可以優化爲最小堆?)
-
每個
entry
都包含了該entry
下一次執行的絕對時間,本輪執行完成後立即計算下一輪時間,等待下次循環時排序更新 -
每次循環開始對
cron.entries
按下次執行時間升序排序,只需要對第一個entry
啓動定時器即可 -
定時器事件觸發時,輪詢
cron.entries
裏需要執行的entries
直到第一個不滿足條件的,由於數組是升序,後面無需再遍歷 -
同時,第一個定時器處理結束開啓下次定時器時,也只需要更新執行過的
cron.entries
的Next
(下次執行時間),不需要更新所有的cron.entries
-
Cron
內部數據結構的維護,採用channel
實現無鎖機制,缺點是可能會有誤差(ms 級),不過在此項目是能夠容忍的,以Job
異步添加爲例(運行中添加
entry
,走異步方式,有duration
的延遲):
-
某個
Job
之間的delta
差,可能多出了duration
的延遲,可以容忍 -
定時器實現裏,會掃描所有當前時間之前的
cron.entries
來執行,增加了容錯
func (c *Cron) run() {
c.logger.Info("start")
// 初始化,計算每個 Job 下次的執行時間
now := c.now()
for _, entry := range c.entries {
entry.Next = entry.Schedule.Next(now)
c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
}
// 在 dead loop,進行任務調度
for {
// 根據下一次的執行時間,對所有 Job 排序
sort.Sort(byTime(c.entries))
// 計時器,用於沒有任務可調度時的阻塞操作
var timer *time.Timer
if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
// 無任務可調度,設置計時器到一個很大的值,把下面的 for 阻塞住
timer = time.NewTimer(100000 * time.Hour)
} else {
// 有任務可調度了,計時器根據第一個可調度任務的下次執行時間設置
// 排過序,所以第一個肯定是最先被執行的
timer = time.NewTimer(c.entries[0].Next.Sub(now))
}
for {
select {
// 有 Job 到了執行時間
case now = <-timer.C:
now = now.In(c.location)
c.logger.Info("wake", "now", now)
// 檢查所有 Job,執行到時的任務
for _, e := range c.entries {
// 可能存在相同時間出發的任務
if e.Next.After(now) || e.Next.IsZero() {
// 後面都不需要遍歷了!
break
}
// 執行 Job 的 func()
c.startJob(e.WrappedJob)
// 保存上次執行時間
e.Prev = e.Next
// 設置 Job 下次的執行時間
e.Next = e.Schedule.Next(now)
c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
}
// 添加新 Job
case newEntry := <-c.add:
timer.Stop() // 必須注意,這裏停止定時器,避免內存泄漏!
now = c.now()
newEntry.Next = newEntry.Schedule.Next(now)
c.entries = append(c.entries, newEntry)
c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
// 獲取所有 Job 的快照
case replyChan := <-c.snapshot:
replyChan <- c.entrySnapshot()
continue
// 停止調度
case <-c.stop:
timer.Stop()
c.logger.Info("stop")
return
// 根據 entryId 刪除一個 Job
case id := <-c.remove:
timer.Stop()
now = c.now()
c.removeEntry(id)
c.logger.Info("removed", "entry", id)
}
break
}
}
}
上述的代碼的核心流程如下圖:
image
0x05 小結
本文分析了基於 Golang 實現的單機定時任務庫。
0x06 參考
-
golang cron v3 定時任務 [6]
-
v3-repo[7]
-
Go 每日一庫之 cron[8]
-
GO 編程模式:修飾器 [9]
參考資料
[1]
cron: https://github.com/robfig/cron/
[2]
此文: https://segmentfault.com/a/1190000023029219
[3]
實現: https://github.com/robfig/cron/blob/v3/constantdelay.go
[4]
實現: https://pandaychen.github.io/2021/10/05/A-GOLANG-CRONTAB-V3-BASIC-INTRO/
[5]
結構: https://github.com/robfig/cron/blob/v3/cron.go#L13
[6]
golang cron v3 定時任務: https://blog.cugxuan.cn/2020/06/04/Go/golang-cron-v3/
[7]
v3-repo: https://github.com/robfig/cron/tree/v3
[8]
Go 每日一庫之 cron: https://segmentfault.com/a/1190000023029219
[9]
GO 編程模式:修飾器: https://coolshell.cn/articles/17929.html
歡迎關注 Go 生態。生態君會不定期分享 Go 語言生態相關內容。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/MvmFjPKIhO-B6bO8DzN_zA