Go 語言實現延遲隊列
簡單 Go 語言延遲隊列思路:
package main
import (
"fmt"
"time"
)
// Message is a structure that holds the contents of the message.
type Message struct {
ID int
Body string
Delay time.Duration
}
// delayMessageQueue holds the queue of messages
type delayMessageQueue struct {
queue []Message
}
func (d *delayMessageQueue) send(msg Message) {
go func() {
timer := time.NewTimer(msg.Delay)
<-timer.C
d.queue = append(d.queue, msg)
fmt.Printf("Message %d sent\n", msg.ID)
}()
}
func (d *delayMessageQueue) receive() {
for {
if len(d.queue) > 0 {
msg := d.queue[0]
fmt.Printf("Message %d received \n", msg.ID)
d.queue = d.queue[1:] // Dequeue
}
end
}
func main() {
dmq := &delayMessageQueue{}
dmq.send(Message{ID: 1, Body: "Hello, World", Delay: 2 * time.Second})
dmq.send(Message{ID: 2, Body: "Hello, Go", Delay: 1 * time.Second})
// Keep the main function alive to let goroutines finish
time.Sleep(5 * time.Second)
dmq.receive()
}
我們定義了一個消息結構,包含消息 ID、消息內容和延遲。我們也定義了一個延遲消息隊列,它有兩個方法,一個發送消息,一個接收消息。
發送方法將消息放入一個 goroutine 中,然後用一個定時器等待指定的延遲時間。當定時器到達時,消息會添加到隊列中。
接收方法將持續檢查隊列,一旦發現隊列中有消息,就會打印消息內容並將其從隊列中移除。通過 time 到達指定的時間然後進行發送。每個 goroutine 都有自己的定時器,這是非常低效的。實際上,我們應該使用一個最小堆維護所有的定時器,並且只有一個 goroutine 在阻塞等待最早的定時器。如果有更早的定時器插入,喚醒那個 goroutine 並阻塞等待新的最早的定時器。
進階版本
其中所有延遲任務都由一個優先隊列(最小堆)進行維護,取出最早到達的任務進行處理。我給出一個簡版的示例,其他更多的細節如併發控制、錯誤處理等,您可以在實際開發中完善。
Go 語言標準庫container/heap
提供了堆操作的實現,通過組合使用該包提供的heap.Push
和heap.Pop
,可以實現一個優先隊列。
package main
import (
"container/heap"
"fmt"
"sync"
"time"
)
// 優先隊列內部維護的隊列元素
type Item struct {
value string
priority int64 // 延遲任務到期時間(用時間戳表示)
index int // 隊列元素在堆中的索引
}
// 優先隊列:底層用最小堆實現
type PriorityQueue []*Item
func (pq PriorityQueue) Len() int { return len(pq) }
// 注意這裏比較的是任務的優先級,爲了讓離當前時間最近的任務在堆頂,我們讓比較結果顛倒
func (pq PriorityQueue) Less(i, j int) bool {
return pq[i].priority < pq[j].priority
}
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
// 插入元素
func (pq *PriorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*Item)
item.index = n
*pq = append(*pq, item)
}
// 刪除元素
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
*pq = old[0 : n-1]
return item
}
var mutex sync.Mutex // 併發控制互斥鎖
// 設定好比cmpTime更晚的任務執行時間並加入堆
func addTask(pq *PriorityQueue, cmpTime int64, diff int64) {
mutex.Lock() // 加鎖
taskTime := cmpTime + diff*int64(time.Second)
item := &Item{
value: fmt.Sprintf("任務%d", taskTime),
priority: taskTime,
}
heap.Push(pq, item)
mutex.Unlock() // 解鎖
}
// 從堆上取出時間最早的任務執行
func doTask(pq *PriorityQueue) {
for {
mutex.Lock()
if len(*pq) == 0 {
mutex.Unlock()
continue // 堆空則不處理
}
// 堆非空,取出最早任務項
item := heap.Pop(pq).(*Item)
now := time.Now().Unix()
if item.priority-now > 0 {
// 未到執行時間,任務重新入堆
heap.Push(pq, item)
} else {
// 執行任務
fmt.Printf("%s 執行\n", item.value)
}
mutex.Unlock()
// 防止doTask過於緊密,每次循環停頓1秒
time.Sleep(1 * time.Second)
}
}
func main() {
pq := make(PriorityQueue, 0)
heap.Init(&pq)
// 啓動執行任務goroutine
go doTask(&pq)
now := time.Now().Unix()
// 預設3個延遲任務,延遲時間分別爲8s, 2s, 5s
addTask(&pq, now, 8)
addTask(&pq, now, 2)
addTask(&pq, now, 5)
// 保持主進程
time.Sleep(15 * time.Second)
}
在這個實現中,我們創建一個優先隊列(Priority Queue),優先級最高的任務(即最早執行的任務)總是位於堆的頂部,這樣我們可以確保總是首先處理最早執行的任務。當新的任務到來或者任務完成時,我們都用heap.Push
和heap.Pop
重新調整堆以保證最早執行的任務總是在堆頂。
關於更詳細的併發控制以及錯誤處理,這需要根據實際的業務需求進行對應的修改和處理。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/N1uVrh3cjN3SIVdI-ztOSQ