解析 Golang 定時任務庫 gron 設計和原理

從 cron 說起

在 Unix-like 操作系統中,有一個大家都很熟悉的 cli 工具,它能夠來處理定時任務,週期性任務,這就是: cron。 你只需要簡單的語法控制就能實現任意【定時】的語義。用法上可以參考一下這個 Crontab Guru Editor[1],做的非常精巧。

簡單說,每一個位都代表了一個時間維度,* 代表全集,所以,上面的語義是:在每天早上的 4 點 05 分觸發任務。

但 cron 畢竟只是一個操作系統級別的工具,如果定時任務失敗了,或者壓根沒啓動,cron 是沒法提醒開發者這一點的。並且,cron 和 正則表達式都有一種魔力,不知道大家是否感同身受,這裏引用同事的一句名言:

這世界上有些語言非常相似: shell 腳本, es 查詢的那個 dsl 語言, 定時任務的 crontab, 正則表達式. 他們相似就相似在每次要寫的時候基本都得重新現學一遍。

正巧,最近看到了 gron 這個開源項目,它是用 Golang 實現一個併發安全的定時任務庫。實現非常簡單精巧,代碼量也不多。今天我們就來一起結合源碼看一下,怎樣基於 Golang 的能力做出來一個【定時任務庫】。

gron

Gron provides a clear syntax for writing and deploying cron jobs.

gron[2] 是一個泰國小哥在 2016 年開源的作品,它的特點就在於非常簡單和清晰的語義來定義【定時任務】,你不用再去記 cron 的語法。我們來看下作爲使用者怎樣上手。

首先,我們還是一個 go get 安裝依賴:

$ go get github.com/roylee0704/gron

假設我們期望在【時機】到了以後,要做的工作是打印一個字符串,每一個小時執行一次,我們就可以這樣:

package main

import (
 "fmt"
 "time"
 "github.com/roylee0704/gron"
)

func main() {
 c := gron.New()
 c.AddFunc(gron.Every(1*time.Hour), func() {
  fmt.Println("runs every hour.")
 })
 c.Start()
}

非常簡單,而且即便是在 c.Start 之後我們依然可以添加新的定時任務進去。支持了很好的擴展性。

定時參數

注意到我們調用 gron.New().AddFunc() 時傳入了一個 gron.Every(1*time.Hour)

這裏其實你可以傳入任何一個 time.Duration,從而把調度間隔從 1 小時調整到 1 分鐘甚至 1 秒。

除此之外,gron 還很貼心地封裝了一個 xtime 包用來把常見的 time.Duration 封裝起來,這裏我們開箱即用。

import "github.com/roylee0704/gron/xtime"

gron.Every(1 * xtime.Day)
gron.Every(1 * xtime.Week)

很多時候我們不僅僅某個任務在當天運行,還希望是我們指定的時刻,而不是依賴程序啓動時間,機械地加 24 hour。gron 對此也做了很好的支持:

gron.Every(30 * xtime.Day).At("00:00")
gron.Every(1 * xtime.Week).At("23:59")

我們只需指定 At("hh:mm") 就可以實現在指定時間執行。

源碼解析

這一節我們來看看 gron 的實現原理。

所謂定時任務,其實包含兩個層面:

  1. 觸發器。即我們希望這個任務在什麼時間點,什麼週期被觸發;

  2. 任務。即我們在觸發之後,希望執行的任務,類比到我們上面示例的 fmt.Println。

對這兩個概念的封裝和擴展是一個定時任務庫必須考慮的。

而同時,我們是在 Golang 的協程上跑程序的,意味着這會是一個長期運行的協程,否則你即便指定了【一個月後幹 XXX】這個任務,程序兩天後掛了,也就無法實現你的訴求了。

所以,我們還希望有一個 manager 的角色,來管理我們的一組【定時任務】,如何調度,什麼時候啓動,怎麼停止,啓動了以後還想加新任務是否支持。

Cron

在 gron 的體系裏,Cron 對象(我們上面通過 gron.New 創建出來的)就是我們的 manager,而底層的一個個【定時任務】則對應到 Cron 對象中的一個個 Entry:

// Cron provides a convenient interface for scheduling job such as to clean-up
// database entry every month.
//
// Cron keeps track of any number of entries, invoking the associated func as
// specified by the schedule. It may also be started, stopped and the entries
// may be inspected.
type Cron struct {
 entries []*Entry
 running bool
 add     chan *Entry
 stop    chan struct{}
}

// New instantiates new Cron instant c.
func New() *Cron {
 return &Cron{
  stop: make(chan struct{}),
  add:  make(chan *Entry),
 }
}

