Go 實現 RabbitMQ 延遲任務詳解

【導讀】 本文介紹了 Go 語言配合 RabbitMQ 實現延遲任務的實現。

延遲任務在業務中是一個很常見的需求,比如:

  1. 訂單下單 15 分鐘之後,用戶沒有支付,則自動取消訂單

  2. 用戶做了某些操作,5 分鐘之後發短信提醒用戶

諸如此類的場景比比皆是,一種最常見的實現方式,就是開啓一個定時任務,然後一直輪詢數據庫,這種實現方式在數據量小的時候還好,但是數據量一旦過大,這輪詢數據庫就會給數據庫造成很大的壓力,此時全面掃表的實現方式就顯得不可靠了。

另外一種實現方式,就是用延遲隊列的方式來實現,但是 RabbitMQ 本身是沒有實現延遲隊列的,不過可以使用 TTL + 死信隊列的方式來實現延遲隊列。

消息的 TTL

TTL 全稱 Time To Live,即生存時間。消息的 TTL 也就是消息的生存時間。在 RabbitMQ 中設置 TTL 有兩種

如果兩者都設置,生存時間取兩者最小的那一個。這裏我們採用第二種,即爲每條消息設置 TTL

死信交換機 / 死信隊列

一個消息在滿足如下的條件的時候,就會變成 “死信”,並且能被投遞到死信交換機(Dead-Letter-Exchange),最後進入到死信交換機綁定的隊列,也稱死信隊列(Dead-Letter-Queue)

死信交換機和普通的交換機是沒有區別的,只是某一個設置死信交換機的隊列中有消息過期了,會自動觸發消息的轉發,發送到死信交換機中去,再由死信交換機轉發到死信隊列中。死信隊列也是一個普通的隊列,並沒有什麼其它特殊的。

延遲隊列的實現

接着來看看 TTL + 死信交換機是如何實現延遲隊列的

上面的流程就是實現延遲隊列的思路,比方說 15 分鐘取消訂單,那麼用戶下單之後,消息的 TTL 設置爲 15 分鐘,當消息在 Queue1 待的時間到了 15 分鐘,那麼就會被轉發到 Dead-Letter-Exchange,從而轉發到 Dead-Letter-Queue,最後被消費者消費,實現延遲任務。

先在 RabbitMQ 控制檯創建一個名爲 dlx 的交換機,作爲死信交換機,並綁定上一個 dlxQueue 隊列,作爲 Dead-Letter-Queue

// 生產者.go
package main

import (
    "github.com/streadway/amqp"
    "mq/fail"
)

func main() {
    conn, err := amqp.Dial("amqp://123:123@localhost:5672")
    fail.OnError(err)
    defer conn.Close()

    ch, err := conn.Channel()
    fail.OnError(err)
    defer ch.Close()

    args := amqp.Table{"x-dead-letter-exchange""dlx"} 
    q, err := ch.QueueDeclare("test", true, false, false, false, args) // 聲明一個test隊列,並設置隊列的死信交換機爲"dlx"

    body := "hello world1"
    for i := 0; i < 10; i++ {
        err = ch.Publish("", q.Name, false, false, amqp.Publishing{
            Body:       []byte(body),
            Expiration: "5000", // 設置TTL爲5秒
        })
        fail.OnError(err)
    }
}

啓動生產者,可以看到消息被投遞到 test 隊列中

5 秒之後,消息被轉發到 dlxQueue 隊列中

之後有一個消費者,專門處理這個 dlxQueu 隊列中的消息

// 消費者.go
package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "mq/fail"
)

func main() {
    conn, err := amqp.Dial("amqp://123:123@localhost:5672")
    fail.OnError(err)

    c, err := conn.Channel()
    fail.OnError(err)

    msgs, err := c.Consume("dlxQueue""", true, false, false, false, nil) //監聽dlxQueue隊列
    fail.OnError(err)

    for d := range msgs {
        fmt.Printf("收到信息: %s\n", d.Body) // 收到消息,業務處理
    }
}

// 5秒之後,打印
// 收到信息: hello world1
// 收到信息: hello world1
// 收到信息: hello world1
// 收到信息: hello world1
// 收到信息: hello world1
// 收到信息: hello world1
// 收到信息: hello world1
// 收到信息: hello world1
// 收到信息: hello world1
// 收到信息: hello world1

總結

使用 TTL + 死信交換機實現延遲任務還是非常方便的,除此之外還可以使用相關的插件 abbitmq-delayed-message-exchange,來實現延遲隊列,也是非常的方便。

Thanks!

轉自:

juejin.cn/post/6844904142155022344

Go 開發大全

參與維護一個非常全面的 Go 開源技術資源庫。日常分享 Go, 雲原生、k8s、Docker 和微服務方面的技術文章和行業動態。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/ri4Y8Pn0Y3y2kdnzb6AUmQ