基於 CRON 庫擴展的分佈式 Crontab 的實現

0x00 前言

cron[1] 是一個用於管理定時任務的庫(單機),基於 Golang 實現 Linux 中 crontab 的功能

0x01 使用

Linux 的 crontab

crontab 基本格式:

# 文件格式說明
# ┌──分鐘(0 - 59)
# │ ┌──小時(0 - 23)
# │ │ ┌──日(1 - 31)
# │ │ │ ┌─月(1 - 12)
# │ │ │ │ ┌─星期(0 - 6,表示從週日到週六)
# │ │ │ │ │
# * * * * * 被執行的命令

基礎例子

用法極豐富,V3 版本也支持標準的 crontab 格式,具體用法細節可以參考 此文 [2]:

func main() {
    job := cron.New(
        cron.WithSeconds(), // 添加秒級別支持,默認支持最小粒度爲分鐘(如需秒級精度則必須設置)
    )
    // 每秒鐘執行一次
    job.AddFunc("* * * * * *", func() {
        fmt.Printf("task run: %v\n", time.Now())
    })
    job.Run()   // 啓動
}

其他典型的用法還有如下:

type cronJobDemo int

func (c cronJobDemo) Run() {
        fmt.Println("5s func trigger")
        return
}

func main() {
    c := cron.New(
            cron.WithSeconds(),
    )
    c.AddFunc("0 * * * *", func() { fmt.Println("Every hour on the half hour") })
    c.AddFunc("30 3-6,20-23 * * *", func() { fmt.Println(".. in the range 3-6am, 8-11pm") })
    c.AddFunc("CRON_TZ=Asia/Tokyo 30 04 * * *", func() { fmt.Println("Runs at 04:30 Tokyo time every day") })
    c.AddFunc("@every 5m", func() { fmt.Println("every 5m, start 5m fron now") }) // 容易理解的格式
    // 通過 AddJob 註冊
    var cJob cronJobDemo
    c.AddJob("@every 5s", cJob)
    c.Start()
    // c.Stop()

    select {}
}

0x02 代碼分析

核心數據結構

對於 cron 庫的整體邏輯,最關鍵的兩個數據結構就是 EntryCron

1、Job:抽象一個定時任務,cron 調度一個 Job,就去執行 JobRun() 方法

type Job interface {
    Run()
}

FuncJobFuncJob 實際就是一個 func() 類型,實現了 Run() 方法:

type FuncJob func()
func (f FuncJob) Run() { 
    f() 
}

在實際應用中,我們需要對 Job 結構做一些擴展,於是就有了 JobWrapper,使用修飾器機制加工 Job(傳入一個 Job,返回一個 Job),有點像 gin 中間件,包裝器可以在執行實際的 Job 前後添加一些邏輯,然後使用一個 Chain 將這些 JobWrapper 組合到一起。

比如給 Job 添加這樣一些屬性:

type JobWrapper func(Job) Job

type Chain struct {
  wrappers []JobWrapper
}

func NewChain(c ...JobWrapper) Chain {
  return Chain{c}
}

2、Chain 結構ChainJobWrapper 的數組,調用 Chain 對象的 Then(j Job) 方法應用這些 JobWrapper,返回最終的 Job

type Chain struct {
  wrappers []JobWrapper
}

func NewChain(c ...JobWrapper) Chain {
  return Chain{c}
}

func (c Chain) Then(j Job) Job {
  for i := range c.wrappers {
      // 注意:應用 JobWrapper 的順序
    j = c.wrappers[len(c.wrappers)-i-1]("len(c.wrappers "len(c.wrappers)-i-1")-i-1")
  }
  return j
}

3、Schedule:描述一個 job 如何循環執行的抽象,需要實現Next方法,此方法返回任務下次被調度的時間

// Schedule describes a job's duty cycle.
type Schedule interface {
 // Next returns the next activation time, later than the given time.
 // Next is invoked initially, and then each time the job is run.
 Next(time.Time) time.Time
}

Scheduler 的實例化結構有:

4、Entry 結構:抽象了一個 job 每當使用 AddJob 註冊一個定時調用策略,就會爲該策略生成唯一的 EntryEntry 裏會存儲被執行的時間、需要被調度執行的實體 Job

type Entry struct {
    ID EntryID          // job id,可以通過該 id 來刪除 job
    Schedule Schedule   // 用於計算 job 下次的執行時間
    Next time.Time      // job 下次執行時間
    Prev time.Time      // job 上次執行時間,沒執行過爲 0
    WrappedJob Job      // 修飾器加工過的 job
    Job Job             // 未經修飾的 job,可以理解爲 AddFunc 的第二個參數
}

5、Cron結構 [5]:關於 Cron 結構,有一些細節,entries 爲何設計爲一個指針 slice

