Go 實現 RabbitMQ 延遲任務詳解
【導讀】 本文介紹了 Go 語言配合 RabbitMQ 實現延遲任務的實現。
延遲任務在業務中是一個很常見的需求,比如:
-
訂單下單 15 分鐘之後,用戶沒有支付,則自動取消訂單
-
用戶做了某些操作,5 分鐘之後發短信提醒用戶
諸如此類的場景比比皆是,一種最常見的實現方式,就是開啓一個定時任務,然後一直輪詢數據庫,這種實現方式在數據量小的時候還好,但是數據量一旦過大,這輪詢數據庫就會給數據庫造成很大的壓力,此時全面掃表的實現方式就顯得不可靠了。
另外一種實現方式,就是用延遲隊列的方式來實現,但是 RabbitMQ 本身是沒有實現延遲隊列的,不過可以使用 TTL + 死信隊列的方式來實現延遲隊列。
消息的 TTL
TTL 全稱 Time To Live,即生存時間。消息的 TTL 也就是消息的生存時間。在 RabbitMQ 中設置 TTL 有兩種
-
第一種是聲明隊列的時候,在隊列的屬性中設置 TTL,這樣該隊列中的消息都會有相同的有效期
-
第二種是發送消息時給消息設置屬性,可以爲每條消息都設置不同的 TTL
如果兩者都設置,生存時間取兩者最小的那一個。這裏我們採用第二種,即爲每條消息設置 TTL
死信交換機 / 死信隊列
一個消息在滿足如下的條件的時候,就會變成 “死信”,並且能被投遞到死信交換機(Dead-Letter-Exchange),最後進入到死信交換機綁定的隊列,也稱死信隊列(Dead-Letter-Queue)
-
消息被拒絕而且 requeue=false
-
消息的 TTL 到了,即消息過期
-
隊列排滿了,排在前面的消息會被丟棄或者扔到死信路由上
死信交換機和普通的交換機是沒有區別的,只是某一個設置死信交換機的隊列中有消息過期了,會自動觸發消息的轉發,發送到死信交換機中去,再由死信交換機轉發到死信隊列中。死信隊列也是一個普通的隊列,並沒有什麼其它特殊的。
延遲隊列的實現
接着來看看 TTL + 死信交換機是如何實現延遲隊列的
上面的流程就是實現延遲隊列的思路,比方說 15 分鐘取消訂單,那麼用戶下單之後,消息的 TTL 設置爲 15 分鐘,當消息在 Queue1 待的時間到了 15 分鐘,那麼就會被轉發到 Dead-Letter-Exchange,從而轉發到 Dead-Letter-Queue,最後被消費者消費,實現延遲任務。
先在 RabbitMQ 控制檯創建一個名爲 dlx 的交換機,作爲死信交換機,並綁定上一個 dlxQueue 隊列,作爲 Dead-Letter-Queue
// 生產者.go
package main
import (
"github.com/streadway/amqp"
"mq/fail"
)
func main() {
conn, err := amqp.Dial("amqp://123:123@localhost:5672")
fail.OnError(err)
defer conn.Close()
ch, err := conn.Channel()
fail.OnError(err)
defer ch.Close()
args := amqp.Table{"x-dead-letter-exchange": "dlx"}
q, err := ch.QueueDeclare("test", true, false, false, false, args) // 聲明一個test隊列,並設置隊列的死信交換機爲"dlx"
body := "hello world1"
for i := 0; i < 10; i++ {
err = ch.Publish("", q.Name, false, false, amqp.Publishing{
Body: []byte(body),
Expiration: "5000", // 設置TTL爲5秒
})
fail.OnError(err)
}
}
啓動生產者,可以看到消息被投遞到 test 隊列中
5 秒之後,消息被轉發到 dlxQueue 隊列中
之後有一個消費者,專門處理這個 dlxQueu 隊列中的消息
// 消費者.go
package main
import (
"fmt"
"github.com/streadway/amqp"
"mq/fail"
)
func main() {
conn, err := amqp.Dial("amqp://123:123@localhost:5672")
fail.OnError(err)
c, err := conn.Channel()
fail.OnError(err)
msgs, err := c.Consume("dlxQueue", "", true, false, false, false, nil) //監聽dlxQueue隊列
fail.OnError(err)
for d := range msgs {
fmt.Printf("收到信息: %s\n", d.Body) // 收到消息,業務處理
}
}
// 5秒之後,打印
// 收到信息: hello world1
// 收到信息: hello world1
// 收到信息: hello world1
// 收到信息: hello world1
// 收到信息: hello world1
// 收到信息: hello world1
// 收到信息: hello world1
// 收到信息: hello world1
// 收到信息: hello world1
// 收到信息: hello world1
總結
使用 TTL + 死信交換機實現延遲任務還是非常方便的,除此之外還可以使用相關的插件 abbitmq-delayed-message-exchange,來實現延遲隊列,也是非常的方便。
Thanks!
轉自:
juejin.cn/post/6844904142155022344
Go 開發大全
參與維護一個非常全面的 Go 開源技術資源庫。日常分享 Go, 雲原生、k8s、Docker 和微服務方面的技術文章和行業動態。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/ri4Y8Pn0Y3y2kdnzb6AUmQ