Go 構建基礎的事件調度器

當我們需要在一段時間後的特定時間或間隔運行任務時,我們需要使用任務調度系統來運行任務:例如發送電子郵件、推送通知、午夜關閉賬戶、清空表格等。

在本文中,我們將構建一個基本的事件調度程序,使用數據庫作爲持久層來調度事件在特定時間段運行,這將使我們瞭解事件調度系統的工作原理。

基本的工作機制是:

每當我們需要調度事件時,計劃作業就會添加到數據庫中以在特定時間運行。

另一個任務始終定期運行以檢查數據庫中的某些任務是否已過期, 如果在數據庫中發現已過期任務(輪詢)則運行計劃作業。

讓我們從創建用於存儲事件的數據庫(在 postgresql 中)開始:

CREATE TABLE IF NOT EXISTS "public"."jobs" (     
   "id"      SERIAL PRIMARY KEY,     
   "name"    varchar(50) NOT NULL,     
   "payload" text,     
   "runAt"   TIMESTAMP NOT NULL    
)

現在,我們來定義數據結構:

// Listeners has attached event listeners
type Listeners map[string]ListenFunc

// ListenFunc function that listens to events
type ListenFunc func(string)

// Event structure
type Event struct {
 ID      uint
 Name    string
 Payload string
}

還需要定義 Scheduler 結構,用於調度事件和運行偵聽器:

// Scheduler data structure
type Scheduler struct {
 db        *sql.DB
 listeners Listeners
}

// NewScheduler creates a new scheduler
func NewScheduler(db *sql.DB, listeners Listeners) Scheduler {
 return Scheduler{
  db:        db,
  listeners: listeners,
 }
}

在第 8 行到第 13 行中,我們通過將 sql.DB 實例和初始偵聽器傳遞給調度程序來創建新的調度程序。

現在,我們實現調度函數,並將我們的事件插入到 jobs 表中:

// Schedule sechedules the provided events
func (s Scheduler) Schedule(event string, payload string, runAt time.Time) {
 log.Print("🚀 Scheduling event ", event, " to run at ", runAt)
 _, err := s.db.Exec(`INSERT INTO "public"."jobs" ("name", "payload", "runAt") VALUES ($1, $2, $3)`, event, payload, runAt)
 if err != nil {
  log.Print("schedule insert error: ", err)
 }
}

// AddListener adds the listener function to Listeners
func (s Scheduler) AddListener(event string, listenFunc ListenFunc) {
 s.listeners[event] = listenFunc
}

在 AddListener 函數中,我們爲事件分配監聽函數。

我們已經首先完成了添加 jobs 表。現在需要從數據庫中獲取已經過期的作業,執行然後刪除它們。

下面的函數實現顯示了我們如何檢查表中的過期事件並將事件序列化到 Event 結構中:

// checkDueEvents checks and returns due events
func (s Scheduler) checkDueEvents() []Event {
 events := []Event{}
 rows, err := s.db.Query(`SELECT "id", "name", "payload" FROM "public"."jobs" WHERE "runAt" < $1`, time.Now())
 if err != nil {
  log.Print("💀 error: ", err)
  return nil
 }
 for rows.Next() {
  evt := Event{}
  rows.Scan(&evt.ID, &evt.Name, &evt.Payload)
  events = append(events, evt)
 }
 return events
}

第二步是調用從數據庫中找到的已註冊事件偵聽器,如下所示:

// callListeners calls the event listener of provided event
func (s Scheduler) callListeners(event Event) {
 eventFn, ok := s.listeners[event.Name]
 if ok {
  go eventFn(event.Payload)
  _, err := s.db.Exec(`DELETE FROM "public"."jobs" WHERE "id" = $1`, event.ID)
  if err != nil {
   log.Print("💀 error: ", err)
  }
 } else {
  log.Print("💀 error: couldn't find event listeners attached to ", event.Name)
 }

}// callListeners calls the event listener of provided event
func (s Scheduler) callListeners(event Event) {
 eventFn, ok := s.listeners[event.Name]
 if ok {
  go eventFn(event.Payload)
  _, err := s.db.Exec(`DELETE FROM "public"."jobs" WHERE "id" = $1`, event.ID)
  if err != nil {
   log.Print("💀 error: ", err)
  }
 } else {
  log.Print("💀 error: couldn't find event listeners attached to ", event.Name)
 }

}

在這裏,我們正在檢查是否有綁定的事件函數,如果找到則調用事件的監聽器函數。

第 6 行到第 9 行將從數據庫中刪除事件,以便在下次輪詢數據庫時不會再找到。