// Cron keeps track of any number of entries, invoking the associated func as
// specified by the schedule. It may be started, stopped, and the entries may
// be inspected while running.
type Cron struct {
    entries   []*Entry          // 所有 Job 集合
    chain     Chain             // 裝飾器鏈
    stop      chan struct{}     // 停止信號
    add       chan *Entry       // 用於異步增加 Entry
    remove    chan EntryID      // 用於異步刪除 Entry
    snapshot  chan chan []Entry
    running   bool              // 是否正在運行
    logger    Logger
    runningMu sync.Mutex        // 運行時鎖
    location  *time.Location    // 時區相關
    parser    Parser            // Cron 解析器
    nextID    EntryID
    jobWaiter sync.WaitGroup    // 併發控制,正在運行的 Job
}

entries 成員

剛纔說到 entries 爲何設計爲指針 slice,原因在於 cron 核心邏輯中,每次循環開始時都會對 Cron.entries 進行排序,排序字段依賴於每個 Entry 結構的 Next 成員,排序依賴於下面的原則:

  1. 按照觸發時間正向排序,越先觸發的越靠前

  2. IsZero 的任務向後面排

  3. 由於可能存在相同週期的任務 Job,所以排序是不穩定的

// byTime is a wrapper for sorting the entry array by time
// (with zero time at the end).
type byTime []*Entry

func (s byTime) Len() int      { return len(s) }
func (s byTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s byTime) Less(i, j int) bool {
 // Two zero times should return false.
 // Otherwise, zero is "greater" than any other time.
 // (To sort it at the end of the list.)
 if s[i].Next.IsZero() {
  return false
 }
 if s[j].Next.IsZero() {
  return true
 }
    // 排序的原則,s[i] 比 s[j] 先觸發
 return s[i].Next.Before(s[j].Next)
}

0x03 內置 JobWrapper 介紹

Recover:捕捉 panic,避免進程異常退出

此 wrapper 比較好理解,在執行內層的 Job 邏輯前,添加 recover() 調用。如果 Job.Run() 執行過程中有 panic。這裏的 recover() 會捕獲到,輸出調用堆棧

// cron.go
func Recover(logger Logger) JobWrapper {
  return func(j Job) Job {
    return FuncJob(func() {
      defer func() {
        if r := recover(); r != nil {
          const size = 64 << 10
          buf := make([]byte, size)
          buf = buf[:runtime.Stack(buf, false)]
          err, ok := r.(error)
          if !ok {
            err = fmt.Errorf("%v", r)
          }
          logger.Error(err, "panic""stack""...\n"+string(buf))
        }
      }()
      j.Run()
    })
  }
}

DelayIfStillRunning

實現了已有任務運行推遲的邏輯。核心是通過一個(任務共用的)互斥鎖 sync.Mutex,每次執行任務前獲取鎖,執行結束之後釋放鎖。所以在上一個任務結束前,下一個任務獲取鎖會阻塞,從而保證的任務的串行執行。

// chain.go
func DelayIfStillRunning(logger Logger) JobWrapper {
  return func(j Job) Job {
    var mu sync.Mutex
    return FuncJob(func() {
      start := time.Now()
      // 下一個任務阻塞等待獲取鎖
      mu.Lock()
      defer mu.Unlock()
      if dur := time.Since(start); dur > time.Minute {
        logger.Info("delay""duration", dur)
      }
      j.Run()
    })
  }
}

SkipIfStillRunning

DelayIfStillRunning 機制不一樣,該方法是跳過執行,通過無緩衝 channel 機制實現。執行任務時,從通道中取值,如果成功,執行,否則跳過。執行完成之後再向通道中發送一個值,確保下一個任務能執行。初始發送一個值到通道中,保證第一個任務的執行。

func SkipIfStillRunning(logger Logger) JobWrapper {
  return func(j Job) Job {
    // 定義一個無緩衝 channel
    var ch = make(chan struct{}, 1)
    ch <- struct{}{}
    return FuncJob(func() {
      select {
      case v := <-ch:
        j.Run()
        ch <- v
      default:
        logger.Info("skip")
      }
    })
  }
}

0x04 核心方法分析

AddJob 方法

AddJob 方法通過兩種方法將任務節點 entry 添加到 Cron.entries 中:

  1. 初始化時,直接 append

  2. 運行狀態下,通過 channel 方式異步添加,避免加鎖

// AddJob adds a Job to the Cron to be run on the given schedule.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
 schedule, err := c.parser.Parse(spec)
 if err != nil {
  return 0, err
 }
 return c.Schedule(schedule, cmd), nil
}

