Go 語言實現延遲隊列

簡單 Go 語言延遲隊列思路:

package main
import (
  "fmt"
  "time"
)
// Message is a structure that holds the contents of the message.
type Message struct {
  ID     int
  Body   string
  Delay  time.Duration
}
// delayMessageQueue holds the queue of messages
type delayMessageQueue struct {
  queue []Message
}
func (d *delayMessageQueue) send(msg Message) {
  go func() {
    timer := time.NewTimer(msg.Delay)
    <-timer.C
    d.queue = append(d.queue, msg)
    fmt.Printf("Message %d sent\n", msg.ID)
  }()
}
func (d *delayMessageQueue) receive() {
  for {
    if len(d.queue) > 0 {
      msg := d.queue[0]
      fmt.Printf("Message %d received \n", msg.ID)
      d.queue = d.queue[1:] // Dequeue
    }
  end
}
func main() {
  dmq := &delayMessageQueue{}
  dmq.send(Message{ID: 1, Body: "Hello, World", Delay: 2 * time.Second})
  dmq.send(Message{ID: 2, Body: "Hello, Go", Delay: 1 * time.Second})
  // Keep the main function alive to let goroutines finish
  time.Sleep(5 * time.Second)
  dmq.receive()
}

我們定義了一個消息結構,包含消息 ID、消息內容和延遲。我們也定義了一個延遲消息隊列,它有兩個方法,一個發送消息,一個接收消息。

發送方法將消息放入一個 goroutine 中,然後用一個定時器等待指定的延遲時間。當定時器到達時,消息會添加到隊列中。

接收方法將持續檢查隊列,一旦發現隊列中有消息,就會打印消息內容並將其從隊列中移除。通過 time 到達指定的時間然後進行發送。每個 goroutine 都有自己的定時器,這是非常低效的。實際上,我們應該使用一個最小堆維護所有的定時器,並且只有一個 goroutine 在阻塞等待最早的定時器。如果有更早的定時器插入,喚醒那個 goroutine 並阻塞等待新的最早的定時器。

進階版本

其中所有延遲任務都由一個優先隊列(最小堆)進行維護,取出最早到達的任務進行處理。我給出一個簡版的示例,其他更多的細節如併發控制、錯誤處理等,您可以在實際開發中完善。

Go 語言標準庫container/heap提供了堆操作的實現,通過組合使用該包提供的heap.Pushheap.Pop,可以實現一個優先隊列。

package main
import (
  "container/heap"
  "fmt"
  "sync"
  "time"
)
// 優先隊列內部維護的隊列元素
type Item struct {
  value    string
  priority int64 // 延遲任務到期時間(用時間戳表示)
  index    int   // 隊列元素在堆中的索引
}
// 優先隊列:底層用最小堆實現
type PriorityQueue []*Item
func (pq PriorityQueue) Len() int { return len(pq) }
// 注意這裏比較的是任務的優先級,爲了讓離當前時間最近的任務在堆頂,我們讓比較結果顛倒
func (pq PriorityQueue) Less(i, j int) bool {
  return pq[i].priority < pq[j].priority
}
func (pq PriorityQueue) Swap(i, j int) {
  pq[i], pq[j] = pq[j], pq[i]
  pq[i].index = i
  pq[j].index = j
}
// 插入元素
func (pq *PriorityQueue) Push(x interface{}) {
  n := len(*pq)
  item := x.(*Item)
  item.index = n
  *pq = append(*pq, item)
}
// 刪除元素
func (pq *PriorityQueue) Pop() interface{} {
  old := *pq
  n := len(old)
  item := old[n-1]
  old[n-1] = nil  // avoid memory leak
  item.index = -1 // for safety
  *pq = old[0 : n-1]
  return item
}
var mutex sync.Mutex // 併發控制互斥鎖
// 設定好比cmpTime更晚的任務執行時間並加入堆
func addTask(pq *PriorityQueue, cmpTime int64, diff int64) {
  mutex.Lock() // 加鎖
  taskTime := cmpTime + diff*int64(time.Second)
  item := &Item{
    value:    fmt.Sprintf("任務%d", taskTime),
    priority: taskTime,
  }
  heap.Push(pq, item)
  mutex.Unlock() // 解鎖
}
// 從堆上取出時間最早的任務執行
func doTask(pq *PriorityQueue) {
  for {
    mutex.Lock()
    if len(*pq) == 0 {
      mutex.Unlock()
      continue // 堆空則不處理
    }
    // 堆非空,取出最早任務項
    item := heap.Pop(pq).(*Item)
    now := time.Now().Unix()
    if item.priority-now > 0 {
      // 未到執行時間,任務重新入堆
      heap.Push(pq, item)
    } else {
      // 執行任務
      fmt.Printf("%s 執行\n", item.value)
    }
    mutex.Unlock()
    // 防止doTask過於緊密,每次循環停頓1秒
    time.Sleep(1 * time.Second)
  }
}
func main() {
  pq := make(PriorityQueue, 0)
  heap.Init(&pq)
  // 啓動執行任務goroutine
  go doTask(&pq)
  now := time.Now().Unix()
  // 預設3個延遲任務,延遲時間分別爲8s, 2s, 5s
  addTask(&pq, now, 8)
  addTask(&pq, now, 2)
  addTask(&pq, now, 5)
  // 保持主進程
  time.Sleep(15 * time.Second)
}

在這個實現中,我們創建一個優先隊列(Priority Queue),優先級最高的任務(即最早執行的任務)總是位於堆的頂部,這樣我們可以確保總是首先處理最早執行的任務。當新的任務到來或者任務完成時,我們都用heap.Pushheap.Pop重新調整堆以保證最早執行的任務總是在堆頂。

關於更詳細的併發控制以及錯誤處理,這需要根據實際的業務需求進行對應的修改和處理。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/N1uVrh3cjN3SIVdI-ztOSQ