我們觀察到,當調用 gron.New() 方法後,得到的是一個指向 Cron 對象的指針。此時只是初始化了 stop 和 add 兩個 channel,沒有啓動調度。

Entry

重頭戲來了,Cron 裏面的 []* Entry 其實就代表了一組【定時任務】,每個【定時任務】可以簡化理解爲 < 觸發器,任務> 組成的一個 tuple。

// Entry consists of a schedule and the job to be executed on that schedule.
type Entry struct {
 Schedule Schedule
 Job      Job

 // the next time the job will run. This is zero time if Cron has not been
 // started or invalid schedule.
 Next time.Time

 // the last time the job was run. This is zero time if the job has not been
 // run.
 Prev time.Time
}

// Schedule is the interface that wraps the basic Next method.
//
// Next deduces next occurring time based on t and underlying states.
type Schedule interface {
 Next(t time.Time) time.Time
}

// Job is the interface that wraps the basic Run method.
//
// Run executes the underlying func.
type Job interface {
 Run()
}

除了這兩個核心依賴外,Entry 結構還包含了【前一次執行時間點】和【下一次執行時間點】,這個目前可以忽略,只是爲了輔助代碼用。

按照時間排序

// byTime is a handy wrapper to chronologically sort entries.
type byTime []*Entry

func (b byTime) Len() int      { return len(b) }
func (b byTime) Swap(i, j int) { b[i], b[j] = b[j], b[i] }

// Less reports `earliest` time i should sort before j.
// zero time is not `earliest` time.
func (b byTime) Less(i, j int) bool {

 if b[i].Next.IsZero() {
  return false
 }
 if b[j].Next.IsZero() {
  return true
 }

 return b[i].Next.Before(b[j].Next)
}

這裏是對 Entry 列表的簡單封裝,因爲我們可能同時有多個 Entry 需要調度,處理的順序很重要。這裏實現了 sort 的接口, 有了 Len(), Swap(), Less() 我們就可以用 sort.Sort() 來排序了。

此處的排序策略是按照時間大小。

新增定時任務

我們在示例裏面出現過調用 AddFunc() 來加入一個 gron.Every(xxx) 這樣一個【定時任務】。其實這是給用戶提供的簡單封裝。

// JobFunc is an adapter to allow the use of ordinary functions as gron.Job
// If f is a function with the appropriate signature, JobFunc(f) is a handler
// that calls f.
//
// todo: possibly func with params? maybe not needed.
type JobFunc func()

// Run calls j()
func (j JobFunc) Run() {
 j()
}


// AddFunc registers the Job function for the given Schedule.
func (c *Cron) AddFunc(s Schedule, j func()) {
 c.Add(s, JobFunc(j))
}

// Add appends schedule, job to entries.
//
// if cron instant is not running, adding to entries is trivial.
// otherwise, to prevent data-race, adds through channel.
func (c *Cron) Add(s Schedule, j Job) {

 entry := &Entry{
  Schedule: s,
  Job:      j,
 }

 if !c.running {
  c.entries = append(c.entries, entry)
  return
 }
 c.add <- entry
}

JobFunc 實現了我們上一節提到的 Job 接口,基於此,我們就可以讓用戶直接傳入一個 func() 就 ok,內部轉成 JobFunc,再利用通用的 Add 方法將其加入到 Cron 中即可。

注意,這裏的 Add 方法就是新增定時任務的核心能力了,我們需要觸發器 Schedule,任務 Job。並以此來構造出一個定時任務 Entry。

若 Cron 實例還沒啓動,加入到 Cron 的 entries 列表裏就 ok,隨後啓動的時候會處理。但如果已經啓動了,就直接往 add 這個 channel 中塞,走額外的新增調度路徑。

啓動和停止

// Start signals cron instant c to get up and running.
func (c *Cron) Start() {
 c.running = true
 go c.run()
}


// Stop halts cron instant c from running.
func (c *Cron) Stop() {

 if !c.running {
  return
 }
 c.running = false
 c.stop <- struct{}{}
}

我們先 high level 地看一下一個 Cron 的啓動和停止。

ok,有了這個心裏預期,我們來看看 c.run() 裏面幹了什麼事:

var after = time.After