// Schedule adds a Job to the Cron to be run on the given schedule.
// The job is wrapped with the configured Chain.
func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
 c.runningMu.Lock()
 defer c.runningMu.Unlock()
 c.nextID++
 entry := &Entry{
  ID:         c.nextID,
  Schedule:   schedule,
  WrappedJob: c.chain.Then(cmd),
  Job:        cmd,
 }
 if !c.running {
        // 直接加
  c.entries = append(c.entries, entry)
 } else {
        // 異步
  c.add <- entry
 }
 return entry.ID
}

run 方法

run 方法

cron 的核心 run() 方法的實現如下,這個是很經典的 for-select 異步處理模型,避免的對 entries 加鎖,非常值得借鑑。其核心有如下幾點:

  1. 一個定時任務(集)的實現,內部採用排序數組,取數組首位元素的時間作爲timer觸發時間(感覺可以優化爲最小堆?)
  1. Cron內部數據結構的維護,採用channel實現無鎖機制,缺點是可能會有誤差(ms 級),不過在此項目是能夠容忍的,以 Job

    異步添加爲例(運行中添加entry,走異步方式,有duration的延遲):

func (c *Cron) run() {
    c.logger.Info("start")

    // 初始化,計算每個 Job 下次的執行時間
    now := c.now()
    for _, entry := range c.entries {
        entry.Next = entry.Schedule.Next(now)
        c.logger.Info("schedule""now", now, "entry", entry.ID, "next", entry.Next)
    }

    // 在 dead loop,進行任務調度
    for {
        // 根據下一次的執行時間,對所有 Job 排序
        sort.Sort(byTime(c.entries))

        // 計時器,用於沒有任務可調度時的阻塞操作
        var timer *time.Timer
        if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
            // 無任務可調度,設置計時器到一個很大的值,把下面的 for 阻塞住
            timer = time.NewTimer(100000 * time.Hour)
        } else {
            // 有任務可調度了,計時器根據第一個可調度任務的下次執行時間設置
            // 排過序,所以第一個肯定是最先被執行的
            timer = time.NewTimer(c.entries[0].Next.Sub(now))
        }

        for {
            select {
            // 有 Job 到了執行時間
            case now = <-timer.C:
                now = now.In(c.location)
                c.logger.Info("wake""now", now)
                // 檢查所有 Job,執行到時的任務
                for _, e := range c.entries {
                    // 可能存在相同時間出發的任務
                    if e.Next.After(now) || e.Next.IsZero() {
                        // 後面都不需要遍歷了!
                        break
                    }
                    // 執行 Job 的 func()
                    c.startJob(e.WrappedJob)

                    // 保存上次執行時間
                    e.Prev = e.Next
                    // 設置 Job 下次的執行時間
                    e.Next = e.Schedule.Next(now)
                    c.logger.Info("run""now", now, "entry", e.ID, "next", e.Next)
                }

            // 添加新 Job
            case newEntry := <-c.add:
                timer.Stop()        // 必須注意,這裏停止定時器,避免內存泄漏!
                now = c.now()
                newEntry.Next = newEntry.Schedule.Next(now)
                c.entries = append(c.entries, newEntry)
                c.logger.Info("added""now", now, "entry", newEntry.ID, "next", newEntry.Next)

            // 獲取所有 Job 的快照
            case replyChan := <-c.snapshot:
                replyChan <- c.entrySnapshot()
                continue

            // 停止調度
            case <-c.stop:
                timer.Stop()
                c.logger.Info("stop")
                return

            // 根據 entryId 刪除一個 Job
            case id := <-c.remove:
                timer.Stop()
                now = c.now()
                c.removeEntry(id)
                c.logger.Info("removed""entry", id)
            }

            break
        }
    }
}

上述的代碼的核心流程如下圖:

image

0x05 小結

本文分析了基於 Golang 實現的單機定時任務庫。

0x06 參考

參考資料

[1]

cron: https://github.com/robfig/cron/

[2]

此文: https://segmentfault.com/a/1190000023029219

[3]

實現: https://github.com/robfig/cron/blob/v3/constantdelay.go

[4]

實現: https://pandaychen.github.io/2021/10/05/A-GOLANG-CRONTAB-V3-BASIC-INTRO/

[5]

結構: https://github.com/robfig/cron/blob/v3/cron.go#L13

[6]

golang cron v3 定時任務: https://blog.cugxuan.cn/2020/06/04/Go/golang-cron-v3/

[7]

v3-repo: https://github.com/robfig/cron/tree/v3

[8]

Go 每日一庫之 cron: https://segmentfault.com/a/1190000023029219

[9]

GO 編程模式:修飾器: https://coolshell.cn/articles/17929.html

歡迎關注 Go 生態。生態君會不定期分享 Go 語言生態相關內容。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/MvmFjPKIhO-B6bO8DzN_zA