最後一步是(輪詢)檢查某個事件是否在給定時間間隔內過期。

對於間隔運行的任務,我們使用 time 庫的 ticker 函數,該函數將提供一個通道,該通道在提供的間隔內接收新的 tick

// CheckEventsInInterval checks the event in given interval
func (s Scheduler) CheckEventsInInterval(ctx context.Context, duration time.Duration) {
 ticker := time.NewTicker(duration)
 go func() {
  for {
   select {
   case <-ctx.Done():
    ticker.Stop()
    return
   case <-ticker.C:
    log.Println("⏰ Ticks Received...")
    events := s.checkDueEvents()
    for _, e := range events {
     s.callListeners(e)
    }
   }

  }
 }()
}

在第 7 行和第 10 行中,我們檢查上下文是否已關閉或 ticker通道是否正在接收新的 tick

在 11 行接收到 tick 後,我們檢查到期事件,然後調用所有事件的偵聽器函數。

下一步就是在 main.go 中,實際使用我們前面定義的那些函數,如下所示:

package main

import (
 "context"
 "log"
 "os"
 "os/signal"
 "time"

 "github.com/dipeshdulal/event-scheduling/customevents"
)

var eventListeners = Listeners{
 "SendEmail": customevents.SendEmail,
 "PayBills":  customevents.PayBills,
}

func main() {
 ctx, cancel := context.WithCancel(context.Background())

 interrupt := make(chan os.Signal, 1)
 signal.Notify(interrupt, os.Interrupt)

 db := initDBConnection()

 scheduler := NewScheduler(db, eventListeners)
 scheduler.CheckEventsInInterval(ctx, time.Minute)

 scheduler.Schedule("SendEmail", "mail: nilkantha.dipesh@gmail.com", time.Now().Add(1*time.Minute))
 scheduler.Schedule("PayBills", "paybills: $4,000 bill", time.Now().Add(2*time.Minute))

 go func() {
  for range interrupt {
   log.Println("\n❌ Interrupt received closing...")
   cancel()
  }
 }()

 <-ctx.Done()
}

在第 13 行到第 16 行中,我們將偵聽函數綁定到事件 SendEmail 和 PayBills上,以便在發生新事件時調用這些函數。

在 22 行 和 32 到 37 行中,我們添加了中斷信號 (os.Interrupt) 通道,當程序中發生中斷時,我們執行 19 行中的上下文取消函數。

從第 26 行到第 30 行,我們定義事件調度程序、運行輪詢函數並將在一分鐘後運行 SendEmail ,兩分鐘後運行 PayBills

程序的輸出將如下所示:

2021/01/16 11:58:49 💾 Seeding database with table...
2021/01/16 11:58:49 🚀 Scheduling event SendEmail to run at 2021-01-16 11:59:49.344904505 +0545 +0545 m=+60.004623549
2021/01/16 11:58:49 🚀 Scheduling event PayBills to run at 2021-01-16 12:00:49.34773798 +0545 +0545 m=+120.007457039
2021/01/16 11:59:49 ⏰ Ticks Received...
2021/01/16 11:59:49 📨 Sending email with data:  mail: nilkantha.dipesh@gmail.com
2021/01/16 12:00:49 ⏰ Ticks Received...
2021/01/16 12:01:49 ⏰ Ticks Received...
2021/01/16 12:01:49 💲 Pay me a bill:  paybills: $4,000 bill
2021/01/16 12:02:49 ⏰ Ticks Received...
2021/01/16 12:03:49 ⏰ Ticks Received...
^C2021/01/16 12:03:57 
❌ Interrupt received closing...

從輸出中,我們可以看到 SendEmail 在一分鐘後觸發,事件 PayBills 在第二分鐘後觸發。

通過這種方式,我們構建了一個基本的事件調度系統,它將在一定時間間隔後調度事件。

這個例子只展示了事件調度程度的基本實現,並未覆蓋諸如:如果兩個輪詢間隔之間發生重疊,如何處理,如何不使用輪詢等。我們可以使用 rabbitmqkafka 等完成一個最終嚴謹的事件調度程度。

原文地址:https://medium.com/wesionary-team/building-basic-event-scheduler-in-go-134c19f77f84

原文作者:Dipesh Dulal

本文永久鏈接:https://github.com/gocn/translator/blob/master/2023/w15_Building_Basic_Event_Scheduler_in_Go.md

譯者:lsj1342

校對:cvley

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/XZ_O14JLyFX9ppbqVHastw