Go 定時任務筆記

Go Timer,Ticker,Sleep

time.Timer

time.Timer 是一個單次的定時器,在指定時間後觸發一次後就不再重複。雖然說觸發後不會再次觸發,但資源不會自己就釋放了,需要調用Stop()方法來釋放資源否則還在內存中。在 Timer 結束後可以用Reset()方法重置計時器 (要在 Stop() 之後或未觸發時使用)。基於這種特性 Timer 一般用於延遲執行或超時控制的場景,如 1. 網絡請求的超時控制 (如 context.WithTimeout(context.Backgroud(), x*time.Second) 是基於 time.Timer 實現的),2. 延遲執行服務等。

time.Ticker

time.Ticker 是按照固定時間間隔重複觸發的定時器,類似地使用ticker.Stop()停止週期性觸發,也可以使用Reset()動態調整 Ticker 的時間間隔,會將當前 Ticker 停止並設置新的時間間隔,時間間隔一定要大於 0 不然會 panic。基於 Ticker 的特性一般用於固定時間間隔執行重複的操作,如定時任務、心跳檢測等。

time.Sleep

    time.Sleep 用於暫停當前協程執行一段時間的方法(使當前協程休眠,期間不佔用 CPU 資源);可以在測試的時候模擬網絡請求延遲。

    // 創建每1s觸發一次的Ticker
    ticker := time.NewTicker(1 * time.Second)

    // 創建倒計時5s的timer
    timer := time.NewTimer(5 * time.Second)

    gofunc() {
        log.Println("2s 開始")
        time.Sleep(2 * time.Second)
        log.Println("2s 結束")
    }()

    done := make(chanbool)

    gofunc() {
        for {
            select {
            case <-ticker.C:
                log.Println("ticker 1s定時器")
            case <-timer.C:
                log.Println("5s到了")
                ticker.Stop()
                done <- true
                return

            }
        }
    }()

    <-done
    log.Println("done")

robfig/cron 庫

robfig/cron 概述

robfig/cron是 Go 中主流的定時任務調度庫,能用於執行週期性任務。robig/cron使用 Corn 表達式指定執行時間、能指定時區執行,任務默認在獨立的 gorutine 中運行,下面來簡單介紹一下。

首先要了解一下Cron表達式,Cron表達式由 5 或 6 個字段組成,5 字段表達式精確到分鐘、6 字段表達式精確到秒。從左到右分別是秒 / 分鐘 / 小時 / 日 / 月 / 周,

# 字段定義
┌───────────── 秒 (0-59)
│ ┌───────────── 分鐘 (0-59)
│ │ ┌───────────── 小時 (0-23)
│ │ │ ┌───────────── 日 (1-31)
│ │ │ │ ┌───────────── 月 (1-12 或 JAN-DEC)
│ │ │ │ │ ┌───────────── 周 (0-6 或 SUN-SAT)
│ │ │ │ │ │
│ │ │ │ │ │
* * * * * *

然後是特殊字符的含義:

如果只是常見定時用法 cron 也實現了部分的語法糖 (預定義時間表),如下表

SzOL0F

robfig/cron 安裝與使用

  1.  創建 Cron 調度器,調度器也有分幾種:分鐘級調度器, 秒級調度器, 日誌和錯誤處理的調度器等等。
// 基本調度器(分鐘級精度)
c := cron.New()

// 支持秒級精度的調度器
c := cron.New(cron.WithSeconds())

// 日誌和錯誤處理的調度器
c := cron.New(
    cron.WithSeconds(),
    cron.WithLogger(cron.DefaultLogger),
    cron.WithChain(cron.Recover(cron.DefaultLogger)),
)
  1.  定義任務並將任務添加到調度器。有AddFuncAddJob兩種添加任務的方法。AddFunc直接傳函數,AddJob傳入Job接口對象 (定義任務結構體)。AddFunc 用起來更簡單方便,AddJob 更有擴展性。

    func main(){
      ....
          c.AddJob("* * * * * *", MyJob{
            logger: log.Default(),
            })
        ....
    }
        
    type MyJob struct {
        logger *log.Logger
    }
        
    // 實現Job接口的Run方法
    func (j MyJob) Run() {
        j.logger.Println("GOGOGO , 開始執行job")
    }
    

  1. 啓動調度器。可以使用c.Start()c.Run()啓用調度器。一般使用c.Start()這種非阻塞的方法,Run 方法運行會阻塞方式。(c.Start + select{} = c.Run 的意思)

  2. 最後關閉調度器。

