Go 定時任務筆記
Go Timer,Ticker,Sleep
time.Timer
time.Timer 是一個單次的定時器,在指定時間後觸發一次後就不再重複。雖然說觸發後不會再次觸發,但資源不會自己就釋放了,需要調用Stop()
方法來釋放資源否則還在內存中。在 Timer 結束後可以用Reset()
方法重置計時器 (要在 Stop() 之後或未觸發時使用)。基於這種特性 Timer 一般用於延遲執行或超時控制的場景,如 1. 網絡請求的超時控制 (如 context.WithTimeout(context.Backgroud(), x*time.Second) 是基於 time.Timer 實現的),2. 延遲執行服務等。
time.Ticker
time.Ticker 是按照固定時間間隔重複觸發的定時器,類似地使用ticker.Stop()
停止週期性觸發,也可以使用Reset()
動態調整 Ticker 的時間間隔,會將當前 Ticker 停止並設置新的時間間隔,時間間隔一定要大於 0 不然會 panic。基於 Ticker 的特性一般用於固定時間間隔執行重複的操作,如定時任務、心跳檢測等。
time.Sleep
time.Sleep 用於暫停當前協程執行一段時間的方法(使當前協程休眠,期間不佔用 CPU 資源);可以在測試的時候模擬網絡請求延遲。
// 創建每1s觸發一次的Ticker
ticker := time.NewTicker(1 * time.Second)
// 創建倒計時5s的timer
timer := time.NewTimer(5 * time.Second)
gofunc() {
log.Println("2s 開始")
time.Sleep(2 * time.Second)
log.Println("2s 結束")
}()
done := make(chanbool)
gofunc() {
for {
select {
case <-ticker.C:
log.Println("ticker 1s定時器")
case <-timer.C:
log.Println("5s到了")
ticker.Stop()
done <- true
return
}
}
}()
<-done
log.Println("done")
robfig/cron 庫
robfig/cron 概述
robfig/cron
是 Go 中主流的定時任務調度庫,能用於執行週期性任務。robig/cron
使用 Corn 表達式指定執行時間、能指定時區執行,任務默認在獨立的 gorutine 中運行,下面來簡單介紹一下。
首先要了解一下Cron
表達式,Cron
表達式由 5 或 6 個字段組成,5 字段表達式精確到分鐘、6 字段表達式精確到秒。從左到右分別是秒 / 分鐘 / 小時 / 日 / 月 / 周,
# 字段定義
┌───────────── 秒 (0-59)
│ ┌───────────── 分鐘 (0-59)
│ │ ┌───────────── 小時 (0-23)
│ │ │ ┌───────────── 日 (1-31)
│ │ │ │ ┌───────────── 月 (1-12 或 JAN-DEC)
│ │ │ │ │ ┌───────────── 周 (0-6 或 SUN-SAT)
│ │ │ │ │ │
│ │ │ │ │ │
* * * * * *
然後是特殊字符的含義:
-
*
是 "每" 的意思,因此* * * * *
就是每分鐘執行的意思。 -
/
是 "每隔" 的意思,如*/5 * * * *
就是每隔 5 分鐘執行的意思。 -
L
是 Last 最後一天的意思。如0 0 L * *
就是每個月的最後一天執行的意思, -
W
是 Weekday,最近的工作日的意思。如0 0 5W * *
就是每個月第 5 天最近的工作日。 -
#
是第幾個周幾的意思,如0 0 * * 1#2
表示第 2 個週一的意思。
如果只是常見定時用法 cron 也實現了部分的語法糖 (預定義時間表),如下表
robfig/cron 安裝與使用
- 使用指令安裝 cron:
go get github.com/robfig/cron/v3
一般來說使用 cron 有以下幾個步驟:
- 創建 Cron 調度器,調度器也有分幾種:分鐘級調度器, 秒級調度器, 日誌和錯誤處理的調度器等等。
// 基本調度器(分鐘級精度)
c := cron.New()
// 支持秒級精度的調度器
c := cron.New(cron.WithSeconds())
// 日誌和錯誤處理的調度器
c := cron.New(
cron.WithSeconds(),
cron.WithLogger(cron.DefaultLogger),
cron.WithChain(cron.Recover(cron.DefaultLogger)),
)
-
定義任務並將任務添加到調度器。有
AddFunc
和AddJob
兩種添加任務的方法。AddFunc
直接傳函數,AddJob
傳入Job
接口對象 (定義任務結構體)。AddFunc 用起來更簡單方便,AddJob 更有擴展性。func main(){ .... c.AddJob("* * * * * *", MyJob{ logger: log.Default(), }) .... } type MyJob struct { logger *log.Logger } // 實現Job接口的Run方法 func (j MyJob) Run() { j.logger.Println("GOGOGO , 開始執行job") }
-
啓動調度器。可以使用
c.Start()
或c.Run()
啓用調度器。一般使用c.Start()
這種非阻塞的方法,Run 方法運行會阻塞方式。(c.Start + select{} = c.Run 的意思) -
最後關閉調度器。
func main() {
// 創建一個新的調度器
c := cron.New(cron.WithSeconds())
// 添加任務
c.AddFunc("* * * * * *", Task)
c.Start()
defer c.Stop()
select {}
}
func Task() {
fmt.Println("執行任務 GOGOGO出發嘍:", time.Now())
}
一個簡易的定時任務調度器
我們可以基於 cron 庫、Gin 框架和 Mysql 做一個簡易的定時任務系統,通過 api 接口動態添加、修改和刪除,啓動或暫停定時任務,獲取每個定時任務的啓動。這裏只演示添加任務,將 MySQL 的任務添加到 cron 中執行一個打印時間的任務,再將其暫停 (剩餘部分都差不多了);大致的流程如下圖:
項目結構
router,controller,service,dao 層在 Gin 的項目中都很熟悉了,主要的區別是多了task
和scheduler
這兩部分,task
主要是將程序中的函數或腳本註冊,與 MySQL 中的 TaskName 一一對應。scheduler
負責將數據庫中的 Task 添加到 cron 中,控制任務啓動暫停或刪除。
.
├── controller #controller層
│ └── task.go
├── dao #操作數據
│ └── task.go
├── go.mod
├── go.sum
├── init #數據庫初始化
│ └── mysql.go
├── job # 放執行的函數和腳本
│ ├── fun
│ │ └── job.go
│ └── script
│ └── sayhi.sh
├── main.go
├── model
│ └── task.go
├── router #路由註冊
│ └── router.go
├── scheduler # cron調度器
│ └── scheduler.go
├── service #service層
│ └── task.go
└── tasks # task註冊
└── registry.go
數據庫表
任務表結構——存儲關於任務的 id、任務名稱、cron 表達式、執行命令、任務狀態。
type TbTasks struct {
ID uint `json:"id" `
Name string `json:"name" gorm:"column:name"`
CronExpr string `json:"cronExpr" gorm:"column:cron_expr"`
TaskType string `json:"taskType" gorm:"column:task_type"` // 任務類型,腳本/函數
TaskName string `json:"taskName" gorm:"column:task_name"` // 執行任務名稱
Status int `json:"status"`
CreatedAt time.Time `json:"createdAt" gorm:"column:created_at"`
UpdatedAt time.Time `json:"updatedAt" gorm:"column:updated_at"`
}
添加任務
就是正常流程,從 router->controller->service->dao 層 (將任務狀態寫成 2 未啓動),其中 service 層要對cron
表達式做校驗判斷其是否正確。
// model
type AddTaskReq struct {
Name string`json:"name"`
CronExpr string`json:"cronExpr"`
TaskType string`json:"taskType"`
TaskName string`json:"taskName"`
}
// 註冊路由
func RegisterTaskRouter(srv service.TaskService) *gin.Engine {
r := gin.Default()
tc := controller.NewTaskController(srv)
v1 := r.Group("/task")
{
// 任務部分
v1.POST("/add", tc.AddTask)
...
return r
}
//controller層
func (tc *TaskController) AddTask(c *gin.Context) {
req := model.AddTaskReq{}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
task := model.TbTasks{
Name: req.Name,
CronExpr: req.CronExpr,
TaskType: req.TaskType,
TaskName: req.TaskName,
}
if err := tc.srv.CreateTask(task); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"msg": "task added"})
}
// service層
func (t taskService) CreateTask(task model.TbTasks) (err error) {
// 對Cron表達式進行校驗,首先構建解析器
parser := cron.NewParser(
cron.SecondOptional | // 允許秒字段(可選)
cron.Minute | // 分鐘字段
cron.Hour | // 小時字段
cron.Dom | // 日期字段
cron.Month | // 月份字段
cron.Dow | // 星期字段
cron.Descriptor, // 支持描述符,如@daily, @weekly等
)
// 解析表達式
_, err = parser.Parse(task.CronExpr)
if err != nil {
log.Println("cron expr error: %v\n", err)
return err
}
task.Status = 0
if err = t.repo.InsertTask(task); err != nil {
return err
}
returnnil
}
// dao層
func (dao GromTaskDAO) InsertTask(task model.TbTasks) (err error) {
result := dao.DB.Table("tasks").Create(&task)
if result.Error != nil {
return result.Error
}
returnnil
}
啓動 cron 任務
這裏的邏輯是將 MySQL 中status
是 0 的任務當成是已啓動的任務,當程序啓動時都要添加到cron
中執行。
首先是對於 Task 的註冊,TaskExecutor 定義了寫到註冊表的任務要符合的類型,taskID
對應 MySQL 中任務的ID
。其中taskID
使用uint
類型主要是爲了避免傳入負數,其次是uint
的整數範圍更大。TaskResgistry
一是用 map 來存儲任務名稱與執行函數之間的映射,二是用讀寫鎖是因爲map
並不是併發安全,多個 goroutine 同時讀寫會 panic 掉。
// tasks/registry.go
type TaskExecutor func(taskID uint)error
type TaskRegistry struct {
executors map[string]TaskExecutor // 任務名稱與執行函數的映射
mu sync.RWMutex // 加讀寫鎖
}
func NewTaskRegistry() *TaskRegistry {
return &TaskRegistry{
executors: make(map[string]TaskExecutor),
}
}
func (tr *TaskRegistry) Register(taskName string, executor TaskExecutor) {
tr.mu.Lock()
defer tr.mu.Unlock()
tr.executors[taskName] = executor
}
// Get 獲取任務執行函數
func (tr *TaskRegistry) Get(taskName string) (TaskExecutor, bool) {
tr.mu.RLock()
defer tr.mu.RUnlock()
executor, exists := tr.executors[taskName]
return executor, exists
}
// 在主函數中註冊
func main(){
...
tr := tasks.NewTaskRegistry()
tr.Register("sayhi", fun.ExecTimeKeeping)
...
}
// ExecTimeKeeping函數定義
func ExecTimeKeeping(taskID uint) (err error) {
log.Println("正在執行報時任務", time.Now().Format("2006-01-02 15:04:05"))
returnnil
}
Task 註冊完成後就要看 cron 調度器了,這裏先用添加執行任務作爲例子,大致流程如下:
-
初始化一個 cron scheduler 調度器。(NewCronScheduler...)
-
在數據庫中找出狀態是 "啓動" 的任務,並檢查一遍
cron
表達式是否正確。 -
在註冊表中找對應的
TaskName
方法執行。
// scheduler.go
type CronScheduler struct {
cron *cron.Cron // cron調度器
taskDAO dao.TaskDAO // 與MySQL交互
registry *tasks.TaskRegistry //上面提到的註冊表
running bool // 調度器運行狀態
mu sync.RWMutex // 讀寫鎖
taskEntry map[uint]cron.EntryID // 任務ID -> Cron EntryID
}
// 創建Cron調度器
func NewCronScheduler(taskDAO dao.TaskDAO, registry *tasks.TaskRegistry) *CronScheduler {
return &CronScheduler{
cron: cron.New(cron.WithSeconds()),
taskDAO: taskDAO,
registry: registry,
running: false,
taskEntry: make(map[uint]cron.EntryID),
}
}
// 啓動調度器,加載正在執行的任務
func (s *CronScheduler) Start(ctx context.Context) (err error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.running {
returnnil
}
acTasklist, acErr := s.taskDAO.GetActiveTasks()
log.Println(acTasklist)
if acErr != nil {
log.Println("Get Active Tasks Error:", acErr)
return err
}
log.Println("Add Tasks:", len(acTasklist))
for _, task := range acTasklist {
if atErr := s.addTaskToCron(task); atErr != nil {
log.Println("Add Task Error:", err)
return err
}
log.Println(task.ID, task.Name, task.TaskType, task.CronExpr)
}
s.cron.Start()
s.running = true
gofunc() {
<-ctx.Done()
s.Stop()
}()
returnnil
}
// Stop 停止調度器
func (s *CronScheduler) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
if !s.running {
return
}
s.cron.Stop()
s.running = false
s.taskEntry = make(map[uint]cron.EntryID)
}
func (s *CronScheduler) RemoveTask(taskID uint) (err error) {
s.mu.Lock()
defer s.mu.Unlock()
// 檢查任務是否存在
entryID, exists := s.taskEntry[taskID]
if !exists {
log.Println("任務不存在", taskID)
returnnil
}
s.cron.Remove(entryID)
delete(s.taskEntry, taskID) // 刪除map中的字段
log.Println("已停止任務", taskID)
returnnil
}
// 添加Task到Cron中
func (s *CronScheduler) addTaskToCron(task *model.TbTasks) (err error) {
// 創建cron解析器
parser := cron.NewParser(
cron.SecondOptional |
cron.Minute |
cron.Hour |
cron.Dom |
cron.Month |
cron.Dow |
cron.Descriptor,
)
schedule, err := parser.Parse(task.CronExpr)
if err != nil {
log.Println("cron expr error: %v\n", err)
return err
}
// 獲取執行的 TaskName
executor, exists := s.registry.Get(task.TaskName)
if !exists {
log.Println("沒有這個任務", task.TaskName)
return
}
entryID := s.cron.Schedule(schedule, cron.FuncJob(func() {
s.execTask(task, executor)
}))
s.taskEntry[task.ID] = entryID
returnnil
}
// 執行任務
func (s *CronScheduler) execTask(task *model.TbTasks, exec tasks.TaskExecutor) {
log.Println("執行任務", task.Name)
//task.Status = 1
if err := s.taskDAO.UpdateTask(task); err != nil {
log.Println("Update Task Error:", err)
return
}
if err := exec(task.ID); err != nil {
log.Println("exec Task Error:", err)
return
}
}
然後是暫停任務,首先在註冊表檢查一下有沒有這個任務,然後用cron
刪除掉該任務,最後再清理 Task 註冊表、修改 MySQL 中該任務的狀態。
// servuce層
func (s *CronScheduler) RemoveTask(taskID uint) (err error) {
s.mu.Lock()
defer s.mu.Unlock()
// 檢查任務是否存在
entryID, exists := s.taskEntry[taskID]
if !exists {
log.Println("任務不存在", taskID)
returnnil
}
s.cron.Remove(entryID)
delete(s.taskEntry, taskID) // 刪除map中的字段
log.Println("已停止任務", taskID)
returnnil
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/YL08X940pqxx8t72y68DJA