// run the scheduler...
//
// It needs to be private as it's responsible of synchronizing a critical
// shared state: `running`.
func (c *Cron) run() {

 var effective time.Time
 now := time.Now().Local()

 // to figure next trig time for entries, referenced from now
 for _, e := range c.entries {
  e.Next = e.Schedule.Next(now)
 }

 for {
  sort.Sort(byTime(c.entries))
  if len(c.entries) > 0 {
   effective = c.entries[0].Next
  } else {
   effective = now.AddDate(15, 0, 0) // to prevent phantom jobs.
  }

  select {
  case now = <-after(effective.Sub(now)):
   // entries with same time gets run.
   for _, entry := range c.entries {
    if entry.Next != effective {
     break
    }
    entry.Prev = now
    entry.Next = entry.Schedule.Next(now)
    go entry.Job.Run()
   }
  case e := <-c.add:
   e.Next = e.Schedule.Next(time.Now())
   c.entries = append(c.entries, e)
  case <-c.stop:
   return // terminate go-routine.
  }
 }
}

重點來了,看看我們是如何把上面 Cron, Entry, Schedule, Job 串起來的。

整體實現還是非常簡潔的,大家可以感受一下。

Schedule

前面其實我們暫時將觸發器的複雜性封裝在 Schedule 接口中了,但怎麼樣實現一個 Schedule 呢?

尤其是注意,我們還支持 At 操作,也就是指定 Day,和具體的小時,分鐘。回憶一下:

gron.Every(30 * xtime.Day).At("00:00")
gron.Every(1 * xtime.Week).At("23:59")

這一節我們就來看看,gron.Every 幹了什麼事,又是如何支持 At 方法的。

// Every returns a Schedule reoccurs every period p, p must be at least
// time.Second.
func Every(p time.Duration) AtSchedule {

 if p < time.Second {
  p = xtime.Second
 }

 p = p - time.Duration(p.Nanoseconds())%time.Second // truncates up to seconds

 return &periodicSchedule{
  period: p,
 }
}

gron 的 Every 函數接受一個 time.Duration,返回了一個 AtSchedule 接口。我待會兒會看,這裏注意,Every 裏面是會把【秒】級以下給截掉。

我們先來看下,最後返回的這個 periodicSchedule 是什麼:

type periodicSchedule struct {
 period time.Duration
}

// Next adds time t to underlying period, truncates up to unit of seconds.
func (ps periodicSchedule) Next(t time.Time) time.Time {
 return t.Truncate(time.Second).Add(ps.period)
}

// At returns a schedule which reoccurs every period p, at time t(hh:ss).
//
// Note: At panics when period p is less than xtime.Day, and error hh:ss format.
func (ps periodicSchedule) At(t string) Schedule {
 if ps.period < xtime.Day {
  panic("period must be at least in days")
 }

 // parse t naively
 h, m, err := parse(t)

 if err != nil {
  panic(err.Error())
 }

 return &atSchedule{
  period: ps.period,
  hh:     h,
  mm:     m,
 }
}

// parse naively tokenises hours and minutes.
//
// returns error when input format was incorrect.
func parse(hhmm string) (hh int, mm int, err error) {

 hh = int(hhmm[0]-'0')*10 + int(hhmm[1]-'0')
 mm = int(hhmm[3]-'0')*10 + int(hhmm[4]-'0')

 if hh < 0 || hh > 24 {
  hh, mm = 0, 0
  err = errors.New("invalid hh format")
 }
 if mm < 0 || mm > 59 {
  hh, mm = 0, 0
  err = errors.New("invalid mm format")
 }

 return
}

可以看到,所謂 periodicSchedule 就是一個【週期性觸發器】,只維護一個 time.Duration 作爲【週期】。

periodicSchedule 實現 Next 的方式也很簡單,把秒以下的截掉之後,直接 Add(period),把週期加到當前的 time.Time 上,返回新的時間點。這個大家都能想到。

重點在於,對 At 能力的支持。我們來關注下 func (ps periodicSchedule) At(t string) Schedule 這個方法

ok,這一步只是拿到了材料,那具體怎樣處理呢?這個還是得繼續往下走,看看 atSchedule 結構幹了什麼:

type atSchedule struct {
 period time.Duration
 hh     int
 mm     int
}

// reset returns new Date based on time instant t, and reconfigure its hh:ss
// according to atSchedule's hh:ss.
func (as atSchedule) reset(t time.Time) time.Time {
 return time.Date(t.Year(), t.Month(), t.Day(), as.hh, as.mm, 0, 0, time.UTC)
}

// Next returns **next** time.
// if t passed its supposed schedule: reset(t), returns reset(t) + period,
// else returns reset(t).
func (as atSchedule) Next(t time.Time) time.Time {
 next := as.reset(t)
 if t.After(next) {
  return next.Add(as.period)
 }
 return next
}

其實只看這個 Next 的實現即可。我們從 periodSchedule 那裏獲取了三個屬性。

在調用 Next 方法時,先做 reset,根據原有 time.Time 的年,月,日,以及用戶輸入的 At 中的小時,分鐘,來構建出來一個 time.Time 作爲新的時間點。