func main() {
  // 創建一個新的調度器
    c := cron.New(cron.WithSeconds())

    // 添加任務
    c.AddFunc("* * * * * *", Task)
    c.Start()
    defer c.Stop()
    select {}
}

func Task() {
    fmt.Println("執行任務 GOGOGO出發嘍:", time.Now())
}

一個簡易的定時任務調度器

我們可以基於 cron 庫、Gin 框架和 Mysql 做一個簡易的定時任務系統,通過 api 接口動態添加、修改和刪除,啓動或暫停定時任務,獲取每個定時任務的啓動。這裏只演示添加任務,將 MySQL 的任務添加到 cron 中執行一個打印時間的任務,再將其暫停 (剩餘部分都差不多了);大致的流程如下圖:

項目結構

router,controller,service,dao 層在 Gin 的項目中都很熟悉了,主要的區別是多了taskscheduler這兩部分,task主要是將程序中的函數或腳本註冊,與 MySQL 中的 TaskName 一一對應。scheduler負責將數據庫中的 Task 添加到 cron 中,控制任務啓動暫停或刪除。

.
├── controller #controller層
│   └── task.go
├── dao  #操作數據
│   └── task.go
├── go.mod
├── go.sum
├── init #數據庫初始化
│   └── mysql.go
├── job # 放執行的函數和腳本
│   ├── fun
│   │   └── job.go
│   └── script
│       └── sayhi.sh
├── main.go 
├── model
│   └── task.go  
├── router #路由註冊
│   └── router.go
├── scheduler # cron調度器
│   └── scheduler.go
├── service #service層
│   └── task.go
└── tasks # task註冊
    └── registry.go

數據庫表

任務表結構——存儲關於任務的 id、任務名稱、cron 表達式、執行命令、任務狀態。

type TbTasks struct {
    ID        uint      `json:"id" `
    Name      string    `json:"name" gorm:"column:name"`
    CronExpr  string    `json:"cronExpr" gorm:"column:cron_expr"`
    TaskType  string    `json:"taskType" gorm:"column:task_type"` // 任務類型,腳本/函數 
    TaskName  string    `json:"taskName" gorm:"column:task_name"` // 執行任務名稱
    Status    int       `json:"status"`
    CreatedAt time.Time `json:"createdAt" gorm:"column:created_at"`
    UpdatedAt time.Time `json:"updatedAt" gorm:"column:updated_at"`
}

添加任務

就是正常流程,從 router->controller->service->dao 層 (將任務狀態寫成 2 未啓動),其中 service 層要對cron表達式做校驗判斷其是否正確。

// model 
type AddTaskReq struct {
    Name     string`json:"name"`
    CronExpr string`json:"cronExpr"`
    TaskType string`json:"taskType"`
    TaskName string`json:"taskName"`
}