此後判斷是在哪個週期,如果當前週期已經過了,那就按照下個週期的時間點返回。

到這裏,一切就都清楚了,如果我們不用 At 能力,直接 gron.Every(xxx),那麼直接就會調用

t.Truncate(time.Second).Add(ps.period)

拿到一個新的時間點返回。

而如果我們要用 At 能力,指定當天的小時,分鐘。那就會走到 periodicSchedule.At 這裏,解析出【小時】和【分鐘】,最後走 Next 返回 reset 之後的時間點。

這個和 gron.Every 方法返回的 AtSchedule 接口其實是完全對應的:

// AtSchedule extends Schedule by enabling periodic-interval & time-specific setup
type AtSchedule interface {
 At(t string) Schedule
 Schedule
}

直接就有一個 Schedule 可以用,但如果你想針對天級以上的 duration 指定時間,也可以走 At 方法,也會返回一個 Schedule 供我們使用。

擴展性

gron 裏面對於所有的依賴也都做成了【依賴接口而不是實現】。Cron 的 Add 函數的入參也是兩個接口,這裏可以隨意替換:func (c *Cron) Add(s Schedule, j Job)

最核心的兩個實體依賴 Schedule, Job 都可以用你自定義的實現來替換掉。

如實現一個新的 Job:

type Reminder struct {
 Msg string
}

func (r Reminder) Run() {
  fmt.Println(r.Msg)
}

事實上,我們上面提到的 periodicSchedule 以及 atSchedule 就是 Schedule 接口的具體實現。我們也完全可以不用 gron.Every,而是自己寫一套新的 Schedule 實現。只要實現 Next(p time.Duration) time.Time 即可。

我們來看一個完整用法案例:

package main

import (
 "fmt"
 "github.com/roylee0704/gron"
 "github.com/roylee0704/gron/xtime"
)

type PrintJob struct{ Msg string }

func (p PrintJob) Run() {
 fmt.Println(p.Msg)
}

func main() {

 var (
  // schedules
  daily     = gron.Every(1 * xtime.Day)
  weekly    = gron.Every(1 * xtime.Week)
  monthly   = gron.Every(30 * xtime.Day)
  yearly    = gron.Every(365 * xtime.Day)

  // contrived jobs
  purgeTask = func() { fmt.Println("purge aged records") }
  printFoo  = printJob{"Foo"}
  printBar  = printJob{"Bar"}
 )

 c := gron.New()

 c.Add(daily.At("12:30"), printFoo)
 c.AddFunc(weekly, func() { fmt.Println("Every week") })
 c.Start()

 // Jobs may also be added to a running Gron
 c.Add(monthly, printBar)
 c.AddFunc(yearly, purgeTask)

 // Stop Gron (running jobs are not halted).
 c.Stop()
}

經典寫法 - 控制退出

這裏我們還是要聊一下 Cron 裏控制退出的經典寫法。我們把其他不相關的部分清理掉,只留下核心代碼:

type Cron struct {
 stop    chan struct{}
}

func (c *Cron) Stop() {
 c.stop <- struct{}{}
}

func (c *Cron) run() {

 for {
  select {
  case <-c.stop:
   return // terminate go-routine.
  }
 }
}

空結構體能夠最大限度節省內存,畢竟我們只是需要一個信號。核心邏輯用 for + select 的配合,這樣當我們需要結束時可以立刻響應。非常經典,建議大家日常有需要的時候採用。

結語

gron 整體代碼其實只在 cron.go 和 schedule.go 兩個文件,合起來代碼不過 300 行,非常精巧,基本沒有冗餘,擴展性很好,是非常好的入門材料。

不過,作爲一個 cron 的替代品,其實 gron 還是有自己的問題的。簡單講就是,如果我重啓了一個 EC2 實例,那麼我的 cron job 其實也還會繼續執行,這是落盤的,操作系統級別的支持。

但如果我執行 gron 的進程掛掉了,不好意思,那就完全涼了。你只有重啓,然後再把所有任務加回來纔行。而我們既然要用 gron,是很有可能定一個幾天後,幾個星期後,幾個月後這樣的觸發器的。誰能保證進程一直活着呢?連機子本身都可能重啓。

所以,我們需要一定的機制來保證 gron 任務的可恢復性,將任務落盤,持久化狀態信息,算是個思考題,這裏大家可以考慮一下怎麼做。

參考資料

[1]

Crontab Guru Editor: https://crontab.guru/

[2]

gron: https://crontab.guru/

轉自:

https://juejin.cn/post/7132715360293716004

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/bkvbST8x4cXLHDM545OabQ