// 註冊路由
func RegisterTaskRouter(srv service.TaskService) *gin.Engine {
  r := gin.Default()
    tc := controller.NewTaskController(srv)
    v1 := r.Group("/task")
    {
        // 任務部分
        v1.POST("/add", tc.AddTask)
  ...
   return r
 }

//controller層
func (tc *TaskController) AddTask(c *gin.Context) {
    req := model.AddTaskReq{}
    if err := c.ShouldBindJSON(&req); err != nil {
        c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
        return
    }
    task := model.TbTasks{
        Name:     req.Name,
        CronExpr: req.CronExpr,
        TaskType: req.TaskType,
        TaskName: req.TaskName,
    }
    if err := tc.srv.CreateTask(task); err != nil {
        c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
        return
    }
    c.JSON(http.StatusOK, gin.H{"msg": "task added"})
}

// service層
func (t taskService) CreateTask(task model.TbTasks) (err error) {
// 對Cron表達式進行校驗,首先構建解析器
    parser := cron.NewParser(
        cron.SecondOptional | // 允許秒字段(可選)
            cron.Minute | // 分鐘字段
            cron.Hour | // 小時字段
            cron.Dom | // 日期字段
            cron.Month | // 月份字段
            cron.Dow | // 星期字段
            cron.Descriptor, // 支持描述符,如@daily, @weekly等
    )
    
// 解析表達式
    _, err = parser.Parse(task.CronExpr)
    if err != nil {
        log.Println("cron expr error: %v\n", err)
        return err
    }

    task.Status = 0
    if err = t.repo.InsertTask(task); err != nil {
        return err
    }
    returnnil
}

// dao層
func (dao GromTaskDAO) InsertTask(task model.TbTasks) (err error) {
    result := dao.DB.Table("tasks").Create(&task)
    if result.Error != nil {
        return result.Error
    }
    returnnil
}

啓動 cron 任務

這裏的邏輯是將 MySQL 中status是 0 的任務當成是已啓動的任務,當程序啓動時都要添加到cron中執行。

首先是對於 Task 的註冊,TaskExecutor 定義了寫到註冊表的任務要符合的類型,taskID對應 MySQL 中任務的ID。其中taskID使用uint類型主要是爲了避免傳入負數,其次是uint的整數範圍更大。TaskResgistry一是用 map 來存儲任務名稱與執行函數之間的映射,二是用讀寫鎖是因爲map並不是併發安全,多個 goroutine 同時讀寫會 panic 掉。

// tasks/registry.go
type TaskExecutor func(taskID uint)error

type TaskRegistry struct {
    executors map[string]TaskExecutor // 任務名稱與執行函數的映射
    mu        sync.RWMutex                        // 加讀寫鎖
}

func NewTaskRegistry() *TaskRegistry {
    return &TaskRegistry{
        executors: make(map[string]TaskExecutor),
    }
}

func (tr *TaskRegistry) Register(taskName string, executor TaskExecutor) {
    tr.mu.Lock()
    defer tr.mu.Unlock()
    tr.executors[taskName] = executor
}

// Get 獲取任務執行函數
func (tr *TaskRegistry) Get(taskName string) (TaskExecutor, bool) {
    tr.mu.RLock()
    defer tr.mu.RUnlock()
    executor, exists := tr.executors[taskName]
    return executor, exists
}

// 在主函數中註冊
func main(){
  ...
  tr := tasks.NewTaskRegistry()
    tr.Register("sayhi", fun.ExecTimeKeeping)
    ...
}

// ExecTimeKeeping函數定義
func ExecTimeKeeping(taskID uint) (err error) {
    log.Println("正在執行報時任務", time.Now().Format("2006-01-02 15:04:05"))
    returnnil
}

Task 註冊完成後就要看 cron 調度器了,這裏先用添加執行任務作爲例子,大致流程如下:

  1.  初始化一個 cron scheduler 調度器。(NewCronScheduler...)

  2.  在數據庫中找出狀態是 "啓動" 的任務,並檢查一遍cron表達式是否正確。

  3.  在註冊表中找對應的TaskName方法執行。

// scheduler.go
type CronScheduler struct {
    cron      *cron.Cron                        // cron調度器
    taskDAO   dao.TaskDAO                        // 與MySQL交互
    registry  *tasks.TaskRegistry  //上面提到的註冊表
    running   bool                                    // 調度器運行狀態
    mu        sync.RWMutex                    // 讀寫鎖
    taskEntry map[uint]cron.EntryID // 任務ID -> Cron EntryID
}

// 創建Cron調度器
func NewCronScheduler(taskDAO dao.TaskDAO, registry *tasks.TaskRegistry) *CronScheduler {
    return &CronScheduler{
        cron:      cron.New(cron.WithSeconds()),
        taskDAO:   taskDAO,
        registry:  registry,
        running:   false,
        taskEntry: make(map[uint]cron.EntryID),
    }
}

// 啓動調度器,加載正在執行的任務
func (s *CronScheduler) Start(ctx context.Context) (err error) {
    s.mu.Lock()
    defer s.mu.Unlock()

    if s.running {
        returnnil
    }

    acTasklist, acErr := s.taskDAO.GetActiveTasks()
    log.Println(acTasklist)
    if acErr != nil {
        log.Println("Get Active Tasks Error:", acErr)
    return err
    }

    log.Println("Add Tasks:", len(acTasklist))
    for _, task := range acTasklist {
        if atErr := s.addTaskToCron(task); atErr != nil {
            log.Println("Add Task Error:", err)
            return err
        }
        log.Println(task.ID, task.Name, task.TaskType, task.CronExpr)
    }

    s.cron.Start()
    s.running = true

    gofunc() {
        <-ctx.Done()
        s.Stop()
    }()

    returnnil

}

// Stop 停止調度器
func (s *CronScheduler) Stop() {
    s.mu.Lock()
    defer s.mu.Unlock()

    if !s.running {
        return
    }

    s.cron.Stop()
    s.running = false
    s.taskEntry = make(map[uint]cron.EntryID)
}

func (s *CronScheduler) RemoveTask(taskID uint) (err error) {
    s.mu.Lock()
    defer s.mu.Unlock()

    // 檢查任務是否存在
    entryID, exists := s.taskEntry[taskID]
    if !exists {
        log.Println("任務不存在", taskID)
        returnnil
    }
    s.cron.Remove(entryID)
    delete(s.taskEntry, taskID) // 刪除map中的字段

    log.Println("已停止任務", taskID)
    returnnil
}

// 添加Task到Cron中
func (s *CronScheduler) addTaskToCron(task *model.TbTasks) (err error) {

    // 創建cron解析器
    parser := cron.NewParser(
        cron.SecondOptional |
            cron.Minute |
            cron.Hour |
            cron.Dom |
            cron.Month |
            cron.Dow |
            cron.Descriptor,
    )

    schedule, err := parser.Parse(task.CronExpr)
    if err != nil {
        log.Println("cron expr error: %v\n", err)
        return err
    }

    // 獲取執行的 TaskName
    executor, exists := s.registry.Get(task.TaskName)
    if !exists {
        log.Println("沒有這個任務", task.TaskName)
        return
    }

    entryID := s.cron.Schedule(schedule, cron.FuncJob(func() {
        s.execTask(task, executor)
    }))

    s.taskEntry[task.ID] = entryID
    returnnil
}

// 執行任務
func (s *CronScheduler) execTask(task *model.TbTasks, exec tasks.TaskExecutor) {
    log.Println("執行任務", task.Name)
    //task.Status = 1
    if err := s.taskDAO.UpdateTask(task); err != nil {
        log.Println("Update Task Error:", err)
        return
    }

    if err := exec(task.ID); err != nil {
        log.Println("exec Task Error:", err)
        return
    }
}

然後是暫停任務,首先在註冊表檢查一下有沒有這個任務,然後用cron刪除掉該任務,最後再清理 Task 註冊表、修改 MySQL 中該任務的狀態。

// servuce層
func (s *CronScheduler) RemoveTask(taskID uint) (err error) {
    s.mu.Lock()
    defer s.mu.Unlock()

    // 檢查任務是否存在
    entryID, exists := s.taskEntry[taskID]
    if !exists {
        log.Println("任務不存在", taskID)
        returnnil
    }
    s.cron.Remove(entryID)
    delete(s.taskEntry, taskID) // 刪除map中的字段

    log.Println("已停止任務", taskID)
    returnnil
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/YL08X940pqxx8t72y68DJA